From 5d13869cbd6a316a9cafe09741017e4d9214d8b3 Mon Sep 17 00:00:00 2001 From: "yulin.deng" <1016068291@qq.com> Date: Fri, 15 Aug 2025 12:04:24 +0800 Subject: [PATCH 1/3] add fastapi --- mssql_mcp_server/config/settings.py | 4 +- mssql_mcp_server/server.py | 183 ++++++++++++++++++++++------ requirements.txt | 4 +- 3 files changed, 149 insertions(+), 42 deletions(-) diff --git a/mssql_mcp_server/config/settings.py b/mssql_mcp_server/config/settings.py index 411fd46..14a2dc7 100644 --- a/mssql_mcp_server/config/settings.py +++ b/mssql_mcp_server/config/settings.py @@ -89,6 +89,7 @@ class ServerConfig: max_rows_limit: int = 100 batch_rows_size: int = 100 mcp_port: int = 8000 + fastapi_port: int = 8001 # FastAPI app port enable_async: bool = True enable_dynamic_resources: bool = True @@ -169,7 +170,8 @@ def _load_server_config(self) -> ServerConfig: batch_rows_size=int(os.getenv("BATCH_ROWS_SIZE", "100")), enable_async=os.getenv("ENABLE_ASYNC", "true").lower() == "true", enable_dynamic_resources=os.getenv("ENABLE_DYNAMIC_RESOURCES", "true").lower() == "true", - mcp_port=int(os.getenv("FASTMCP_PORT", "8000")) + mcp_port=int(os.getenv("FASTMCP_PORT", "8000")), + fastapi_port=int(os.getenv("FASTAPI_PORT", "8001")) ) diff --git a/mssql_mcp_server/server.py b/mssql_mcp_server/server.py index 7b441e5..ddfc75c 100644 --- a/mssql_mcp_server/server.py +++ b/mssql_mcp_server/server.py @@ -2,8 +2,10 @@ import asyncio from typing import Optional from dotenv import load_dotenv +from fastapi import FastAPI, Query from fastmcp import FastMCP from fastmcp import Context +import uvicorn from mssql_mcp_server.config.settings import settings from mssql_mcp_server.database.async_connection import get_pool, close_pool from mssql_mcp_server.handlers.async_resources import AsyncResourceHandlers @@ -15,11 +17,56 @@ logger = Logger.get_logger(__name__) -app = FastMCP(name="mssql_mcp_server") +app = FastAPI() +mcp = FastMCP(name="mssql_mcp_server") + + +@app.get("/health") +async def health_check(timeout: Optional[int] = Query(default=None, description="睡眠时间(秒),每5秒打印一次日志。如果为空则直接返回结果")): + """ + Health check endpoint for the FastAPI application. + + Args: + timeout: 睡眠时间(秒),每5秒打印一次日志。如果为空则直接返回结果 + + Returns: + Health status response + """ + start_time = asyncio.get_event_loop().time() + + # 如果timeout为空,直接返回结果 + if timeout is None: + logger.info("健康检查请求 - 立即返回") + return {"status": "ok"} + + logger.info(f"健康检查开始,超时时间: {timeout} 秒") + + if timeout > 0: + elapsed = 0 + while elapsed < timeout: + # 计算剩余时间 + remaining = timeout - elapsed + # 等待时间:取剩余时间和5秒的较小值 + wait_time = min(5, remaining) + + await asyncio.sleep(wait_time) + elapsed = int(asyncio.get_event_loop().time() - start_time) + + if elapsed < timeout: + logger.info(f"健康检查运行中... 已用时: {elapsed} 秒,剩余: {timeout - elapsed} 秒") + + total_time = int(asyncio.get_event_loop().time() - start_time) + logger.info(f"健康检查完成,总耗时: {total_time} 秒") + + return { + "status": "ok", + "timeout": timeout, + "actual_duration": total_time + } # Static database-level resources -@app.resource("mssql://database/tables") +@mcp.resource("mssql://database/tables") async def get_database_tables() -> str: """List all tables in the database.""" try: @@ -30,7 +77,7 @@ async def get_database_tables() -> str: return f"Error: {str(e)}" -@app.resource("mssql://database/views") +@mcp.resource("mssql://database/views") async def get_database_views() -> str: """List all views in the database.""" try: @@ -41,7 +88,7 @@ async def get_database_views() -> str: return f"Error: {str(e)}" -@app.resource("mssql://database/info") +@mcp.resource("mssql://database/info") async def get_database_info_resource() -> str: """Get general database information.""" try: @@ -68,7 +115,7 @@ def create_object_data_resource(object_name: str, object_type: str): schema, name = object_name.split('.', 1) limit = 100 # Default limit for data retrieval - @app.resource(f"mssql://{object_type}/{schema}/{name}/data", + @mcp.resource(f"mssql://{object_type}/{schema}/{name}/data", name=f"{object_type.title()} Data: {object_name}", description=f"Data from {object_type} {object_name} (top {limit} rows)") async def get_object_data_func(): @@ -85,7 +132,7 @@ def create_object_schema_resource(object_name: str, object_type: str): """Factory function to create object schema resource.""" schema, name = object_name.split('.', 1) - @app.resource(f"mssql://{object_type}/{schema}/{name}/schema", + @mcp.resource(f"mssql://{object_type}/{schema}/{name}/schema", name=f"{object_type.title()} Schema: {object_name}", description=f"Schema information for {object_type} {object_name}") async def get_object_schema_func(): @@ -118,7 +165,7 @@ async def get_object_schema_func(): return 0 -@app.tool() +@mcp.tool() async def execute_sql(query: str, allow_modifications: bool = False, ctx: Optional[Context] = None) -> str: """ Execute an SQL query on the MSSQL server. @@ -138,7 +185,7 @@ async def execute_sql(query: str, allow_modifications: bool = False, ctx: Option return f"Error: {str(e)}" -@app.tool(enabled=False) +@mcp.tool(enabled=False) async def get_table_schema(table_name: str) -> str: """ Get schema information for a specific table. @@ -157,7 +204,7 @@ async def get_table_schema(table_name: str) -> str: return f"Error: {str(e)}" -@app.tool(enabled=False) +@mcp.tool(enabled=False) async def list_tables() -> str: """ Get a list of all tables in the database. @@ -174,7 +221,7 @@ async def list_tables() -> str: return f"Error: {str(e)}" -@app.tool(enabled=False) +@mcp.tool(enabled=False) async def get_table_data(table_name: str, limit: int = None) -> str: """ Get data from a specific table. @@ -194,7 +241,7 @@ async def get_table_data(table_name: str, limit: int = None) -> str: return f"Error: {str(e)}" -@app.tool(enabled=False) +@mcp.tool(enabled=False) async def test_connection() -> str: """ Test the database connection and get connection info. @@ -210,7 +257,7 @@ async def test_connection() -> str: return f"Error: {str(e)}" -@app.tool(enabled=False) +@mcp.tool(enabled=False) async def get_database_info() -> str: """ Get comprehensive database information. @@ -226,7 +273,7 @@ async def get_database_info() -> str: return f"Error: {str(e)}" -@app.tool(enabled=False) +@mcp.tool(enabled=False) async def clear_cache(pattern: str = "") -> str: """ Clear cache entries. @@ -245,7 +292,7 @@ async def clear_cache(pattern: str = "") -> str: return f"Error: {str(e)}" -@app.tool(enabled=False) +@mcp.tool(enabled=False) async def invalidate_table_cache(table_name: str = None) -> str: """ Invalidate cache for specific table or all tables. @@ -306,6 +353,55 @@ async def initialize_server() -> None: raise +async def start_fastapi_server(): + """Start the FastAPI server.""" + try: + fastapi_port = settings.server.fastapi_port + host = settings.server.host + logger.info(f"Starting FastAPI server on {host}:{fastapi_port}") + + config = uvicorn.Config( + app=app, + host=host, + port=fastapi_port, + log_level=settings.server.log_level.lower(), + access_log=True + ) + server = uvicorn.Server(config) + await server.serve() + + except Exception as e: + logger.error(f"FastAPI server error: {e}", exc_info=True) + raise + + +async def start_mcp_server(): + """Start the MCP server.""" + try: + transport = settings.server.transport + host = settings.server.host + port = settings.server.mcp_port + logger.info(f"Starting MCP server with transport: {transport}") + + if transport in ["http", "tcp", "sse"]: + logger.info(f"Using host: {host}, port: {port}") + await mcp.run_async(transport=transport, host=host, port=port, uvicorn_config={ + "workers": 1, + "timeout_keep_alive": 300, + "timeout_notify": 300, + "backlog": 2048, + "limit_concurrency": 50, + "limit_max_requests": None, + }) + else: + logger.info(f"Using {transport} transport") + await mcp.run_async(transport=transport) + + except Exception as e: + logger.error(f"MCP server error: {e}", exc_info=True) + raise + + async def cleanup_server() -> None: """Clean up server resources.""" try: @@ -325,39 +421,46 @@ async def cleanup_server() -> None: async def main(): - """Main entry point to run the MCP server.""" + """Main entry point to run both MCP and FastAPI servers concurrently.""" try: # Initialize all components await initialize_server() - # Get transport configuration + # Start both servers concurrently + logger.info("Starting both MCP and FastAPI servers concurrently...") + + # Create tasks for both servers + tasks = [] + + # Always start FastAPI server + fastapi_task = asyncio.create_task(start_fastapi_server()) + tasks.append(fastapi_task) + + # Start MCP server if not using stdio transport transport = settings.server.transport - host = settings.server.host - port = settings.server.mcp_port - logger.info(f"Starting server with transport: {transport}") - - if transport in ["http", "tcp", "sse"]: - logger.info(f"Using host: {host}, port: {port}") - # Explicitly pass host and port to override FastMCP's default behavior - try: - await app.run_async(transport=transport, host=host, port=port, uvicorn_config={ - "workers": 1, - "timeout_keep_alive": 300, - "timeout_notify": 300, - "backlog": 2048, - "limit_concurrency": 50, - "limit_max_requests": None, - }) - except Exception as e: - logger.error(f"FastMCP server error: {e}", exc_info=True) - raise + if transport != "stdio": + mcp_task = asyncio.create_task(start_mcp_server()) + tasks.append(mcp_task) else: - logger.info(f"Using {transport} transport") + logger.info("MCP server using stdio transport - starting inline") + await start_mcp_server() + return + + # Wait for any task to complete or fail + done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + + # Cancel pending tasks + for task in pending: + task.cancel() try: - await app.run_async(transport=transport) - except Exception as e: - logger.error(f"FastMCP server error: {e}", exc_info=True) - raise + await task + except asyncio.CancelledError: + pass + + # Check if any task failed + for task in done: + if task.exception(): + raise task.exception() except KeyboardInterrupt: logger.info("Server shutdown requested by user") diff --git a/requirements.txt b/requirements.txt index 8bffcf3..21863b4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,6 @@ fastmcp>=2.0.0 pyodbc>=4.0.0 python-dotenv>=1.0.0 pydantic>=2.0.0 -aioodbc>=0.4.0 \ No newline at end of file +aioodbc>=0.4.0 +fastapi>=0.116.1 +uvicorn>=0.30.0 \ No newline at end of file From a9c13ee4220b6ecfac97db1cd5904ae8cbf81de6 Mon Sep 17 00:00:00 2001 From: "yulin.deng" <1016068291@qq.com> Date: Fri, 15 Aug 2025 21:23:44 +0800 Subject: [PATCH 2/3] add /health api --- mssql_mcp_server/server.py | 40 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/mssql_mcp_server/server.py b/mssql_mcp_server/server.py index 7b441e5..68b9182 100644 --- a/mssql_mcp_server/server.py +++ b/mssql_mcp_server/server.py @@ -4,6 +4,8 @@ from dotenv import load_dotenv from fastmcp import FastMCP from fastmcp import Context +from starlette.requests import Request +from starlette.responses import JSONResponse from mssql_mcp_server.config.settings import settings from mssql_mcp_server.database.async_connection import get_pool, close_pool from mssql_mcp_server.handlers.async_resources import AsyncResourceHandlers @@ -118,6 +120,44 @@ async def get_object_schema_func(): return 0 +@app.custom_route("/health", methods=["GET"]) +async def health_check(request: Request) -> JSONResponse: + """ + Health check endpoint for the FastAPI application. + + Args: + timeout: 睡眠时间(秒),每5秒打印一次日志。如果为空则直接返回结果 + + Returns: + Health status response + """ + + timeout = request.query_params.get("timeout") + start_time = asyncio.get_event_loop().time() + if timeout is None: + logger.info("健康检查请求 - 立即返回") + return JSONResponse({"status": "ok"}) + logger.info(f"健康检查开始,超时时间: {timeout} 秒") + timeout = int(timeout) if timeout.isdigit() else 0 + if timeout > 0: + elapsed = 0 + while elapsed < timeout: + remaining = timeout - elapsed + wait_time = min(5, remaining) + await asyncio.sleep(wait_time) + elapsed = int(asyncio.get_event_loop().time() - start_time) + if elapsed < timeout: + logger.info(f"健康检查运行中... 已用时: {elapsed} 秒,剩余: {timeout - elapsed} 秒") + + total_time = int(asyncio.get_event_loop().time() - start_time) + logger.info(f"健康检查完成,总耗时: {total_time} 秒") + return JSONResponse({ + "status": "ok", + "timeout": timeout, + "actual_duration": total_time + }) + + @app.tool() async def execute_sql(query: str, allow_modifications: bool = False, ctx: Optional[Context] = None) -> str: """ From f39be451ea9dc85ccb107f6448b422451d6bc256 Mon Sep 17 00:00:00 2001 From: "yulin.deng" <1016068291@qq.com> Date: Fri, 15 Aug 2025 21:44:28 +0800 Subject: [PATCH 3/3] clean up the code --- mssql_mcp_server/config/settings.py | 2 -- requirements.txt | 4 +--- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/mssql_mcp_server/config/settings.py b/mssql_mcp_server/config/settings.py index 14a2dc7..6e0e89a 100644 --- a/mssql_mcp_server/config/settings.py +++ b/mssql_mcp_server/config/settings.py @@ -89,7 +89,6 @@ class ServerConfig: max_rows_limit: int = 100 batch_rows_size: int = 100 mcp_port: int = 8000 - fastapi_port: int = 8001 # FastAPI app port enable_async: bool = True enable_dynamic_resources: bool = True @@ -171,7 +170,6 @@ def _load_server_config(self) -> ServerConfig: enable_async=os.getenv("ENABLE_ASYNC", "true").lower() == "true", enable_dynamic_resources=os.getenv("ENABLE_DYNAMIC_RESOURCES", "true").lower() == "true", mcp_port=int(os.getenv("FASTMCP_PORT", "8000")), - fastapi_port=int(os.getenv("FASTAPI_PORT", "8001")) ) diff --git a/requirements.txt b/requirements.txt index 21863b4..8bffcf3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,4 @@ fastmcp>=2.0.0 pyodbc>=4.0.0 python-dotenv>=1.0.0 pydantic>=2.0.0 -aioodbc>=0.4.0 -fastapi>=0.116.1 -uvicorn>=0.30.0 \ No newline at end of file +aioodbc>=0.4.0 \ No newline at end of file