Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 117 additions & 43 deletions README.md

Large diffs are not rendered by default.

48 changes: 47 additions & 1 deletion behaviors/context-intelligence.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,17 @@ hooks:
- module: hook-context-intelligence
source: git+https://github.com/microsoft/amplifier-bundle-context-intelligence@main#subdirectory=modules/hook-context-intelligence
config:
# DEPRECATED: use `destinations` below for multi-server fan-out.
# When set (and no `destinations` is given), these synthesize a single "default"
# destination that matches all sessions (include: ["**"]).
# Legacy-scalar behavior when unset:
# - both unset → no destination → local-only.
# - url set, api_key unset → dispatch disabled, local-only (a WARNING is logged).
# (NOTE: a `destinations` entry with an empty url/api_key is instead a hard
# mount error — the two paths differ. See validate_destinations.)
context_intelligence_server_url: "${AMPLIFIER_CONTEXT_INTELLIGENCE_SERVER_URL:}"
context_intelligence_api_key: "${AMPLIFIER_CONTEXT_INTELLIGENCE_API_KEY:}"
# workspace: auto-resolved from coordinator project_slug → config → env var fallback
# workspace: auto-resolved from coordinator project_slug → config
workspace: "${AMPLIFIER_CONTEXT_INTELLIGENCE_WORKSPACE:}"
log_level: "${AMPLIFIER_CONTEXT_INTELLIGENCE_LOG_LEVEL:INFO}"
dispatch_timeout: "${AMPLIFIER_CONTEXT_INTELLIGENCE_DISPATCH_TIMEOUT:30}"
Expand All @@ -62,3 +70,41 @@ hooks:
# base_path: ~/.amplifier/projects (auto-resolved; uncomment to override)
# project_slug: (auto-resolved from working directory; uncomment to override)
# exclude_events: [] (optional fnmatch patterns; uncomment and list events to suppress)
#
# --- Multi-server fan-out (preferred) ----------------------------------
# Define fan-out destinations centrally; override per-project via a project
# .amplifier/settings.yaml under overrides.hook-context-intelligence.config.
# The legacy context_intelligence_server_url/api_key scalars above are
# DEPRECATED — when set (and no `destinations` is given) they synthesize a
# single "default" destination matching all sessions (include: ["**"]).
# Destination names are stable identifiers — renaming a destination silently
# drops any project-scope override that referenced the old name.
#
# Matching rules (include / exclude): patterns are matched against the
# session's working directory using .gitignore semantics (pathspec
# "gitignore"). A destination is active iff the working dir matches an
# `include` AND does not match an `exclude` (exclude wins, per destination).
# - "foo/" or "foo" or "**/foo/" → the directory foo AND everything beneath it
# - "foo/**" → the CONTENTS of foo only (not foo itself)
# - "**" → every session
# Prefer the trailing-slash directory form (e.g. "**/client-x/") to mean
# "this project and all its sessions" — it matches whether you launch from
# the project root or a subdirectory.
#
# IMPORTANT — default include semantics:
# A destination with NO `include` key has an empty pattern set and matches
# NOTHING (receives zero sessions). You must declare `include` explicitly.
# The legacy scalar path (context_intelligence_server_url + api_key, no
# `destinations:` key) still synthesizes include: ["**"] — existing
# single-server users are unaffected.
#
# destinations:
# personal:
# url: "${PERSONAL_CI_URL:}"
# api_key: "${PERSONAL_CI_KEY:}"
# include: ["**"] # all sessions...
# exclude: ["**/client-*/"] # ...except any client-* project dir and everything under it
# team:
# url: "${TEAM_CI_URL:}"
# api_key: "${TEAM_CI_KEY:}"
# include: ["**/client-x/"] # the client-x project dir and everything under it
Original file line number Diff line number Diff line change
@@ -1,15 +1,27 @@
"""Context Intelligence hook — thin event forwarder.

Writes session events to local JSONL and dispatches them to the
Context Intelligence server when ``context_intelligence_server_url``
is configured.
Writes session events to local JSONL and, when fan-out destinations are configured,
dispatches them concurrently to one or more Context Intelligence servers based on
the session's working directory.

Configuration keys
------------------
destinations : dict, optional
Fan-out destinations keyed by stable name. Each entry is a dict with:
url : str — base URL of the CI server (app expands ${VAR} before mount).
api_key : str — bearer token.
include : list[str], optional — pathspec (gitwildmatch) patterns; default ["**"].
exclude : list[str], optional — exclude-wins patterns; default [].
Configured via overrides.hook-context-intelligence.config.destinations in settings.yaml.
App-cli deep-merges project-over-user, so per-project overrides patch individual
destination sub-keys without clobbering others.

context_intelligence_server_url : str, optional
Base URL of the Context Intelligence server, e.g.
``http://localhost:8000``. When set, every event is POSTed
to ``{url}/events``.
DEPRECATED. When set (and no `destinations` is given) it synthesizes a single
"default" destination matching all sessions (include: ["**"]). Migrate to
`destinations` for multi-server fan-out.
context_intelligence_api_key : str, optional
DEPRECATED. Companion to context_intelligence_server_url.
workspace : str, optional
Workspace identifier used to scope graph data on the server.
Resolved automatically from the coordinator when not set
Expand All @@ -29,6 +41,10 @@
Event names to register unconditionally, regardless of capability
discovery order. Use to capture events from modules that mount after
this hook (e.g. ``delegate:agent_spawned``).

Note: Skills are fetched from the first configured destination (insertion order),
typically "default" from the legacy-synthesized single-server path. This preserves
today's single-server behavior for skill fetching, which is not per-destination.
"""

