Skip to content

Commit d0388c6

Browse files
feat(services): upload strategy internal internal handler (#190)
* feat(upload): add StreamUploadHandler interface for clean upload workflow * feat(upload): add CatalogStreamUploadHandler with 3-step workflow * refactor(queue): add StreamUploadHandler constructor with single-path flushAndPush - Add handler-based constructor to StreamDocumentUploadQueue - Add no-arg constructor for test compatibility - Extract clearQueue() private method - Add flushAndPush() method with handler delegation - Maintain backward compatibility with uploader-based path * refactor(service): wire UpdateStreamService with handler-based queue - Remove fileContainer management from UpdateStreamService - Remove fileContainer management from UpdateStreamServiceInternal - Create CatalogStreamUploadHandler in UpdateStreamService constructor - Use handler-based queue factory method forStreamSource() - Delegate document operations to queue instead of managing containers - Simplify close() to call queue.flushAndPush() directly - Handler now owns file container lifecycle (separation of concerns) * test(upload): update queue and service tests for handler-based architecture - Update StreamDocumentUploadQueueTest: mock UploadStrategy<StreamUpdate> - Add test for handler-based constructor path - Update UpdateStreamServiceInternalTest: remove file container tests - Remove obsolete tests for createUploadAndPush() method - Update tests to validate queue delegation pattern - All 154 tests pass with new handler-based queue architecture * feat(upload): add StreamUploadHandler interface for clean upload workflow * feat(upload): add StreamUploadHandler interface for clean upload workflow * remove flushAndPush * add back public class * remove unused import * lint * lint * lint * fix last response issue --------- Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
1 parent 8a6116d commit d0388c6

16 files changed

Lines changed: 1153 additions & 133 deletions

CONFIGURATION.md

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
# Configuration Guide
2+
3+
This document describes the available configuration options for the Coveo Push API Java Client.
4+
5+
## Batch Size Configuration
6+
7+
The batch size controls how much data is accumulated before creating a file container and pushing to Coveo. The default is **5 MB**. The maximum allowed is **256 MB** (Stream API limit).
8+
9+
### Configuration Methods
10+
11+
There are two ways to configure the batch size:
12+
13+
#### 1. System Property (Runtime Configuration)
14+
15+
Set the `coveo.push.batchSize` system property to configure the default batch size globally for all service instances:
16+
17+
**Java Command Line:**
18+
19+
```bash
20+
java -Dcoveo.push.batchSize=134217728 -jar your-application.jar
21+
```
22+
23+
**Within Java Code:**
24+
25+
```java
26+
// Set before creating any service instances
27+
System.setProperty("coveo.push.batchSize", "134217728"); // 128 MB in bytes
28+
```
29+
30+
**Maven/Gradle Build:**
31+
32+
```xml
33+
<!-- pom.xml -->
34+
<properties>
35+
<argLine>-Dcoveo.push.batchSize=134217728</argLine>
36+
</properties>
37+
```
38+
39+
```groovy
40+
// build.gradle
41+
test {
42+
systemProperty 'coveo.push.batchSize', '134217728'
43+
}
44+
```
45+
46+
**Example Values:**
47+
48+
- `5242880` = 5 MB (default)
49+
- `268435456` = 256 MB (maximum)
50+
- `134217728` = 128 MB
51+
- `67108864` = 64 MB
52+
- `33554432` = 32 MB
53+
- `10485760` = 10 MB
54+
55+
#### 2. Constructor Parameter (Per-Instance Configuration)
56+
57+
Pass the `maxQueueSize` parameter when creating service instances:
58+
59+
```java
60+
// UpdateStreamService with custom 128 MB batch size
61+
UpdateStreamService service = new UpdateStreamService(
62+
catalogSource,
63+
backoffOptions,
64+
null, // userAgents (optional)
65+
128 * 1024 * 1024 // 128 MB in bytes
66+
);
67+
68+
// PushService with custom batch size
69+
PushService pushService = new PushService(
70+
pushEnabledSource,
71+
backoffOptions,
72+
128 * 1024 * 1024 // 128 MB
73+
);
74+
75+
// StreamService with custom batch size
76+
StreamService streamService = new StreamService(
77+
streamEnabledSource,
78+
backoffOptions,
79+
null, // userAgents (optional)
80+
128 * 1024 * 1024 // 128 MB
81+
);
82+
```
83+
84+
### Configuration Priority
85+
86+
When both methods are used:
87+
88+
1. **Constructor parameter** takes precedence (if specified)
89+
2. **System property** is used as default (if set)
90+
3. **Built-in default** of 5 MB is used otherwise
91+
92+
### Validation Rules
93+
94+
All batch size values are validated:
95+
96+
-**Maximum:** 256 MB (268,435,456 bytes) - API limit
97+
-**Minimum:** Greater than 0
98+
- ❌ Values exceeding 256 MB will throw `IllegalArgumentException`
99+
- ❌ Invalid or negative values will throw `IllegalArgumentException`
100+
101+
### Examples
102+
103+
#### Example 1: Using System Property
104+
105+
```java
106+
// Configure globally via system property
107+
System.setProperty("coveo.push.batchSize", "134217728"); // 128 MB
108+
109+
// All services will use 128 MB by default
110+
UpdateStreamService updateService = new UpdateStreamService(catalogSource, backoffOptions);
111+
PushService pushService = new PushService(pushEnabledSource, backoffOptions);
112+
StreamService streamService = new StreamService(streamEnabledSource, backoffOptions);
113+
```
114+
115+
#### Example 2: Override Per Service
116+
117+
```java
118+
// Set global default to 128 MB
119+
System.setProperty("coveo.push.batchSize", "134217728");
120+
121+
// Update service uses global default (128 MB)
122+
UpdateStreamService updateService = new UpdateStreamService(catalogSource, backoffOptions);
123+
124+
// Push service overrides with 64 MB
125+
PushService pushService = new PushService(pushEnabledSource, backoffOptions, 64 * 1024 * 1024);
126+
127+
// Stream service uses global default (128 MB)
128+
StreamService streamService = new StreamService(streamEnabledSource, backoffOptions);
129+
```
130+
131+
### When to Adjust Batch Size
132+
133+
**Use smaller batches (32-64 MB) when:**
134+
135+
- Network bandwidth is limited
136+
- Memory is constrained
137+
- Processing many small documents
138+
- You want more frequent progress updates
139+
140+
**Use larger batches (128-256 MB) when:**
141+
142+
- Network bandwidth is high
143+
- Processing large documents or files
144+
- You want to minimize API calls
145+
- Maximum throughput is needed
146+
147+
**Keep default (5 MB) when:**
148+
149+
- You're unsure
150+
- Memory is a concern
151+
- You want predictable, frequent pushes
152+
153+
### Configuration Property Reference
154+
155+
| Property Name | Description | Default Value | Valid Range |
156+
| ---------------------- | --------------------------- | ---------------- | -------------- |
157+
| `coveo.push.batchSize` | Default batch size in bytes | `5242880` (5 MB) | 1 to 268435456 |
158+
159+
## Additional Configuration
160+
161+
### Environment Variables
162+
163+
The following environment variables can be used for general configuration:
164+
165+
- `COVEO_API_KEY` - API key for authentication
166+
- `COVEO_ORGANIZATION_ID` - Organization identifier
167+
- `COVEO_PLATFORM_URL` - Custom platform URL (if needed)
168+
169+
Refer to the Coveo Platform documentation for complete environment configuration options.

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,14 @@ public class PushOneDocument {
200200
}
201201
```
202202

203+
## Configuration
204+
205+
### Batch Size Configuration
206+
207+
The SDK uses a default batch size of **5 MB** before automatically creating a file container and pushing documents. The maximum allowed batch size is **256 MB** (matching the Coveo Stream API limit). You can configure this globally via system property or per-service via constructor.
208+
209+
For complete configuration details, examples, and best practices, see **[CONFIGURATION.md](CONFIGURATION.md)**.
210+
203211
### Exponential Backoff Retry Configuration
204212

205213
By default, the SDK leverages an exponential backoff retry mechanism. Exponential backoff allows for the SDK to make multiple attempts to resolve throttled requests, increasing the amount of time to wait for each subsequent attempt. Outgoing requests will retry when a `429` status code is returned from the platform.

samples/ConfigureBatchSize.class

2.06 KB
Binary file not shown.

samples/ConfigureBatchSize.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import com.coveo.pushapiclient.*;
2+
import com.coveo.pushapiclient.exceptions.NoOpenFileContainerException;
3+
4+
import java.io.IOException;
5+
6+
/**
7+
* Demonstrates how to configure the batch size for document uploads.
8+
*
9+
* The batch size controls how much data accumulates before automatically
10+
* creating a file container and pushing to Coveo. Default is 5 MB, max is 256 MB.
11+
*/
12+
public class ConfigureBatchSize {
13+
14+
public static void main(String[] args) throws IOException, InterruptedException, NoOpenFileContainerException {
15+
16+
PlatformUrl platformUrl = new PlatformUrlBuilder()
17+
.withEnvironment(Environment.PRODUCTION)
18+
.withRegion(Region.US)
19+
.build();
20+
21+
CatalogSource catalogSource = CatalogSource.fromPlatformUrl(
22+
"my_api_key", "my_org_id", "my_source_id", platformUrl);
23+
24+
// Option 1: Use default batch size (5 MB)
25+
// This creates an UpdateStreamService with the built-in 5 MB limit
26+
UpdateStreamService defaultService = new UpdateStreamService(catalogSource);
27+
28+
// Option 2: Configure batch size via constructor (50 MB)
29+
// Pass the custom batch size directly as an integer parameter
30+
int fiftyMegabytes = 50 * 1024 * 1024;
31+
UpdateStreamService customService = new UpdateStreamService(
32+
catalogSource,
33+
new BackoffOptionsBuilder().build(),
34+
null,
35+
fiftyMegabytes);
36+
37+
// Option 3: Configure globally via system property (affects all services)
38+
// Run with: java -Dcoveo.push.batchSize=52428800 ConfigureBatchSize
39+
// This sets 50 MB for all service instances that don't specify a size
40+
// This approach allows configuration at runtime without code changes
41+
42+
// Use the service
43+
DocumentBuilder document = new DocumentBuilder("https://my.document.uri", "My document title")
44+
.withData("these words will be searchable");
45+
46+
customService.addOrUpdate(document);
47+
customService.close();
48+
}
49+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package com.coveo.pushapiclient;
2+
3+
import com.google.gson.Gson;
4+
import java.io.IOException;
5+
import java.net.http.HttpResponse;
6+
import org.apache.logging.log4j.LogManager;
7+
import org.apache.logging.log4j.Logger;
8+
9+
class CatalogStreamUploadHandler implements StreamUploadHandler {
10+
private static final Logger logger = LogManager.getLogger(CatalogStreamUploadHandler.class);
11+
private final StreamEnabledSource source;
12+
private final PlatformClient platformClient;
13+
14+
CatalogStreamUploadHandler(StreamEnabledSource source, PlatformClient platformClient) {
15+
this.source = source;
16+
this.platformClient = platformClient;
17+
}
18+
19+
@Override
20+
public HttpResponse<String> uploadAndPush(StreamUpdate stream)
21+
throws IOException, InterruptedException {
22+
// Step 1: Create file container
23+
logger.debug("Creating file container for stream upload");
24+
HttpResponse<String> containerResponse = platformClient.createFileContainer();
25+
FileContainer container = new Gson().fromJson(containerResponse.body(), FileContainer.class);
26+
27+
// Step 2: Upload content to container
28+
String batchUpdateJson = new Gson().toJson(stream.marshal());
29+
logger.debug("Uploading stream content to file container: {}", container.fileId);
30+
platformClient.uploadContentToFileContainer(container, batchUpdateJson);
31+
32+
// Step 3: Push container to stream source
33+
logger.info("Pushing file container to stream source: {}", source.getId());
34+
return platformClient.pushFileContainerContentToStreamSource(source.getId(), container);
35+
}
36+
}

src/main/java/com/coveo/pushapiclient/PushService.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,32 @@ public PushService(PushEnabledSource source) {
1414
}
1515

1616
public PushService(PushEnabledSource source, BackoffOptions options) {
17+
this(source, options, DocumentUploadQueue.getConfiguredBatchSize());
18+
}
19+
20+
/**
21+
* Creates a new PushService with configurable batch size.
22+
*
23+
* <p>Example batch sizes in bytes:
24+
*
25+
* <ul>
26+
* <li>5 MB (default): {@code 5 * 1024 * 1024} = {@code 5242880}
27+
* <li>50 MB: {@code 50 * 1024 * 1024} = {@code 52428800}
28+
* <li>256 MB (max): {@code 256 * 1024 * 1024} = {@code 268435456}
29+
* </ul>
30+
*
31+
* @param source The source to push documents to.
32+
* @param options The configuration options for exponential backoff.
33+
* @param maxQueueSize The maximum batch size in bytes before auto-flushing (default: 5MB, max:
34+
* 256MB).
35+
* @throws IllegalArgumentException if maxQueueSize exceeds 256MB or is not positive.
36+
*/
37+
public PushService(PushEnabledSource source, BackoffOptions options, int maxQueueSize) {
1738
String apiKey = source.getApiKey();
1839
String organizationId = source.getOrganizationId();
1940
PlatformUrl platformUrl = source.getPlatformUrl();
2041
UploadStrategy uploader = this.getUploadStrategy();
21-
DocumentUploadQueue queue = new DocumentUploadQueue(uploader);
42+
DocumentUploadQueue queue = new DocumentUploadQueue(uploader, maxQueueSize);
2243

2344
this.platformClient = new PlatformClient(apiKey, organizationId, platformUrl, options);
2445
this.service = new PushServiceInternal(queue);

src/main/java/com/coveo/pushapiclient/StreamDocumentUploadQueue.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
11
package com.coveo.pushapiclient;
22

33
import java.io.IOException;
4+
import java.net.http.HttpResponse;
45
import java.util.ArrayList;
56
import org.apache.logging.log4j.LogManager;
67
import org.apache.logging.log4j.Logger;
78

89
public class StreamDocumentUploadQueue extends DocumentUploadQueue {
910

1011
private static final Logger logger = LogManager.getLogger(StreamDocumentUploadQueue.class);
12+
private StreamUploadHandler streamHandler;
1113
protected ArrayList<PartialUpdateDocument> documentToPartiallyUpdateList;
14+
private HttpResponse<String> lastResponse;
1215

13-
public StreamDocumentUploadQueue(UploadStrategy uploader) {
14-
super(uploader);
16+
public StreamDocumentUploadQueue(StreamUploadHandler handler, int maxQueueSize) {
17+
super(null, maxQueueSize);
18+
this.streamHandler = handler;
1519
this.documentToPartiallyUpdateList = new ArrayList<>();
1620
}
1721

@@ -25,13 +29,19 @@ public StreamDocumentUploadQueue(UploadStrategy uploader) {
2529
public void flush() throws IOException, InterruptedException {
2630
if (this.isEmpty()) {
2731
logger.debug("Empty batch. Skipping upload");
32+
this.lastResponse = null;
2833
return;
2934
}
3035
// TODO: LENS-871: support concurrent requests
3136
StreamUpdate stream = this.getStream();
3237
logger.info("Uploading document Stream");
33-
this.uploader.apply(stream);
3438

39+
this.lastResponse = this.streamHandler.uploadAndPush(stream);
40+
41+
clearQueue();
42+
}
43+
44+
private void clearQueue() {
3545
this.size = 0;
3646
this.documentToAddList.clear();
3747
this.documentToDeleteList.clear();
@@ -78,4 +88,13 @@ public BatchUpdate getBatch() {
7888
public boolean isEmpty() {
7989
return super.isEmpty() && documentToPartiallyUpdateList.isEmpty();
8090
}
91+
92+
/**
93+
* Returns the HTTP response from the last flush operation.
94+
*
95+
* @return The last response, or null if no flush has occurred or queue was empty.
96+
*/
97+
HttpResponse<String> getLastResponse() {
98+
return this.lastResponse;
99+
}
81100
}

0 commit comments

Comments
 (0)