-
Notifications
You must be signed in to change notification settings - Fork 62
Expand file tree
/
Copy pathsession_start_command.py
More file actions
106 lines (91 loc) · 3.85 KB
/
session_start_command.py
File metadata and controls
106 lines (91 loc) · 3.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import sys
from typing import Annotated
import typer
from cycode.cli.apps.ai_guardrails.consts import AIIDEType
from cycode.cli.apps.ai_guardrails.scan.claude_config import get_mcp_servers, get_user_email, load_claude_config
from cycode.cli.apps.ai_guardrails.scan.payload import AIHookPayload, _extract_from_claude_transcript
from cycode.cli.apps.ai_guardrails.scan.utils import safe_json_parse
from cycode.cli.apps.auth.auth_common import get_authorization_info
from cycode.cli.apps.auth.auth_manager import AuthManager
from cycode.cli.exceptions.handle_auth_errors import handle_auth_exception
from cycode.cli.utils.get_api_client import get_ai_security_manager_client
from cycode.logger import get_logger
logger = get_logger('AI Guardrails')
def _build_session_payload(payload: dict, ide: str) -> AIHookPayload:
"""Build an AIHookPayload from a session-start stdin payload."""
if ide == AIIDEType.CLAUDE_CODE:
ide_version, model, _ = _extract_from_claude_transcript(payload.get('transcript_path'))
claude_config = load_claude_config()
ide_user_email = get_user_email(claude_config) if claude_config else None
return AIHookPayload(
event_name='session_start',
conversation_id=payload.get('session_id'),
ide_user_email=ide_user_email,
model=payload.get('model') or model,
ide_provider=AIIDEType.CLAUDE_CODE.value,
ide_version=ide_version,
)
# Cursor
return AIHookPayload(
event_name='session_start',
conversation_id=payload.get('conversation_id'),
ide_user_email=payload.get('user_email'),
model=payload.get('model'),
ide_provider=AIIDEType.CURSOR.value,
ide_version=payload.get('cursor_version'),
)
def session_start_command(
ctx: typer.Context,
ide: Annotated[
str,
typer.Option(
'--ide',
help='IDE that triggered the session start.',
hidden=True,
),
] = AIIDEType.CURSOR.value,
) -> None:
"""Handle session start: ensure auth, create conversation, report data flow."""
# Step 1: Ensure authentication
auth_info = get_authorization_info(ctx)
if auth_info is None:
logger.debug('Not authenticated, starting authentication')
try:
auth_manager = AuthManager()
auth_manager.authenticate()
except Exception as err:
handle_auth_exception(ctx, err)
return
else:
logger.debug('Already authenticated')
# Step 2: Read stdin payload (backward compat: old hooks pipe no stdin)
if sys.stdin.isatty():
logger.debug('No stdin payload (TTY), skipping session initialization')
return
stdin_data = sys.stdin.read().strip()
payload = safe_json_parse(stdin_data)
if not payload:
logger.debug('Empty or invalid stdin payload, skipping session initialization')
return
# Step 3: Build session payload and initialize API client
session_payload = _build_session_payload(payload, ide)
try:
ai_client = get_ai_security_manager_client(ctx)
except Exception as e:
logger.debug('Failed to initialize AI security client', exc_info=e)
return
# Step 4: Create conversation
try:
ai_client.create_conversation(session_payload)
except Exception as e:
logger.debug('Failed to create conversation during session start', exc_info=e)
# Step 5: Report data flow (MCP servers, Claude Code only)
if ide == AIIDEType.CLAUDE_CODE:
claude_config = load_claude_config()
if claude_config:
mcp_servers = get_mcp_servers(claude_config)
if mcp_servers:
try:
ai_client.report_data_flow(mcp_servers)
except Exception as e:
logger.debug('Failed to report MCP servers', exc_info=e)