Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.adk.models.LlmRegistry;
import com.google.adk.models.LlmRequest;
import com.google.adk.models.LlmResponse;
import com.google.adk.sessions.SessionUtils;
import com.google.adk.telemetry.Tracing;
import com.google.adk.tools.BaseTool;
import com.google.adk.tools.BaseToolset;
Expand Down Expand Up @@ -503,7 +504,30 @@ public Flowable<Event> run(InvocationContext invocationContext) {

private Flowable<Event> run(
Context spanContext, InvocationContext invocationContext, int stepsCompleted) {
Flowable<Event> currentStepEvents = runOneStep(spanContext, invocationContext).cache();
// Persist each event to the session synchronously within the step so that the next step does
// not start before the previous step's events have been appended. Without this, the deferred
// continuation (concatWith below) subscribes synchronously on runOneStep's upstream onComplete
// signal, which can race with the downstream consumer's appendEvent calls in Runner.
//
// The Runner-side appendEvent still runs and deduplicates this event by id, so plugin
// onEventCallback and non-LlmAgent paths are unaffected.
//
// Events emitted by a transferred sub-agent's nested BaseLlmFlow.run() have already been
// appended by that nested flow, so skip them here to avoid duplicates. Deduplication is done
// by event id against the session's existing events.
Flowable<Event> currentStepEvents =
runOneStep(spanContext, invocationContext)
.concatMap(
event -> {
if (SessionUtils.isEventAlreadyAppended(invocationContext.session(), event)) {
return Flowable.just(event);
}
return invocationContext
.sessionService()
.appendEvent(invocationContext.session(), event)
.toFlowable();
})
.cache();
if (stepsCompleted + 1 >= maxSteps) {
logger.debug("Ending flow execution because max steps reached.");
return currentStepEvents;
Expand Down
29 changes: 18 additions & 11 deletions core/src/main/java/com/google/adk/runner/Runner.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.adk.sessions.InMemorySessionService;
import com.google.adk.sessions.Session;
import com.google.adk.sessions.SessionKey;
import com.google.adk.sessions.SessionUtils;
import com.google.adk.summarizer.EventsCompactionConfig;
import com.google.adk.summarizer.LlmEventSummarizer;
import com.google.adk.summarizer.SlidingWindowEventCompactor;
Expand Down Expand Up @@ -581,19 +582,25 @@ private Flowable<Event> runAgentWithUpdatedSession(
.agent()
.runAsync(contextWithUpdatedSession)
.concatMap(
agentEvent ->
this.sessionService
.appendEvent(updatedSession, agentEvent)
.flatMap(
registeredEvent -> {
// TODO: remove this hack after deprecating runAsync with Session.
copySessionStates(updatedSession, initialContext.session());
return contextWithUpdatedSession
agentEvent -> {
// TODO: remove this hack after deprecating runAsync with Session.
copySessionStates(updatedSession, initialContext.session());
// BaseLlmFlow appends events synchronously to fix a race where the next LLM
// step would otherwise start before the previous step's events were persisted.
// Skip the duplicate append here so the event is not added twice.
Single<Event> appendOrSkip =
SessionUtils.isEventAlreadyAppended(updatedSession, agentEvent)
? Single.just(agentEvent)
: this.sessionService.appendEvent(updatedSession, agentEvent);
return appendOrSkip
.flatMap(
registeredEvent ->
contextWithUpdatedSession
.pluginManager()
.onEventCallback(contextWithUpdatedSession, registeredEvent)
.defaultIfEmpty(registeredEvent);
})
.toFlowable());
.defaultIfEmpty(registeredEvent))
.toFlowable();
});

// If beforeRunCallback returns content, emit it and skip agent
Context capturedContext = Context.current();
Expand Down
27 changes: 27 additions & 0 deletions core/src/main/java/com/google/adk/sessions/SessionUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.adk.sessions;

import com.google.adk.events.Event;
import com.google.common.collect.ImmutableList;
import com.google.genai.types.Blob;
import com.google.genai.types.Content;
Expand All @@ -31,6 +32,32 @@ public final class SessionUtils {

public SessionUtils() {}

/**
* Returns true if an event with the same id is already present in {@code session.events()}.
*
* <p>Used to deduplicate {@code appendEvent} calls when the same event flows through multiple
* append points (e.g. {@code BaseLlmFlow.run} for a transferred sub-agent and the parent flow, or
* {@code BaseLlmFlow.run} and {@code Runner}).
*/
public static boolean isEventAlreadyAppended(Session session, Event event) {
String eventId = event.id();
if (eventId == null) {
return false;
}
List<Event> events = session.events();
if (events == null || events.isEmpty()) {
return false;
}
synchronized (events) {
for (Event existing : events) {
if (eventId.equals(existing.id())) {
return true;
}
}
}
return false;
}

/** Base64-encodes inline blobs in content. */
public static Content encodeContent(Content content) {
List<Part> encodedParts = new ArrayList<>();
Expand Down
147 changes: 147 additions & 0 deletions core/src/test/java/com/google/adk/runner/RunnerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static org.mockito.Mockito.when;

import com.google.adk.agents.BaseAgent;
import com.google.adk.agents.Callbacks.AfterModelCallback;
import com.google.adk.agents.InvocationContext;
import com.google.adk.agents.LiveRequestQueue;
import com.google.adk.agents.LlmAgent;
Expand All @@ -47,9 +48,14 @@
import com.google.adk.artifacts.BaseArtifactService;
import com.google.adk.events.Event;
import com.google.adk.flows.llmflows.Functions;
import com.google.adk.models.LlmRequest;
import com.google.adk.models.LlmResponse;
import com.google.adk.plugins.BasePlugin;
import com.google.adk.sessions.BaseSessionService;
import com.google.adk.sessions.GetSessionConfig;
import com.google.adk.sessions.InMemorySessionService;
import com.google.adk.sessions.ListEventsResponse;
import com.google.adk.sessions.ListSessionsResponse;
import com.google.adk.sessions.Session;
import com.google.adk.sessions.SessionKey;
import com.google.adk.summarizer.EventsCompactionConfig;
Expand Down Expand Up @@ -85,6 +91,7 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
Expand Down Expand Up @@ -860,6 +867,146 @@ public void runAsync_concurrentCalls_firstFails_secondSucceeds() throws Exceptio
subscriber2.assertValue(agentEvent);
}

/**
* A slow appendEvent must not let the next LLM step start with a stale session missing the
* previous step's function-response event.
*/
@Test
public void runAsync_slowAppendEvent_doesNotCauseStaleSessionInNextStep() throws Exception {
TestLlm raceTestLlm =
createTestLlm(
createFunctionCallLlmResponse("call_1", echoTool.name(), ImmutableMap.of("arg", "v1")),
createTextLlmResponse("done"));

LlmAgent agentForRace =
createTestAgentBuilder(raceTestLlm).tools(ImmutableList.of(echoTool)).build();

BaseSessionService delayedSessionService =
new AppendDelayingSessionService(new InMemorySessionService(), 50);

Runner runnerForRace =
Runner.builder()
.app(App.builder().name("test").rootAgent(agentForRace).build())
.sessionService(delayedSessionService)
.build();
Session raceSession =
runnerForRace.sessionService().createSession("test", "user").blockingGet();

var unused =
runnerForRace
.runAsync("user", raceSession.id(), createContent("start"))
.toList()
.blockingGet();

ImmutableList<LlmRequest> requests = ImmutableList.copyOf(raceTestLlm.getRequests());
assertThat(requests).hasSize(2);

// Second LLM request must see the function response from step 1.
boolean foundToolResponse =
requests.get(1).contents().stream()
.flatMap(c -> c.parts().stream().flatMap(List::stream))
.anyMatch(part -> part.functionResponse().isPresent());
assertThat(foundToolResponse).isTrue();
}

/**
* When an LlmAgent transfers control to a sub-LlmAgent, the sub-agent's events flow back up
* through the parent's {@code BaseLlmFlow.run()} pipeline. Each event must be appended to the
* session exactly once.
*/
@Test
public void runAsync_transferToSubAgent_eventsAppendedOnce() throws Exception {
LlmAgent subAgent =
createTestAgentBuilder(createTestLlm(createTextLlmResponse("sub response")))
.name("sub-agent")
.build();

// Force a transfer to sub-agent using an afterModelCallback.
AfterModelCallback transferCallback =
(ctx, response) -> {
ctx.eventActions().setTransferToAgent(subAgent.name());
return Maybe.empty();
};

TestLlm rootTestLlm = createTestLlm(createTextLlmResponse("initial"));
LlmAgent rootAgent =
createTestAgentBuilder(rootTestLlm)
.subAgents(subAgent)
.afterModelCallback(ImmutableList.of(transferCallback))
.build();

Runner transferRunner =
Runner.builder().app(App.builder().name("test").rootAgent(rootAgent).build()).build();
Session transferSession =
transferRunner.sessionService().createSession("test", "user").blockingGet();

var unused =
transferRunner
.runAsync("user", transferSession.id(), createContent("start"))
.toList()
.blockingGet();

Session finalSession =
transferRunner
.sessionService()
.getSession(
transferSession.appName(),
transferSession.userId(),
transferSession.id(),
Optional.empty())
.blockingGet();

// Each event id should appear at most once in the session.
List<String> eventIds = finalSession.events().stream().map(Event::id).toList();
assertThat(eventIds).containsNoDuplicates();
}

/** {@link BaseSessionService} that delays {@link #appendEvent} to surface ordering bugs. */
private static final class AppendDelayingSessionService implements BaseSessionService {
private final BaseSessionService delegate;
private final long appendDelayMs;

AppendDelayingSessionService(BaseSessionService delegate, long appendDelayMs) {
this.delegate = delegate;
this.appendDelayMs = appendDelayMs;
}

// Delegates to the underlying BaseSessionService createSession overload, which is itself
// deprecated; suppressed because the wrapper must preserve the same signature.
@SuppressWarnings("deprecation")
@Override
public Single<Session> createSession(
String appName, String userId, ConcurrentMap<String, Object> state, String sessionId) {
return delegate.createSession(appName, userId, state, sessionId);
}

@Override
public Maybe<Session> getSession(
String appName, String userId, String sessionId, Optional<GetSessionConfig> config) {
return delegate.getSession(appName, userId, sessionId, config);
}

@Override
public Single<ListSessionsResponse> listSessions(String appName, String userId) {
return delegate.listSessions(appName, userId);
}

@Override
public Completable deleteSession(String appName, String userId, String sessionId) {
return delegate.deleteSession(appName, userId, sessionId);
}

@Override
public Single<ListEventsResponse> listEvents(String appName, String userId, String sessionId) {
return delegate.listEvents(appName, userId, sessionId);
}

@Override
public Single<Event> appendEvent(Session session, Event event) {
return delegate.appendEvent(session, event).delay(appendDelayMs, MILLISECONDS);
}
}

@Test
public void runAsync_withSessionKey_success() {
var events =
Expand Down