Skip to content

Commit 7e5948f

Browse files
Checkpoint 4
1 parent f21643e commit 7e5948f

10 files changed

Lines changed: 198 additions & 54 deletions

File tree

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@
103103
</dependency>
104104
<dependency>
105105
<groupId>org.geotools</groupId>
106-
<artifactId>gt-wps</artifactId>
106+
<artifactId>gt-wfs-ng</artifactId>
107107
<version>${org.geotools.version}</version>
108108
</dependency>
109109
<dependency>

server/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@
156156
</dependency>
157157
<dependency>
158158
<groupId>org.geotools</groupId>
159-
<artifactId>gt-wps</artifactId>
159+
<artifactId>gt-wfs-ng</artifactId>
160160
</dependency>
161161
<dependency>
162162
<groupId>org.geotools</groupId>

server/src/main/java/au/org/aodn/ogcapi/server/core/model/enumeration/ProcessIdEnum.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
public enum ProcessIdEnum {
77
DOWNLOAD_DATASET("download"),
88
DOWNLOAD_WFS_SSE("downloadWfs"),
9-
DOWNLOAD_WFS_SIZE("downloadWfsSize"),
9+
DOWNLOAD_WFS_ESTIMATE("estimateWfsDownload"),
1010
UNKNOWN("");
1111

1212
private final String value;

server/src/main/java/au/org/aodn/ogcapi/server/core/service/geoserver/wfs/DownloadWfsDataService.java

Lines changed: 118 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,24 @@
22

33
import au.org.aodn.ogcapi.server.core.model.ogc.FeatureRequest;
44
import au.org.aodn.ogcapi.server.core.util.DatetimeUtils;
5+
import jakarta.annotation.Nonnull;
56
import lombok.extern.slf4j.Slf4j;
7+
import net.opengis.wfs.FeatureCollectionType;
8+
import org.geotools.wfs.v1_1.WFSConfiguration;
9+
import org.geotools.xsd.Parser;
610
import org.springframework.beans.factory.annotation.Qualifier;
711
import org.springframework.beans.factory.annotation.Value;
8-
import org.springframework.http.HttpEntity;
9-
import org.springframework.http.HttpMethod;
12+
import org.springframework.http.*;
13+
import org.springframework.http.client.ClientHttpResponse;
1014
import org.springframework.stereotype.Service;
1115
import org.springframework.web.client.RestTemplate;
1216
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
1317

1418
import java.io.ByteArrayOutputStream;
19+
import java.io.IOException;
1520
import java.io.InputStream;
21+
import java.io.StringReader;
22+
import java.math.BigInteger;
1623
import java.util.*;
1724
import java.util.concurrent.atomic.AtomicBoolean;
1825

@@ -23,6 +30,52 @@ public class DownloadWfsDataService {
2330
private final RestTemplate restTemplate;
2431
private final HttpEntity<?> pretendUserEntity;
2532
private final int chunkSize;
33+
private static final WFSConfiguration CONFIG = new WFSConfiguration();
34+
/**
35+
* Some wfs request contains non standard minetype which is not allow by the default rest template
36+
*/
37+
static class WfsCustomResponseWrapper implements ClientHttpResponse {
38+
private final ClientHttpResponse delegate;
39+
40+
public WfsCustomResponseWrapper(ClientHttpResponse delegate) {
41+
this.delegate = delegate;
42+
}
43+
44+
@Override
45+
@Nonnull
46+
public HttpStatusCode getStatusCode() throws IOException {
47+
return delegate.getStatusCode();
48+
}
49+
50+
@Override
51+
@Nonnull
52+
public String getStatusText() throws IOException {
53+
return delegate.getStatusText();
54+
}
55+
56+
@Override
57+
public void close() {
58+
delegate.close();
59+
}
60+
61+
@Override
62+
@Nonnull
63+
public InputStream getBody() throws IOException {
64+
return delegate.getBody();
65+
}
66+
67+
@Override
68+
@Nonnull
69+
public HttpHeaders getHeaders() {
70+
HttpHeaders headers = new HttpHeaders();
71+
headers.putAll(delegate.getHeaders());
72+
String ct = headers.getFirst(HttpHeaders.CONTENT_TYPE);
73+
if (ct != null && ct.contains("subtype=")) {
74+
headers.set(HttpHeaders.CONTENT_TYPE, "text/xml");
75+
}
76+
return headers;
77+
}
78+
}
2679

2780
public DownloadWfsDataService(
2881
WfsServer wfsServer,
@@ -31,9 +84,18 @@ public DownloadWfsDataService(
3184
@Value("${app.sse.chunkSize:16384}") int chunkSize
3285
) {
3386
this.wfsServer = wfsServer;
34-
this.restTemplate = restTemplate;
3587
this.pretendUserEntity = pretendUserEntity;
3688
this.chunkSize = chunkSize;
89+
// We need a custom rest template in order to deal with some non-standard minetype from wfs
90+
RestTemplate clone = new RestTemplate(restTemplate.getRequestFactory());
91+
clone.setInterceptors(new ArrayList<>(restTemplate.getInterceptors()));
92+
clone.setMessageConverters(new ArrayList<>(restTemplate.getMessageConverters()));
93+
clone.setErrorHandler(restTemplate.getErrorHandler());
94+
clone.getInterceptors().add((request, body, execution) -> {
95+
ClientHttpResponse resp = execution.execute(request, body);
96+
return new WfsCustomResponseWrapper(resp);
97+
});
98+
this.restTemplate = clone;
3799
}
38100
/**
39101
* Does collection lookup, WFS validation, field retrieval, and URL building
@@ -45,7 +107,9 @@ public String prepareWfsRequestUrl(
45107
Object multiPolygon,
46108
List<String> fields,
47109
String layerName,
48-
String outputFormat) {
110+
String outputFormat,
111+
long maxRecordCount,
112+
boolean estimateSizeOnly) {
49113

50114

51115
// Get WFS server URL and field model for the given UUID and layer name
@@ -68,15 +132,64 @@ public String prepareWfsRequestUrl(
68132
layerName,
69133
fields,
70134
wfsServer.buildCqlFilter(uuid, featureRequest),
71-
outputFormat);
135+
outputFormat,
136+
maxRecordCount,
137+
estimateSizeOnly);
72138

73139
log.info("Prepared WFS request URL: {}", wfsRequestUrl);
74140
return wfsRequestUrl;
75141
} else {
76142
throw new IllegalArgumentException("No WFS server URL found for the given UUID and layer name");
77143
}
78144
}
145+
/**
146+
* We just need to estimate the download size, the way we do it is issue two query:
147+
* a. Issue a query and get the number or record hit
148+
* b. Issue a query with data download but then limit the records size, and do a liner interpolation
149+
* @return
150+
*/
151+
public BigInteger estimateDownloadSize(
152+
String uuid,
153+
String layerName,
154+
String startDate,
155+
String endDate,
156+
Object multiPolygon,
157+
List<String> fields,
158+
String outputFormat) {
79159

160+
// Just get number of record, the reply will always in XML
161+
String wfsRequestUrl = prepareWfsRequestUrl(
162+
uuid, startDate, endDate, multiPolygon, fields, layerName, null, -1L, true
163+
);
164+
165+
ResponseEntity<String> response = restTemplate.exchange(wfsRequestUrl, HttpMethod.GET, pretendUserEntity, String.class);
166+
167+
if(response.getStatusCode().is2xxSuccessful() && response.getBody() != null) {
168+
Parser parser = new Parser(CONFIG);
169+
parser.setValidating(false);
170+
parser.setFailOnValidationError(false);
171+
172+
try {
173+
Object o = parser.parse(new StringReader(response.getBody()));
174+
if(o instanceof FeatureCollectionType hits) {
175+
BigInteger featureCount = hits.getNumberOfFeatures();
176+
177+
// Now we need to do another query where we limited the record count to something small
178+
wfsRequestUrl = prepareWfsRequestUrl(
179+
uuid, startDate, endDate, multiPolygon, fields, layerName, outputFormat, 50L, false
180+
);
181+
ResponseEntity<byte[]> bytes = restTemplate.exchange(wfsRequestUrl, HttpMethod.GET, pretendUserEntity, byte[].class);
182+
if(bytes.getStatusCode().is2xxSuccessful() && bytes.getBody() != null) {
183+
return featureCount.multiply(BigInteger.valueOf(bytes.getBody().length / 50));
184+
}
185+
}
186+
}
187+
catch(Exception e) {
188+
log.error("Fail to convert wfs hits result", e);
189+
}
190+
}
191+
return null;
192+
}
80193
/**
81194
* Execute WFS request with SSE support
82195
*/

server/src/main/java/au/org/aodn/ogcapi/server/core/service/geoserver/wfs/WfsServer.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ public String buildCqlFilter(String uuid, WfsFeatureRequest featureRequest) {
148148
/**
149149
* Build WFS GetFeature URL
150150
*/
151-
protected String createWfsRequestUrl(String wfsUrl, String layerName, List<String> fields, String cqlFilter, String outputFormat) {
151+
protected String createWfsRequestUrl(String wfsUrl, String layerName, List<String> fields, String cqlFilter, String outputFormat, long maxRecordNum, boolean estimateSizeOnly) {
152152
UriComponents components = UriComponentsBuilder.fromUriString(wfsUrl).build();
153153
UriComponentsBuilder builder = UriComponentsBuilder.newInstance()
154154
.scheme("https") // Force HTTPS to fix redirect
@@ -161,7 +161,19 @@ protected String createWfsRequestUrl(String wfsUrl, String layerName, List<Strin
161161

162162
Map<String, String> param = new HashMap<>(wfsDefaultParam.getDownload());
163163
param.put("typeName", layerName);
164-
param.put("outputFormat", outputFormat == null ? "text/csv" : outputFormat);
164+
165+
if(outputFormat != null) {
166+
param.put("outputFormat", outputFormat);
167+
}
168+
169+
if(maxRecordNum > 0) {
170+
param.put("maxFeatures", String.valueOf(maxRecordNum));
171+
}
172+
173+
if (estimateSizeOnly) {
174+
// Just get result count
175+
param.put("resultType", "hits");
176+
}
165177

166178
if (fields != null) {
167179
param.put("propertyName", String.join(",", fields));

server/src/main/java/au/org/aodn/ogcapi/server/processes/RestApi.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,12 +144,23 @@ public SseEmitter executeSse(
144144
SseEmitter emitter;
145145

146146
switch (id) {
147-
case DOWNLOAD_WFS_SSE: {
147+
case DOWNLOAD_WFS_SSE:
148+
case DOWNLOAD_WFS_ESTIMATE: {
148149
if(FeatureRequest.GeoServerOutputFormat.fromString(outputFormat) == FeatureRequest.GeoServerOutputFormat.UNKNOWN) {
149-
throw new IllegalArgumentException(String.format("Output format %s not supported", outputFormat));
150+
emitter = new SseEmitter(0L);
151+
emitter.completeWithError(new BadRequestException(
152+
String.format("Missing output format [%s]", processId)
153+
));
150154
}
151155
emitter = restServices.downloadWfsDataWithSse(
152-
uuid, startDate, endDate, multiPolygon, fields, layerName, outputFormat
156+
uuid,
157+
startDate,
158+
endDate,
159+
multiPolygon,
160+
fields,
161+
layerName,
162+
outputFormat,
163+
id == ProcessIdEnum.DOWNLOAD_WFS_ESTIMATE
153164
);
154165
break;
155166
}

server/src/main/java/au/org/aodn/ogcapi/server/processes/RestServices.java

Lines changed: 38 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.io.InputStream;
2020
import java.io.IOException;
21+
import java.math.BigInteger;
2122
import java.nio.charset.StandardCharsets;
2223
import java.util.HashMap;
2324
import java.util.List;
@@ -181,20 +182,18 @@ public SseEmitter downloadWfsDataWithSse(String uuid,
181182
Object multiPolygon,
182183
List<String> fields,
183184
String layerName,
184-
String outputFormat) {
185+
String outputFormat,
186+
boolean estimateSizeOnly) {
185187

186188
final SseEmitter emitter = new SseEmitter(0L);
187189

188190
// Set up references for resources that need to be cleaned up
189191
AtomicReference<ScheduledFuture<?>> keepAliveTaskRef = new AtomicReference<>();
190192
AtomicReference<ScheduledExecutorService> keepAliveExecutorRef = new AtomicReference<>();
191-
AtomicBoolean downloadCompleted = new AtomicBoolean(false);
192193

193194
// Set up cleanup function to clear up resources
194195
Runnable cleanupWfsResources = () -> {
195196
try {
196-
downloadCompleted.set(true);
197-
198197
ScheduledFuture<?> keepAliveTask = keepAliveTaskRef.get();
199198
if (keepAliveTask != null && !keepAliveTask.isCancelled()) {
200199
keepAliveTask.cancel(false);
@@ -248,17 +247,15 @@ public SseEmitter downloadWfsDataWithSse(String uuid,
248247
// Send keep-alive every 20 seconds
249248
ScheduledFuture<?> keepAliveTask = keepAliveExecutor.scheduleAtFixedRate(() -> {
250249
try {
251-
if (!downloadCompleted.get()) {
252-
String status = wfsServerResponded.get() ? "streaming" : "waiting-for-wfs-server";
253-
emitter.send(SseEmitter.event()
254-
.name("keep-alive")
255-
.data(Map.of(
256-
"status", status,
257-
"timestamp", System.currentTimeMillis(),
258-
"message", wfsServerResponded.get() ?
259-
"WFS data streaming in progress..." : "Waiting for WFS server response..."
260-
)));
261-
}
250+
String status = wfsServerResponded.get() ? "streaming" : "waiting-for-wfs-server";
251+
emitter.send(SseEmitter.event()
252+
.name("keep-alive")
253+
.data(Map.of(
254+
"status", status,
255+
"timestamp", System.currentTimeMillis(),
256+
"message", wfsServerResponded.get() ?
257+
"WFS data streaming in progress..." : "Waiting for WFS server response..."
258+
)));
262259
} catch (Exception e) {
263260
WfsErrorHandler.handleError(e, uuid, emitter, cleanupWfsResources);
264261
}
@@ -267,28 +264,39 @@ public SseEmitter downloadWfsDataWithSse(String uuid,
267264
keepAliveTaskRef.set(keepAliveTask);
268265
keepAliveExecutorRef.set(keepAliveExecutor);
269266

270-
// STEP 3: Do preparation work: Collection lookup from Elasticsearch, WFS validation, Field retrieval, URL building
271-
String wfsRequestUrl = downloadWfsDataService.prepareWfsRequestUrl(
272-
uuid, startDate, endDate, multiPolygon, fields, layerName, outputFormat
273-
);
274-
275267
emitter.send(SseEmitter.event()
276268
.name("wfs-request-ready")
277269
.data(Map.of(
278270
"message", "Connecting to WFS server...",
279271
"timestamp", System.currentTimeMillis()
280272
)));
281273

282-
// STEP 4: Make the WFS call: Streaming the response directly to client via SSE
283-
downloadWfsDataService.executeWfsRequestWithSse(
284-
wfsRequestUrl,
285-
uuid,
286-
layerName,
287-
outputFormat,
288-
emitter,
289-
wfsServerResponded
290-
);
291-
274+
if(!estimateSizeOnly) {
275+
// STEP 3: Do preparation work: Collection lookup from Elasticsearch, WFS validation, Field retrieval, URL building
276+
String wfsRequestUrl = downloadWfsDataService.prepareWfsRequestUrl(
277+
uuid, startDate, endDate, multiPolygon, fields, layerName, outputFormat, -1L, false
278+
);
279+
280+
// STEP 4: Make the WFS call: Streaming the response directly to client via SSE
281+
downloadWfsDataService.executeWfsRequestWithSse(
282+
wfsRequestUrl,
283+
uuid,
284+
layerName,
285+
outputFormat,
286+
emitter,
287+
wfsServerResponded
288+
);
289+
}
290+
else {
291+
BigInteger est = downloadWfsDataService.estimateDownloadSize(uuid, layerName, startDate, endDate, multiPolygon, fields, outputFormat);
292+
emitter.send(SseEmitter.event()
293+
.name(est != null ? "estimate-complete" : "estimate-failed")
294+
.data(Map.of(
295+
"size", est != null ? est : "",
296+
"timestamp", System.currentTimeMillis()
297+
)));
298+
emitter.complete();
299+
}
292300
} catch (Exception e) {
293301
WfsErrorHandler.handleError(e, uuid, emitter, cleanupWfsResources);
294302
}

server/src/main/resources/application.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ wfs-default-param:
2929
REQUEST: "DescribeFeatureType"
3030
download:
3131
SERVICE: "WFS"
32-
VERSION: "1.0.0" # TODO: Change to 2.0.0 after testing CQL query
32+
VERSION: "1.1.0" # Need at least 1.1.0 for resultType=hits to work TODO: Change to 2.0.0 after testing CQL query
3333
REQUEST: "GetFeature"
3434
capabilities:
3535
SERVICE: "WFS"

0 commit comments

Comments
 (0)