Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions examples/agents-of-all-shapes/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Agents of all shapes → one Tangle Intelligence pipe

Proof that Tangle Intelligence works with **any agent, not just our sandbox**.
Every shape — the Tangle runtime, an OpenAI-compatible router (tcloud /
OpenRouter), a Mastra agent, the Claude Agent SDK, a Python agno agent —
converges on the **same** canonical OpenTelemetry GenAI spans, and the **same**
in-process engine produces the decision packet:

```
your agent (any framework)
→ OTel GenAI spans (gen_ai.request.model, gen_ai.usage.*, score)
→ fromOtelSpans() → RunRecord[]
→ analyzeRuns() → InsightReport (composite, lift CI, Pareto,
failureModes, recommendations)
```

No sandbox. No deploy. No server. The analysis runs **in-process**.

## Run it

```bash
# Verified QA path — in-process, no key, no infra:
npx tsx examples/agents-of-all-shapes/run.ts

# CI verification (what proves it):
pnpm test -- tests/agents-of-all-shapes.test.ts
```

Set `TANGLE_API_KEY=sk-tan-...` to *also* POST the same spans to the hosted
`/v1/otlp/v1/traces` ingest for the dashboard — identical analysis, server-side.

## The one contract every shape meets

`shared/intelligence.ts` is the whole integration surface. A shape only has to
emit OTel spans carrying the standard GenAI attributes plus a `score`:

| attribute | meaning |
|---|---|
| `gen_ai.request.model` | model snapshot (also `llm.model`, `tangle.model`) |
| `gen_ai.usage.input_tokens` / `output_tokens` | token usage |
| `gen_ai.usage.cost_usd` | cost (also `cost.usd`) |
| `score` | your eval/judge/rubric outcome 0..1 (also `tangle.score`, `eval.score`) |
| an `ERROR`-status span's `name` | → `RunRecord.failureMode` |

These are **standard OpenTelemetry GenAI semantic conventions** — most
frameworks already emit them; you add `score`.

## The shapes

| Shape | File | Live wiring |
|---|---|---|
| **Tangle runtime / router (tcloud)** | `shapes.ts` → `tangleRuntimeRuns` | `createOtelExporter` + `loopEventToOtelSpan` (see `examples/with-intelligence-export`) |
| **OpenAI-compatible** (tcloud / OpenRouter / OpenAI / vLLM) | `shapes.ts` → `openAiCompatibleRuns` | any OpenAI client at the router's `baseURL`; emit a GenAI span per call |
| **Mastra** | `shapes.ts` → `mastraRuns` | Mastra's native OTLP exporter → `${INTELLIGENCE_BASE}/v1/otlp/v1/traces` |
| **Claude Agent SDK** | `shapes.ts` → `claudeAgentSdkRuns` | wrap `query()`, one GenAI span per turn from `msg.usage` |
| **Python agno** | `python-agno/agno_to_intelligence.py` | agno run → OTLP/HTTP POST (or `pip install agent-eval-rpc`) |

The TypeScript shapes ship deterministic batches so the showcase is
**verifiable in CI with no key** (`tests/agents-of-all-shapes.test.ts`). Each
shape's header comment shows the exact live wiring — swap the batch for your
framework's real telemetry and it lands on the identical engine.

## Why this matters

The integration point is the **OTel wire**, not the Tangle SDK or sandbox. Any
team with agent traces — whatever framework, whatever runtime — gets the full
`InsightReport` (failure clustering, cost/quality Pareto, ranked
recommendations, and lift CI once they emit two cohorts) without adopting our
execution stack.
129 changes: 129 additions & 0 deletions examples/agents-of-all-shapes/python-agno/agno_to_intelligence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""
Python agno agent -> Tangle Intelligence. No sandbox, no Tangle SDK.

The same canonical OTel GenAI spans the TypeScript shapes emit, from a
Python agno agent. Two ways, same engine:

1. Hosted: POST OTLP/HTTP-JSON straight to the ingest route. Works with
any Python agent; no Tangle dependency at all.
2. Substrate (via the published `agent-eval-rpc` client): judge/analyze
over the wire — `pip install agent-eval-rpc`.

