-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathip_geolocation_adapter.py
More file actions
122 lines (97 loc) · 3.44 KB
/
ip_geolocation_adapter.py
File metadata and controls
122 lines (97 loc) · 3.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MySQL连接器适配器
该脚本提供了一个统一的接口来使用不同的MySQL连接器。
可以自动检测系统上可用的MySQL连接器库,并使用最合适的一个。
"""
import sys
import importlib.util
class MySQLAdapter:
"""MySQL连接器适配器类"""
def __init__(self):
self.connector_type = self._detect_connector()
if not self.connector_type:
print("错误: 未找到可用的MySQL连接器")
print("请安装以下连接器之一: mysql-connector-python, mysqlclient, pymysql")
sys.exit(1)
print(f"使用 {self.connector_type} 连接MySQL")
def _detect_connector(self):
"""检测系统上可用的MySQL连接器"""
connectors = [
{"name": "mysql.connector", "module": "mysql.connector"},
{"name": "MySQLdb", "module": "MySQLdb"},
{"name": "pymysql", "module": "pymysql"}
]
for connector in connectors:
if importlib.util.find_spec(connector["module"]):
return connector["name"]
return None
def connect(self, config):
"""连接到MySQL数据库"""
if self.connector_type == "mysql.connector":
import mysql.connector
return mysql.connector.connect(**config)
elif self.connector_type == "MySQLdb":
import MySQLdb
# MySQLdb使用不同的参数命名
adjusted_config = {
"host": config.get("host"),
"user": config.get("user"),
"passwd": config.get("password"),
"db": config.get("database")
}
return MySQLdb.connect(**adjusted_config)
elif self.connector_type == "pymysql":
import pymysql
return pymysql.connect(
host=config.get("host"),
user=config.get("user"),
password=config.get("password"),
database=config.get("database")
)
def execute_query(self, conn, query, params=None):
"""执行SQL查询"""
cursor = conn.cursor()
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
return cursor
def commit(self, conn):
"""提交事务"""
conn.commit()
def close(self, conn):
"""关闭连接"""
conn.close()
def fetchall(self, cursor):
"""获取所有结果"""
return cursor.fetchall()
def rowcount(self, cursor):
"""获取影响的行数"""
return cursor.rowcount
# 使用示例
if __name__ == "__main__":
db = MySQLAdapter()
# 创建数据库配置
db_config = {
'host': 'localhost',
'user': 'root',
'password': 'password',
'database': 'test'
}
try:
# 连接数据库
conn = db.connect(db_config)
# 执行查询
cursor = db.execute_query(conn, "SELECT VERSION()")
# 获取结果
result = db.fetchall(cursor)
print(f"MySQL版本: {result[0][0]}")
# 关闭连接
db.close(conn)
except Exception as e:
print(f"数据库连接测试失败: {e}")
sys.exit(1)
print("数据库连接测试成功!")
sys.exit(0)