from __future__ import annotations
Expand Down Expand Up @@ -154,7 +170,7 @@ async def mount(

Always:
- Registers ConfigResolver as ``context_intelligence.config_resolver`` capability
- LoggingHandler — writes events.jsonl + dispatches to CI server
- LoggingHandler — writes events.jsonl + dispatches to CI server(s)
"""
from .config_resolver import ConfigResolver
from .handlers.logging_handler import LoggingHandler
Expand All @@ -171,21 +187,38 @@ async def mount(

unregister_fns: list[Callable[[], None]] = []

# --- Fail-fast validation (C3) ---
all_destinations = resolver.validate_destinations() # raises ValueError on misconfiguration

# --- Migration warning (S1) ---
# Detect legacy scalar config key rather than env var (D1: hook no longer reads env).
# The app expands ${AMPLIFIER_CONTEXT_INTELLIGENCE_SERVER_URL} into config before mount.
if not config.get("destinations") and resolver.context_intelligence_server_url:
log.warning(
"context-intelligence: using legacy single-server config "
"(context_intelligence_server_url). Migrate to "
"overrides.hook-context-intelligence.config.destinations for multi-server fan-out."
)

# --- Skill-fetch server selection (spec §5.1.3) ---
# Skills are global, not per-destination. Use first destination with a non-empty url
# (insertion order). Synthesized "default" is always first on the legacy path.
skill_fetch_url = next((d.url for d in all_destinations.values() if d.url), None)

logging_handler = LoggingHandler(resolver)

# Skill fetch phase — deferred to skills:discovered event
server_url = resolver.context_intelligence_server_url
fetcher: SkillFetcher | None = None
skills_capable: bool = False

if not server_url:
if not skill_fetch_url:
log.info("skill_fetch_skipped: no server_url in config")
else:
_tentative_fetcher = SkillFetcher(server_url)
_tentative_fetcher = SkillFetcher(skill_fetch_url)
result = await _tentative_fetcher.check_server_version()
log.info(
"skill_version_check: server=%s reachable=%s version=%s",
server_url,
skill_fetch_url,
result.reachable,
result.version,
)
Expand All @@ -199,7 +232,7 @@ async def mount(
skills_capable = _is_skills_capable(result.version)

async def on_skills_discovered(event_name: str, data: dict[str, Any]) -> None:
await _refresh_watched_skills(coordinator, fetcher, skills_capable)
await _refresh_watched_skills(coordinator, fetcher, skills_capable) # type: ignore[arg-type]

unreg_skills_discovered = coordinator.hooks.register(
"skills:discovered",
Expand Down Expand Up @@ -238,6 +271,7 @@ async def on_skill_unloaded(event_name: str, data: dict[str, Any]) -> None:
"unregister_fns": unregister_fns,
"logging_handler": logging_handler,
"resolver": resolver,
"destinations": all_destinations,
}
coordinator.register_capability("context_intelligence._hook_state", _hook_state)

Expand Down Expand Up @@ -270,7 +304,14 @@ async def on_session_ready(coordinator: Any) -> None:
legacy capability + additional_events config) and registers the
LoggingHandler for every active event. Runs after every module has
mounted, so late-contributed events are captured.

Also selects active fan-out destinations based on session.working_dir
and installs per-destination dispatchers into the LoggingHandler.
"""
from .config_resolver import Destination
from .handlers.logging_handler import _DestinationDispatcher
from .fanout import normalize_match_key, select_active

state = coordinator.get_capability("context_intelligence._hook_state")
if state is None:
log.warning("on_session_ready: hook state not found — mount() may not have run")
Expand All @@ -279,6 +320,56 @@ async def on_session_ready(coordinator: Any) -> None:
resolver = state["resolver"]
logging_handler = state["logging_handler"]
unregister_fns = state["unregister_fns"]
destinations: dict[str, Destination] = state["destinations"]

# --- Destination selection (C2: working_dir capability ONLY, fail-loud) ---
active: dict[str, Destination] = {}
match_key: str = ""
if destinations:
get_cap = getattr(coordinator, "get_capability", None)
working_dir = get_cap("session.working_dir") if get_cap else None
if not working_dir:
# working_dir capability unavailable. Do NOT raise here: the kernel
# CATCHES on_session_ready exceptions (Phase 6, _session_init.py) and
# continues the session, so a raise is swallowed AND aborts the rest of
# this callback — silently disabling ALL capture, including the local
# JSONL the design guarantees is always written. Degrade to local-only
# (active = {}) with a discoverable WARNING and fall through so the
# LoggingHandler is still registered below.
log.warning(
"context-intelligence: session.working_dir capability is unavailable; "
"fan-out disabled for this session (local JSONL only)."
)
else:
match_key = normalize_match_key(str(working_dir))
active = select_active(destinations, match_key)

# Build one dispatcher per ACTIVE destination (D9).
dispatchers = [
_DestinationDispatcher(
name=d.name,
url=d.url,
api_key=d.api_key,
workspace=resolver.workspace,
dispatch_timeout=resolver.dispatch_timeout,
failure_threshold=resolver.dispatch_failure_threshold,
queue_capacity=resolver.dispatch_queue_capacity,
close_drain_timeout=resolver.close_drain_timeout,
)
for d in active.values()
]
await logging_handler.set_dispatchers(dispatchers)

# --- Fan-out log line (S2) ---
if not destinations:
log.info("context-intelligence fan-out: no destinations configured — local JSONL only")
elif active:
log.info("context-intelligence fan-out: active -> %s", ", ".join(sorted(active)))
else:
log.warning(
"context-intelligence fan-out: routed to none (local-only) for working_dir=%s",
match_key,
)

# Step 1: canonical kernel events + all module contributions
# _discover_events returns: set(ALL_EVENTS) + collect_contributions
Expand All @@ -290,17 +381,17 @@ async def on_session_ready(coordinator: Any) -> None:

# Step 3: conditional exclude filter
exclude = resolver.exclude_events
active = (
active_events = (
{e for e in events if not any(fnmatch.fnmatch(e, p) for p in exclude)}
if exclude
else events
)

# Step 4: register LoggingHandler for every active event
for event in sorted(active):
for event in sorted(active_events):
unreg = coordinator.hooks.register(
event, logging_handler, priority=100, name="LoggingHandler"
)
unregister_fns.append(unreg)

log.info("on_session_ready: registered %d events", len(active))
log.info("on_session_ready: registered %d events", len(active_events))
Loading
Loading