Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions collectoss/application/cli/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)

from collectoss.application.db.session import DatabaseSession
from collectoss.application.db.lib import merge_contributors
from sqlalchemy import update
from datetime import datetime
from collectoss.application.db.models import Repo
Expand Down Expand Up @@ -151,6 +152,29 @@ def get_repo_groups(ctx: click.Context) -> pd.DataFrame:
return df


@cli.command("merge-contributors")
@click.argument("primary_cntrb_id")
@click.argument("duplicate_cntrb_id")
@test_connection
@test_db_connection
@with_database
@click.pass_context
def merge_contributors_cmd(ctx: click.Context, primary_cntrb_id: str, duplicate_cntrb_id: str) -> None:
"""Merge a duplicate contributor into the primary contributor."""
try:
merge_contributors(
ctx.obj.engine,
primary_cntrb_id,
duplicate_cntrb_id,
)
click.echo(
f"Merged duplicate contributor {duplicate_cntrb_id} into primary contributor {primary_cntrb_id}"
)
except Exception as e:
logger.error(f"Failed to merge contributors: {e}")
raise


@cli.command("add-repo-groups")
@click.argument("filename", type=click.Path(exists=True))
@test_connection
Expand Down
102 changes: 102 additions & 0 deletions collectoss/application/db/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,108 @@ def fetchall_data_from_sql_text(sql_text):
result = connection.execute(sql_text)
return [dict(row) for row in result.mappings()]


def merge_contributors(engine, primary_cntrb_id: str, duplicate_cntrb_id: str) -> None:
"""Merge a duplicate contributor record into a primary contributor record.

This updates all foreign-key references to the duplicate contributor id
to use the primary contributor id, merges contributor alias rows,
and deletes the duplicate contributor row.
"""

if primary_cntrb_id == duplicate_cntrb_id:
raise ValueError("Primary and duplicate contributor IDs must differ")

with engine.begin() as connection:
primary_exists = connection.execute(
s.sql.text(
"SELECT 1 FROM data.contributors WHERE cntrb_id = :id"
),
{"id": primary_cntrb_id},
).scalar()

duplicate_exists = connection.execute(
s.sql.text(
"SELECT 1 FROM data.contributors WHERE cntrb_id = :id"
),
{"id": duplicate_cntrb_id},
).scalar()

if not primary_exists:
raise ValueError(f"Primary contributor not found: {primary_cntrb_id}")
if not duplicate_exists:
raise ValueError(f"Duplicate contributor not found: {duplicate_cntrb_id}")

connection.execute(
s.sql.text(
"""
DELETE FROM data.contributors_aliases
WHERE cntrb_id = :duplicate_id
AND alias_email IN (
SELECT alias_email
FROM data.contributors_aliases
WHERE cntrb_id = :primary_id
)
"""
),
{"duplicate_id": duplicate_cntrb_id, "primary_id": primary_cntrb_id},
)

connection.execute(
s.sql.text(
"""
UPDATE data.contributors_aliases
SET cntrb_id = :primary_id
WHERE cntrb_id = :duplicate_id
"""
),
{"duplicate_id": duplicate_cntrb_id, "primary_id": primary_cntrb_id},
)

referencing_fks = connection.execute(
s.sql.text(
"""
SELECT kcu.table_schema, kcu.table_name, kcu.column_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
JOIN information_schema.constraint_column_usage ccu
ON ccu.constraint_name = tc.constraint_name
AND ccu.table_schema = tc.table_schema
WHERE tc.constraint_type = 'FOREIGN KEY'
AND ccu.table_schema = 'data'
AND ccu.table_name = 'contributors'
AND ccu.column_name = 'cntrb_id'
"""
)
).mappings()

for fk in referencing_fks:
table_schema = fk["table_schema"]
table_name = fk["table_name"]
column_name = fk["column_name"]

if table_schema == "data" and table_name == "contributors_aliases":
continue

table_ref = f'"{table_schema}"."{table_name}"'
sql = s.sql.text(
f"UPDATE {table_ref} SET \"{column_name}\" = :primary_id WHERE \"{column_name}\" = :duplicate_id"
)
connection.execute(
sql,
{"primary_id": primary_cntrb_id, "duplicate_id": duplicate_cntrb_id},
)

connection.execute(
s.sql.text(
"DELETE FROM data.contributors WHERE cntrb_id = :duplicate_id"
),
{"duplicate_id": duplicate_cntrb_id},
)


def get_repo_by_repo_git(repo_git: str):

with get_session() as session:
Expand Down