Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions src/routes/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import { Router, type Response } from "express";
import { apiKeyStore, pauseState, servicesStore, usageStore } from "../store/state.js";
import {
apiKeyStore,
getUsageTotalRequests,
pauseState,
servicesStore,
usageByAgent,
} from "../store/state.js";

/**
* Builds operational metrics and aggregate stats routes.
Expand All @@ -8,8 +14,7 @@ export function createMetricsRouter(): Router {
const router = Router();

router.get("/api/v1/metrics", (_req, res: Response) => {
let totalRequests = 0;
for (const v of usageStore.values()) totalRequests += v;
const totalRequests = getUsageTotalRequests();
const lines = [
"# HELP agentpay_services_total Number of registered services.",
"# TYPE agentpay_services_total gauge",
Expand All @@ -29,17 +34,11 @@ export function createMetricsRouter(): Router {
});

router.get("/api/v1/stats", (_req, res: Response) => {
let totalRequests = 0;
const agents = new Set<string>();
for (const [key, total] of usageStore.entries()) {
totalRequests += total;
agents.add(key.split("::")[0]);
}
res.json({
totalServices: servicesStore.size,
totalApiKeys: apiKeyStore.size,
totalRequests,
uniqueAgents: agents.size,
totalRequests: getUsageTotalRequests(),
uniqueAgents: usageByAgent.size,
paused: pauseState.paused,
});
});
Expand Down
31 changes: 14 additions & 17 deletions src/routes/services.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import {
servicesDisabled,
servicesMetadata,
servicesStore,
usageByService,
usageStore,
usageTotalsByService,
} from "../store/state.js";
import { getRequestId } from "../types.js";

Expand Down Expand Up @@ -80,15 +82,8 @@ export function createServicesRouter(): Router {

router.get("/api/v1/services/:serviceId/usage", (req: Request, res: Response) => {
const { serviceId } = req.params;
const suffix = `::${serviceId}`;
let total = 0;
let agents = 0;
for (const [key, value] of usageStore.entries()) {
if (key.endsWith(suffix)) {
total += value;
agents++;
}
}
const total = usageTotalsByService.get(serviceId) ?? 0;
const agents = usageByService.get(serviceId)?.size ?? 0;
res.json({ serviceId, total, agents });
});

Expand All @@ -102,10 +97,11 @@ export function createServicesRouter(): Router {
);
const suffix = `::${serviceId}`;
const items: { agent: string; total: number }[] = [];
for (const [key, total] of usageStore.entries()) {
if (key.endsWith(suffix)) {
items.push({ agent: key.slice(0, key.length - suffix.length), total });
}
for (const key of usageByService.get(serviceId) ?? []) {
items.push({
agent: key.slice(0, key.length - suffix.length),
total: usageStore.get(key) ?? 0,
});
}
items.sort((a, b) => b.total - a.total);
res.json({ serviceId, items: items.slice(0, limit) });
Expand All @@ -116,10 +112,11 @@ export function createServicesRouter(): Router {
const { serviceId } = req.params;
const suffix = `::${serviceId}`;
const items: { agent: string; total: number }[] = [];
for (const [key, total] of usageStore.entries()) {
if (key.endsWith(suffix)) {
items.push({ agent: key.slice(0, key.length - suffix.length), total });
}
for (const key of usageByService.get(serviceId) ?? []) {
items.push({
agent: key.slice(0, key.length - suffix.length),
total: usageStore.get(key) ?? 0,
});
}
res.json({ serviceId, items });
});
Expand Down
25 changes: 11 additions & 14 deletions src/routes/usage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ import { recordEvent } from "../events.js";
import {
servicesDisabled,
servicesStore,
usageByAgent,
usageKey,
usageStore,
usageTotalsByAgent,
usageTotalsByService,
} from "../store/state.js";
import { getRequestId } from "../types.js";

Expand Down Expand Up @@ -144,8 +147,7 @@ export function createUsageRouter(): Router {

router.get("/api/v1/billing/total", (_req, res: Response) => {
let totalStroops = 0;
for (const [key, requests] of usageStore.entries()) {
const [, serviceId] = key.split("::");
for (const [serviceId, requests] of usageTotalsByService.entries()) {
const price = servicesStore.get(serviceId)?.priceStroops ?? 0;
totalStroops += requests * price;
}
Expand Down Expand Up @@ -190,30 +192,25 @@ export function createUsageRouter(): Router {
1000,
Math.max(1, Number((req.query.limit as string) ?? 200))
);
const seen = new Set<string>();
for (const key of usageStore.keys()) seen.add(key.split("::")[0]);
const agents = Array.from(seen).slice(0, limit);
const agents = Array.from(usageByAgent.keys()).slice(0, limit);
res.json({ agents });
});

router.get("/api/v1/agents/:agent/total", (req: Request, res: Response) => {
const { agent } = req.params;
const prefix = `${agent}::`;
let total = 0;
for (const [key, n] of usageStore.entries()) {
if (key.startsWith(prefix)) total += n;
}
const total = usageTotalsByAgent.get(agent) ?? 0;
res.json({ agent, total });
});

router.get("/api/v1/agents/:agent/usage", (req: Request, res: Response) => {
const { agent } = req.params;
const prefix = `${agent}::`;
const items: { serviceId: string; total: number }[] = [];
for (const [key, total] of usageStore.entries()) {
if (key.startsWith(prefix)) {
items.push({ serviceId: key.slice(prefix.length), total });
}
for (const key of usageByAgent.get(agent) ?? []) {
items.push({
serviceId: key.slice(prefix.length),
total: usageStore.get(key) ?? 0,
});
}
res.json({ agent, items });
});
Expand Down
150 changes: 147 additions & 3 deletions src/store/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,156 @@ export const config: Record<string, number> = {
/** Opaque API keys keyed by full secret token. */
export const apiKeyStore = new Map<string, ApiKeyRecord>();

