Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
272 changes: 271 additions & 1 deletion tavro_api/api/routers/insights.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,46 @@ def _days_since(ts: Any) -> int:
return 0


def _to_dt(ts: Any) -> Optional[datetime]:
if not ts:
return None
try:
dt = ts if isinstance(ts, datetime) else datetime.fromisoformat(str(ts))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except (TypeError, ValueError):
return None


def _relative_time(ts: Any) -> str:
dt = _to_dt(ts)
if not dt:
return "Recently"
seconds = max(0, int((datetime.now(timezone.utc) - dt).total_seconds()))
if seconds < 60:
return "Just now"
minutes = seconds // 60
if minutes < 60:
return f"{minutes}m ago"
hours = minutes // 60
if hours < 24:
return f"{hours}h ago"
days = hours // 24
if days == 1:
return "Yesterday"
if days < 7:
return f"{days} days ago"
weeks = days // 7
if weeks < 5:
return f"{weeks}w ago"
months = days // 30
if months < 12:
return f"{months}mo ago"
years = days // 365
return f"{years}y ago"


# KPI catalog — port of KPI_DEFINITIONS. Each returns (value, target, status).
def _kpi_task_completion(perf): # noqa: ANN001
v = 82 + perf * 16
Expand Down Expand Up @@ -406,6 +446,14 @@ def _pct(count: int, total: int) -> int:
("ESG & Sustainability", ["organisation", "technology"]),
]

def _profile_dimension_hint(categories: List[str], category_labels: Dict[str, str]) -> str:
labels = [_display(category_labels.get(cat), cat.title()) for cat in categories]
if not labels:
return "a Blueprint dimension"
if len(labels) == 1:
return f"a {labels[0]} dimension"
return f"{', '.join(labels[:-1])}, or {labels[-1]} dimensions"


# ---------------------------------------------------------------------------
# SQL
Expand All @@ -418,6 +466,8 @@ def _pct(count: int, total: int) -> int:
a.agent_name,
a.agent_description,
a.source_system,
a.created_ts,
a.updated_ts,
i.environment,
i.governance_status,
cfg.autonomy_level,
Expand Down Expand Up @@ -477,11 +527,27 @@ def _pct(count: int, total: int) -> int:
"""

_USECASES_SQL = f"""
SELECT name, status
SELECT ai_use_case_id, name, status, created_ts, updated_ts
FROM {CORE}.ai_use_cases
WHERE (tenant_id = :tid OR tenant_id IS NULL)
"""

_SPARK_COUNTS_SQL = f"""
SELECT
COUNT(*)::int AS total,
COUNT(*) FILTER (WHERE created_at >= NOW() - INTERVAL '7 days')::int AS this_week
FROM {CORE}.spark_ideas
WHERE company_id = :cid
"""

_RECENT_SPARK_SQL = f"""
SELECT idea_id, title, created_at, updated_at
FROM {CORE}.spark_ideas
WHERE company_id = :cid
ORDER BY COALESCE(updated_at, created_at) DESC NULLS LAST
LIMIT :limit
"""

_COMPANY_PICK_SQL = """
SELECT id FROM twin.company ORDER BY updated_at DESC NULLS LAST LIMIT 1
"""
Expand All @@ -493,6 +559,12 @@ def _pct(count: int, total: int) -> int:
WHERE dn.company_id = :cid AND dn.valid_to IS NULL
"""

_DIM_TYPE_LABELS_SQL = """
SELECT category::text AS category, name
FROM twin.dim_type
ORDER BY system_defined DESC NULLS LAST, name
"""


# ---------------------------------------------------------------------------
# Endpoint
Expand Down Expand Up @@ -614,6 +686,15 @@ def _to_risk_agent(a: Dict[str, Any]) -> Dict[str, Any]:
for a in agent_rows if _risk_not_triggered(a)
]

spark_total, spark_this_week = await _spark_counts(db, company_id)
use_cases_in_progress = sum(
1
for uc in uc_rows
if any(k in _norm(uc.get("status")) for k in ("progress", "build", "develop"))
)
live_agents = sum(1 for a in agent_rows if _is_prod_env(a.get("environment")))
need_review = sum(1 for a in agent_rows if _needs_human(a))

