|
| 1 | +from pathlib import Path |
| 2 | + |
| 3 | +import kuzu |
| 4 | +from loguru import logger as log |
| 5 | + |
| 6 | +from shared.lakehouse import Lakehouse |
| 7 | +from shared.settings import LOCAL_DIR, env |
| 8 | + |
| 9 | + |
| 10 | +class GraphAnalytics: |
| 11 | + def __init__(self, schema: str): |
| 12 | + dbname = env.str(f"{schema.upper()}_GRAPH_DB") |
| 13 | + db_path = Path(LOCAL_DIR) / dbname |
| 14 | + |
| 15 | + if not db_path.exists(): |
| 16 | + raise FileNotFoundError(f"db not found: {db_path}") |
| 17 | + |
| 18 | + db = kuzu.Database(db_path) |
| 19 | + self.conn = kuzu.Connection(db) |
| 20 | + |
| 21 | + self.lh = Lakehouse() |
| 22 | + |
| 23 | + def compute_con_scores( |
| 24 | + self, |
| 25 | + node_label: str, |
| 26 | + rel_label: str, |
| 27 | + column_name: str = "con_score", |
| 28 | + ): |
| 29 | + log.info( |
| 30 | + "Computing CON scores for {} nodes via {} rels, storing to {} property", |
| 31 | + node_label, |
| 32 | + rel_label, |
| 33 | + column_name, |
| 34 | + ) |
| 35 | + |
| 36 | + log.debug("Adding {} to {}, if not exists", column_name, node_label) |
| 37 | + |
| 38 | + self.conn.execute( |
| 39 | + f""" |
| 40 | + ALTER TABLE {node_label} |
| 41 | + ADD IF NOT EXISTS {column_name} DOUBLE |
| 42 | + """ |
| 43 | + ) |
| 44 | + |
| 45 | + log.debug("Resetting {} on {}", column_name, node_label) |
| 46 | + |
| 47 | + self.conn.execute( |
| 48 | + f""" |
| 49 | + MATCH (c:{node_label}) |
| 50 | + SET c.`{column_name}` = 0.0 |
| 51 | + """ |
| 52 | + ) |
| 53 | + |
| 54 | + log.debug("Computing CON scores") |
| 55 | + |
| 56 | + self.conn.execute( |
| 57 | + f""" |
| 58 | + MATCH (a:{node_label})-[ac:{rel_label}]->(c:{node_label}) |
| 59 | + MATCH (b:{node_label})-[bc:{rel_label}]->(c:{node_label}) |
| 60 | + WHERE a <> b |
| 61 | + WITH a, b, |
| 62 | + CASE |
| 63 | + WHEN ac.esi < bc.esi |
| 64 | + THEN ac.esi |
| 65 | + ELSE bc.esi |
| 66 | + END AS min_esi |
| 67 | + WITH a, b, sum(min_esi) AS con_pair |
| 68 | + WITH a, sum(con_pair) AS con_score |
| 69 | + SET a.`{column_name}` = con_score |
| 70 | + """ |
| 71 | + ) |
0 commit comments