Skip to content

Commit 7d172ad

Browse files
feat(LENS-1390): add support for incremental catalog updates in Java SDK (#120)
1 parent 2048a32 commit 7d172ad

7 files changed

Lines changed: 414 additions & 23 deletions

File tree

samples/StreamDocuments.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public static void main(String[] args) throws IOException, InterruptedException,
1313

1414
// Using the Stream Service will act as a source rebuild, therefore any currently indexed items not contained in the payload will be deleted.
1515
StreamService streamService = new StreamService(catalogSource);
16-
// To perform full document updates, use the PushService instead.
16+
// To perform full document updates, use the UpdateStreamService instead.
1717
// For more info, visit: https://docs.coveo.com/en/l62e0540/coveo-for-commerce/how-to-update-your-catalog#full-document-updates
1818

1919
DocumentBuilder document1 = new DocumentBuilder("https://my.document.uri", "My document title")

samples/UpdateStreamDocuments.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import com.coveo.pushapiclient.*;
2+
import com.coveo.pushapiclient.exceptions.NoOpenStreamException;
3+
4+
import java.io.IOException;
5+
import java.util.HashMap;
6+
7+
public class StreamDocuments {
8+
9+
public static void main(String[] args) throws IOException, InterruptedException, NoOpenStreamException {
10+
11+
PlatformUrl platformUrl = new PlatformUrlBuilder().withEnvironment(Environment.PRODUCTION).withRegion(Region.US).build();
12+
CatalogSource catalogSource = CatalogSource.fromPlatformUrl("my_api_key","my_org_id","my_source_id", platformUrl);
13+
14+
// Using the Update Stream Service will act as an incremental change to the index, therefore any currently indexed items not contained in the payload will remain.
15+
UpdateStreamService updateStream = new UpdateStreamService(catalogSource);
16+
// To perform full index rebuild, use the StreamService instead.
17+
18+
DocumentBuilder document1 = new DocumentBuilder("https://my.document.uri", "My document title")
19+
.withData("these words will be searchable")
20+
.withAuthor("bob")
21+
.withClickableUri("https://my.document.click.com")
22+
.withFileExtension(".html")
23+
.withMetadata(new HashMap<>() {{
24+
put("tags", new String[]{"the_first_tag", "the_second_tag"});
25+
put("version", 1);
26+
put("somekey", "somevalue");
27+
}});
28+
29+
updateStreamService.addOrUpdate(document1);
30+
31+
DocumentBuilder document2 = new DocumentBuilder("https://my.document2.uri", "My document2 title");
32+
updateStreamService.addOrUpdate(document2);
33+
34+
DeleteDocument document3 = new DeleteDocument("https://my.document3.uri");
35+
updateStreamService.delete(document3);
36+
37+
updateStreamService.close();
38+
}
39+
}

src/main/java/com/coveo/pushapiclient/StreamService.java

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@ public class StreamService {
1818
* Creates a service to stream your documents to the provided source by interacting with the
1919
* Stream API.
2020
*
21-
* <p>To perform <a href="https://docs.coveo.com/en/l62e0540">full document updates</a>, use the
22-
* {@PushService}, since pushing documents with the {@StreamService} is equivalent to triggering a
23-
* full source rebuild. The {@StreamService} can also be used for an initial catalog upload.
21+
* <p>To perform <a href="https://docs.coveo.com/en/l62e0540">full document updates or
22+
* deletions</a>, use the {@UpdateStreamService}, since pushing documents with the
23+
* {@StreamService} is equivalent to triggering a full source rebuild. The {@StreamService} can
24+
* also be used for an initial catalog upload.
2425
*
2526
* @param source The source to which you want to send your documents.
2627
*/
@@ -32,9 +33,10 @@ public StreamService(StreamEnabledSource source) {
3233
* Creates a service to stream your documents to the provided source by interacting with the
3334
* Stream API.
3435
*
35-
* <p>To perform <a href="https://docs.coveo.com/en/l62e0540">full document updates</a>, use the
36-
* {@PushService}, since pushing documents with the {@StreamService} is equivalent to triggering a
37-
* full source rebuild. The {@StreamService} can also be used for an initial catalog upload.
36+
* <p>To perform <a href="https://docs.coveo.com/en/l62e0540">full document updates or
37+
* deletions</a>, use the {@UpdateStreamService}, since pushing documents with the
38+
* {@StreamService} is equivalent to triggering a full source rebuild. The {@StreamService} can
39+
* also be used for an initial catalog upload.
3840
*
3941
* @param source The source to which you want to send your documents.
4042
* @param options The configuration options for exponential backoff.
@@ -54,33 +56,30 @@ public StreamService(StreamEnabledSource source, BackoffOptions options) {
5456
}
5557

5658
/**
57-
* Adds documents to the previously specified source.
58-
* This function will open a stream before uploading documents into it.
59+
* Adds documents to the previously specified source. This function will open a stream before
60+
* uploading documents into it.
5961
*
60-
* <p>
61-
* If called several times, the service will automatically batch documents and
62-
* create new stream chunks whenever the data payload exceeds the
63-
* <a href="https://docs.coveo.com/en/lb4a0344#stream-api-limits">batch size limit</a>
64-
* set for the Stream API.
62+
* <p>If called several times, the service will automatically batch documents and create new
63+
* stream chunks whenever the data payload exceeds the <a
64+
* href="https://docs.coveo.com/en/lb4a0344#stream-api-limits">batch size limit</a> set for the
65+
* Stream API.
6566
*
66-
* <p>
67-
* Once there are no more documents to add, it is important to call the {@link StreamService#close} function
68-
* in order to send any buffered documents and close the open stream.
69-
* Otherwise, changes will not be reflected in the index.
67+
* <p>Once there are no more documents to add, it is important to call the {@link
68+
* StreamService#close} function in order to send any buffered documents and close the open
69+
* stream. Otherwise, changes will not be reflected in the index.
7070
*
7171
* <p>
72-
* <pre>
73-
* {@code
72+
*
73+
* <pre>{@code
7474
* //...
7575
* StreamService service = new StreamService(source));
7676
* for (DocumentBuilder document : fictionalDocumentList) {
7777
* service.add(document);
7878
* }
7979
* service.close(document);
80-
* </pre>
80+
* }</pre>
8181
*
82-
* <p>
83-
* For more code samples, @see `samples/StreamDocuments.java`
82+
* <p>For more code samples, @see `samples/StreamDocuments.java`
8483
*
8584
* @param document The documentBuilder to add to your source
8685
* @throws InterruptedException
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package com.coveo.pushapiclient;
2+
3+
import com.coveo.pushapiclient.exceptions.NoOpenFileContainerException;
4+
import com.google.gson.Gson;
5+
import java.io.IOException;
6+
import java.net.http.HttpResponse;
7+
import org.apache.logging.log4j.LogManager;
8+
import org.apache.logging.log4j.Logger;
9+
10+
public class UpdateStreamService {
11+
12+
private final PlatformClient platformClient;
13+
private final UpdateStreamServiceInternal updateStreamServiceInternal;
14+
15+
private FileContainer fileContainer;
16+
17+
/**
18+
* Creates a service to stream your documents to the provided source by interacting with the
19+
* Stream API. This provides the ability to incrementally add, update, or delete documents via a
20+
* stream.
21+
*
22+
* <p>To perform <a href="https://docs.coveo.com/en/lb4a0344">a full source rebuild</a>, use the
23+
* {@StreamService}
24+
*
25+
* @param source The source to which you want to send your documents.
26+
*/
27+
public UpdateStreamService(StreamEnabledSource source) {
28+
this(source, new BackoffOptionsBuilder().build());
29+
}
30+
31+
/**
32+
* Creates a service to stream your documents to the provided source by interacting with the
33+
* Stream API. This provides the ability to incrementally add, update, or delete documents via a
34+
* stream.
35+
*
36+
* <p>To perform <a href="https://docs.coveo.com/en/lb4a0344">a full source rebuild</a>, use the
37+
* {@StreamService}
38+
*
39+
* @param source The source to which you want to send your documents.
40+
* @param options The configuration options for exponential backoff.
41+
*/
42+
public UpdateStreamService(StreamEnabledSource source, BackoffOptions options) {
43+
Logger logger = LogManager.getLogger(UpdateStreamService.class);
44+
this.platformClient =
45+
new PlatformClient(
46+
source.getApiKey(), source.getOrganizationId(), source.getPlatformUrl(), options);
47+
this.updateStreamServiceInternal =
48+
new UpdateStreamServiceInternal(
49+
source, new DocumentUploadQueue(this.getUploadStrategy()), this.platformClient, logger);
50+
}
51+
52+
/**
53+
* Adds documents to an open file container be created or updated. If there is no file container
54+
* open to receive the documents, this function will open a file container before uploading
55+
* documents into it.
56+
*
57+
* <p>If called several times, the service will automatically batch documents and create new
58+
* stream chunks whenever the data payload exceeds the <a
59+
* href="https://docs.coveo.com/en/lb4a0344#stream-api-limits">batch size limit</a> set for the
60+
* Stream API.
61+
*
62+
* <p>Once there are no more documents to add, it is important to call the {@link
63+
* UpdateStreamService#close} function in order to send any buffered documents and push the file
64+
* container. Otherwise, changes will not be reflected in the index.
65+
*
66+
* <p>
67+
*
68+
* <pre>{@code
69+
* //...
70+
* UpdateStreamService service = new UpdateStreamService(source));
71+
* for (DocumentBuilder document : fictionalDocumentList) {
72+
* service.addOrUpdate(document);
73+
* }
74+
* service.close(document);
75+
* }</pre>
76+
*
77+
* <p>For more code samples, @see `samples/UpdateStreamDocuments.java`
78+
*
79+
* @param document The documentBuilder to push to your file container
80+
* @throws InterruptedException If the creation of the file container or adding the document is
81+
* interrupted.
82+
* @throws IOException If the creation of the file container or adding the document fails.
83+
*/
84+
public void addOrUpdate(DocumentBuilder document) throws IOException, InterruptedException {
85+
fileContainer = updateStreamServiceInternal.addOrUpdate(document);
86+
}
87+
88+
/**
89+
* Adds documents to an open file container be deleted. If there is no file container open to
90+
* receive the documents, this function will open a file container before uploading documents into
91+
* it.
92+
*
93+
* <p>If called several times, the service will automatically batch documents and create new
94+
* stream chunks whenever the data payload exceeds the <a
95+
* href="https://docs.coveo.com/en/lb4a0344#stream-api-limits">batch size limit</a> set for the
96+
* Stream API.
97+
*
98+
* <p>Once there are no more documents to add, it is important to call the {@link
99+
* UpdateStreamService#close} function in order to send any buffered documents and push the file
100+
* container. Otherwise, changes will not be reflected in the index.
101+
*
102+
* <p>
103+
*
104+
* <pre>{@code
105+
* //...
106+
* UpdateStreamService service = new UpdateStreamService(source));
107+
* for (DeleteDocument document : fictionalDocumentList) {
108+
* service.delete(document);
109+
* }
110+
* service.close(document);
111+
* }</pre>
112+
*
113+
* <p>For more code samples, @see `samples/UpdateStreamDocuments.java`
114+
*
115+
* @param document The deleteDocument to push to your file container
116+
* @throws InterruptedException If the creation of the file container or adding the document is
117+
* interrupted.
118+
* @throws IOException If the creation of the file container or adding the document fails.
119+
*/
120+
public void delete(DeleteDocument document) throws IOException, InterruptedException {
121+
fileContainer = updateStreamServiceInternal.delete(document);
122+
}
123+
124+
/**
125+
* Sends any buffered documents and <a
126+
* href="https://docs.coveo.com/en/l62e0540/how-to-update-your-catalog#step-3-send-the-file-container-to-update-your-catalog">pushes
127+
* the file container</a>.
128+
*
129+
* <p>Upon invoking this method, any documents added to the file container will be pushed and
130+
* indexed.
131+
*
132+
* @return The HttpResponse from the platform.
133+
* @throws IOException If the pushing file container failed.
134+
* @throws InterruptedException If the pushing file container is interrupted.
135+
* @throws NoOpenFileContainerException If there is no open file container to push.
136+
*/
137+
public HttpResponse<String> close()
138+
throws IOException, InterruptedException, NoOpenFileContainerException {
139+
return updateStreamServiceInternal.close();
140+
}
141+
142+
private UploadStrategy getUploadStrategy() {
143+
return (batchUpdate) -> {
144+
String batchUpdateJson = new Gson().toJson(batchUpdate.marshal());
145+
return this.platformClient.uploadContentToFileContainer(fileContainer, batchUpdateJson);
146+
};
147+
}
148+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package com.coveo.pushapiclient;
2+
3+
import com.coveo.pushapiclient.exceptions.NoOpenFileContainerException;
4+
import com.google.gson.Gson;
5+
import java.io.IOException;
6+
import java.net.http.HttpResponse;
7+
import org.apache.logging.log4j.Logger;
8+
9+
/** For internal use only. Made to easily test the service without having to use PowerMock */
10+
class UpdateStreamServiceInternal {
11+
private final Logger logger;
12+
private final StreamEnabledSource source;
13+
private final PlatformClient platformClient;
14+
private final DocumentUploadQueue queue;
15+
private FileContainer fileContainer;
16+
17+
public UpdateStreamServiceInternal(
18+
final StreamEnabledSource source,
19+
final DocumentUploadQueue queue,
20+
final PlatformClient platformClient,
21+
final Logger logger) {
22+
this.source = source;
23+
this.queue = queue;
24+
this.platformClient = platformClient;
25+
this.logger = logger;
26+
}
27+
28+
public FileContainer addOrUpdate(DocumentBuilder document)
29+
throws IOException, InterruptedException {
30+
if (this.fileContainer == null) {
31+
this.fileContainer = this.createFileContainer();
32+
}
33+
queue.add(document);
34+
return this.fileContainer;
35+
}
36+
37+
public FileContainer delete(DeleteDocument document) throws IOException, InterruptedException {
38+
if (this.fileContainer == null) {
39+
this.fileContainer = this.createFileContainer();
40+
}
41+
queue.add(document);
42+
return this.fileContainer;
43+
}
44+
45+
public HttpResponse<String> close()
46+
throws IOException, InterruptedException, NoOpenFileContainerException {
47+
return this.pushFileContainer(this.getSourceId());
48+
}
49+
50+
private FileContainer createFileContainer() throws IOException, InterruptedException {
51+
this.logger.info("Creating new file container");
52+
HttpResponse<String> response = this.platformClient.createFileContainer();
53+
return new Gson().fromJson(response.body(), FileContainer.class);
54+
}
55+
56+
private HttpResponse<String> pushFileContainer(String sourceId)
57+
throws NoOpenFileContainerException, IOException, InterruptedException {
58+
if (this.fileContainer == null) {
59+
throw new NoOpenFileContainerException(
60+
"No open file container detected. A new container will automatically be created once you start adding, updating or deleting documents.");
61+
}
62+
queue.flush();
63+
this.logger.info("Pushing to file container " + this.fileContainer.fileId);
64+
return this.platformClient.pushFileContainerContentToStreamSource(sourceId, this.fileContainer);
65+
}
66+
67+
private String getSourceId() {
68+
return this.source.getId();
69+
}
70+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.coveo.pushapiclient.exceptions;
2+
3+
public class NoOpenFileContainerException extends Exception {
4+
public NoOpenFileContainerException(String errorMessage) {
5+
super(errorMessage);
6+
}
7+
}

0 commit comments

Comments
 (0)