Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ async-trait = "0.1"
boxfnonce = "0.1.1"
chrono = "0.4"
derive_more = { version = "2", features = ["from"] }
futures = "0.3.31"
futures = "0.3.32"
futures-concurrency = "7.6.3"
fxhash = "0.2.1"
jsonrpcmsg = "0.1.2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl McpBridgeConnectionActor {
to_mcp_client_rx,
} = self;

let client = mcp::Client
let result = mcp::Client
.builder()
.name(format!("mpc-client-to-conductor({connection_id})"))
// When we receive a message from the MCP client, forward it to the conductor
Expand All @@ -70,8 +70,8 @@ impl McpBridgeConnectionActor {
mcp_connection_to_client.send_proxied_message(message)?;
}
Ok(())
});
let result = Box::pin(client).await;
})
.await;

conductor_tx
.send(ConductorMessage::McpConnectionDisconnected {
Expand Down
18 changes: 8 additions & 10 deletions src/agent-client-protocol-conductor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,12 +319,10 @@ impl ConductorArgs {
(None, false) => (None, None),
};

Box::pin(
self.run(debug_logger.as_ref(), trace_writer)
.instrument(tracing::info_span!("conductor", pid = %pid, cwd = %cwd)),
)
.await
.map_err(|err| anyhow::anyhow!("{err}"))
self.run(debug_logger.as_ref(), trace_writer)
.instrument(tracing::info_span!("conductor", pid = %pid, cwd = %cwd))
.await
.map_err(|err| anyhow::anyhow!("{err}"))
}

async fn run(
Expand All @@ -334,23 +332,23 @@ impl ConductorArgs {
) -> Result<(), agent_client_protocol_core::Error> {
match self.command {
ConductorCommand::Agent { name, components } => {
Box::pin(initialize_conductor(
initialize_conductor(
debug_logger,
trace_writer,
name,
components,
ConductorImpl::new_agent,
))
)
.await
}
ConductorCommand::Proxy { name, proxies } => {
Box::pin(initialize_conductor(
initialize_conductor(
debug_logger,
trace_writer,
name,
proxies,
ConductorImpl::new_proxy,
))
)
.await
}
ConductorCommand::Mcp { port } => mcp_bridge::run_mcp_bridge(port).await,
Expand Down
22 changes: 10 additions & 12 deletions src/agent-client-protocol-conductor/tests/arrow_proxy_eliza.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,27 @@ async fn test_conductor_with_arrow_proxy_and_test_agent()

// Spawn the conductor
let conductor_handle = tokio::spawn(async move {
Box::pin(
ConductorImpl::new_agent(
"conductor".to_string(),
ProxiesAndAgent::new(test_agent).proxy(arrow_proxy_agent),
McpBridgeMode::default(),
)
.run(agent_client_protocol_core::ByteStreams::new(
conductor_write.compat_write(),
conductor_read.compat(),
)),
ConductorImpl::new_agent(
"conductor".to_string(),
ProxiesAndAgent::new(test_agent).proxy(arrow_proxy_agent),
McpBridgeMode::default(),
)
.run(agent_client_protocol_core::ByteStreams::new(
conductor_write.compat_write(),
conductor_read.compat(),
))
.await
});

// Wait for editor to complete and get the result
let result = tokio::time::timeout(std::time::Duration::from_secs(30), async move {
let result = Box::pin(yopo::prompt(
let result = yopo::prompt(
agent_client_protocol_core::ByteStreams::new(
editor_write.compat_write(),
editor_read.compat(),
),
TestyCommand::Greet.to_prompt(),
))
)
.await?;

tracing::debug!(?result, "Received response from arrow proxy chain");
Expand Down
22 changes: 10 additions & 12 deletions src/agent-client-protocol-conductor/tests/empty_conductor_eliza.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,29 +54,27 @@ async fn test_conductor_with_empty_conductor_and_test_agent()

// Spawn the conductor
let conductor_handle = tokio::spawn(async move {
Box::pin(
ConductorImpl::new_agent(
"outer-conductor".to_string(),
ProxiesAndAgent::new(Testy::new()).proxy(MockEmptyConductor),
McpBridgeMode::default(),
)
.run(agent_client_protocol_core::ByteStreams::new(
conductor_write.compat_write(),
conductor_read.compat(),
)),
ConductorImpl::new_agent(
"outer-conductor".to_string(),
ProxiesAndAgent::new(Testy::new()).proxy(MockEmptyConductor),
McpBridgeMode::default(),
)
.run(agent_client_protocol_core::ByteStreams::new(
conductor_write.compat_write(),
conductor_read.compat(),
))
.await
});

// Wait for editor to complete and get the result
let result = tokio::time::timeout(std::time::Duration::from_secs(30), async move {
let result = Box::pin(yopo::prompt(
let result = yopo::prompt(
agent_client_protocol_core::ByteStreams::new(
editor_write.compat_write(),
editor_read.compat(),
),
TestyCommand::Greet.to_prompt(),
))
)
.await?;

tracing::debug!(?result, "Received response from empty conductor chain");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,15 @@ async fn run_test_with_components(
.builder()
.name("editor-to-connector")
.with_spawned(|_cx| async move {
Box::pin(
ConductorImpl::new_agent(
"conductor".to_string(),
ProxiesAndAgent::new(Testy::new()).proxies(proxies),
McpBridgeMode::default(),
)
.run(agent_client_protocol_core::ByteStreams::new(
conductor_out.compat_write(),
conductor_in.compat(),
)),
ConductorImpl::new_agent(
"conductor".to_string(),
ProxiesAndAgent::new(Testy::new()).proxies(proxies),
McpBridgeMode::default(),
)
.run(agent_client_protocol_core::ByteStreams::new(
conductor_out.compat_write(),
conductor_in.compat(),
))
.await
})
.connect_with(transport, editor_task)
Expand All @@ -156,22 +154,19 @@ async fn test_single_component_gets_initialize_request()
-> Result<(), agent_client_protocol_core::Error> {
// Single component (agent) should receive InitializeRequest - we use ElizaAgent
// which properly handles InitializeRequest
Box::pin(run_test_with_components(
vec![],
async |connection_to_editor| {
let init_response = recv(
connection_to_editor.send_request(InitializeRequest::new(ProtocolVersion::LATEST)),
)
.await;

assert!(
init_response.is_ok(),
"Initialize should succeed: {init_response:?}"
);

Ok::<(), agent_client_protocol_core::Error>(())
},
))
run_test_with_components(vec![], async |connection_to_editor| {
let init_response = recv(
connection_to_editor.send_request(InitializeRequest::new(ProtocolVersion::LATEST)),
)
.await;

assert!(
init_response.is_ok(),
"Initialize should succeed: {init_response:?}"
);

Ok::<(), agent_client_protocol_core::Error>(())
})
.await?;

Ok(())
Expand All @@ -184,7 +179,7 @@ async fn test_two_components_proxy_gets_initialize_proxy()
// Second component (agent, ElizaAgent) gets InitializeRequest
let component1 = InitConfig::new();

Box::pin(run_test_with_components(
run_test_with_components(
vec![InitComponent::new(&component1)],
async |connection_to_editor| {
let init_response = recv(
Expand All @@ -199,7 +194,7 @@ async fn test_two_components_proxy_gets_initialize_proxy()

Ok::<(), agent_client_protocol_core::Error>(())
},
))
)
.await?;

// First component (proxy) should receive InitializeProxyRequest
Expand All @@ -222,7 +217,7 @@ async fn test_three_components_all_proxies_get_initialize_proxy()
let component1 = InitConfig::new();
let component2 = InitConfig::new();

Box::pin(run_test_with_components(
run_test_with_components(
vec![
InitComponent::new(&component1),
InitComponent::new(&component2),
Expand All @@ -240,7 +235,7 @@ async fn test_three_components_all_proxies_get_initialize_proxy()

Ok::<(), agent_client_protocol_core::Error>(())
},
))
)
.await?;

// First two components (proxies) should receive InitializeProxyRequest
Expand Down Expand Up @@ -307,17 +302,15 @@ async fn run_bad_proxy_test(
.builder()
.name("editor-to-connector")
.with_spawned(|_cx| async move {
Box::pin(
ConductorImpl::new_agent(
"conductor".to_string(),
ProxiesAndAgent::new(agent).proxies(proxies),
McpBridgeMode::default(),
)
.run(agent_client_protocol_core::ByteStreams::new(
conductor_out.compat_write(),
conductor_in.compat(),
)),
ConductorImpl::new_agent(
"conductor".to_string(),
ProxiesAndAgent::new(agent).proxies(proxies),
McpBridgeMode::default(),
)
.run(agent_client_protocol_core::ByteStreams::new(
conductor_out.compat_write(),
conductor_in.compat(),
))
.await
})
.connect_with(transport, editor_task)
Expand All @@ -329,7 +322,7 @@ async fn test_conductor_rejects_initialize_proxy_forwarded_to_agent()
-> Result<(), agent_client_protocol_core::Error> {
// BadProxy incorrectly forwards InitializeProxyRequest to the agent.
// The conductor should reject this with an error.
let result = Box::pin(run_bad_proxy_test(
let result = run_bad_proxy_test(
vec![DynConnectTo::new(BadProxy)],
DynConnectTo::new(Testy::new()),
async |connection_to_editor| {
Expand All @@ -347,7 +340,7 @@ async fn test_conductor_rejects_initialize_proxy_forwarded_to_agent()

Ok::<(), agent_client_protocol_core::Error>(())
},
))
)
.await;

match result {
Expand All @@ -368,7 +361,7 @@ async fn test_conductor_rejects_initialize_proxy_forwarded_to_proxy()
-> Result<(), agent_client_protocol_core::Error> {
// BadProxy incorrectly forwards InitializeProxyRequest to another proxy.
// The conductor should reject this with an error.
let result = Box::pin(run_bad_proxy_test(
let result = run_bad_proxy_test(
vec![
DynConnectTo::new(BadProxy),
DynConnectTo::new(InitComponent::new(&InitConfig::new())), // This proxy will receive the bad request
Expand All @@ -390,7 +383,7 @@ async fn test_conductor_rejects_initialize_proxy_forwarded_to_proxy()

Ok::<(), agent_client_protocol_core::Error>(())
},
))
)
.await;

// The error might bubble up through run_test_with_components instead
Expand Down
Loading
Loading