-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmyvcs.py
More file actions
354 lines (304 loc) · 13.3 KB
/
myvcs.py
File metadata and controls
354 lines (304 loc) · 13.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
#Importing the modules required
import mysql.connector
from mysql.connector import Error
import logging
import hashlib
from psycopg2 import Error
logging.basicConfig(level=logging.INFO)
#Logic for connection to database
def create_connection(host_name, user_name, user_password, db_name):
"""Create a database connection to a MySQL database."""
conn = None
try:
conn = mysql.connector.connect(
host=host_name,
user=user_name,
password=user_password,
database=db_name
)
if conn.is_connected():
logging.info("Connection established")
initialize_main_branch(conn)
except Error as e:
logging.error(f"Error: '{e}'")
return conn
#Initializing the default branch as main
def initialize_main_branch(conn):
with conn.cursor() as cursor:
query_check = "SELECT * FROM branches WHERE name = 'main'"
cursor.execute(query_check)
existing_branch = cursor.fetchone()
if existing_branch:
logging.info("Main branch already exists. No need to create.")
return False
query = "INSERT INTO branches (name, latest_commit) VALUES ('main', NULL)"
try:
cursor.execute(query)
conn.commit()
logging.info("Main branch created successfully.")
return True
except Error as e:
logging.error(f"Error: '{e}'")
return False
#Logic for addition of files
def add_file(conn, file_hash, content):
with conn.cursor() as cursor:
query_check = "SELECT * FROM files WHERE hash = %s"
cursor.execute(query_check, (file_hash,))
existing_file = cursor.fetchone()
if existing_file:
logging.info("File with this hash already exists. Skipping insert.")
return False
query = "INSERT INTO files (hash, content) VALUES (%s, %s)"
try:
cursor.execute(query, (file_hash, content))
conn.commit()
logging.info("File added successfully.")
return True
except Error as e:
logging.error(f"Error: '{e}'")
return False
#Logic for creation of commits with automated updation
def create_commit(conn, commit_hash, message, parent_commit, branch_name):
"""Create a new commit and update the corresponding branch's latest commit."""
with conn.cursor() as cursor:
if commit_exists(conn, commit_hash):
logging.info(f"Commit with hash '{commit_hash}' already exists. Skipping insert.")
return False
if parent_commit and not commit_exists(conn, parent_commit):
logging.error(f"Cannot create commit '{commit_hash}': Parent commit '{parent_commit}' does not exist.")
return False
query = "INSERT INTO commits (commit_hash, message, parent_commit, branch_name) VALUES (%s, %s, %s, %s)"
try:
cursor.execute(query, (commit_hash, message, parent_commit, branch_name))
conn.commit()
logging.info(f"Commit '{commit_hash}' created successfully.")
cursor.execute("SELECT 1 FROM branches WHERE name = %s", (branch_name,))
branch_exists = cursor.fetchone() is not None
if branch_exists:
cursor.execute("UPDATE branches SET latest_commit = %s WHERE name = %s",
(commit_hash, branch_name))
conn.commit()
logging.info(f"Branch '{branch_name}' updated with latest commit '{commit_hash}'.")
else:
cursor.execute("INSERT INTO branches (name, latest_commit) VALUES (%s, %s)",
(branch_name, commit_hash))
conn.commit()
logging.info(f"Branch '{branch_name}' created with latest commit '{commit_hash}'.")
return True
except Error as e:
logging.error(f"Error in commit creation or branch update: {e}")
conn.rollback()
return False
#Verification of commit existence
def commit_exists(conn, commit_hash):
"""Check if a commit exists in the commits table."""
with conn.cursor() as cursor:
cursor.execute("SELECT 1 FROM commits WHERE commit_hash = %s", (commit_hash,))
return cursor.fetchone() is not None
#Logic for creation of branches
def create_branch(conn, branch_name, latest_commit):
with conn.cursor() as cursor:
query_check = "SELECT * FROM branches WHERE name = %s"
cursor.execute(query_check, (branch_name,))
existing_branch = cursor.fetchone()
if existing_branch:
logging.info(f"Branch with name '{branch_name}' already exists. Skipping insert.")
return False
if latest_commit:
query_commit_check = "SELECT * FROM commits WHERE commit_hash = %s"
cursor.execute(query_commit_check, (latest_commit,))
commit_exists = cursor.fetchone()
if not commit_exists:
logging.error(f"Cannot create branch '{branch_name}': Latest commit '{latest_commit}' does not exist.")
return False
query = "INSERT INTO branches (name, latest_commit) VALUES (%s, %s)"
try:
cursor.execute(query, (branch_name, latest_commit if latest_commit else None))
conn.commit()
logging.info(f"Branch '{branch_name}' created successfully.")
return True
except Error as e:
logging.error(f"Error creating branch '{branch_name}': '{e}'")
return False
#Logic for file retrieval by hash
def get_file_by_hash(conn, file_hash):
with conn.cursor() as cursor:
query = "SELECT * FROM files WHERE hash = %s"
try:
cursor.execute(query, (file_hash,))
file = cursor.fetchone()
if file:
logging.info(f"File found: {file}")
return file
else:
logging.info("File not found.")
return None
except Error as e:
logging.error(f"Error: '{e}'")
return None
#Retrieval of commit info
def get_commit_info(conn, commit_hash):
"""Check if a commit exists in the commits table."""
cursor = conn.cursor()
cursor.execute("SELECT * FROM commits WHERE commit_hash = %s", (commit_hash,))
return cursor.fetchone()
#Logic for retrieval of commit history
def get_commit_history(conn, branch_name):
with conn.cursor() as cursor:
query = "SELECT commit_hash, message, timestamp FROM commits WHERE branch_name = %s ORDER BY timestamp ASC"
try:
cursor.execute(query, (branch_name,))
commits = cursor.fetchall()
if commits:
logging.info(f"Commit history for branch '{branch_name}':")
for commit in commits:
logging.info(f"Commit Hash: {commit[0]}, Message: {commit[1]}, Timestamp: {commit[2]}")
return commits
else:
logging.info(f"No commits found for branch '{branch_name}'.")
return []
except Error as e:
logging.error(f"Error: '{e}'")
return []
#Logic for retrieval of branch info
def get_branch_info(conn, branch_name):
with conn.cursor() as cursor:
query = "SELECT name, latest_commit FROM branches WHERE name = %s"
try:
cursor.execute(query, (branch_name,))
branch = cursor.fetchone()
if branch:
logging.info(f"Branch: {branch[0]}, Latest Commit: {branch[1]}")
return branch
else:
logging.info("Branch not found.")
return None
except Error as e:
logging.error(f"Error: '{e}'")
return None
#Logic for deletion of files
def delete_file(conn, file_hash):
with conn.cursor() as cursor:
query = "DELETE FROM files WHERE hash = %s"
try:
cursor.execute(query, (file_hash,))
conn.commit()
logging.info("File deleted successfully.")
return True
except Error as e:
logging.error(f"Error: '{e}'")
return False
#Logic for deletion of commits
def delete_commit(conn, commit_hash):
with conn.cursor() as cursor:
query = "DELETE FROM commits WHERE commit_hash = %s"
try:
cursor.execute(query, (commit_hash,))
conn.commit()
logging.info("Commit deleted successfully.")
return True
except Error as e:
logging.error(f"Error: '{e}'")
return False
#Logic for deletion of branches
def delete_branch(conn, branch_name):
"""Delete a branch from the database."""
with conn.cursor() as cursor:
query_check = "SELECT * FROM branches WHERE name = %s"
cursor.execute(query_check, (branch_name,))
existing_branch = cursor.fetchone()
if not existing_branch:
logging.warning(f"Branch '{branch_name}' does not exist. Deletion failed.")
return False
query_commit_check = "SELECT * FROM commits WHERE branch_name = %s"
cursor.execute(query_commit_check, (branch_name,))
associated_commits = cursor.fetchall()
if associated_commits:
logging.warning(f"Branch '{branch_name}' has associated commits. Deletion failed.")
return False
query_delete = "DELETE FROM branches WHERE name = %s"
try:
cursor.execute(query_delete, (branch_name,))
conn.commit()
logging.info(f"Branch '{branch_name}' deleted successfully.")
return True
except Error as e:
logging.error(f"Error: '{e}'")
return False
#Logic for merging of branches
def merge_branches(conn, source_branch, target_branch):
"""Merge source_branch into target_branch."""
source_commit_hash = get_latest_commit_hash(conn, source_branch)
target_commit_hash = get_latest_commit_hash(conn, target_branch)
if not source_commit_hash or not target_commit_hash:
logging.warning("One or both branches not found.")
return False
common_ancestor_commit = find_common_ancestor(conn, source_commit_hash, target_commit_hash)
if not common_ancestor_commit:
logging.warning("No common ancestor found, unable to merge.")
return False
logging.info(f"Common ancestor commit: {common_ancestor_commit}")
if check_for_conflicts(conn, common_ancestor_commit, source_commit_hash, target_commit_hash):
logging.warning("Conflicts detected, need to resolve manually.")
return False
merge_commit_hash = create_merge_commit(conn, source_commit_hash, target_commit_hash, target_branch)
logging.info(f"Merge commit created: {merge_commit_hash}")
update_branch(conn, target_branch, merge_commit_hash)
return True
#Functions involved in branch merging
#Fetching the latest commit hash
def get_latest_commit_hash(conn, branch_name):
"""Fetch the latest commit hash for a given branch."""
with conn.cursor() as cursor:
cursor.execute("SELECT latest_commit FROM branches WHERE name = %s", (branch_name,))
result = cursor.fetchone()
if result:
return result[0]
logging.warning(f"No commit found for branch '{branch_name}'")
return None
#Finding common ancestor between two commits
def find_common_ancestor(conn, commit_hash1, commit_hash2):
"""Find the common ancestor of two commits."""
visited = set()
while commit_hash1:
visited.add(commit_hash1)
commit_hash1 = get_parent_commit(conn, commit_hash1)
while commit_hash2:
if commit_hash2 in visited:
return commit_hash2
commit_hash2 = get_parent_commit(conn, commit_hash2)
return None
#Retrieval of parent commit
def get_parent_commit(conn, commit_hash):
"""Fetch the parent commit for a given commit hash."""
with conn.cursor() as cursor:
cursor.execute("SELECT parent_commit FROM commits WHERE commit_hash = %s", (commit_hash,))
parent = cursor.fetchone()
return parent[0] if parent else None
#Checking for any conflicts
def check_for_conflicts(conn, ancestor_commit, source_commit, target_commit):
"""Check if there are conflicts between source and target commits."""
return False
#Creation of the merged commits
def create_merge_commit(conn, source_commit, target_commit, branch_name):
"""Create a merge commit."""
merge_commit_hash = hashlib.sha256(f"{source_commit}{target_commit}{branch_name}".encode()).hexdigest()
with conn.cursor() as cursor:
try:
cursor.execute(
"INSERT INTO commits (commit_hash, message, parent_commit, branch_name) VALUES (%s, %s, %s, %s)",
(merge_commit_hash, f"Merge {source_commit} into {target_commit}", source_commit, branch_name)
)
conn.commit()
except Exception as e:
logging.error(f"Failed to create merge commit: {e}")
return None
return merge_commit_hash
#Updation of the branch
def update_branch(conn, branch_name, commit_hash):
"""Update the latest commit for a branch."""
with conn.cursor() as cursor:
cursor.execute("UPDATE branches SET latest_commit = %s WHERE name = %s", (commit_hash, branch_name))
conn.commit()
#Ending the code