diff --git a/source/imaer-gml/pom.xml b/source/imaer-gml/pom.xml index f3b3341e..8cdd3135 100644 --- a/source/imaer-gml/pom.xml +++ b/source/imaer-gml/pom.xml @@ -74,17 +74,6 @@ xml-resolver ${xml-resolver.version} - - com.github.rwitzel.streamflyer - streamflyer-core - ${streamflyer.version} - - - commons-io - commons-io - - - commons-io commons-io diff --git a/source/imaer-gml/src/main/java/nl/overheid/aerius/gml/filter/AbstractXMLFilteringReader.java b/source/imaer-gml/src/main/java/nl/overheid/aerius/gml/filter/AbstractXMLFilteringReader.java new file mode 100644 index 00000000..a0b61d29 --- /dev/null +++ b/source/imaer-gml/src/main/java/nl/overheid/aerius/gml/filter/AbstractXMLFilteringReader.java @@ -0,0 +1,370 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.overheid.aerius.gml.filter; + +import java.io.IOException; +import java.io.Reader; +import java.nio.CharBuffer; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abstract base class for filtering XML content based on regex patterns. + * This class implements a streaming approach to handle large XML files efficiently, + * processing data in chunks and maintaining proper state across buffer boundaries. + * + *

Sliding Window Algorithm: + * The reader processes the input using a fixed-size sliding window buffer instead of loading + * the entire file into memory. This allows processing files of arbitrary size with constant + * memory usage (typically under 1MB regardless of file size). + * + *

Boundary-Aware Chunking: + * When sliding the window forward, the algorithm ensures cuts are made at safe positions + * (after complete XML tags) to prevent splitting elements across chunks. The algorithm: + *

+ * + *

