Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,20 @@
import java.util.stream.Collectors;

public class AlignedChunkGroupWriterImpl implements IChunkGroupWriter {
private static final Logger LOG = LoggerFactory.getLogger(AlignedChunkGroupWriterImpl.class);
protected static final Logger LOG = LoggerFactory.getLogger(AlignedChunkGroupWriterImpl.class);

private final IDeviceID deviceId;
protected final IDeviceID deviceId;

// measurementID -> ValueChunkWriter
private final Map<String, ValueChunkWriter> valueChunkWriterMap = new LinkedHashMap<>();
protected final Map<String, ValueChunkWriter> valueChunkWriterMap = new LinkedHashMap<>();

private final TimeChunkWriter timeChunkWriter;
protected final TimeChunkWriter timeChunkWriter;

private final EncryptParameter encryprParam;
protected final EncryptParameter encryprParam;

private long lastTime = Long.MIN_VALUE;
private boolean isInitLastTime = false;
private boolean convertColumnNameToLowerCase = false;
protected long lastTime = Long.MIN_VALUE;
protected boolean isInitLastTime = false;
protected boolean convertColumnNameToLowerCase = false;

public AlignedChunkGroupWriterImpl(IDeviceID deviceId) {
this.deviceId = deviceId;
Expand Down Expand Up @@ -392,7 +392,7 @@ private void writeEmptyDataInOneRow(List<ValueChunkWriter> valueChunkWriterList)
* check occupied memory size, if it exceeds the PageSize threshold, construct a page and put it
* to pageBuffer
*/
private boolean checkPageSizeAndMayOpenANewPage() {
protected boolean checkPageSizeAndMayOpenANewPage() {
if (timeChunkWriter.checkPageSizeAndMayOpenANewPage()) {
return true;
}
Expand All @@ -404,21 +404,21 @@ private boolean checkPageSizeAndMayOpenANewPage() {
return false;
}

private void writePageToPageBuffer() {
protected void writePageToPageBuffer() {
timeChunkWriter.writePageToPageBuffer();
for (ValueChunkWriter valueChunkWriter : valueChunkWriterMap.values()) {
valueChunkWriter.writePageToPageBuffer();
}
}

private void sealAllChunks() {
protected void sealAllChunks() {
timeChunkWriter.sealCurrentPage();
for (ValueChunkWriter valueChunkWriter : valueChunkWriterMap.values()) {
valueChunkWriter.sealCurrentPage();
}
}

private void checkIsHistoryData(long time) throws WriteProcessException {
protected void checkIsHistoryData(long time) throws WriteProcessException {
if (isInitLastTime && time <= lastTime) {
throw new WriteProcessException(
"Not allowed to write out-of-order data in timeseries "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@

package org.apache.tsfile.write.chunk;

import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.encrypt.EncryptParameter;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.write.UnSupportedDataTypeException;
import org.apache.tsfile.write.schema.IMeasurementSchema;

import java.io.IOException;
import java.util.List;

public class TableChunkGroupWriterImpl extends AlignedChunkGroupWriterImpl {

Expand All @@ -33,4 +39,65 @@ public TableChunkGroupWriterImpl(IDeviceID deviceId, EncryptParameter encryptPar
super(deviceId, encryptParam);
setConvertColumnNameToLowerCase(true);
}

public int write(
Column timeColumn,
Column[] valueColumns,
List<IMeasurementSchema> measurementSchemas,
int startRowIndex,
int endRowIndex)
throws IOException {
int pointCount = 0;
ValueChunkWriter[] valueChunkWriters = new ValueChunkWriter[valueColumns.length];
for (int i = 0; i < measurementSchemas.size(); i++) {
valueChunkWriters[i] = tryToAddSeriesWriterInternal(measurementSchemas.get(i));
}
for (int rowIndex = startRowIndex; rowIndex < endRowIndex; rowIndex++) {
long time = timeColumn.getLong(rowIndex);
for (int valueColumnIndex = 0; valueColumnIndex < valueColumns.length; valueColumnIndex++) {
Column valueColumn = valueColumns[valueColumnIndex];
IMeasurementSchema measurementSchema = measurementSchemas.get(valueColumnIndex);
ValueChunkWriter valueChunkWriter = valueChunkWriters[rowIndex];
boolean isNull = valueColumn.isNull(rowIndex);
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In write(...), the ValueChunkWriter is indexed by rowIndex (valueChunkWriters[rowIndex]) instead of by valueColumnIndex. This will throw ArrayIndexOutOfBoundsException as soon as endRowIndex exceeds the number of columns, and it also writes data into the wrong series. Use the value-column index when selecting the writer, and consider validating measurementSchemas.size() matches valueColumns.length before initializing/iterating to avoid AIOOB or null writers. Also, this method currently skips the out-of-order check (checkIsHistoryData(time)) that other chunk writers perform, so out-of-order writes may silently corrupt state; it should apply the same check (and adjust the throws signature accordingly).

Copilot uses AI. Check for mistakes.
switch (measurementSchema.getType()) {
case BOOLEAN:
valueChunkWriter.write(time, isNull ? false : valueColumn.getBoolean(rowIndex), isNull);
break;
case INT32:
case DATE:
valueChunkWriter.write(time, isNull ? 0 : valueColumn.getInt(rowIndex), isNull);
break;
case INT64:
case TIMESTAMP:
valueChunkWriter.write(time, isNull ? 0 : valueColumn.getLong(rowIndex), isNull);
break;
case FLOAT:
valueChunkWriter.write(time, isNull ? 0 : valueColumn.getFloat(rowIndex), isNull);
break;
case DOUBLE:
valueChunkWriter.write(time, isNull ? 0 : valueColumn.getDouble(rowIndex), isNull);
break;
case TEXT:
case BLOB:
case STRING:
case OBJECT:
valueChunkWriter.write(time, isNull ? null : valueColumn.getBinary(rowIndex), isNull);
break;
default:
throw new UnSupportedDataTypeException(
String.format(
"Data type %s is not supported.",
measurementSchemas.get(valueColumnIndex).getType()));
}
}
timeChunkWriter.write(time);
lastTime = time;
isInitLastTime = true;
if (checkPageSizeAndMayOpenANewPage()) {
writePageToPageBuffer();
}
pointCount++;
}
return pointCount;
}
}
Loading
Loading