Skip to content

Commit 803c4f3

Browse files
Checkpoint 4
1 parent 7ab0aff commit 803c4f3

7 files changed

Lines changed: 151 additions & 38 deletions

File tree

server/src/main/java/au/org/aodn/ogcapi/server/core/configuration/WfsWmsConfig.java renamed to server/src/main/java/au/org/aodn/ogcapi/server/core/configuration/GeoServerConfig.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import au.org.aodn.ogcapi.server.core.service.geoserver.wfs.WfsDefaultParam;
55
import au.org.aodn.ogcapi.server.core.service.geoserver.wfs.WfsServer;
66
import au.org.aodn.ogcapi.server.core.service.geoserver.wms.WmsServer;
7+
import au.org.aodn.ogcapi.server.core.service.geoserver.wps.WpsServer;
78
import au.org.aodn.ogcapi.server.core.util.RestTemplateUtils;
89
import org.springframework.beans.factory.annotation.Qualifier;
910
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
@@ -15,7 +16,7 @@
1516
import org.springframework.web.client.RestTemplate;
1617

1718
@Configuration
18-
public class WfsWmsConfig {
19+
public class GeoServerConfig {
1920

2021
@ConditionalOnMissingBean(name = "pretendUserEntity")
2122
@Bean("pretendUserEntity")
@@ -40,4 +41,12 @@ public WfsServer createWfsServer(Search search,
4041
public WmsServer createWmsServer(Search search, @Lazy WfsServer wfsServer, @Qualifier("pretendUserEntity") HttpEntity<?> entity) {
4142
return new WmsServer(search, wfsServer, entity);
4243
}
44+
45+
@Bean
46+
public WpsServer createWpsServer(WmsServer wmsServer,
47+
WfsServer wfsServer,
48+
RestTemplate restTemplate,
49+
@Qualifier("pretendUserEntity") HttpEntity<?> entity) {
50+
return new WpsServer(wmsServer, wfsServer, restTemplate, entity);
51+
}
4352
}

server/src/main/java/au/org/aodn/ogcapi/server/core/model/enumeration/ProcessIdEnum.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,23 @@
55
@Getter
66
public enum ProcessIdEnum {
77
DOWNLOAD_DATASET("download"),
8-
;
8+
DOWNLOAD_WFS_SSE("downloadWfs"),
9+
DOWNLOAD_WFS_SIZE("downloadWfsSize"),
10+
UNKNOWN("");
911

1012
private final String value;
1113

1214
ProcessIdEnum(String value) {
1315
this.value = value;
1416
}
17+
18+
public static ProcessIdEnum fromString(String text) {
19+
for (ProcessIdEnum e : values()) {
20+
if (e.value.equalsIgnoreCase(text)) {
21+
return e;
22+
}
23+
}
24+
return UNKNOWN;
25+
}
26+
1527
}

server/src/main/java/au/org/aodn/ogcapi/server/core/service/geoserver/wps/WpsServer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
import au.org.aodn.ogcapi.server.core.service.geoserver.Server;
55
import au.org.aodn.ogcapi.server.core.service.geoserver.wfs.WfsServer;
66
import au.org.aodn.ogcapi.server.core.service.geoserver.wms.WmsServer;
7+
import lombok.Getter;
8+
import lombok.Setter;
9+
import lombok.experimental.SuperBuilder;
710
import lombok.extern.slf4j.Slf4j;
811
import net.opengis.ows11.CodeType;
912
import net.opengis.ows11.Ows11Factory;
@@ -38,6 +41,9 @@ public class WpsServer implements Server {
3841
protected final WfsServer wfsServer;
3942
protected final HttpEntity<?> pretendUserEntity;
4043

44+
@Getter
45+
@Setter
46+
@SuperBuilder
4147
public static class WpsProcessRequest extends FeatureRequest {}
4248

4349
public WpsServer(WmsServer wmsServer, WfsServer wfsServer, RestTemplate restTemplate, HttpEntity<?> entity) {

server/src/main/java/au/org/aodn/ogcapi/server/processes/RestApi.java

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import io.swagger.v3.oas.annotations.media.Schema;
1616
import jakarta.validation.Valid;
1717
import lombok.extern.slf4j.Slf4j;
18+
import org.apache.coyote.BadRequestException;
1819
import org.springframework.beans.factory.annotation.Autowired;
1920
import org.springframework.http.HttpStatus;
2021
import org.springframework.http.MediaType;
@@ -35,10 +36,12 @@ public class RestApi implements ProcessesApi {
3536
@Override
3637
// because the produces value in the interface declaration includes "/_" which may
3738
// cause exception thrown sometimes. So i re-declared the produces value here
38-
@RequestMapping(value = "/processes/{processID}/execution",
39-
produces = {"application/json", "text/html"},
40-
consumes = {"application/json"},
41-
method = RequestMethod.POST)
39+
@RequestMapping(
40+
value = "/processes/{processID}/execution",
41+
produces = { MediaType.APPLICATION_JSON_VALUE, MediaType.TEXT_HTML_VALUE },
42+
consumes = { MediaType.APPLICATION_JSON_VALUE },
43+
method = RequestMethod.POST
44+
)
4245
public ResponseEntity<InlineResponse200> execute(
4346
@Parameter(in = ParameterIn.PATH, required = true, schema = @Schema())
4447
@PathVariable("processID")
@@ -47,8 +50,9 @@ public ResponseEntity<InlineResponse200> execute(
4750
@Valid
4851
@RequestBody Execute body) {
4952

50-
if (processID.equals(ProcessIdEnum.DOWNLOAD_DATASET.getValue())) {
53+
ProcessIdEnum processId = ProcessIdEnum.fromString(processID);
5154

55+
if (processId == ProcessIdEnum.DOWNLOAD_DATASET) {
5256
try {
5357

5458
var uuid = (String) body.getInputs().get(DatasetDownloadEnums.Parameter.UUID.getValue());
@@ -111,16 +115,18 @@ public ResponseEntity<ProcessList> getProcesses() {
111115
/**
112116
* WFS download endpoint with SSE support to handle long-running operations and prevent timeouts
113117
*/
114-
@RequestMapping(value = "/processes/downloadWfs/execution",
115-
produces = MediaType.TEXT_EVENT_STREAM_VALUE,
116-
consumes = {"application/json"},
117-
method = RequestMethod.POST)
118-
public SseEmitter downloadWfsSse(
118+
@RequestMapping(
119+
value = "/processes/{processID}/execution",
120+
produces = { MediaType.TEXT_EVENT_STREAM_VALUE },
121+
consumes = { MediaType.APPLICATION_JSON_VALUE },
122+
method = RequestMethod.POST
123+
)
124+
public SseEmitter executeSse(
125+
@Parameter(in = ParameterIn.PATH, required = true, schema = @Schema())
126+
@PathVariable("processID") String processId,
119127
@Parameter(in = ParameterIn.DEFAULT, description = "Mandatory execute request JSON", required = true, schema = @Schema())
120128
@RequestBody Execute body) {
121129

122-
final SseEmitter emitter = new SseEmitter(0L);
123-
124130
try {
125131
String uuid = body.getInputs().get(DatasetDownloadEnums.Parameter.UUID.getValue()).toString();
126132
String startDate = body.getInputs().get(DatasetDownloadEnums.Parameter.START_DATE.getValue()).toString();
@@ -137,12 +143,33 @@ public SseEmitter downloadWfsSse(
137143
throw new IllegalArgumentException(String.format("Output format %s not supported", outputFormat));
138144
}
139145

140-
return restServices.downloadWfsDataWithSse(
141-
uuid, startDate, endDate, multiPolygon, fields, layerName, outputFormat, emitter
142-
);
146+
ProcessIdEnum id = ProcessIdEnum.fromString(processId);
147+
SseEmitter emitter;
148+
149+
switch (id) {
150+
case DOWNLOAD_WFS_SSE: {
151+
emitter = restServices.downloadWfsDataWithSse(
152+
uuid, startDate, endDate, multiPolygon, fields, layerName, outputFormat
153+
);
154+
break;
155+
}
156+
case DOWNLOAD_WFS_SIZE: {
157+
emitter = restServices.downloadWfsSizeWithSse(uuid, layerName, startDate, endDate, multiPolygon, fields);
158+
break;
159+
}
160+
default: {
161+
emitter = new SseEmitter(0L);
162+
emitter.completeWithError(new BadRequestException(
163+
String.format("Unknown process Id for SSE type download [%s]", processId)
164+
));
165+
}
166+
}
167+
168+
return emitter;
143169

144170
} catch (Exception e) {
145171
log.error("Download wfs data failed with unhandled error {}", e.getMessage());
172+
final SseEmitter emitter = new SseEmitter(0L);
146173
emitter.completeWithError(e);
147174
return emitter;
148175
}

server/src/main/java/au/org/aodn/ogcapi/server/processes/RestServices.java

Lines changed: 69 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import au.org.aodn.ogcapi.server.core.exception.wfs.WfsErrorHandler;
44
import au.org.aodn.ogcapi.server.core.model.enumeration.DatasetDownloadEnums;
55
import au.org.aodn.ogcapi.server.core.service.geoserver.wfs.DownloadWfsDataService;
6+
import au.org.aodn.ogcapi.server.core.service.geoserver.wps.WpsServer;
7+
import au.org.aodn.ogcapi.server.core.util.DatetimeUtils;
68
import au.org.aodn.ogcapi.server.core.util.EmailUtils;
79
import com.fasterxml.jackson.core.JsonProcessingException;
810
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -35,6 +37,9 @@ public class RestServices {
3537
@Autowired
3638
private DownloadWfsDataService downloadWfsDataService;
3739

40+
@Autowired
41+
protected WpsServer wpsServer;
42+
3843
public RestServices(BatchClient batchClient, ObjectMapper objectMapper) {
3944
this.batchClient = batchClient;
4045
this.objectMapper = objectMapper;
@@ -175,15 +180,15 @@ private String generateStartedEmailContent(
175180
}
176181
}
177182

178-
public SseEmitter downloadWfsDataWithSse(
179-
String uuid,
180-
String startDate,
181-
String endDate,
182-
Object multiPolygon,
183-
List<String> fields,
184-
String layerName,
185-
String outputFormat,
186-
SseEmitter emitter) {
183+
public SseEmitter downloadWfsDataWithSse(String uuid,
184+
String startDate,
185+
String endDate,
186+
Object multiPolygon,
187+
List<String> fields,
188+
String layerName,
189+
String outputFormat) {
190+
191+
final SseEmitter emitter = new SseEmitter(0L);
187192

188193
// Set up references for resources that need to be cleaned up
189194
AtomicReference<ScheduledFuture<?>> keepAliveTaskRef = new AtomicReference<>();
@@ -295,4 +300,59 @@ public SseEmitter downloadWfsDataWithSse(
295300
});
296301
return emitter;
297302
}
303+
304+
public SseEmitter downloadWfsSizeWithSse(String uuid,
305+
String layerName,
306+
String startDate,
307+
String endDate,
308+
Object multiPolygon,
309+
List<String> fields) {
310+
311+
312+
final SseEmitter emitter = new SseEmitter(0L);
313+
314+
CompletableFuture.runAsync(() -> {
315+
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
316+
CountDownLatch done = new CountDownLatch(1);
317+
318+
// Send keep-alive every 20 seconds
319+
executor.execute(() -> {
320+
try {
321+
while (done.await(20, TimeUnit.SECONDS)) {
322+
emitter.send(SseEmitter.event()
323+
.name("keep-alive")
324+
.data(Map.of(
325+
"timestamp", System.currentTimeMillis(),
326+
"message", "Waiting for WFS server response..."
327+
)));
328+
}
329+
}
330+
catch(Exception e) {
331+
done.countDown();
332+
}
333+
});
334+
335+
WpsServer.WpsProcessRequest request = WpsServer.WpsProcessRequest.builder()
336+
.layerName(layerName)
337+
.datetime(DatetimeUtils.formatOGCDateTime(startDate, endDate))
338+
.properties(fields)
339+
.multiPolygon(multiPolygon)
340+
.build();
341+
342+
try {
343+
emitter.send(wpsServer.getEstimateDownloadSize(
344+
uuid,
345+
request
346+
));
347+
}
348+
catch(Exception e) {
349+
emitter.completeWithError(e);
350+
}
351+
finally {
352+
done.countDown();
353+
emitter.complete();
354+
}
355+
});
356+
return emitter;
357+
}
298358
}

server/src/test/java/au/org/aodn/ogcapi/server/core/service/geoserver/wms/WmsServerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import au.org.aodn.ogcapi.server.core.configuration.Config;
44
import au.org.aodn.ogcapi.server.core.configuration.TestConfig;
5-
import au.org.aodn.ogcapi.server.core.configuration.WfsWmsConfig;
5+
import au.org.aodn.ogcapi.server.core.configuration.GeoServerConfig;
66
import au.org.aodn.ogcapi.server.core.exception.GeoserverFieldsNotFoundException;
77
import au.org.aodn.ogcapi.server.core.model.LinkModel;
88
import au.org.aodn.ogcapi.server.core.model.StacCollectionModel;
@@ -41,7 +41,7 @@
4141
classes = {
4242
TestConfig.class,
4343
Config.class,
44-
WfsWmsConfig.class,
44+
GeoServerConfig.class,
4545
WmsDefaultParam.class,
4646
WfsDefaultParam.class,
4747
JacksonAutoConfiguration.class,

server/src/test/java/au/org/aodn/ogcapi/server/core/service/geoserver/wps/WpsServerTest.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@
1010
import org.mockito.junit.jupiter.MockitoExtension;
1111
import org.opengis.filter.Filter;
1212
import org.opengis.filter.FilterFactory2;
13-
import org.slf4j.Logger;
14-
import org.slf4j.LoggerFactory;
1513
import org.springframework.http.HttpEntity;
1614
import org.springframework.http.HttpMethod;
1715
import org.springframework.http.HttpStatus;
@@ -31,8 +29,6 @@
3129
@ExtendWith(MockitoExtension.class)
3230
public class WpsServerTest {
3331

34-
protected Logger log = LoggerFactory.getLogger(WpsServerTest.class);
35-
3632
@Mock
3733
protected WmsServer wmsServer;
3834

@@ -91,8 +87,9 @@ void getEstimateDownloadSize_success_returnsBody() throws Exception {
9187
String wfsUrl = "http://example.com/geoserver/wfs";
9288
AtomicReference<String> xml = new AtomicReference<>();
9389

94-
WpsServer.WpsProcessRequest request = new WpsServer.WpsProcessRequest();
95-
request.setLayerName(layerName);
90+
WpsServer.WpsProcessRequest request = WpsServer.WpsProcessRequest.builder()
91+
.layerName(layerName)
92+
.build();
9693

9794
when(wfsServer.getFeatureServerUrl(uuid, layerName)).thenReturn(Optional.of(wfsUrl));
9895
when(wfsServer.buildCqlFilter(eq(uuid), any())).thenReturn("state = 'TAS'");
@@ -159,8 +156,9 @@ void getEstimateDownloadSize_success_returnsBody() throws Exception {
159156
@Test
160157
void getEstimateDownloadSize_noWfsUrl_throwsException() {
161158
String uuid = "test-uuid";
162-
WpsServer.WpsProcessRequest request = new WpsServer.WpsProcessRequest();
163-
request.setLayerName("test:layer");
159+
WpsServer.WpsProcessRequest request = WpsServer.WpsProcessRequest.builder()
160+
.layerName("test:layer")
161+
.build();
164162

165163
when(wfsServer.getFeatureServerUrl(anyString(), anyString())).thenReturn(Optional.empty());
166164

@@ -174,8 +172,9 @@ void getEstimateDownloadSize_non2xx_returnsEmpty() throws Exception {
174172
String layerName = "test:layer";
175173
String wfsUrl = "http://example.com/geoserver/wfs";
176174

177-
WpsServer.WpsProcessRequest request = new WpsServer.WpsProcessRequest();
178-
request.setLayerName(layerName);
175+
WpsServer.WpsProcessRequest request = WpsServer.WpsProcessRequest.builder()
176+
.layerName(layerName)
177+
.build();
179178

180179
when(wfsServer.getFeatureServerUrl(uuid, layerName)).thenReturn(Optional.of(wfsUrl));
181180
when(wfsServer.buildCqlFilter(eq(uuid), any())).thenReturn("state = 'TAS'");

0 commit comments

Comments
 (0)