-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathserver.py
More file actions
331 lines (261 loc) · 9.97 KB
/
server.py
File metadata and controls
331 lines (261 loc) · 9.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
"""Lightning Memory MCP server: 9 tools for agent memory, intelligence, and sync."""
from __future__ import annotations
from mcp.server.fastmcp import FastMCP
from .intelligence import IntelligenceEngine
from .memory import MemoryEngine
mcp = FastMCP(
"Lightning Memory",
instructions=(
"Decentralized agent memory for the Lightning economy. "
"Store, query, and list memories with Nostr identity and Lightning payments. "
"Agents remember transactions, vendor reputations, spending patterns, and decisions."
),
)
# Lazy-init engine (created on first tool call)
_engine: MemoryEngine | None = None
def _get_engine() -> MemoryEngine:
global _engine
if _engine is None:
_engine = MemoryEngine()
return _engine
@mcp.tool()
def memory_store(
content: str,
type: str = "general",
metadata: str = "{}",
) -> dict:
"""Store a memory for later retrieval.
Use this to remember transactions, vendor experiences, decisions,
spending patterns, API responses, or any information worth recalling.
Args:
content: The memory content to store. Be descriptive.
Examples:
- "Paid 500 sats to bitrefill.com for a $5 Amazon gift card via L402. Fast, reliable."
- "OpenAI API returned 429 rate limit after 50 requests/min. Backoff to 30/min."
- "User prefers to cap spending at 10,000 sats per session."
type: Memory category. One of:
- general: Default, uncategorized
- transaction: Payment records, invoices, L402 purchases
- vendor: Service/API reputation and reliability notes
- preference: User or agent preferences and settings
- error: Error patterns and failure modes
- decision: Key decisions and their reasoning
metadata: JSON string of additional key-value pairs.
Example: '{"vendor": "bitrefill.com", "amount_sats": 500}'
Returns:
The stored memory record with id, content, type, and timestamps.
"""
import json
engine = _get_engine()
meta = json.loads(metadata) if isinstance(metadata, str) else metadata
result = engine.store(content=content, memory_type=type, metadata=meta)
return {
"status": "stored",
"id": result["id"],
"type": result["type"],
"agent_pubkey": engine.identity.public_key_hex,
}
@mcp.tool()
def memory_query(
query: str,
limit: int = 10,
type: str | None = None,
) -> dict:
"""Search memories by relevance. Returns the most relevant matches.
Use this to recall past transactions, check vendor reputation,
retrieve spending patterns, or find any previously stored information.
Args:
query: Natural language search query.
Examples:
- "bitrefill payment history"
- "which APIs gave rate limit errors?"
- "spending decisions this week"
limit: Maximum number of results (default 10, max 100).
type: Optional filter by memory type (transaction, vendor, preference, error, decision, general).
Returns:
List of matching memories ranked by relevance, with scores.
"""
engine = _get_engine()
limit = min(limit, 100)
results = engine.query(query=query, limit=limit, memory_type=type)
return {
"count": len(results),
"memories": results,
}
@mcp.tool()
def memory_list(
type: str | None = None,
since: str | None = None,
limit: int = 50,
) -> dict:
"""List memories, optionally filtered by type and time range.
Use this to browse recent memories, check all transactions,
or review memories of a specific type.
Args:
type: Filter by memory type (transaction, vendor, preference, error, decision, general).
since: Time filter. Relative: "1h", "24h", "7d", "30d". Or Unix timestamp.
limit: Maximum number of results (default 50, max 200).
Returns:
List of memories in reverse chronological order with stats.
"""
engine = _get_engine()
limit = min(limit, 200)
results = engine.list(memory_type=type, since=since, limit=limit)
stats = engine.stats()
return {
"count": len(results),
"total_memories": stats["total"],
"by_type": stats["by_type"],
"agent_pubkey": stats["agent_pubkey"],
"memories": results,
}
def _get_intelligence() -> IntelligenceEngine:
engine = _get_engine()
return IntelligenceEngine(conn=engine.conn)
@mcp.tool()
def ln_vendor_reputation(vendor: str) -> dict:
"""Check a vendor's reputation based on transaction history.
Use this before paying a vendor to see if they're reliable.
Aggregates all past transactions to build a reputation score.
Args:
vendor: Vendor name or domain (e.g., "bitrefill.com", "openai").
Returns:
Reputation report: total transactions, total sats spent,
success rate, average payment size, and tags.
"""
intel = _get_intelligence()
rep = intel.vendor_report(vendor)
return {
"reputation": rep.to_dict(),
"recommendation": (
"reliable" if rep.success_rate >= 0.9 and rep.total_txns >= 3
else "new" if rep.total_txns == 0
else "caution" if rep.success_rate < 0.7
else "limited_data"
),
}
@mcp.tool()
def ln_spending_summary(since: str = "30d") -> dict:
"""Get a spending summary for budget awareness.
Shows total sats spent, broken down by vendor and protocol.
Args:
since: Time period. Relative: "1h", "24h", "7d", "30d". Or Unix timestamp.
Returns:
Spending breakdown with totals by vendor and protocol.
"""
intel = _get_intelligence()
summary = intel.spending_summary(since)
return {"summary": summary.to_dict()}
@mcp.tool()
def ln_anomaly_check(vendor: str, amount_sats: int) -> dict:
"""Check if a proposed payment amount is normal for a vendor.
Use this before making a payment to catch price anomalies.
Compares the proposed amount against historical averages.
Args:
vendor: Vendor name or domain.
amount_sats: Proposed payment amount in satoshis.
Returns:
Anomaly report: verdict (normal/high/first_time), context, and historical average.
"""
intel = _get_intelligence()
report = intel.anomaly_check(vendor, amount_sats)
return {"anomaly": report.to_dict()}
@mcp.tool()
def memory_sync(direction: str = "both") -> dict:
"""Sync memories with Nostr relays.
Push local memories to relays and/or pull remote memories to local.
Requires secp256k1 for push (signing). Pull works with any identity.
Args:
direction: Sync direction. One of:
- "push": Upload local memories to relays
- "pull": Download memories from relays
- "both": Push then pull (default)
Returns:
Sync result with counts of pushed/pulled memories and any errors.
"""
from .sync import pull_memories, push_memories, SyncResult
engine = _get_engine()
combined = SyncResult()
if direction in ("push", "both"):
push_result = push_memories(engine.conn, engine.identity)
combined.pushed = push_result.pushed
combined.errors.extend(push_result.errors)
if direction in ("pull", "both"):
pull_result = pull_memories(engine.conn, engine.identity)
combined.pulled = pull_result.pulled
combined.errors.extend(pull_result.errors)
return {
"status": "completed",
"direction": direction,
**combined.to_dict(),
}
@mcp.tool()
def memory_export(limit: int = 100) -> dict:
"""Export memories as Nostr NIP-78 events.
Converts local memories into portable Nostr event format.
Events are signed if secp256k1 is available.
Useful for backup, sharing, or manual relay publishing.
Args:
limit: Maximum number of memories to export (default 100).
Returns:
List of NIP-78 events with memory content.
"""
from .sync import export_memories
engine = _get_engine()
limit = min(limit, 1000)
events = export_memories(engine.conn, engine.identity, limit)
return {
"count": len(events),
"signed": engine.identity.has_signing,
"agent_pubkey": engine.identity.public_key_hex,
"events": events,
}
@mcp.tool()
def ln_budget_status() -> dict:
"""Check L402 gateway earnings and payment stats.
Shows total sats earned from L402 gateway payments, broken down by operation.
Reads from locally stored payment records (logged by the gateway).
Returns:
Earnings summary: total sats, payment count, breakdown by operation.
"""
import json as _json
engine = _get_engine()
payments = engine.list(memory_type="l402_payment", limit=1000)
total_sats = 0
by_operation: dict[str, int] = {}
for p in payments:
meta = p.get("metadata", {})
if isinstance(meta, str):
meta = _json.loads(meta) if meta else {}
sats = meta.get("amount_sats", 0)
total_sats += sats
op = meta.get("operation", "unknown")
by_operation[op] = by_operation.get(op, 0) + sats
return {
"total_earned_sats": total_sats,
"total_payments": len(payments),
"by_operation": by_operation,
}
def main() -> None:
"""Run the MCP server, or dispatch a CLI subcommand.
Subcommands:
relay-status [--json] Check relay connectivity and last sync state.
"""
import sys
args = sys.argv[1:]
if args and args[0] == "relay-status":
from .cli import cmd_relay_status
raise SystemExit(cmd_relay_status(args[1:]))
if args and args[0] in ("-h", "--help"):
print(
"Usage: lightning-memory [subcommand]\n"
"\n"
"Subcommands:\n"
" relay-status [--json] Check relay connectivity and last sync state.\n"
"\n"
"With no subcommand, starts the MCP server."
)
raise SystemExit(0)
mcp.run()
if __name__ == "__main__":
main()