Skip to content

Commit e590e3d

Browse files
authored
stdio: keep subprocess alive by default (#25)
Add keep-alive mode to StdioTransport (with one-shot opt-out) and extend stdio demo + tests to cover persistence.
1 parent 3ce6213 commit e590e3d

5 files changed

Lines changed: 268 additions & 18 deletions

File tree

examples/stdio_mcp_server.cpp

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ int main()
1010
using Json = nlohmann::json;
1111

1212
fastmcpp::tools::ToolManager tm;
13+
int counter_value = 0;
1314
fastmcpp::tools::Tool add{
1415
"add",
1516
Json{{"type", "object"},
@@ -29,8 +30,27 @@ int main()
2930
}};
3031
tm.register_tool(add);
3132

33+
fastmcpp::tools::Tool counter{
34+
"counter",
35+
Json{{"type", "object"}, {"properties", Json::object()}},
36+
Json{{"type", "array"},
37+
{"items",
38+
Json::array({Json{{"type", "object"},
39+
{"properties", Json{{"type", Json{{"type", "string"}}},
40+
{"text", Json{{"type", "string"}}}}},
41+
{"required", Json::array({"type", "text"})}}})}},
42+
[&counter_value](const Json&) -> Json
43+
{
44+
counter_value += 1;
45+
return Json{{"content",
46+
Json::array({Json{{"type", "text"}, {"text", std::to_string(counter_value)}}})}};
47+
}};
48+
tm.register_tool(counter);
49+
3250
auto handler =
33-
fastmcpp::mcp::make_mcp_handler("demo_stdio", "0.1.0", tm, {{"add", "Add two numbers"}});
51+
fastmcpp::mcp::make_mcp_handler("demo_stdio", "0.1.0", tm,
52+
{{"add", "Add two numbers"},
53+
{"counter", "Increment and return an in-process counter"}});
3454
fastmcpp::server::StdioServerWrapper server(handler);
3555
server.run();
3656
return 0;

include/fastmcpp/client/transports.hpp

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,9 @@ class WebSocketTransport : public ITransport
5353
std::string url_;
5454
};
5555

56-
// Launches an MCP stdio server as a subprocess and performs
57-
// a single JSON-RPC request/response per call.
56+
// Launches an MCP stdio server as a subprocess and performs JSON-RPC requests
57+
// over its stdin/stdout. By default, the subprocess is kept alive between calls
58+
// to better match Python fastmcp behavior; pass keep_alive=false to spawn per call.
5859
class StdioTransport : public ITransport
5960
{
6061
public:
@@ -64,29 +65,44 @@ class StdioTransport : public ITransport
6465
/// @param log_file Optional path where subprocess stderr will be written.
6566
/// If provided, stderr is redirected to this file in append mode.
6667
/// If not provided, stderr is captured and included in error messages.
68+
/// @param keep_alive Whether to keep the subprocess alive between calls. Defaults to true.
6769
explicit StdioTransport(std::string command, std::vector<std::string> args = {},
68-
std::optional<std::filesystem::path> log_file = std::nullopt)
69-
: command_(std::move(command)), args_(std::move(args)), log_file_(std::move(log_file))
70-
{
71-
}
70+
std::optional<std::filesystem::path> log_file = std::nullopt,
71+
bool keep_alive = true);
7272

7373
/// Construct with ostream pointer for stderr (v2.13.0+)
7474
/// @param command The command to execute
7575
/// @param args Command-line arguments
7676
/// @param log_stream Stream pointer where subprocess stderr will be written
7777
/// Caller retains ownership; must remain valid during request()
78-
StdioTransport(std::string command, std::vector<std::string> args, std::ostream* log_stream)
79-
: command_(std::move(command)), args_(std::move(args)), log_stream_(log_stream)
78+
/// @param keep_alive Whether to keep the subprocess alive between calls. Defaults to true.
79+
StdioTransport(std::string command, std::vector<std::string> args, std::ostream* log_stream,
80+
bool keep_alive = true);
81+
82+
StdioTransport(const StdioTransport&) = delete;
83+
StdioTransport& operator=(const StdioTransport&) = delete;
84+
StdioTransport(StdioTransport&&) noexcept;
85+
StdioTransport& operator=(StdioTransport&&) noexcept;
86+
87+
~StdioTransport();
88+
89+
fastmcpp::Json request(const std::string& route, const fastmcpp::Json& payload) override;
90+
91+
bool keep_alive() const noexcept
8092
{
93+
return keep_alive_;
8194
}
8295

83-
fastmcpp::Json request(const std::string& route, const fastmcpp::Json& payload);
84-
8596
private:
8697
std::string command_;
8798
std::vector<std::string> args_;
8899
std::optional<std::filesystem::path> log_file_;
89100
std::ostream* log_stream_ = nullptr;
101+
bool keep_alive_{true};
102+
int64_t next_id_{1};
103+
104+
struct State;
105+
std::unique_ptr<State> state_;
90106
};
91107

92108
/// SSE client transport for connecting to MCP servers using Server-Sent Events protocol.

