Skip to content

Commit 0c48f72

Browse files
authored
feat!: 553:Adding pagination to JpaDatabasePushNotificationConfigStore (#564)
# Description - Added Pagination support to getInfo method in JpaDatabasePushNotificationConfigStore -- Follows the same pattern as that of JpaDatabaseTaskStore - Updated sendNotification(Task) in BasePushNotificationSender to paginate results if a nextToken is present Fixes #553 🦕
1 parent 7f5c6c8 commit 0c48f72

6 files changed

Lines changed: 282 additions & 75 deletions

File tree

extras/push-notification-config-store-database-jpa/src/main/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaDatabasePushNotificationConfigStore.java

Lines changed: 65 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package io.a2a.extras.pushnotificationconfigstore.database.jpa;
22

3-
import java.util.ArrayList;
4-
import java.util.Collections;
3+
import jakarta.persistence.TypedQuery;
4+
import java.time.Instant;
55
import java.util.List;
66

77
import jakarta.annotation.Priority;
@@ -17,6 +17,7 @@
1717
import io.a2a.spec.ListTaskPushNotificationConfigResult;
1818
import io.a2a.spec.PushNotificationConfig;
1919
import io.a2a.spec.TaskPushNotificationConfig;
20+
import java.util.stream.Collectors;
2021
import org.slf4j.Logger;
2122
import org.slf4j.LoggerFactory;
2223

@@ -26,7 +27,9 @@
2627
public class JpaDatabasePushNotificationConfigStore implements PushNotificationConfigStore {
2728

2829
private static final Logger LOGGER = LoggerFactory.getLogger(JpaDatabasePushNotificationConfigStore.class);
29-
30+
31+
private static final Instant NULL_TIMESTAMP_SENTINEL = Instant.EPOCH;
32+
3033
@PersistenceContext(unitName = "a2a-java")
3134
EntityManager em;
3235

@@ -36,6 +39,8 @@ public PushNotificationConfig setInfo(String taskId, PushNotificationConfig noti
3639
// Ensure config has an ID - default to taskId if not provided (mirroring InMemoryPushNotificationConfigStore behavior)
3740
PushNotificationConfig.Builder builder = PushNotificationConfig.builder(notificationConfig);
3841
if (notificationConfig.id() == null || notificationConfig.id().isEmpty()) {
42+
// This means the taskId and configId are same. This will not allow having multiple configs for a single Task.
43+
// The configId is a required field in the spec and should not be empty
3944
builder.id(taskId);
4045
}
4146
notificationConfig = builder.build();
@@ -72,15 +77,61 @@ public PushNotificationConfig setInfo(String taskId, PushNotificationConfig noti
7277
@Override
7378
public ListTaskPushNotificationConfigResult getInfo(ListTaskPushNotificationConfigParams params) {
7479
String taskId = params.id();
75-
LOGGER.debug("Retrieving PushNotificationConfigs for Task '{}'", taskId);
80+
LOGGER.debug("Retrieving PushNotificationConfigs for Task '{}' with params: pageSize={}, pageToken={}",
81+
taskId, params.pageSize(), params.pageToken());
7682
try {
77-
List<JpaPushNotificationConfig> jpaConfigs = em.createQuery(
78-
"SELECT c FROM JpaPushNotificationConfig c WHERE c.id.taskId = :taskId",
79-
JpaPushNotificationConfig.class)
80-
.setParameter("taskId", taskId)
81-
.getResultList();
83+
StringBuilder queryBuilder = new StringBuilder("SELECT c FROM JpaPushNotificationConfig c WHERE c.id.taskId = :taskId");
84+
85+
if (params.pageToken() != null && !params.pageToken().isEmpty()) {
86+
String[] tokenParts = params.pageToken().split(":", 2);
87+
if (tokenParts.length == 2) {
88+
// Keyset pagination: get tasks where timestamp < tokenTimestamp OR (timestamp = tokenTimestamp AND id > tokenId)
89+
// All tasks have timestamps (TaskStatus canonical constructor ensures this)
90+
queryBuilder.append(" AND (COALESCE(c.createdAt, :nullSentinel) < :tokenTimestamp OR (COALESCE(c.createdAt, :nullSentinel) = :tokenTimestamp AND c.id.configId > :tokenId))");
91+
} else {
92+
// Based on the comments in the test case, if the pageToken is invalid start from the beginning.
93+
}
94+
}
95+
96+
queryBuilder.append(" ORDER BY COALESCE(c.createdAt, :nullSentinel) DESC, c.id.configId ASC");
97+
98+
TypedQuery<JpaPushNotificationConfig> query = em.createQuery(queryBuilder.toString(), JpaPushNotificationConfig.class);
99+
query.setParameter("taskId", taskId);
100+
query.setParameter("nullSentinel", NULL_TIMESTAMP_SENTINEL);
101+
102+
if (params.pageToken() != null && !params.pageToken().isEmpty()) {
103+
String[] tokenParts = params.pageToken().split(":", 2);
104+
if (tokenParts.length == 2) {
105+
try {
106+
long timestampMillis = Long.parseLong(tokenParts[0]);
107+
String tokenId = tokenParts[1];
108+
109+
Instant tokenTimestamp = Instant.ofEpochMilli(timestampMillis);
110+
query.setParameter("tokenTimestamp", tokenTimestamp);
111+
query.setParameter("tokenId", tokenId);
112+
} catch (NumberFormatException e) {
113+
// Malformed timestamp in pageToken
114+
throw new io.a2a.spec.InvalidParamsError(null,
115+
"Invalid pageToken format: timestamp must be numeric milliseconds", null);
116+
}
117+
}
118+
}
119+
120+
int pageSize = params.getEffectivePageSize();
121+
query.setMaxResults(pageSize + 1);
122+
List<JpaPushNotificationConfig> jpaConfigsPage = query.getResultList();
123+
124+
String nextPageToken = null;
125+
if (jpaConfigsPage.size() > pageSize) {
126+
// There are more results than the page size, and in this case, a nextToken should be created with the last item.
127+
// Format: "timestamp_millis:taskId" for keyset pagination
128+
jpaConfigsPage = jpaConfigsPage.subList(0, pageSize);
129+
JpaPushNotificationConfig lastConfig = jpaConfigsPage.get(jpaConfigsPage.size() - 1);
130+
Instant timestamp = lastConfig.getCreatedAt() != null ? lastConfig.getCreatedAt() : NULL_TIMESTAMP_SENTINEL;
131+
nextPageToken = timestamp.toEpochMilli() + ":" + lastConfig.getId().getConfigId();
132+
}
82133

83-
List<PushNotificationConfig> configs = jpaConfigs.stream()
134+
List<PushNotificationConfig> configs = jpaConfigsPage.stream()
84135
.map(jpaConfig -> {
85136
try {
86137
return jpaConfig.getConfig();
@@ -95,57 +146,17 @@ public ListTaskPushNotificationConfigResult getInfo(ListTaskPushNotificationConf
95146

96147
LOGGER.debug("Successfully retrieved {} PushNotificationConfigs for Task '{}'", configs.size(), taskId);
97148

98-
// Handle pagination
99-
if (configs.isEmpty()) {
100-
return new ListTaskPushNotificationConfigResult(Collections.emptyList());
101-
}
102-
103-
if (params.pageSize() <= 0) {
104-
return new ListTaskPushNotificationConfigResult(convertPushNotificationConfig(configs, params), null);
105-
}
106-
107-
// Apply pageToken filtering if provided
108-
List<PushNotificationConfig> paginatedConfigs = configs;
109-
if (params.pageToken() != null && !params.pageToken().isBlank()) {
110-
int index = findFirstIndex(configs, params.pageToken());
111-
if (index < configs.size()) {
112-
paginatedConfigs = configs.subList(index, configs.size());
113-
}
114-
}
115-
116-
// Apply page size limit
117-
if (paginatedConfigs.size() <= params.pageSize()) {
118-
return new ListTaskPushNotificationConfigResult(convertPushNotificationConfig(paginatedConfigs, params), null);
119-
}
149+
List<TaskPushNotificationConfig> taskPushNotificationConfigs = configs.stream()
150+
.map(config -> new TaskPushNotificationConfig(params.id(), config, params.tenant()))
151+
.collect(Collectors.toList());
120152

121-
String nextToken = paginatedConfigs.get(params.pageSize()).token();
122-
return new ListTaskPushNotificationConfigResult(
123-
convertPushNotificationConfig(paginatedConfigs.subList(0, params.pageSize()), params),
124-
nextToken);
153+
return new ListTaskPushNotificationConfigResult(taskPushNotificationConfigs, nextPageToken);
125154
} catch (Exception e) {
126155
LOGGER.error("Failed to retrieve PushNotificationConfigs for Task '{}'", taskId, e);
127156
throw e;
128157
}
129158
}
130159

131-
private int findFirstIndex(List<PushNotificationConfig> configs, String token) {
132-
for (int i = 0; i < configs.size(); i++) {
133-
if (token.equals(configs.get(i).token())) {
134-
return i;
135-
}
136-
}
137-
return configs.size();
138-
}
139-
140-
private List<TaskPushNotificationConfig> convertPushNotificationConfig(List<PushNotificationConfig> pushNotificationConfigList, ListTaskPushNotificationConfigParams params) {
141-
List<TaskPushNotificationConfig> taskPushNotificationConfigList = new ArrayList<>(pushNotificationConfigList.size());
142-
for (PushNotificationConfig pushNotificationConfig : pushNotificationConfigList) {
143-
TaskPushNotificationConfig taskPushNotificationConfig = new TaskPushNotificationConfig(params.id(), pushNotificationConfig, params.tenant());
144-
taskPushNotificationConfigList.add(taskPushNotificationConfig);
145-
}
146-
return taskPushNotificationConfigList;
147-
}
148-
149160
@Transactional
150161
@Override
151162
public void deleteInfo(String taskId, String configId) {

extras/push-notification-config-store-database-jpa/src/main/java/io/a2a/extras/pushnotificationconfigstore/database/jpa/JpaPushNotificationConfig.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
import jakarta.persistence.Column;
44
import jakarta.persistence.EmbeddedId;
55
import jakarta.persistence.Entity;
6+
import jakarta.persistence.PrePersist;
67
import jakarta.persistence.Table;
78
import jakarta.persistence.Transient;
89

910
import io.a2a.jsonrpc.common.json.JsonProcessingException;
1011
import io.a2a.jsonrpc.common.json.JsonUtil;
1112
import io.a2a.spec.PushNotificationConfig;
13+
import java.time.Instant;
1214

1315
@Entity
1416
@Table(name = "a2a_push_notification_configs")
@@ -19,6 +21,9 @@ public class JpaPushNotificationConfig {
1921
@Column(name = "task_data", columnDefinition = "TEXT", nullable = false)
2022
private String configJson;
2123

24+
@Column(name = "created_at")
25+
private Instant createdAt;
26+
2227
@Transient
2328
private PushNotificationConfig config;
2429

@@ -31,6 +36,12 @@ public JpaPushNotificationConfig(TaskConfigId id, String configJson) {
3136
this.configJson = configJson;
3237
}
3338

39+
@PrePersist
40+
protected void onCreate() {
41+
if (createdAt == null) {
42+
createdAt = Instant.now();
43+
}
44+
}
3445

3546
public TaskConfigId getId() {
3647
return id;
@@ -60,6 +71,14 @@ public void setConfig(PushNotificationConfig config) throws JsonProcessingExcept
6071
this.config = config;
6172
}
6273

74+
public Instant getCreatedAt() {
75+
return createdAt;
76+
}
77+
78+
public void setCreatedAt(Instant createdAt) {
79+
this.createdAt = createdAt;
80+
}
81+
6382
static JpaPushNotificationConfig createFromConfig(String taskId, PushNotificationConfig config) throws JsonProcessingException {
6483
String json = JsonUtil.toJson(config);
6584
JpaPushNotificationConfig jpaPushNotificationConfig =

0 commit comments

Comments
 (0)