Run (live): TANGLE_API_KEY=sk-tan-... python agno_to_intelligence.py
Without agno installed it falls back to a recorded batch so the wiring is
runnable as-is.
"""

import json
import os
import time
import urllib.request

INTELLIGENCE_BASE = os.environ.get(
"INTELLIGENCE_BASE", "https://intelligence.tangle.tools/v1/otlp"
)
API_KEY = os.environ.get("TANGLE_API_KEY", "sk-tan-...")


def run_agno_agent(prompt: str) -> dict:
"""Run a real agno agent if installed; else a recorded run so this
file is runnable without the dep. Live wiring shown inline."""
try:
from agno.agent import Agent # type: ignore
from agno.models.openai import OpenAIChat # type: ignore

agent = Agent(model=OpenAIChat(id="gpt-4o"))
resp = agent.run(prompt)
usage = getattr(resp, "metrics", {}) or {}
return {
"model": "openai/gpt-4o",
"input_tokens": int(usage.get("input_tokens", 0) or 0),
"output_tokens": int(usage.get("output_tokens", 0) or 0),
"cost_usd": float(usage.get("cost", 0.0) or 0.0),
# Your acceptance check / judge score in 0..1.
"score": 1.0 if resp and getattr(resp, "content", None) else 0.0,
"failure_mode": None if getattr(resp, "content", None) else "format_drift",
}
except Exception:
# Recorded run — agno not installed or no key. Wiring stays valid.
return {
"model": "openai/gpt-4o",
"input_tokens": 1240,
"output_tokens": 320,
"cost_usd": 0.018,
"score": 0.83,
"failure_mode": None,
}


def otlp_spans_for_run(run_id: str, r: dict) -> list[dict]:
now_ns = time.time_ns()
attrs = [
{"key": "gen_ai.request.model", "value": {"stringValue": r["model"]}},
{"key": "gen_ai.usage.input_tokens", "value": {"doubleValue": r["input_tokens"]}},
{"key": "gen_ai.usage.output_tokens", "value": {"doubleValue": r["output_tokens"]}},
{"key": "gen_ai.usage.cost_usd", "value": {"doubleValue": r["cost_usd"]}},
{"key": "score", "value": {"doubleValue": r["score"]}},
]
spans = [
{
"traceId": run_id,
"spanId": f"{run_id}-llm",
"name": "gen_ai.chat",
"startTimeUnixNano": str(now_ns),
"endTimeUnixNano": str(now_ns + 800_000_000),
"attributes": attrs,
"status": {"code": "STATUS_CODE_ERROR" if r["failure_mode"] else "STATUS_CODE_OK"},
}
]
if r["failure_mode"]:
spans.append(
{
"traceId": run_id,
"spanId": f"{run_id}-err",
"name": r["failure_mode"],
"startTimeUnixNano": str(now_ns + 800_000_000),
"endTimeUnixNano": str(now_ns + 800_000_000),
"attributes": [],
"status": {"code": "STATUS_CODE_ERROR"},
}
)
return spans


def ship(spans: list[dict]) -> None:
body = json.dumps(
{
"resourceSpans": [
{
"resource": {
"attributes": [
{"key": "service.name", "value": {"stringValue": "agno-agent"}}
]
},
"scopeSpans": [{"scope": {"name": "agno"}, "spans": spans}],
}
]
}
).encode()
req = urllib.request.Request(
f"{INTELLIGENCE_BASE}/v1/traces",
data=body,
headers={"content-type": "application/json", "authorization": f"Bearer {API_KEY}"},
method="POST",
)
with urllib.request.urlopen(req) as resp:
if resp.status >= 300:
raise RuntimeError(f"ingest failed: {resp.status}")


if __name__ == "__main__":
prompts = ["Summarise the Q3 report", "Draft a follow-up email", "Classify this ticket"]
all_spans: list[dict] = []
for i, p in enumerate(prompts):
all_spans += otlp_spans_for_run(f"agno-{i}", run_agno_agent(p))
if API_KEY != "sk-tan-...":
ship(all_spans)
print(f"Shipped {len(all_spans)} spans from agno → Tangle Intelligence.")
else:
print("(set TANGLE_API_KEY to ship; printing spans)\n", json.dumps(all_spans, indent=2)[:600])
59 changes: 59 additions & 0 deletions examples/agents-of-all-shapes/run.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Agents of all shapes → one decision packet. No sandbox. No deploy.
*
* pnpm tsx examples/agents-of-all-shapes/run.ts
*
* Runs every shape (Tangle runtime / OpenAI-compatible router / Mastra /
* Claude Agent SDK), converts each to canonical OTel GenAI spans, and feeds
* the merged stream through the in-process intelligence engine
* (`fromOtelSpans → analyzeRuns`). Prints the fleet `InsightReport` plus a
* per-shape breakdown.
*
* Optional hosted path: set TANGLE_API_KEY (and INTELLIGENCE_BASE) to also
* POST the spans to the hosted OTLP ingest for the dashboard.
*/

import { allShapes } from './shapes'
import { shipToTangleOtlp, spansForRuns, toInsightReport } from './shared/intelligence'

async function main() {
const shapes = allShapes()
const allRuns = Object.values(shapes).flat()
const allSpans = spansForRuns(allRuns)

// The fleet view — every framework's runs in one vocabulary.
const fleet = await toInsightReport(allSpans)
console.log('=== Fleet InsightReport (all shapes) ===')
console.log(`runs: ${fleet.composite.n}`)
console.log(`composite mean: ${fleet.composite.mean.toFixed(3)}`)
console.log(`composite p50: ${fleet.composite.p50.toFixed(3)}`)
console.log(`failure modes: ${JSON.stringify(fleet.failureModes ?? [])}`)
console.log(`recommendations: ${fleet.recommendations.length}`)
for (const r of fleet.recommendations.slice(0, 3)) {
console.log(` [${r.priority}] ${r.title}`)
}

// Per-shape — prove the SAME engine works on each framework alone.
console.log('\n=== Per-shape composite ===')
for (const [name, runs] of Object.entries(shapes)) {
const report = await toInsightReport(spansForRuns(runs))
console.log(
`${name.padEnd(20)} n=${report.composite.n} mean=${report.composite.mean.toFixed(3)}`,
)
}

// Optional: also ship to the hosted ingest for the dashboard.
const apiKey = process.env.TANGLE_API_KEY
if (apiKey) {
const endpoint = process.env.INTELLIGENCE_BASE ?? 'https://intelligence.tangle.tools/v1/otlp'
await shipToTangleOtlp(allSpans, { endpoint, apiKey })
console.log(`\nShipped ${allSpans.length} spans to ${endpoint} for the dashboard.`)
} else {
console.log('\n(set TANGLE_API_KEY to also ship to the hosted dashboard)')
}
}

main().catch((err) => {
console.error(err)
process.exit(1)
})
Loading
Loading