src/cli/main.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ static int tasks_usage(int exit_code = 1)
5252
std::cout << " --ws <url> WebSocket URL (e.g. ws://127.0.0.1:8765)\n";
5353
std::cout << " --stdio <command> Spawn an MCP stdio server\n";
5454
std::cout << " --stdio-arg <arg> Repeatable args for --stdio\n";
55+
std::cout << " --stdio-one-shot Spawn a fresh process per request (disables keep-alive)\n";
5556
std::cout << "\n";
5657
std::cout << "Notes:\n";
5758
std::cout << " - Python fastmcp's `tasks` CLI is for Docket (distributed workers/Redis).\n";
@@ -75,6 +76,7 @@ struct TasksConnection
7576
std::string url_or_command;
7677
std::string mcp_path = "/mcp";
7778
std::vector<std::string> stdio_args;
79+
bool stdio_keep_alive = true;
7880
};
7981

8082
static bool is_flag(const std::string& s)
@@ -158,6 +160,8 @@ static std::optional<TasksConnection> parse_tasks_connection(std::vector<std::st
158160
conn.url_or_command = *stdio;
159161
saw_any = true;
160162
}
163+
if (consume_flag(args, "--stdio-one-shot"))
164+
conn.stdio_keep_alive = false;
161165

162166
while (true)
163167
{
@@ -185,7 +189,8 @@ static fastmcpp::client::Client make_client_from_connection(const TasksConnectio
185189
case TasksConnection::Kind::WebSocket:
186190
return Client(std::make_unique<WebSocketTransport>(conn.url_or_command));
187191
case TasksConnection::Kind::Stdio:
188-
return Client(std::make_unique<StdioTransport>(conn.url_or_command, conn.stdio_args));
192+
return Client(std::make_unique<StdioTransport>(conn.url_or_command, conn.stdio_args,
193+
std::nullopt, conn.stdio_keep_alive));
189194
}
190195
throw std::runtime_error("Unsupported transport kind");
191196
}

src/client/transports.cpp

Lines changed: 187 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
#include "fastmcpp/client/transports.hpp"
22

33
#include "fastmcpp/exceptions.hpp"
4-
#include "fastmcpp/util/json.hpp"
4+
#include "fastmcpp/util/json.hpp"
55

66
#include <chrono>
7-
#include <easywsclient.hpp>
7+
#include <condition_variable>
8+
#include <deque>
9+
#include <easywsclient.hpp>
810
#include <fstream>
911
#include <httplib.h>
12+
#include <mutex>
1013
#include <sstream>
1114
#include <thread>
1215
#ifdef FASTMCPP_POST_STREAMING
@@ -16,12 +19,28 @@
1619
#include <process.hpp>
1720
#endif
1821