/** Outstanding usage counters keyed by `${agent}::${serviceId}`. */
export const usageStore = new Map<string, number>();

/** Builds the shared in-memory usage key for an agent/service pair. */
export const usageKey = (agent: string, serviceId: string) => `${agent}::${serviceId}`;

type UsageKeyParts = { agent: string; serviceId: string };

const parseUsageKey = (key: string): UsageKeyParts | undefined => {
const [agent, serviceId] = key.split("::");
if (!agent || serviceId === undefined) return undefined;
return { agent, serviceId };
};

const addKey = (index: Map<string, Set<string>>, bucket: string, key: string) => {
const keys = index.get(bucket);
if (keys) {
keys.add(key);
return;
}
index.set(bucket, new Set([key]));
};

const removeKey = (index: Map<string, Set<string>>, bucket: string, key: string) => {
const keys = index.get(bucket);
if (!keys) return;
keys.delete(key);
if (keys.size === 0) index.delete(bucket);
};

const addTotal = (totals: Map<string, number>, bucket: string, delta: number) => {
totals.set(bucket, (totals.get(bucket) ?? 0) + delta);
};

/**
* Usage keys grouped by agent so agent rollups avoid scanning the full store.
*/
export const usageByAgent = new Map<string, Set<string>>();

/**
* Usage keys grouped by service so service rollups avoid scanning the full store.
*/
export const usageByService = new Map<string, Set<string>>();

/** Outstanding request totals grouped by agent. */
export const usageTotalsByAgent = new Map<string, number>();

/** Outstanding request totals grouped by service. */
export const usageTotalsByService = new Map<string, number>();

let usageTotalRequests = 0;

/**
* Returns the protocol-wide outstanding request total maintained on writes.
*/
export const getUsageTotalRequests = () => usageTotalRequests;

/**
* Verifies the maintained usage indexes against a brute-force store scan.
* This is intended for regression tests and health checks, not request paths.
*/
export const assertUsageIndexesConsistent = () => {
const expectedByAgent = new Map<string, Set<string>>();
const expectedByService = new Map<string, Set<string>>();
const expectedAgentTotals = new Map<string, number>();
const expectedServiceTotals = new Map<string, number>();
let expectedTotal = 0;

for (const [key, total] of usageStore.entries()) {
expectedTotal += total;
const parts = parseUsageKey(key);
if (!parts) continue;
addKey(expectedByAgent, parts.agent, key);
addKey(expectedByService, parts.serviceId, key);
addTotal(expectedAgentTotals, parts.agent, total);
addTotal(expectedServiceTotals, parts.serviceId, total);
}

const serializeSets = (index: Map<string, Set<string>>) =>
JSON.stringify(
Array.from(index.entries()).map(([bucket, keys]) => [
bucket,
Array.from(keys).sort(),
])
);
const serializeTotals = (totals: Map<string, number>) =>
JSON.stringify(Array.from(totals.entries()));

if (
usageTotalRequests !== expectedTotal ||
serializeSets(usageByAgent) !== serializeSets(expectedByAgent) ||
serializeSets(usageByService) !== serializeSets(expectedByService) ||
serializeTotals(usageTotalsByAgent) !== serializeTotals(expectedAgentTotals) ||
serializeTotals(usageTotalsByService) !== serializeTotals(expectedServiceTotals)
) {
throw new Error("usage indexes are inconsistent with usageStore");
}
};

class IndexedUsageStore extends Map<string, number> {
set(key: string, value: number): this {
const previous = super.get(key) ?? 0;
const existed = super.has(key);
super.set(key, value);

const delta = value - previous;
usageTotalRequests += delta;
const parts = parseUsageKey(key);
if (!parts) return this;

if (!existed) {
addKey(usageByAgent, parts.agent, key);
addKey(usageByService, parts.serviceId, key);
}
addTotal(usageTotalsByAgent, parts.agent, delta);
addTotal(usageTotalsByService, parts.serviceId, delta);

return this;
}

delete(key: string): boolean {
const previous = super.get(key);
const deleted = super.delete(key);
if (!deleted) return false;

usageTotalRequests -= previous ?? 0;
const parts = parseUsageKey(key);
if (!parts) return true;

removeKey(usageByAgent, parts.agent, key);
removeKey(usageByService, parts.serviceId, key);
addTotal(usageTotalsByAgent, parts.agent, -(previous ?? 0));
addTotal(usageTotalsByService, parts.serviceId, -(previous ?? 0));
if (!usageByAgent.has(parts.agent)) usageTotalsByAgent.delete(parts.agent);
if (!usageByService.has(parts.serviceId))
usageTotalsByService.delete(parts.serviceId);

return true;
}

clear(): void {
super.clear();
usageByAgent.clear();
usageByService.clear();
usageTotalsByAgent.clear();
usageTotalsByService.clear();
usageTotalRequests = 0;
}
}

/** Outstanding usage counters keyed by `${agent}::${serviceId}`. */
export const usageStore = new IndexedUsageStore();

/** Registered services and their per-request prices. */
export const servicesStore = new Map<string, { priceStroops: number }>();

Expand Down
Loading
Loading