Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.tsfile.read.reader.series.PaginationController;

import java.util.Arrays;
import java.util.function.Consumer;

public class TsBlockUtil {

Expand Down Expand Up @@ -75,10 +76,32 @@ public static TsBlock applyFilterAndLimitOffsetToTsBlock(
TsBlockBuilder builder,
Filter pushDownFilter,
PaginationController paginationController) {
return applyFilterAndLimitOffsetToTsBlock(
unFilteredBlock, builder, pushDownFilter, paginationController, null);
}

public static TsBlock applyFilterAndLimitOffsetToTsBlock(
TsBlock unFilteredBlock,
TsBlockBuilder builder,
Filter pushDownFilter,
PaginationController paginationController,
Consumer<Long> filterRowsRecorder) {
Comment thread
alpass163 marked this conversation as resolved.
Outdated

boolean[] selection = new boolean[unFilteredBlock.getPositionCount()];
Arrays.fill(selection, true);
boolean[] keepCurrentRow = pushDownFilter.satisfyTsBlock(selection, unFilteredBlock);
boolean[] keepCurrentRow =
filterRowsRecorder == null
? pushDownFilter.satisfyTsBlock(selection, unFilteredBlock)
: pushDownFilter.satisfyTsBlock(selection, unFilteredBlock, filterRowsRecorder);

return buildFilteredTsBlock(unFilteredBlock, builder, keepCurrentRow, paginationController);
}

private static TsBlock buildFilteredTsBlock(
TsBlock unFilteredBlock,
TsBlockBuilder builder,
boolean[] keepCurrentRow,
PaginationController paginationController) {
// construct time column
int readEndIndex =
buildTimeColumnWithPagination(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.Consumer;

/**
* A Filter is an executable expression tree describing the criteria for which records to keep when
Expand Down Expand Up @@ -117,6 +118,44 @@ public abstract class Filter {
*/
public abstract boolean[] satisfyTsBlock(boolean[] selection, TsBlock tsBlock);

public final boolean[] satisfyTsBlock(
boolean[] selection, TsBlock tsBlock, Consumer<Long> filterRowsRecorder) {
Comment thread
alpass163 marked this conversation as resolved.
Outdated

int inputCount = countSelectedRows(selection);
boolean[] result = satisfyTsBlock(selection, tsBlock);
int outputCount = countSelectedRows(result);
if (inputCount > outputCount) {
filterRowsRecorder.accept((long) (inputCount - outputCount));
}

return result;
}

private static int countSelectedRows(boolean[] selection) {
if (selection == null) return 0;
int count = 0;
int length = selection.length;
int i = 0;

// calculate multi times
Comment thread
alpass163 marked this conversation as resolved.
Outdated
for (; i < length - 7; i += 8) {
count +=
(selection[i] ? 1 : 0)
+ (selection[i + 1] ? 1 : 0)
+ (selection[i + 2] ? 1 : 0)
+ (selection[i + 3] ? 1 : 0)
+ (selection[i + 4] ? 1 : 0)
+ (selection[i + 5] ? 1 : 0)
+ (selection[i + 6] ? 1 : 0)
+ (selection[i + 7] ? 1 : 0);
}
for (; i < length; i++) {
count += (selection[i] ? 1 : 0);
}
Comment thread
alpass163 marked this conversation as resolved.
Outdated

return count;
}

/**
* To examine whether the block can be skipped.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;

public interface IPageReader extends IMetadata {

Expand All @@ -39,6 +40,8 @@ default BatchData getAllSatisfiedPageData() throws IOException {

TsBlock getAllSatisfiedData() throws IOException;

TsBlock getAllSatisfiedData(Consumer<Long> filterRowsRecorder) throws IOException;
Comment thread
alpass163 marked this conversation as resolved.
Outdated
Comment thread
alpass163 marked this conversation as resolved.
Outdated

void addRecordFilter(Filter filter);

// The 'modified' property is also true when a data type need to be modified in query and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;

import static java.util.Objects.requireNonNull;
import static org.apache.tsfile.read.reader.series.PaginationController.UNLIMITED_PAGINATION_CONTROLLER;

public abstract class AbstractAlignedPageReader implements IPageReader {
Expand Down Expand Up @@ -215,6 +217,55 @@ public TsBlock getAllSatisfiedData() throws IOException {
unFilteredBlock, builder, pushDownFilter, paginationController);
}

/**
* get all satisfied data while record the number of filter rows. if one tuple is not satisfy by
* the filter and is deleted at the same time, the tuple cannot be considered as a filtered data.
*/
@Override
public TsBlock getAllSatisfiedData(Consumer<Long> filterRowsRecorder) throws IOException {
Comment thread
alpass163 marked this conversation as resolved.
Outdated
requireNonNull(filterRowsRecorder, "filterRowsRecorder is null");
long[] timeBatch = timePageReader.getNextTimeBatch();

if (allPageDataSatisfy()) {
buildResultWithoutAnyFilterAndDelete(timeBatch);
return builder.build();
}

long allFilteredRows = 0;
// if !filter.satisfy, discard this row
boolean[] keepCurrentRow = new boolean[timeBatch.length];
boolean globalTimeFilterAllSatisfy = globalTimeFilterAllSatisfy();
if (globalTimeFilterAllSatisfy) {
Arrays.fill(keepCurrentRow, true);
} else {
// record the filtered rows number
long filteredRows =
updateKeepCurrentRowThroughGlobalTimeFilterWithRecord(keepCurrentRow, timeBatch);
allFilteredRows += filteredRows;
}

if (timePageReader.isModified()) {
// if one row is deleted, it can't be considered as the filtered row
long deletedAndFilteredRows =
updateKeepCurrentRowThroughDeletionWithRecord(keepCurrentRow, timeBatch);
allFilteredRows -= deletedAndFilteredRows;
}
if (allFilteredRows != 0) {
filterRowsRecorder.accept(allFilteredRows);
}
boolean pushDownFilterAllSatisfy = pushDownFilterAllSatisfy();
constructResult(keepCurrentRow, timeBatch, pushDownFilterAllSatisfy);

TsBlock unFilteredBlock = builder.build();
if (pushDownFilterAllSatisfy) {
// OFFSET & LIMIT has been consumed in buildTimeColumn
return unFilteredBlock;
}
builder.reset();
return TsBlockUtil.applyFilterAndLimitOffsetToTsBlock(
unFilteredBlock, builder, pushDownFilter, paginationController, filterRowsRecorder);
}

private void buildResultWithoutAnyFilterAndDelete(long[] timeBatch) throws IOException {
if (paginationController.hasCurOffset(timeBatch.length)) {
paginationController.consumeOffset(timeBatch.length);
Expand Down Expand Up @@ -264,6 +315,17 @@ private void updateKeepCurrentRowThroughGlobalTimeFilter(
}
}

private long updateKeepCurrentRowThroughGlobalTimeFilterWithRecord(
boolean[] keepCurrentRow, long[] timeBatch) {

long filteredRows = 0;
for (int i = 0, n = timeBatch.length; i < n; i++) {
keepCurrentRow[i] = globalTimeFilter.satisfy(timeBatch[i], null);
filteredRows += keepCurrentRow[i] ? 0 : 1;
}
return filteredRows;
}

private void updateKeepCurrentRowThroughDeletion(boolean[] keepCurrentRow, long[] timeBatch) {
for (int i = 0, n = timeBatch.length; i < n; i++) {
if (keepCurrentRow[i]) {
Expand All @@ -272,6 +334,19 @@ private void updateKeepCurrentRowThroughDeletion(boolean[] keepCurrentRow, long[
}
}

private long updateKeepCurrentRowThroughDeletionWithRecord(
boolean[] keepCurrentRow, long[] timeBatch) {
long deletedAndFilteredRows = 0;
for (int i = 0, n = timeBatch.length; i < n; i++) {
if (keepCurrentRow[i]) {
keepCurrentRow[i] = !timePageReader.isDeleted(timeBatch[i]);
} else {
deletedAndFilteredRows += timePageReader.isDeleted(timeBatch[i]) ? 1 : 0;
}
}
return deletedAndFilteredRows;
}

protected int buildTimeColumn(
long[] timeBatch, boolean[] keepCurrentRow, boolean pushDownFilterAllSatisfy) {
if (pushDownFilterAllSatisfy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;

import static org.apache.tsfile.read.reader.series.PaginationController.UNLIMITED_PAGINATION_CONTROLLER;
import static org.apache.tsfile.utils.Preconditions.checkArgument;
Expand Down Expand Up @@ -212,6 +213,11 @@ public BatchData getAllSatisfiedPageData(boolean ascending) throws IOException {

@Override
public TsBlock getAllSatisfiedData() throws IOException {
return getAllSatisfiedData(null);
}

@Override
public TsBlock getAllSatisfiedData(Consumer<Long> filterRowsRecorder) throws IOException {
Comment thread
alpass163 marked this conversation as resolved.
Outdated
uncompressDataIfNecessary();
TsBlockBuilder builder;
int initialExpectedEntries = (int) pageHeader.getStatistics().getCount();
Expand All @@ -223,14 +229,18 @@ public TsBlock getAllSatisfiedData() throws IOException {

TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();
ColumnBuilder valueBuilder = builder.getColumnBuilder(0);
long allFilteredRows = 0;
boolean allSatisfy = recordFilter == null || recordFilter.allSatisfy(this);
switch (dataType) {
case BOOLEAN:
while (timeDecoder.hasNext(timeBuffer)) {
long timestamp = timeDecoder.readLong(timeBuffer);
boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
if (isDeleted(timestamp)
|| (!allSatisfy && !recordFilter.satisfyBoolean(timestamp, aBoolean))) {
if (isDeleted(timestamp)) {
continue;
}
if (!allSatisfy && !recordFilter.satisfyBoolean(timestamp, aBoolean)) {
allFilteredRows++;
continue;
}
if (paginationController.hasCurOffset()) {
Expand All @@ -252,8 +262,11 @@ public TsBlock getAllSatisfiedData() throws IOException {
while (timeDecoder.hasNext(timeBuffer)) {
long timestamp = timeDecoder.readLong(timeBuffer);
int anInt = valueDecoder.readInt(valueBuffer);
if (isDeleted(timestamp)
|| (!allSatisfy && !recordFilter.satisfyInteger(timestamp, anInt))) {
if (isDeleted(timestamp)) {
continue;
}
if (!allSatisfy && !recordFilter.satisfyInteger(timestamp, anInt)) {
allFilteredRows++;
continue;
}
if (paginationController.hasCurOffset()) {
Expand All @@ -275,8 +288,11 @@ public TsBlock getAllSatisfiedData() throws IOException {
while (timeDecoder.hasNext(timeBuffer)) {
long timestamp = timeDecoder.readLong(timeBuffer);
long aLong = valueDecoder.readLong(valueBuffer);
if (isDeleted(timestamp)
|| (!allSatisfy && !recordFilter.satisfyLong(timestamp, aLong))) {
if (isDeleted(timestamp)) {
continue;
}
if (!allSatisfy && !recordFilter.satisfyLong(timestamp, aLong)) {
allFilteredRows++;
continue;
}
if (paginationController.hasCurOffset()) {
Expand All @@ -297,8 +313,11 @@ public TsBlock getAllSatisfiedData() throws IOException {
while (timeDecoder.hasNext(timeBuffer)) {
long timestamp = timeDecoder.readLong(timeBuffer);
float aFloat = valueDecoder.readFloat(valueBuffer);
if (isDeleted(timestamp)
|| (!allSatisfy && !recordFilter.satisfyFloat(timestamp, aFloat))) {
if (isDeleted(timestamp)) {
continue;
}
if (!allSatisfy && !recordFilter.satisfyFloat(timestamp, aFloat)) {
allFilteredRows++;
continue;
}
if (paginationController.hasCurOffset()) {
Expand All @@ -319,8 +338,11 @@ public TsBlock getAllSatisfiedData() throws IOException {
while (timeDecoder.hasNext(timeBuffer)) {
long timestamp = timeDecoder.readLong(timeBuffer);
double aDouble = valueDecoder.readDouble(valueBuffer);
if (isDeleted(timestamp)
|| (!allSatisfy && !recordFilter.satisfyDouble(timestamp, aDouble))) {
if (isDeleted(timestamp)) {
continue;
}
if (!allSatisfy && !recordFilter.satisfyDouble(timestamp, aDouble)) {
allFilteredRows++;
continue;
}
if (paginationController.hasCurOffset()) {
Expand All @@ -344,8 +366,11 @@ public TsBlock getAllSatisfiedData() throws IOException {
while (timeDecoder.hasNext(timeBuffer)) {
long timestamp = timeDecoder.readLong(timeBuffer);
Binary aBinary = valueDecoder.readBinary(valueBuffer);
if (isDeleted(timestamp)
|| (!allSatisfy && !recordFilter.satisfyBinary(timestamp, aBinary))) {
if (isDeleted(timestamp)) {
continue;
}
if (!allSatisfy && !recordFilter.satisfyBinary(timestamp, aBinary)) {
allFilteredRows++;
continue;
}
if (paginationController.hasCurOffset()) {
Expand All @@ -365,6 +390,9 @@ public TsBlock getAllSatisfiedData() throws IOException {
default:
throw new UnSupportedDataTypeException(String.valueOf(dataType));
}
if (filterRowsRecorder != null && allFilteredRows > 0) {
filterRowsRecorder.accept(allFilteredRows);
}
return builder.build();
}

Expand Down
Loading