diff --git a/collectoss/application/cli/db.py b/collectoss/application/cli/db.py index 7b6bc7c09..599dac517 100644 --- a/collectoss/application/cli/db.py +++ b/collectoss/application/cli/db.py @@ -21,6 +21,7 @@ ) from collectoss.application.db.session import DatabaseSession +from collectoss.application.db.lib import merge_contributors from sqlalchemy import update from datetime import datetime from collectoss.application.db.models import Repo @@ -151,6 +152,29 @@ def get_repo_groups(ctx: click.Context) -> pd.DataFrame: return df +@cli.command("merge-contributors") +@click.argument("primary_cntrb_id") +@click.argument("duplicate_cntrb_id") +@test_connection +@test_db_connection +@with_database +@click.pass_context +def merge_contributors_cmd(ctx: click.Context, primary_cntrb_id: str, duplicate_cntrb_id: str) -> None: + """Merge a duplicate contributor into the primary contributor.""" + try: + merge_contributors( + ctx.obj.engine, + primary_cntrb_id, + duplicate_cntrb_id, + ) + click.echo( + f"Merged duplicate contributor {duplicate_cntrb_id} into primary contributor {primary_cntrb_id}" + ) + except Exception as e: + logger.error(f"Failed to merge contributors: {e}") + raise + + @cli.command("add-repo-groups") @click.argument("filename", type=click.Path(exists=True)) @test_connection diff --git a/collectoss/application/db/lib.py b/collectoss/application/db/lib.py index c5394365d..3f7f5303a 100644 --- a/collectoss/application/db/lib.py +++ b/collectoss/application/db/lib.py @@ -92,6 +92,108 @@ def fetchall_data_from_sql_text(sql_text): result = connection.execute(sql_text) return [dict(row) for row in result.mappings()] + +def merge_contributors(engine, primary_cntrb_id: str, duplicate_cntrb_id: str) -> None: + """Merge a duplicate contributor record into a primary contributor record. + + This updates all foreign-key references to the duplicate contributor id + to use the primary contributor id, merges contributor alias rows, + and deletes the duplicate contributor row. + """ + + if primary_cntrb_id == duplicate_cntrb_id: + raise ValueError("Primary and duplicate contributor IDs must differ") + + with engine.begin() as connection: + primary_exists = connection.execute( + s.sql.text( + "SELECT 1 FROM data.contributors WHERE cntrb_id = :id" + ), + {"id": primary_cntrb_id}, + ).scalar() + + duplicate_exists = connection.execute( + s.sql.text( + "SELECT 1 FROM data.contributors WHERE cntrb_id = :id" + ), + {"id": duplicate_cntrb_id}, + ).scalar() + + if not primary_exists: + raise ValueError(f"Primary contributor not found: {primary_cntrb_id}") + if not duplicate_exists: + raise ValueError(f"Duplicate contributor not found: {duplicate_cntrb_id}") + + connection.execute( + s.sql.text( + """ + DELETE FROM data.contributors_aliases + WHERE cntrb_id = :duplicate_id + AND alias_email IN ( + SELECT alias_email + FROM data.contributors_aliases + WHERE cntrb_id = :primary_id + ) + """ + ), + {"duplicate_id": duplicate_cntrb_id, "primary_id": primary_cntrb_id}, + ) + + connection.execute( + s.sql.text( + """ + UPDATE data.contributors_aliases + SET cntrb_id = :primary_id + WHERE cntrb_id = :duplicate_id + """ + ), + {"duplicate_id": duplicate_cntrb_id, "primary_id": primary_cntrb_id}, + ) + + referencing_fks = connection.execute( + s.sql.text( + """ + SELECT kcu.table_schema, kcu.table_name, kcu.column_name + FROM information_schema.table_constraints tc + JOIN information_schema.key_column_usage kcu + ON tc.constraint_name = kcu.constraint_name + AND tc.table_schema = kcu.table_schema + JOIN information_schema.constraint_column_usage ccu + ON ccu.constraint_name = tc.constraint_name + AND ccu.table_schema = tc.table_schema + WHERE tc.constraint_type = 'FOREIGN KEY' + AND ccu.table_schema = 'data' + AND ccu.table_name = 'contributors' + AND ccu.column_name = 'cntrb_id' + """ + ) + ).mappings() + + for fk in referencing_fks: + table_schema = fk["table_schema"] + table_name = fk["table_name"] + column_name = fk["column_name"] + + if table_schema == "data" and table_name == "contributors_aliases": + continue + + table_ref = f'"{table_schema}"."{table_name}"' + sql = s.sql.text( + f"UPDATE {table_ref} SET \"{column_name}\" = :primary_id WHERE \"{column_name}\" = :duplicate_id" + ) + connection.execute( + sql, + {"primary_id": primary_cntrb_id, "duplicate_id": duplicate_cntrb_id}, + ) + + connection.execute( + s.sql.text( + "DELETE FROM data.contributors WHERE cntrb_id = :duplicate_id" + ), + {"duplicate_id": duplicate_cntrb_id}, + ) + + def get_repo_by_repo_git(repo_git: str): with get_session() as session: