@@ -2,23 +2,30 @@ package runtime
22
33import (
44 "context"
5+ "encoding/json"
56 "fmt"
67 "log/slog"
8+ "time"
9+
10+ "golang.org/x/oauth2"
711
812 "github.com/docker/cagent/pkg/api"
913 "github.com/docker/cagent/pkg/chat"
1014 latest "github.com/docker/cagent/pkg/config/v2"
1115 "github.com/docker/cagent/pkg/session"
1216 "github.com/docker/cagent/pkg/team"
17+ "github.com/docker/cagent/pkg/tools"
18+ "github.com/docker/cagent/pkg/tools/mcp"
1319)
1420
1521// RemoteRuntime implements the Interface using a remote client
1622type RemoteRuntime struct {
17- client * Client
18- currentAgent string
19- agentFilename string
20- sessionID string
21- team * team.Team
23+ client * Client
24+ currentAgent string
25+ agentFilename string
26+ sessionID string
27+ team * team.Team
28+ pendingOAuthElicitation * ElicitationRequestEvent
2229}
2330
2431// RemoteRuntimeOption is a function for configuring the RemoteRuntime
@@ -115,6 +122,10 @@ func (r *RemoteRuntime) RunStream(ctx context.Context, sess *session.Session) <-
115122 }
116123
117124 for streamEvent := range streamChan {
125+ if elicitationRequest , ok := streamEvent .(* ElicitationRequestEvent ); ok {
126+ // Store pending OAuth elicitation request
127+ r .pendingOAuthElicitation = elicitationRequest
128+ }
118129 events <- streamEvent
119130 }
120131 }()
@@ -176,51 +187,186 @@ func (r *RemoteRuntime) convertSessionMessages(sess *session.Session) []api.Mess
176187 return messages
177188}
178189
179- // ResumeStartAuthorizationFlow allows resuming execution after user confirmation
180- func (r * RemoteRuntime ) ResumeStartAuthorizationFlow (ctx context.Context , confirmationType bool ) {
181- slog .Debug ("Resuming remote runtime" , "agent" , r .currentAgent , "confirmation_type " , confirmationType , "session_id" , r .sessionID )
190+ // ResumeElicitation sends an elicitation response back to a waiting elicitation request
191+ func (r * RemoteRuntime ) ResumeElicitation (ctx context.Context , action tools. ElicitationAction , content map [ string ] any ) error {
192+ slog .Debug ("Resuming remote runtime with elicitation response " , "agent" , r .currentAgent , "action " , action , "session_id" , r .sessionID )
182193
183- if r . sessionID == "" {
184- slog . Error ( "Cannot resume: no session ID available" )
185- return
194+ err := r . handleOAuthElicitation ( ctx , r . pendingOAuthElicitation )
195+ if err != nil {
196+ return err
186197 }
198+ // TODO: once we get here and the elicitation is the OAuth type, we need to start the managed OAuth flow
187199
188- if err := r .client .ResumeStartAuthorizationFlow (ctx , r .sessionID , confirmationType ); err != nil {
189- slog . Error ( "Failed to resume remote session" , "error" , err , "session_id" , r . sessionID )
200+ if err := r .client .ResumeElicitation (ctx , r .sessionID , action , content ); err != nil {
201+ return err
190202 }
203+
204+ return nil
191205}
192206
193- // ResumeCodeReceived allows resuming execution after user confirmation
194- func (r * RemoteRuntime ) ResumeCodeReceived (ctx context.Context , code , state string ) error {
195- slog .Debug ("Resuming remote runtime " , "agent " , r . currentAgent , "code" , code , "state" , state , "session_id" , r . sessionID )
207+ // HandleOAuthElicitation handles OAuth elicitation requests from remote MCP servers
208+ func (r * RemoteRuntime ) handleOAuthElicitation (ctx context.Context , req * ElicitationRequestEvent ) error {
209+ slog .Debug ("Handling OAuth elicitation request " , "server_url " , req . Meta [ "cagent/server_url" ] )
196210
197- if r .sessionID == "" {
198- slog .Error ("Cannot resume: no session ID available" )
199- return fmt .Errorf ("session ID cannot be empty" )
211+ // Extract OAuth parameters from metadata
212+ serverURL , ok := req .Meta ["cagent/server_url" ].(string )
213+ if ! ok {
214+ err := fmt .Errorf ("server_url missing from elicitation metadata" )
215+ slog .Error ("Failed to extract server_url" , "error" , err )
216+ _ = r .client .ResumeElicitation (ctx , r .sessionID , "decline" , nil )
217+ return err
200218 }
201219
202- if err := r .client .ResumeCodeReceived (ctx , code , state ); err != nil {
203- slog .Error ("Failed to resume remote session" , "error" , err , "session_id" , r .sessionID )
220+ // Extract authorization server metadata
221+ authServerMetadata , ok := req .Meta ["auth_server_metadata" ].(map [string ]any )
222+ if ! ok {
223+ err := fmt .Errorf ("auth_server_metadata missing from elicitation metadata" )
224+ slog .Error ("Failed to extract auth_server_metadata" , "error" , err )
225+ _ = r .client .ResumeElicitation (ctx , r .sessionID , "decline" , nil )
204226 return err
205227 }
206228
207- return nil
208- }
229+ // Unmarshal authorization server metadata
230+ var authMetadata mcp.AuthorizationServerMetadata
231+ metadataBytes , err := json .Marshal (authServerMetadata )
232+ if err != nil {
233+ slog .Error ("Failed to marshal auth_server_metadata" , "error" , err )
234+ _ = r .client .ResumeElicitation (ctx , r .sessionID , "decline" , nil )
235+ return fmt .Errorf ("failed to marshal auth_server_metadata: %w" , err )
236+ }
237+ if err := json .Unmarshal (metadataBytes , & authMetadata ); err != nil {
238+ slog .Error ("Failed to unmarshal auth_server_metadata" , "error" , err )
239+ _ = r .client .ResumeElicitation (ctx , r .sessionID , "decline" , nil )
240+ return fmt .Errorf ("failed to unmarshal auth_server_metadata: %w" , err )
241+ }
209242
210- // ResumeElicitation sends an elicitation response back to a waiting elicitation request
211- func (r * RemoteRuntime ) ResumeElicitation (ctx context.Context , action string , content map [string ]any ) error {
212- slog .Debug ("Resuming remote runtime with elicitation response" , "agent" , r .currentAgent , "action" , action , "session_id" , r .sessionID )
243+ slog .Debug ("Authorization server metadata extracted" , "issuer" , authMetadata .Issuer )
213244
214- if r .sessionID == "" {
215- slog .Error ("Cannot resume: no session ID available" )
216- return fmt .Errorf ("session ID cannot be empty" )
245+ // Create timeout context for OAuth flow (5 minutes)
246+ oauthCtx , cancel := context .WithTimeout (ctx , 5 * time .Minute )
247+ defer cancel ()
248+
249+ // Create and start callback server
250+ slog .Debug ("Creating OAuth callback server" )
251+ callbackServer , err := mcp .NewCallbackServer ()
252+ if err != nil {
253+ slog .Error ("Failed to create callback server" , "error" , err )
254+ _ = r .client .ResumeElicitation (ctx , r .sessionID , "decline" , nil )
255+ return fmt .Errorf ("failed to create callback server: %w" , err )
217256 }
257+ defer func () {
258+ shutdownCtx , shutdownCancel := context .WithTimeout (context .Background (), 5 * time .Second )
259+ defer shutdownCancel ()
260+ if err := callbackServer .Shutdown (shutdownCtx ); err != nil {
261+ slog .Error ("Failed to shutdown callback server" , "error" , err )
262+ }
263+ }()
264+
265+ if err := callbackServer .Start (); err != nil {
266+ slog .Error ("Failed to start callback server" , "error" , err )
267+ _ = r .client .ResumeElicitation (ctx , r .sessionID , "decline" , nil )
268+ return fmt .Errorf ("failed to start callback server: %w" , err )
269+ }
270+
271+ redirectURI := callbackServer .GetRedirectURI ()
272+ slog .Debug ("Callback server started" , "redirect_uri" , redirectURI )
273+
274+ // Register client
275+ var clientID , clientSecret string
276+ if authMetadata .RegistrationEndpoint != "" {
277+ slog .Debug ("Attempting dynamic client registration" )
278+ clientID , clientSecret , err = mcp .RegisterClient (oauthCtx , & authMetadata , redirectURI , nil )
279+ if err != nil {
280+ slog .Error ("Dynamic client registration failed" , "error" , err )
281+ _ = r .client .ResumeElicitation (ctx , r .sessionID , "decline" , nil )
282+ return fmt .Errorf ("failed to register client: %w" , err )
283+ }
284+ slog .Debug ("Client registered successfully" , "client_id" , clientID )
285+ } else {
286+ err := fmt .Errorf ("authorization server does not support dynamic client registration" )
287+ slog .Error ("Client registration not supported" , "error" , err )
288+ _ = r .client .ResumeElicitation (ctx , r .sessionID , "decline" , nil )
289+ return err
290+ }
291+
292+ // Generate state and PKCE verifier
293+ state , err := mcp .GenerateState ()
294+ if err != nil {
295+ slog .Error ("Failed to generate state" , "error" , err )
296+ _ = r .client .ResumeElicitation (ctx , r .sessionID , "decline" , nil )
297+ return fmt .Errorf ("failed to generate state: %w" , err )
298+ }
299+
300+ callbackServer .SetExpectedState (state )
301+ verifier := mcp .GeneratePKCEVerifier ()
302+
303+ // Build authorization URL
304+ authURL := mcp .BuildAuthorizationURL (
305+ authMetadata .AuthorizationEndpoint ,
306+ clientID ,
307+ redirectURI ,
308+ state ,
309+ oauth2 .S256ChallengeFromVerifier (verifier ),
310+ serverURL ,
311+ )
218312
219- if err := r .client .ResumeElicitation (ctx , action , content ); err != nil {
220- slog .Error ("Failed to resume remote session with elicitation" , "error" , err , "session_id" , r .sessionID )
313+ slog .Debug ("Authorization URL built" , "url" , authURL )
314+
315+ // Request authorization code (this opens the browser)
316+ slog .Debug ("Requesting authorization code" )
317+ code , receivedState , err := mcp .RequestAuthorizationCode (oauthCtx , authURL , callbackServer , state )
318+ if err != nil {
319+ slog .Error ("Failed to get authorization code" , "error" , err )
320+ _ = r .client .ResumeElicitation (ctx , r .sessionID , "decline" , nil )
321+ return fmt .Errorf ("failed to get authorization code: %w" , err )
322+ }
323+
324+ if receivedState != state {
325+ err := fmt .Errorf ("state mismatch: expected %s, got %s" , state , receivedState )
326+ slog .Error ("State mismatch in authorization response" , "error" , err )
327+ _ = r .client .ResumeElicitation (ctx , r .sessionID , "decline" , nil )
221328 return err
222329 }
223330
331+ slog .Debug ("Authorization code received, exchanging for token" )
332+
333+ // Exchange code for token
334+ token , err := mcp .ExchangeCodeForToken (
335+ oauthCtx ,
336+ authMetadata .TokenEndpoint ,
337+ code ,
338+ verifier ,
339+ clientID ,
340+ clientSecret ,
341+ redirectURI ,
342+ )
343+ if err != nil {
344+ slog .Error ("Failed to exchange code for token" , "error" , err )
345+ _ = r .client .ResumeElicitation (ctx , r .sessionID , "decline" , nil )
346+ return fmt .Errorf ("failed to exchange code for token: %w" , err )
347+ }
348+
349+ slog .Debug ("Token obtained successfully" , "token_type" , token .TokenType )
350+
351+ // Send token back to server via ResumeElicitation
352+ tokenData := map [string ]any {
353+ "access_token" : token .AccessToken ,
354+ "token_type" : token .TokenType ,
355+ }
356+ if token .ExpiresIn > 0 {
357+ tokenData ["expires_in" ] = token .ExpiresIn
358+ }
359+ if token .RefreshToken != "" {
360+ tokenData ["refresh_token" ] = token .RefreshToken
361+ }
362+
363+ slog .Debug ("Sending token to server" )
364+ if err := r .client .ResumeElicitation (ctx , r .sessionID , tools .ElicitationActionAccept , tokenData ); err != nil {
365+ slog .Error ("Failed to send token to server" , "error" , err )
366+ return fmt .Errorf ("failed to send token to server: %w" , err )
367+ }
368+
369+ slog .Debug ("OAuth flow completed successfully" )
224370 return nil
225371}
226372
0 commit comments