-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpreparation.py
More file actions
161 lines (135 loc) · 6.11 KB
/
preparation.py
File metadata and controls
161 lines (135 loc) · 6.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#!/usr/bin/env python3
"""
Функции подготовки для бенчмаркинга облачных PostgreSQL
Этот модуль содержит основные функции для подготовки и настройки баз данных
перед выполнением бенчмаркинга.
"""
import os
import sys
import subprocess
import time
import logging
import psycopg2
from config import benchmark_config
from preparation_utils import (
test_network_latency,
create_custom_workload_files,
cleanup_custom_workload_files,
vacuum_database,
drop_pgbench_tables
)
logger = logging.getLogger('pg_benchmark')
def verify_prerequisites() -> str:
"""
Проверка установки и доступности pgbench.
Returns:
str: версия pgbench или вызывает исключение, если не найден
"""
try:
result = subprocess.run(
["pgbench", "--version"],
capture_output=True,
text=True,
check=True
)
version = result.stdout.strip()
logger.info(f"pgbench найден: {version}")
return version
except subprocess.CalledProcessError as e:
logger.error(f"Ошибка выполнения pgbench: {e}")
raise RuntimeError("Ошибка выполнения pgbench. Проверьте правильность установки.")
except FileNotFoundError:
logger.error("pgbench не найден в PATH")
raise RuntimeError("pgbench не найден. Пожалуйста, установите инструменты клиента PostgreSQL.")
def test_connectivity(config: dict) -> tuple:
"""
Проверка соединения с сервисом базы данных.
Args:
config: Словарь конфигурации базы данных
Returns:
Tuple[bool, float]: Статус соединения и задержка в миллисекундах
"""
start_time = time.time()
connection = None
success = False
try:
connection_string = (
f"host={config['host']} "
f"port={config['port']} "
f"dbname={config['database']} "
f"user={config['user']} "
f"password={config['password']} "
f"connect_timeout={benchmark_config['connection_timeout']}"
)
if benchmark_config['use_ssl']:
connection_string += " sslmode=require"
logger.info(f"Проверка соединения с {config['name']} на {config['host']}")
for attempt in range(benchmark_config['connection_retries']):
try:
connection = psycopg2.connect(connection_string)
cursor = connection.cursor()
cursor.execute("SELECT version();")
version = cursor.fetchone()[0]
cursor.close()
end_time = time.time()
latency_ms = (end_time - start_time) * 1000
logger.info(f"Успешное соединение с {config['name']}")
logger.info(f"Версия PostgreSQL: {version}")
logger.info(f"Задержка соединения: {latency_ms:.2f} мс")
success = True
break
except psycopg2.OperationalError as e:
logger.warning(f"Попытка соединения {attempt+1} не удалась: {e}")
if attempt < benchmark_config['connection_retries'] - 1:
time.sleep(2) # Ожидание перед повторной попыткой
else:
logger.error(f"Не удалось соединиться после {benchmark_config['connection_retries']} попыток")
latency_ms = -1
except Exception as e:
logger.error(f"Непредвиденная ошибка во время проверки соединения: {e}")
latency_ms = -1
finally:
if connection:
connection.close()
return success, latency_ms
def prepare_database(config: dict, scale_factor: int) -> tuple:
"""
Подготовка базы данных для бенчмаркинга.
Args:
config: Словарь конфигурации базы данных
scale_factor: Фактор масштабирования для инициализации pgbench
Returns:
Tuple[bool, float]: Статус подготовки и время в секундах
"""
start_time = time.time()
# Создание команды pgbench для инициализации
pgbench_cmd = [
"pgbench",
"-h", config["host"],
"-p", str(config["port"]),
"-U", config["user"],
"-d", config["database"],
"-i", # Инициализация
"-s", str(scale_factor) # Фактор масштабирования
]
# Установка переменной окружения PGPASSWORD
env = os.environ.copy()
env["PGPASSWORD"] = config["password"]
try:
logger.info(f"Инициализация таблиц pgbench для {config['name']} с фактором масштабирования {scale_factor}")
result = subprocess.run(
pgbench_cmd,
env=env,
capture_output=True,
text=True,
check=True
)
end_time = time.time()
preparation_time = end_time - start_time
logger.info(f"Подготовка базы данных завершена за {preparation_time:.2f} секунд")
logger.debug(f"Вывод инициализации: {result.stdout}")
return True, preparation_time
except subprocess.CalledProcessError as e:
logger.error(f"Ошибка подготовки базы данных: {e}")
logger.error(f"Вывод ошибки: {e.stderr}")
return False, -1