# --- stage gate blockers (every agent) ---
stage_gate_blockers = [
{
Expand Down Expand Up @@ -647,14 +728,22 @@ def _to_risk_agent(a: Dict[str, Any]) -> Dict[str, Any]:

# --- company profile (twin) ---
company_profile = await _build_company_profile(db, company_id)
recent_activity = await _home_recent_activity(db, company_id, agent_rows, uc_rows)
attention_items = _home_attention_items(agent_rows, uc_rows, company_profile)

return {
"totals": {
"sparkIdeas": spark_total,
"sparkIdeasThisWeek": spark_this_week,
"totalAgents": total_agents,
"liveAgents": live_agents,
"totalUseCases": len(uc_rows),
"useCasesInProgress": use_cases_in_progress,
"criticalCount": sum(1 for a in agent_rows if a["_risk"] == "critical"),
"highRiskCount": sum(1 for a in agent_rows if a["_risk"] == "high"),
"hitlOpen": len(hitl),
"openIssues": len(hitl),
"needReview": need_review,
},
"agentLifecycle": agent_lifecycle,
"useCaseLifecycle": usecase_lifecycle,
Expand All @@ -667,9 +756,181 @@ def _to_risk_agent(a: Dict[str, Any]) -> Dict[str, Any]:
"stageGateBlockers": stage_gate_blockers,
"successMetrics": success_metrics,
"companyProfile": company_profile,
"homeRecentActivity": recent_activity,
"homeAttentionItems": attention_items,
}


async def _spark_counts(db: AsyncSession, company_id: Optional[str]) -> tuple[int, int]:
try:
cid = company_id
if not cid:
row = (await db.execute(text(_COMPANY_PICK_SQL))).first()
cid = str(row[0]) if row else None
if not cid:
return 0, 0
row = (await db.execute(text(_SPARK_COUNTS_SQL), {"cid": cid})).mappings().first()
if not row:
return 0, 0
return int(row["total"] or 0), int(row["this_week"] or 0)
except Exception: # noqa: BLE001
return 0, 0


async def _resolve_company_id(db: AsyncSession, company_id: Optional[str]) -> Optional[str]:
if company_id:
return company_id
row = (await db.execute(text(_COMPANY_PICK_SQL))).first()
return str(row[0]) if row else None


async def _home_recent_activity(
db: AsyncSession,
company_id: Optional[str],
agent_rows: List[Dict[str, Any]],
uc_rows: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
events: List[Dict[str, Any]] = []

try:
cid = await _resolve_company_id(db, company_id)
if cid:
spark_rows = (await db.execute(text(_RECENT_SPARK_SQL), {"cid": cid, "limit": 4})).mappings().all()
for row in spark_rows:
ts = row.get("updated_at") or row.get("created_at")
title = _display(row.get("title"), "Spark idea")
events.append({
"id": f"spark:{row.get('idea_id')}",
"text": f"Spark idea added: {title}",
"time": _relative_time(ts),
"dot": "emerald",
"_ts": _to_dt(ts),
})
except Exception: # noqa: BLE001
pass

for uc in uc_rows:
ts = uc.get("updated_ts") or uc.get("created_ts")
status = _display(uc.get("status"), "")
name = _display(uc.get("name"), "AI use case")
if status:
text_value = f"{name} moved to {status} stage"
else:
text_value = f"AI use case updated: {name}"
events.append({
"id": f"usecase:{uc.get('ai_use_case_id')}",
"text": text_value,
"time": _relative_time(ts),
"dot": "violet",
"_ts": _to_dt(ts),
})

for agent in agent_rows:
risk_ts = agent.get("assessment_ts")
if risk_ts and _has_resolved_risk(agent):
risk = _display(agent.get("blended_risk_class") or agent.get("aivss_class"), "risk")
events.append({
"id": f"agent-risk:{agent.get('agent_id')}",
"text": f"{_display(agent.get('agent_name'), 'Agent')} risk classified as {risk}",
"time": _relative_time(risk_ts),
"dot": "amber" if agent.get("_risk") in ("critical", "high", "medium") else "emerald",
"_ts": _to_dt(risk_ts),
})

ts = agent.get("updated_ts") or agent.get("created_ts")
if ts:
events.append({
"id": f"agent:{agent.get('agent_id')}",
"text": f"Agent updated: {_display(agent.get('agent_name'), 'Untitled agent')}",
"time": _relative_time(ts),
"dot": "emerald" if _is_prod_env(agent.get("environment")) else "violet",
"_ts": _to_dt(ts),
})

events.sort(key=lambda e: e.get("_ts") or datetime.min.replace(tzinfo=timezone.utc), reverse=True)
return [{k: v for k, v in event.items() if k != "_ts"} for event in events[:4]]


def _home_attention_items(
agent_rows: List[Dict[str, Any]],
uc_rows: List[Dict[str, Any]],
company_profile: Dict[str, Any],
) -> List[Dict[str, Any]]:
items: List[Dict[str, Any]] = []

review_agents = sorted(
[a for a in agent_rows if _needs_human(a)],
key=lambda a: _to_dt(a.get("updated_ts")) or datetime.min.replace(tzinfo=timezone.utc),
reverse=True,
)
for agent in review_agents:
status = _display(agent.get("governance_status") or agent.get("risk_state"), "review required")
items.append({
"id": f"agent-review:{agent.get('agent_id')}",
"badge": "Approval",
"text": f"{_display(agent.get('agent_name'), 'Agent')} - {status}",
"action": "Review",
"route": f"/agent/{agent.get('agent_id')}",
})

risk_agents = sorted(
[a for a in agent_rows if a.get("_risk") in ("critical", "high")],
key=lambda a: a.get("_score") or 0,
reverse=True,
)
for agent in risk_agents:
items.append({
"id": f"agent-risk:{agent.get('agent_id')}",
"badge": "Risk",
"text": f"{_display(agent.get('agent_name'), 'Agent')} - {_display(agent.get('_risk'), 'risk')} risk requires review",
"action": "Review",
"route": f"/agent/{agent.get('agent_id')}",
})

for agent in [a for a in agent_rows if _risk_not_triggered(a)]:
items.append({
"id": f"agent-unassessed:{agent.get('agent_id')}",
"badge": "Issue",
"text": f"{_display(agent.get('agent_name'), 'Agent')} - risk assessment not yet triggered",
"action": "Review",
"route": f"/agent/{agent.get('agent_id')}",
})

for uc in uc_rows:
status = _norm(uc.get("status"))
if any(k in status for k in ("pending", "review", "approval", "approve")):
items.append({
"id": f"usecase-review:{uc.get('ai_use_case_id')}",
"badge": "Approval",
"text": f"{_display(uc.get('name'), 'AI use case')} - {uc.get('status') or 'review pending'}",
"action": "Review",
"route": f"/use-case/{uc.get('ai_use_case_id')}",
})

for gap in company_profile.get("gaps", [])[:2]:
area = _display(gap.get("area"), "Profile")
dimension_hint = _display(gap.get("dimensionHint"), "Blueprint dimension")
items.append({
"id": f"blueprint-gap:{gap.get('id')}",
"badge": "Incomplete",
"text": f"Blueprint - add {dimension_hint} for {area}",
"action": "Complete",
"route": "/blueprint",
})

seen: set[str] = set()
unique: List[Dict[str, Any]] = []
for item in items:
item_id = str(item.get("id") or item.get("text"))
if item_id in seen:
continue
seen.add(item_id)
unique.append(item)
if len(unique) >= 4:
break
return unique


async def _build_company_profile(db: AsyncSession, company_id: Optional[str]) -> Dict[str, Any]:
empty = {"hasActiveCompany": False, "overallPct": 0, "sections": [], "gaps": [], "refreshes": []}
try:
Expand All @@ -680,9 +941,17 @@ async def _build_company_profile(db: AsyncSession, company_id: Optional[str]) ->
if not cid:
return empty
nodes = (await db.execute(text(_PROFILE_NODES_SQL), {"cid": cid})).mappings().all()
dim_type_rows = (await db.execute(text(_DIM_TYPE_LABELS_SQL))).mappings().all()
except Exception: # noqa: BLE001
return empty

category_labels: Dict[str, str] = {}
for row in dim_type_rows:
category = _display(row.get("category"), "")
name = _display(row.get("name"), "")
if category and name and category not in category_labels:
category_labels[category] = name

by_category: Dict[str, List[Any]] = {}
for n in nodes:
by_category.setdefault(n["category"], []).append(n["updated_at"])
Expand All @@ -703,6 +972,7 @@ async def _build_company_profile(db: AsyncSession, company_id: Optional[str]) ->
"id": f"{s['label']}-{i}",
"gap": f"{s['label']} dimensions missing",
"area": s["label"],
"dimensionHint": _profile_dimension_hint(_PROFILE_SECTIONS[i][1], category_labels),
"severity": "high" if i < 2 else "medium",
}
for i, s in enumerate(sections) if s["pct"] == 0
Expand Down
Loading