Limitation: + * The content between {@code tagToFilter} and its corresponding end tag (including all nested + * elements) must not exceed {@value #MAX_MATCH_SIZE} characters (64KB by default). If a single + * block exceeds this size, the regex may fail to match and the block will not be filtered. + * This limit is sufficient for typical IMAER receptor points but may need consideration + * for elements with very large nested content. + * + *

Subclasses must provide the tag to filter and content regex pattern via the constructor: + *

+ * + *

The complete pattern is constructed as: {@code tagToFilter + contentRegex + endTag} + * where endTag is automatically derived from tagToFilter. + */ +public abstract class AbstractXMLFilteringReader extends Reader { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractXMLFilteringReader.class); + + private static final int DEFAULT_BUFFER_SIZE = 8192; + private static final int MAX_MATCH_SIZE = 65536; + private static final int MIN_KEEP_RATIO = 4; + + private final Reader source; + private final StringBuilder buffer; + private final char[] readBuffer; + private final Pattern pattern; + private final String endTag; + private int bufferPosition; + private int bufferLength; + private boolean endOfSource; + private boolean closed; + + /** + * Creates a new AbstractXMLFilteringReader. + * + * @param source the source reader to read from + * @param tagToFilter the start tag of the container element to filter (e.g., "") + * @param contentRegex the regex pattern for the content between start and end tags + */ + protected AbstractXMLFilteringReader(final Reader source, final String tagToFilter, final String contentRegex) { + this.source = source; + this.buffer = new StringBuilder(MAX_MATCH_SIZE * 2); + this.readBuffer = new char[DEFAULT_BUFFER_SIZE]; + this.endTag = " 0 && canReadMore) { + if (hasBufferedData()) { + final int copied = copyFromBuffer(cbuf, currentOffset, remaining); + currentOffset += copied; + remaining -= copied; + } else { + canReadMore = tryToReadMoreData(); + } + } + + return currentOffset - off; + } + + private int determineReturnValue(final int bytesRead) { + if (bytesRead > 0) { + return bytesRead; + } else if (endOfSource) { + return -1; + } else { + return 0; + } + } + + private boolean tryToReadMoreData() throws IOException { + if (endOfSource) { + return false; + } + + fillAndProcessBuffer(); + return hasBufferedData() || !endOfSource; + } + + @Override + public int read(final CharBuffer target) throws IOException { + ensureOpen(); + + final int len = target.remaining(); + final char[] chars = new char[len]; + final int n = read(chars, 0, len); + + if (n > 0) { + target.put(chars, 0, n); + } + + return n; + } + + @Override + public long skip(final long n) throws IOException { + ensureOpen(); + + if (n <= 0) { + return 0; + } + + final int bufferSize = (int) Math.min(n, DEFAULT_BUFFER_SIZE); + final char[] skipBuffer = new char[bufferSize]; + long remaining = n; + int totalSkipped = 0; + boolean shouldContinue = true; + + while (remaining > 0 && shouldContinue) { + final int toRead = (int) Math.min(remaining, bufferSize); + final int read = read(skipBuffer, 0, toRead); + + if (read == -1) { + shouldContinue = false; + } else { + totalSkipped += read; + remaining -= read; + } + } + + return totalSkipped; + } + + @Override + public boolean ready() throws IOException { + ensureOpen(); + return bufferPosition < bufferLength || source.ready(); + } + + @Override + public boolean markSupported() { + return false; + } + + @Override + public void mark(final int readAheadLimit) throws IOException { + throw new IOException("mark/reset not supported"); + } + + @Override + public void reset() throws IOException { + throw new IOException("mark/reset not supported"); + } + + @Override + public void close() throws IOException { + if (!closed) { + closed = true; + source.close(); + } + } + + private void ensureOpen() throws IOException { + if (closed) { + throw new IOException("Reader is closed"); + } + } + + private boolean hasBufferedData() { + return bufferPosition < bufferLength; + } + + private int copyFromBuffer(final char[] cbuf, final int off, final int len) { + final int available = bufferLength - bufferPosition; + final int toCopy = Math.min(len, available); + buffer.getChars(bufferPosition, bufferPosition + toCopy, cbuf, off); + bufferPosition += toCopy; + return toCopy; + } + + private void fillAndProcessBuffer() throws IOException { + compactBuffer(); + readFromSource(); + + bufferLength = buffer.length(); + + if (bufferLength > 0) { + processBuffer(); + } + } + + private void compactBuffer() { + if (bufferPosition > 0) { + buffer.delete(0, bufferPosition); + bufferLength -= bufferPosition; + bufferPosition = 0; + } + } + + private void readFromSource() throws IOException { + int totalRead = 0; + boolean shouldContinue = true; + + while (shouldContinue && !endOfSource && buffer.length() < MAX_MATCH_SIZE * 2) { + final int bytesRead = source.read(readBuffer); + + if (bytesRead == -1) { + endOfSource = true; + shouldContinue = false; + } else { + buffer.append(readBuffer, 0, bytesRead); + totalRead += bytesRead; + + if (totalRead >= DEFAULT_BUFFER_SIZE) { + shouldContinue = false; + } + } + } + } + + private void processBuffer() { + final StringBuilder filteredContent = new StringBuilder(bufferLength); + final Matcher matcher = pattern.matcher(buffer); + int lastMatchEnd = 0; + + while (matcher.find()) { + filteredContent.append(buffer, lastMatchEnd, matcher.start()); + lastMatchEnd = matcher.end(); + } + + if (endOfSource) { + appendRemainingContent(filteredContent, lastMatchEnd); + replaceBufferContent(filteredContent); + } else { + final int safeCutPosition = findSafeCutPosition(lastMatchEnd); + filteredContent.append(buffer, lastMatchEnd, safeCutPosition); + // Replace buffer with filtered content, then append remaining unprocessed data + buffer.delete(0, safeCutPosition); + buffer.insert(0, filteredContent); + } + + updateBufferState(); + } + + private void appendRemainingContent(final StringBuilder filteredContent, final int lastMatchEnd) { + filteredContent.append(buffer, lastMatchEnd, bufferLength); + } + + private void replaceBufferContent(final StringBuilder filteredContent) { + buffer.setLength(0); + buffer.append(filteredContent); + } + + private void updateBufferState() { + bufferLength = buffer.length(); + bufferPosition = 0; + } + + private int findSafeCutPosition(final int lastMatchEnd) { + final int minKeep = calculateMinimumKeepSize(); + final int maxCut = bufferLength - minKeep; + + int safeCutPosition = lastMatchEnd; + + if (maxCut > lastMatchEnd) { + safeCutPosition = determineBestCutPosition(maxCut, lastMatchEnd); + } + + return safeCutPosition; + } + + private int determineBestCutPosition(final int maxCut, final int lastMatchEnd) { + int bestCutPosition = maxCut; + + final int endTagPosition = findEndTagPosition(maxCut); + if (isValidCutPosition(endTagPosition, lastMatchEnd)) { + bestCutPosition = endTagPosition + endTag.length(); + } else { + final int lastCloseTag = findLastCloseTag(maxCut); + if (isValidCutPosition(lastCloseTag, lastMatchEnd)) { + bestCutPosition = lastCloseTag + 1; + } + } + + return bestCutPosition; + } + + private int calculateMinimumKeepSize() { + return Math.min(MAX_MATCH_SIZE / MIN_KEEP_RATIO, bufferLength / MIN_KEEP_RATIO); + } + + private int findEndTagPosition(final int maxCut) { + return buffer.toString().lastIndexOf(endTag, maxCut); + } + + private int findLastCloseTag(final int maxCut) { + return buffer.toString().lastIndexOf(">", maxCut); + } + + private static boolean isValidCutPosition(final int cutPosition, final int lastMatchEnd) { + return cutPosition != -1 && cutPosition >= lastMatchEnd; + } +} diff --git a/source/imaer-gml/src/main/java/nl/overheid/aerius/gml/filter/ReceptorFilteringReader.java b/source/imaer-gml/src/main/java/nl/overheid/aerius/gml/filter/ReceptorFilteringReader.java new file mode 100644 index 00000000..ae323c7f --- /dev/null +++ b/source/imaer-gml/src/main/java/nl/overheid/aerius/gml/filter/ReceptorFilteringReader.java @@ -0,0 +1,38 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.overheid.aerius.gml.filter; + +import java.io.Reader; + +/** + * A Reader that filters out XML featureMember blocks containing ReceptorPoint elements. + * This is used to skip parsing receptor points when results are not needed, + * significantly reducing memory usage and processing time for large GML files. + * + *

The filtering matches blocks of the form: + *

<imaer:featureMember>[whitespace]<imaer:ReceptorPoint...>...</imaer:featureMember>
+ * + */ +public class ReceptorFilteringReader extends AbstractXMLFilteringReader { + + private static final String TAG_TO_FILTER = ""; + private static final String CONTENT_REGEX = "[\\n\\r\\s]*]*>[\\s\\S]*?"; + + public ReceptorFilteringReader(final Reader source) { + super(source, TAG_TO_FILTER, CONTENT_REGEX); + } +} diff --git a/source/imaer-gml/src/main/java/nl/overheid/aerius/importer/ImaerImporter.java b/source/imaer-gml/src/main/java/nl/overheid/aerius/importer/ImaerImporter.java index fc3e96bf..f372573f 100644 --- a/source/imaer-gml/src/main/java/nl/overheid/aerius/importer/ImaerImporter.java +++ b/source/imaer-gml/src/main/java/nl/overheid/aerius/importer/ImaerImporter.java @@ -16,6 +16,7 @@ */ package nl.overheid.aerius.importer; +import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.Reader; @@ -24,16 +25,13 @@ import java.util.List; import java.util.Optional; import java.util.Set; -import java.util.regex.Pattern; import org.apache.commons.io.input.ReaderInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.github.rwitzel.streamflyer.core.ModifyingReader; -import com.github.rwitzel.streamflyer.regex.RegexModifier; - import nl.overheid.aerius.gml.GMLMetaDataReader; +import nl.overheid.aerius.gml.filter.ReceptorFilteringReader; import nl.overheid.aerius.gml.GMLReader; import nl.overheid.aerius.gml.GMLReaderFactory; import nl.overheid.aerius.gml.GMLValidator; @@ -243,17 +241,26 @@ private static void setCrsIf(final FeatureCollection featureCollection, final * * @param inputStream stream to filter. * @param importOptions determine if we want to filer out. - * @return - * @throws AeriusException + * @return filtered input stream + * @throws AeriusException when an I/O error occurs during filtering setup */ - private static InputStream filterResults(final InputStream inputStream, final Set importOptions) { + private static InputStream filterResults(final InputStream inputStream, final Set importOptions) throws AeriusException { if (importOptions.contains(ImportOption.INCLUDE_RESULTS)) { return inputStream; } else { - final RegexModifier myModifier = new RegexModifier("[\n].+([\\s\\S]*?)<\\/imaer:featureMember>", - Pattern.CASE_INSENSITIVE, ""); - final Reader reader = new ModifyingReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8), myModifier); - return new ReaderInputStream(reader, StandardCharsets.UTF_8); + final Reader sourceReader = new InputStreamReader(inputStream, StandardCharsets.UTF_8); + final Reader filteringReader = new ReceptorFilteringReader(sourceReader); + try { + return ReaderInputStream.builder() + .setReader(filteringReader) + .setCharset(StandardCharsets.UTF_8) + .get(); + } catch (final IOException e) { + final AeriusException aeriusException = new AeriusException(ImaerExceptionReason.INTERNAL_ERROR, + "Failed to create filtering input stream: " + e.getMessage()); + aeriusException.initCause(e); + throw aeriusException; + } } } diff --git a/source/imaer-gml/src/test/java/nl/overheid/aerius/gml/filter/ReceptorFilteringReaderTest.java b/source/imaer-gml/src/test/java/nl/overheid/aerius/gml/filter/ReceptorFilteringReaderTest.java new file mode 100644 index 00000000..a8928a82 --- /dev/null +++ b/source/imaer-gml/src/test/java/nl/overheid/aerius/gml/filter/ReceptorFilteringReaderTest.java @@ -0,0 +1,188 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.overheid.aerius.gml.filter; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.io.Reader; +import java.io.StringReader; +import java.nio.CharBuffer; +import java.util.stream.Stream; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +/** + * Unit tests for {@link ReceptorFilteringReader}. + */ +class ReceptorFilteringReaderTest { + + @ParameterizedTest + @MethodSource("provideFilteringTestCases") + void testFiltering(final String input, final String expected, final String description) throws IOException { + try (final ReceptorFilteringReader reader = new ReceptorFilteringReader(new StringReader(input))) { + final String result = readAll(reader); + assertEquals(expected, result, description); + } + } + + static Stream provideFilteringTestCases() { + final String sourceBlock = "sourceContent"; + final String receptorBlock = "\nreceptorContent\n"; + + return Stream.of( + Arguments.of( + "content", + "content", + "Input without ReceptorPoint should not be filtered"), + Arguments.of( + "\ncontent\n", + "", + "Single receptor block should be completely filtered out"), + Arguments.of( + "\ncontent1\n\ncontent2\n", + "", + "Multiple receptor blocks should all be filtered out"), + Arguments.of( + sourceBlock + receptorBlock + sourceBlock, + sourceBlock + sourceBlock, + "Only receptor blocks should be filtered, source blocks should remain"), + Arguments.of( + "\ncontent\n", + "", + "Receptor blocks with attributes should be filtered out"), + Arguments.of( + "\ncontent\n", + "", + "Case-insensitive matching should filter uppercase receptor blocks"), + Arguments.of( + "", + "", + "Empty input should produce empty output"), + Arguments.of( + "\n\nNL.IMAERCP.1137558.0 456251.095.8\n", + "", + "Receptor blocks with nested content should be completely filtered out")); + } + + @Test + void testSingleCharRead() throws IOException { + final String input = "X"; + + try (final ReceptorFilteringReader reader = new ReceptorFilteringReader(new StringReader(input))) { + final StringBuilder result = new StringBuilder(); + int c; + while ((c = reader.read()) != -1) { + result.append((char) c); + } + assertEquals(input, result.toString(), "Single character read should produce original input when no filtering occurs"); + } + } + + @Test + void testReadIntoCharBuffer() throws IOException { + final String input = "content"; + + try (final ReceptorFilteringReader reader = new ReceptorFilteringReader(new StringReader(input))) { + final CharBuffer buffer = CharBuffer.allocate(input.length()); + final int read = reader.read(buffer); + buffer.flip(); + assertEquals(input.length(), read, "Should read all characters when no filtering occurs"); + assertEquals(input, buffer.toString(), "CharBuffer content should match original input"); + } + } + + @Test + void testSkip() throws IOException { + final String input = "content"; + + try (final ReceptorFilteringReader reader = new ReceptorFilteringReader(new StringReader(input))) { + final long skipped = reader.skip(10); + assertEquals(10, skipped, "Should skip exactly 10 characters"); + + final String remaining = readAll(reader); + assertEquals(input.substring(10), remaining, "Remaining content after skip should match input substring"); + } + } + + @Test + void testReady() throws IOException { + final String input = "test"; + + try (final ReceptorFilteringReader reader = new ReceptorFilteringReader(new StringReader(input))) { + assertTrue(reader.ready(), "Reader should be ready when there is data available"); + } + } + + @Test + void testMarkNotSupported() throws IOException { + try (final ReceptorFilteringReader reader = new ReceptorFilteringReader(new StringReader("test"))) { + assertThrows(IOException.class, () -> reader.mark(10), "mark() should throw IOException as it's not supported"); + assertThrows(IOException.class, reader::reset, "reset() should throw IOException as it's not supported"); + } + } + + @Test + void testClose() throws IOException { + final StringReader source = new StringReader("test"); + final ReceptorFilteringReader reader = new ReceptorFilteringReader(source); + + reader.close(); + + // Should throw IOException when trying to read after close + assertThrows(IOException.class, reader::read, "Reading from closed reader should throw IOException"); + } + + @Test + void testComplexGMLStructure() throws IOException { + final String header = "\n\n"; + final String metadata = "2024\n"; + final String sourceBlock = "Source1\n"; + final String receptorBlock = "\n\nvalue\n\n\n"; + final String footer = ""; + + final String input = header + metadata + sourceBlock + receptorBlock + sourceBlock + footer; + + try (final ReceptorFilteringReader reader = new ReceptorFilteringReader(new StringReader(input))) { + final String result = readAll(reader); + // Verify receptor block is removed and source blocks are preserved + assertFalse(result.contains("ReceptorPoint"), "ReceptorPoint should be filtered out"); + assertTrue(result.contains("Source1"), "Source1 should be preserved"); + assertTrue(result.contains(""), "Header should be preserved"); + assertTrue(result.contains(""), "Footer should be preserved"); + // Count source blocks - should have 2 + final int sourceCount = result.split("EmissionSource").length - 1; + assertEquals(2, sourceCount / 2, "Should have 2 source blocks (opening and closing tags)"); + } + } + + private String readAll(final Reader reader) throws IOException { + final StringBuilder result = new StringBuilder(); + final char[] buffer = new char[1024]; + int read; + while ((read = reader.read(buffer)) != -1) { + result.append(buffer, 0, read); + } + return result.toString(); + } +}