19-
namespace fastmcpp::client
20-
{
22+
namespace fastmcpp::client
23+
{
2124

22-
namespace
25+
struct StdioTransport::State
2326
{
24-
struct ParsedUrl
27+
#ifdef TINY_PROCESS_LIB_AVAILABLE
28+
std::unique_ptr<TinyProcessLib::Process> process;
29+
std::ofstream log_file_stream;
30+
std::ostream* stderr_target{nullptr};
31+
32+
std::mutex request_mutex;
33+
std::mutex mutex;
34+
std::condition_variable cv;
35+
std::string stdout_partial;
36+
std::deque<std::string> stdout_lines;
37+
std::string stderr_data;
38+
#endif
39+
};
40+
41+
namespace
42+
{
43+
struct ParsedUrl
2544
{
2645
std::string scheme; // "http" or "https"
2746
std::string host;
@@ -508,6 +527,20 @@ void WebSocketTransport::request_stream(const std::string& route, const fastmcpp
508527
ws->close();
509528
}
510529

530+
StdioTransport::StdioTransport(std::string command, std::vector<std::string> args,
531+
std::optional<std::filesystem::path> log_file, bool keep_alive)
532+
: command_(std::move(command)), args_(std::move(args)), log_file_(std::move(log_file)),
533+
keep_alive_(keep_alive)
534+
{
535+
}
536+
537+
StdioTransport::StdioTransport(std::string command, std::vector<std::string> args,
538+
std::ostream* log_stream, bool keep_alive)
539+
: command_(std::move(command)), args_(std::move(args)), log_stream_(log_stream),
540+
keep_alive_(keep_alive)
541+
{
542+
}
543+
511544
fastmcpp::Json StdioTransport::request(const std::string& route, const fastmcpp::Json& payload)
512545
{
513546
// Use TinyProcessLibrary (fetched via CMake) for cross-platform subprocess handling
@@ -519,6 +552,131 @@ fastmcpp::Json StdioTransport::request(const std::string& route, const fastmcpp:
519552

520553
#ifdef TINY_PROCESS_LIB_AVAILABLE
521554
using namespace TinyProcessLib;
555+
556+
if (keep_alive_)
557+
{
558+
if (!state_)
559+
{
560+
state_ = std::make_unique<State>();
561+
562+
if (log_file_.has_value())
563+
{
564+
state_->log_file_stream.open(log_file_.value(), std::ios::app);
565+
if (state_->log_file_stream.is_open())
566+
state_->stderr_target = &state_->log_file_stream;
567+
}
568+
else if (log_stream_ != nullptr)
569+
{
570+
state_->stderr_target = log_stream_;
571+
}
572+
573+
auto stdout_callback = [st_ptr = state_.get()](const char* bytes, size_t n)
574+
{
575+
std::lock_guard<std::mutex> lock(st_ptr->mutex);
576+
st_ptr->stdout_partial.append(bytes, n);
577+
578+
for (;;)
579+
{
580+
auto pos = st_ptr->stdout_partial.find('\n');
581+
if (pos == std::string::npos)
582+
break;
583+
584+
std::string line = st_ptr->stdout_partial.substr(0, pos);
585+
if (!line.empty() && line.back() == '\r')
586+
line.pop_back();
587+
st_ptr->stdout_lines.push_back(std::move(line));
588+
st_ptr->stdout_partial.erase(0, pos + 1);
589+
}
590+
591+
st_ptr->cv.notify_all();
592+
};
593+
594+
auto stderr_callback = [st_ptr = state_.get()](const char* bytes, size_t n)
595+
{
596+
std::lock_guard<std::mutex> lock(st_ptr->mutex);
597+
if (st_ptr->stderr_target != nullptr)
598+
{
599+
st_ptr->stderr_target->write(bytes, n);
600+
st_ptr->stderr_target->flush();
601+
}
602+
st_ptr->stderr_data.append(bytes, n);
603+
};
604+
605+
state_->process = std::make_unique<Process>(cmd.str(), "", stdout_callback,
606+
stderr_callback, /*open_stdin*/ true);
607+
}
608+
609+
auto* st = state_.get();
610+
std::lock_guard<std::mutex> request_lock(st->request_mutex);
611+
612+
const int64_t id = next_id_++;
613+
fastmcpp::Json request = {
614+
{"jsonrpc", "2.0"},
615+
{"id", id},
616+
{"method", route},
617+
{"params", payload},
618+
};
619+
620+
{
621+
std::lock_guard<std::mutex> lock(st->mutex);
622+
st->stderr_data.clear();
623+
}
624+
625+
if (!st->process->write(request.dump() + "\n"))
626+
throw fastmcpp::TransportError("StdioTransport: failed to write request");
627+
628+
// Wait for a response matching this ID.
629+
// Note: stdio servers may emit notifications or logs; ignore non-matching lines.
630+
for (;;)
631+
{
632+
int exit_status = 0;
633+
if (st->process->try_get_exit_status(exit_status))
634+
{
635+
std::lock_guard<std::mutex> lock(st->mutex);
636+
throw fastmcpp::TransportError(
637+
"StdioTransport process exited with code: " +
638+
std::to_string(exit_status) +
639+
(st->stderr_data.empty() ? std::string("")
640+
: ("; stderr: ") + st->stderr_data));
641+
}
642+
643+
std::unique_lock<std::mutex> lock(st->mutex);
644+
if (!st->cv.wait_for(lock, std::chrono::seconds(30),
645+
[&]() { return !st->stdout_lines.empty(); }))
646+
{
647+
throw fastmcpp::TransportError("StdioTransport: timed out waiting for response");
648+
}
649+
650+
while (!st->stdout_lines.empty())
651+
{
652+
auto line = std::move(st->stdout_lines.front());
653+
st->stdout_lines.pop_front();
654+
lock.unlock();
655+
656+
if (line.empty())
657+
{
658+
lock.lock();
659+
continue;
660+
}
661+
662+
try
663+
{
664+
auto parsed = fastmcpp::util::json::parse(line);
665+
if (parsed.contains("id") && parsed["id"].is_number_integer() &&
666+
parsed["id"].get<int64_t>() == id)
667+
{
668+
return parsed;
669+
}
670+
}
671+
catch (...)
672+
{
673+
// Ignore non-JSON stdout lines (e.g., server logs).
674+
}
675+
676+
lock.lock();
677+
}
678+
}
679+
}
522680
std::string stdout_data;
523681
std::string stderr_data;
524682

@@ -582,6 +740,29 @@ fastmcpp::Json StdioTransport::request(const std::string& route, const fastmcpp:
582740
#endif
583741
}
584742

743+
StdioTransport::StdioTransport(StdioTransport&&) noexcept = default;
744+
StdioTransport& StdioTransport::operator=(StdioTransport&&) noexcept = default;
745+
746+
StdioTransport::~StdioTransport()
747+
{
748+
#ifdef TINY_PROCESS_LIB_AVAILABLE
749+
if (state_ && state_->process)
750+
{
751+
state_->process->close_stdin();
752+
753+
int exit_status = 0;
754+
for (int i = 0; i < 10; i++)
755+
{
756+
if (state_->process->try_get_exit_status(exit_status))
757+
return;
758+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
759+
}
760+
761+
state_->process->kill(false);
762+
}
763+
#endif
764+
}
765+
585766
// =============================================================================
586767
// SseClientTransport implementation
587768
// =============================================================================

0 commit comments

Comments
 (0)