Skip to content

Commit bb8265e

Browse files
fix: use prefix field instead of regex for subscription prefix filters (#378)
The addPrefix methods in SubscriptionFilter were incorrectly calling builder.setRegex() instead of builder.addPrefix(), causing prefix filters to be sent as regex matches on the wire. This could match streams that contain the prefix anywhere in the name rather than only at the start. Fixes #374 Co-authored-by: William Chong <william-chong@outlook.com>
1 parent 4b70fe4 commit bb8265e

2 files changed

Lines changed: 96 additions & 2 deletions

File tree

src/main/java/io/kurrent/dbclient/SubscriptionFilter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ Persistent.CreateReq.AllOptions.FilterOptions.Expression.Builder setRegEx(String
8181
}
8282

8383
void addPrefix(Persistent.CreateReq.AllOptions.FilterOptions.Expression.Builder builder, String prefix) {
84-
builder.setRegex(prefix);
84+
builder.addPrefix(prefix);
8585
}
8686

8787
void setNoFilter() {
@@ -133,7 +133,7 @@ StreamsOuterClass.ReadReq.Options.FilterOptions.Expression.Builder setRegEx(Stri
133133
}
134134

135135
void addPrefix(StreamsOuterClass.ReadReq.Options.FilterOptions.Expression.Builder builder, String prefix) {
136-
builder.setRegex(prefix);
136+
builder.addPrefix(prefix);
137137
}
138138

139139
void setNoFilter() {

src/test/java/io/kurrent/dbclient/streams/SubscriptionTests.java

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,100 @@ public void onCancelled(Subscription subscription, Throwable throwable) {
139139
subscription.stop();
140140
}
141141

142+
@Test
143+
@Timeout(value = 2, unit = TimeUnit.MINUTES)
144+
default void testSubscribeToAllByStreamPrefix() throws Throwable {
145+
String streamPrefix = generateName();
146+
String matchingStream = streamPrefix + "-matching";
147+
String eventType = generateName();
148+
int expectedCount = 5;
149+
150+
final CountDownLatch receivedEvents = new CountDownLatch(expectedCount);
151+
final AtomicInteger count = new AtomicInteger(0);
152+
153+
Subscription subscription = getDefaultClient().subscribeToAll(new SubscriptionListener() {
154+
@Override
155+
public void onEvent(Subscription subscription, ResolvedEvent event) {
156+
count.incrementAndGet();
157+
receivedEvents.countDown();
158+
}
159+
160+
@Override
161+
public void onCancelled(Subscription subscription, Throwable throwable) {
162+
if (throwable == null)
163+
return;
164+
165+
Assertions.fail(throwable);
166+
}
167+
}, SubscribeToAllOptions.get()
168+
.filter(SubscriptionFilter
169+
.newBuilder()
170+
.addStreamNamePrefix(streamPrefix)
171+
.build())).get();
172+
173+
getDefaultClient().appendToStream(matchingStream, generateEvents(expectedCount, eventType).iterator()).get();
174+
175+
receivedEvents.await();
176+
Assertions.assertEquals(expectedCount, count.get());
177+
subscription.stop();
178+
}
179+
180+
@Test
181+
@Timeout(value = 2, unit = TimeUnit.MINUTES)
182+
default void testSubscribeToAllByStreamPrefixExcludesNonPrefix() throws Throwable {
183+
String streamPrefix = generateName();
184+
String matchingStream = streamPrefix + "-matching";
185+
String nonMatchingStream = "not-" + streamPrefix;
186+
String eventType = generateName();
187+
int expectedCount = 5;
188+
189+
final CountDownLatch receivedEvents = new CountDownLatch(expectedCount);
190+
final AtomicInteger count = new AtomicInteger(0);
191+
final ArrayList<String> receivedStreamIds = new ArrayList<>();
192+
193+
Subscription subscription = getDefaultClient().subscribeToAll(new SubscriptionListener() {
194+
@Override
195+
public void onEvent(Subscription subscription, ResolvedEvent event) {
196+
synchronized (receivedStreamIds) {
197+
receivedStreamIds.add(event.getOriginalEvent().getStreamId());
198+
}
199+
count.incrementAndGet();
200+
receivedEvents.countDown();
201+
}
202+
203+
@Override
204+
public void onCancelled(Subscription subscription, Throwable throwable) {
205+
if (throwable == null)
206+
return;
207+
208+
Assertions.fail(throwable);
209+
}
210+
}, SubscribeToAllOptions.get()
211+
.filter(SubscriptionFilter
212+
.newBuilder()
213+
.addStreamNamePrefix(streamPrefix)
214+
.build())).get();
215+
216+
getDefaultClient().appendToStream(matchingStream, generateEvents(expectedCount, eventType).iterator()).get();
217+
getDefaultClient().appendToStream(nonMatchingStream, generateEvents(3, eventType).iterator()).get();
218+
219+
receivedEvents.await();
220+
221+
Thread.sleep(2000);
222+
223+
Assertions.assertEquals(expectedCount, count.get(),
224+
"Filter should only match streams starting with the prefix, not streams containing it");
225+
226+
synchronized (receivedStreamIds) {
227+
for (String streamId : receivedStreamIds) {
228+
Assertions.assertTrue(streamId.startsWith(streamPrefix),
229+
"Received event from stream '" + streamId + "' which does not start with prefix '" + streamPrefix + "'");
230+
}
231+
}
232+
233+
subscription.stop();
234+
}
235+
142236
@Test
143237
default void testCancellingSubscriptionShouldNotRaiseAnException() throws Throwable {
144238
KurrentDBClient client = getDefaultClient();

0 commit comments

Comments
 (0)