-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmcp_server_http.py
More file actions
89 lines (68 loc) · 2.34 KB
/
Copy pathmcp_server_http.py
File metadata and controls
89 lines (68 loc) · 2.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#!/usr/bin/env python3
"""FGIP MCP Server (HTTP/SSE) - For claude.ai and remote clients.
Run with:
python3 mcp_server_http.py [--port 8080]
Then connect claude.ai to:
http://your-ip:8080/sse
"""
import asyncio
import json
import sqlite3
import argparse
from pathlib import Path
from typing import Any
from datetime import datetime
from mcp.server import Server
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.responses import JSONResponse
import uvicorn
# FGIP paths
FGIP_ROOT = Path(__file__).parent
DB_PATH = FGIP_ROOT / "fgip.db"
# Import tool definitions from stdio server
from mcp_server import server, list_tools, call_tool
# Create SSE transport
sse = SseServerTransport("/messages")
async def handle_sse(request):
"""Handle SSE connection from claude.ai"""
async with sse.connect_sse(
request.scope, request.receive, request._send
) as streams:
await server.run(
streams[0], streams[1], server.create_initialization_options()
)
async def handle_messages(request):
"""Handle POST messages from SSE clients"""
await sse.handle_post_message(request.scope, request.receive, request._send)
async def health(request):
"""Health check endpoint"""
return JSONResponse({
"status": "ok",
"server": "fgip-mcp",
"transport": "sse",
"tools": 16 # Updated: added get_system_briefing
})
# Starlette app
app = Starlette(
debug=True,
routes=[
Route("/health", health),
Route("/sse", handle_sse),
Route("/messages", handle_messages, methods=["POST"]),
],
)
def main():
parser = argparse.ArgumentParser(description="FGIP MCP Server (HTTP/SSE)")
parser.add_argument("--port", type=int, default=8080, help="Port to listen on")
parser.add_argument("--host", default="0.0.0.0", help="Host to bind to")
args = parser.parse_args()
print(f"Starting FGIP MCP Server (HTTP/SSE)")
print(f" Health: http://{args.host}:{args.port}/health")
print(f" SSE: http://{args.host}:{args.port}/sse")
print(f" Messages: http://{args.host}:{args.port}/messages")
print(f"\nFor claude.ai, use: http://YOUR_PUBLIC_IP:{args.port}/sse")
uvicorn.run(app, host=args.host, port=args.port)
if __name__ == "__main__":
main()