-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathSql.py
More file actions
executable file
·102 lines (88 loc) · 3.8 KB
/
Sql.py
File metadata and controls
executable file
·102 lines (88 loc) · 3.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import re
import warnings
# ignore unhelpful MySQL warnings
from pymysql import connect, cursors, OperationalError
warnings.filterwarnings("ignore", category=Warning)
class Sql(object):
def __init__(self, args, log):
self.tag_count = 0
conn = connect(host=args.db_host, user=args.db_user, password=args.db_password)
cursor = conn.cursor()
cursor.execute(
"CREATE DATABASE IF NOT EXISTS `{0}`".format(args.temp_db_database)
)
self.log = log
self.conn = connect(
host=args.db_host,
user=args.db_user,
password=args.db_password,
database=args.temp_db_database,
charset="utf8",
use_unicode=True,
autocommit=True,
)
self.cursor = self.conn.cursor()
self.database = args.temp_db_database
def execute(self, script, parameters=(), database=None):
cursor = self.conn.cursor()
cursor.execute(f"USE {database or self.database}")
cursor.execute(script, parameters)
self.conn.commit()
def execute_dict(self, script, parameters=()):
dict_cursor = self.conn.cursor(cursors.DictCursor)
dict_cursor.execute(script, parameters)
return dict_cursor.fetchall()
def execute_and_fetchall(self, database: str, statement: str):
"""
Execute a SQL statement and then fetch its results.
:param database: The database to run the statement against.
:param statement: The SQL statement to execute.
:return: The fetched result of the SQL statement as a dict.
"""
cursor = self.conn.cursor()
cursor.execute(f"USE {database}")
cursor.execute(statement)
self.conn.commit()
return cursor.fetchall()
def run_script_from_file(self, filename, database, initial_load=False):
# Open and read the file as a single buffer
fd = open(filename, "r")
sqlFile = fd.read()
fd.close()
self.run_sql_file(sqlFile, database, initial_load)
def run_sql_file(self, sqlFile, database, initial_load=False):
# replace placeholders and return all SQL commands (split on ';')
sqlCommands = sqlFile.replace("$DATABASE$", database).split(";\n")
# Start a transaction
self.cursor.execute("START TRANSACTION")
self.cursor.execute("CREATE DATABASE IF NOT EXISTS {0}".format(database))
self.cursor.execute("USE {0}".format(database))
# Execute every command from the input file
for command in sqlCommands:
# This will skip and report errors
# For example, if the tables do not yet exist, this will skip over
# the DROP TABLE commands
try:
# Strip out commented out lines
end_command = re.sub(r"(--|#|\/\*).*?\n", "", command)
lc_command = end_command.lower().strip().replace("\n", "")
if initial_load and (
lc_command.startswith("create database ")
or lc_command.startswith("use ")
):
self.log.info("Skipping command - {0}".format(lc_command))
elif lc_command is None or lc_command == "":
self.log.info(lc_command)
else:
self.cursor.execute(lc_command)
except OperationalError as e:
self.log.info("Command skipped: {0} [{1}]".format(command, e))
self.conn.commit()
def col_exists(self, col, table, database):
self.cursor.execute(
"""
SELECT * FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = '{0}' AND TABLE_NAME = '{1}' AND COLUMN_NAME = '{2}'
""".format(database, table, col)
)
result = self.cursor.fetchone()
return result is not None