Skip to content

Commit 2523ceb

Browse files
committed
Refactor correlation and regression accumulators to improve code clarity by removing unnecessary comments and whitespace
1 parent dedfbc5 commit 2523ceb

9 files changed

Lines changed: 28 additions & 60 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CentralMomentAccumulator.java

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ public enum MomentType {
3535
private final TSDataType seriesDataType;
3636
private final MomentType momentType;
3737

38-
// State variables: count, mean, M2, M3, M4
3938
private long count;
4039
private double mean;
4140
private double m2;
@@ -49,7 +48,7 @@ public CentralMomentAccumulator(TSDataType seriesDataType, MomentType momentType
4948

5049
@Override
5150
public void addInput(Column[] columns, BitMap bitMap) {
52-
// Tree model: columns[0] is Time, columns[1] is data
51+
5352
int size = columns[1].getPositionCount();
5453
for (int i = 0; i < size; i++) {
5554
if (bitMap != null && !bitMap.isMarked(i)) {
@@ -91,13 +90,10 @@ private void update(double value) {
9190

9291
mean += delta_n;
9392

94-
// 更新 M4 (顺序很重要,必须在更新 M3, M2 之前)
9593
m4 += term1 * delta_n2 * (count * count - 3 * count + 3) + 6 * delta_n2 * m2 - 4 * delta_n * m3;
9694

97-
// 更新 M3
9895
m3 += term1 * delta_n * (count - 2) - 3 * delta_n * m2;
9996

100-
// 更新 M2
10197
m2 += term1;
10298
}
10399

@@ -135,24 +131,19 @@ private void merge(long nB, double meanB, double m2B, double m3B, double m4B) {
135131
double delta3 = delta * delta2;
136132
double delta4 = delta2 * delta2;
137133

138-
// 合并公式 (Chan et al.)
139-
// M4 合并
140134
m4 +=
141135
m4B
142136
+ delta4 * nA * nB * (nA * nA - nA * nB + nB * nB) / (nTotal * nTotal * nTotal)
143137
+ 6.0 * delta2 * (nA * nA * m2B + nB * nB * m2) / (nTotal * nTotal)
144138
+ 4.0 * delta * (nA * m3B - nB * m3) / nTotal;
145139

146-
// M3 合并
147140
m3 +=
148141
m3B
149142
+ delta3 * nA * nB * (nA - nB) / (nTotal * nTotal)
150143
+ 3.0 * delta * (nA * m2B - nB * m2) / nTotal;
151144

152-
// M2 合并
153145
m2 += m2B + delta2 * nA * nB / nTotal;
154146

155-
// Mean 合并
156147
mean += delta * nB / nTotal;
157148
count = nTotal;
158149
}
@@ -164,7 +155,7 @@ public void outputIntermediate(ColumnBuilder[] columnBuilders) {
164155
if (count == 0) {
165156
columnBuilders[0].appendNull();
166157
} else {
167-
// 序列化: long + 4 * double = 40 bytes
158+
168159
byte[] bytes = new byte[40];
169160
ByteBuffer buffer = ByteBuffer.wrap(bytes);
170161
buffer.putLong(count);
@@ -178,27 +169,26 @@ public void outputIntermediate(ColumnBuilder[] columnBuilders) {
178169

179170
@Override
180171
public void outputFinal(ColumnBuilder columnBuilder) {
181-
if (count == 0 || m2 == 0) { // 方差为0或无数据
172+
if (count == 0 || m2 == 0) {
182173
columnBuilder.appendNull();
183174
return;
184175
}
185176

186177
if (momentType == MomentType.SKEWNESS) {
187-
if (count < 3) { // 偏度要求 N >= 3
178+
if (count < 3) {
188179
columnBuilder.appendNull();
189180
} else {
190-
// 无偏估计公式: (N * M3) / ((N-1)*(N-2) * sigma^3)
191-
// sigma = sqrt(M2 / (N-1))
181+
192182
double variance = m2 / (count - 1);
193183
double stdev = Math.sqrt(variance);
194184
double result = (count * m3) / ((count - 1) * (count - 2) * stdev * stdev * stdev);
195185
columnBuilder.writeDouble(result);
196186
}
197-
} else { // KURTOSIS
198-
if (count < 4) { // 峰度要求 N >= 4
187+
} else {
188+
if (count < 4) {
199189
columnBuilder.appendNull();
200190
} else {
201-
// 无偏估计公式 (超额峰度 Excess Kurtosis)
191+
202192
double variance = m2 / (count - 1);
203193
double term1 =
204194
(count * (count + 1) * m4)
@@ -209,7 +199,6 @@ public void outputFinal(ColumnBuilder columnBuilder) {
209199
}
210200
}
211201

212-
// 默认实现
213202
@Override
214203
public void removeIntermediate(Column[] input) {
215204
throw new UnsupportedOperationException();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/CorrelationAccumulator.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ public enum CorrelationType {
4444
private long count;
4545
private double meanX;
4646
private double meanY;
47-
private double m2X; // sum((x - meanX)^2)
48-
private double m2Y; // sum((y - meanY)^2)
49-
private double c2; // sum((x - meanX) * (y - meanY))
47+
private double m2X;
48+
private double m2Y;
49+
private double c2;
5050

5151
public CorrelationAccumulator(TSDataType[] seriesDataTypes, CorrelationType correlationType) {
5252
this.seriesDataTypes = seriesDataTypes;
@@ -55,8 +55,7 @@ public CorrelationAccumulator(TSDataType[] seriesDataTypes, CorrelationType corr
5555

5656
@Override
5757
public void addInput(Column[] columns, BitMap bitMap) {
58-
// columns[0] is time column
59-
// columns[1] and columns[2] are the two data columns
58+
6059
int size = columns[0].getPositionCount();
6160
for (int i = 0; i < size; i++) {
6261
if (bitMap != null && !bitMap.isMarked(i)) {
@@ -97,8 +96,6 @@ private void update(double x, double y) {
9796
meanX += deltaX / newCount;
9897
meanY += deltaY / newCount;
9998

100-
// Welford's algorithm for covariance and variance
101-
// C2_new = C2_old + (x - meanX_old) * (y - meanY_new)
10299
c2 += deltaX * (y - meanY);
103100
m2X += deltaX * (x - meanX);
104101
m2Y += deltaY * (y - meanY);
@@ -147,7 +144,6 @@ private void merge(
147144
double deltaX = otherMeanX - meanX;
148145
double deltaY = otherMeanY - meanY;
149146

150-
// Merge formulas
151147
c2 += otherC2 + deltaX * deltaY * count * otherCount / newCount;
152148
m2X += otherM2X + deltaX * deltaX * count * otherCount / newCount;
153149
m2Y += otherM2Y + deltaY * deltaY * count * otherCount / newCount;
@@ -180,10 +176,10 @@ public void outputFinal(ColumnBuilder columnBuilder) {
180176
switch (correlationType) {
181177
case CORR:
182178
if (count < 2) {
183-
// Not enough data to calculate correlation
179+
184180
columnBuilder.appendNull();
185181
} else if (m2X == 0 || m2Y == 0) {
186-
// If either variable has zero variance (all values the same), correlation is 0
182+
187183
columnBuilder.writeDouble(0.0);
188184
} else {
189185
columnBuilder.writeDouble(c2 / Math.sqrt(m2X * m2Y));
@@ -210,7 +206,7 @@ public void outputFinal(ColumnBuilder columnBuilder) {
210206

211207
@Override
212208
public void removeIntermediate(Column[] input) {
213-
// Optional: sliding window logic implementation if needed, otherwise throw exception
209+
214210
throw new UnsupportedOperationException("Remove not implemented for Correlation");
215211
}
216212

@@ -220,9 +216,7 @@ public void addStatistics(Statistics statistics) {
220216
}
221217

222218
@Override
223-
public void setFinal(Column finalResult) {
224-
// No-op for this accumulator typically
225-
}
219+
public void setFinal(Column finalResult) {}
226220

227221
@Override
228222
public void reset() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/RegressionAccumulator.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,11 @@ public enum RegressionType {
3535
private final TSDataType[] seriesDataTypes;
3636
private final RegressionType regressionType;
3737

38-
// 状态变量 (不需要 m2Y)
3938
private long count;
4039
private double meanX;
4140
private double meanY;
42-
private double m2X; // Sum((x - meanX)^2)
43-
private double c2; // Sum((x - meanX) * (y - meanY))
41+
private double m2X;
42+
private double c2;
4443

4544
public RegressionAccumulator(TSDataType[] seriesDataTypes, RegressionType regressionType) {
4645
this.seriesDataTypes = seriesDataTypes;
@@ -49,8 +48,6 @@ public RegressionAccumulator(TSDataType[] seriesDataTypes, RegressionType regres
4948

5049
@Override
5150
public void addInput(Column[] columns, BitMap bitMap) {
52-
// Tree 模型: columns[0] 是 Time
53-
// REGR_SLOPE(y, x) -> columns[1] 是 y, columns[2] 是 x
5451

5552
int size = columns[1].getPositionCount();
5653
for (int i = 0; i < size; i++) {
@@ -61,8 +58,8 @@ public void addInput(Column[] columns, BitMap bitMap) {
6158
continue;
6259
}
6360

64-
double y = getDoubleValue(columns[1], i, seriesDataTypes[0]); // Arg1: Y (因变量)
65-
double x = getDoubleValue(columns[2], i, seriesDataTypes[1]); // Arg2: X (自变量)
61+
double y = getDoubleValue(columns[1], i, seriesDataTypes[0]);
62+
double x = getDoubleValue(columns[2], i, seriesDataTypes[1]);
6663

6764
update(x, y);
6865
}
@@ -92,7 +89,6 @@ private void update(double x, double y) {
9289
meanX += deltaX / newCount;
9390
meanY += deltaY / newCount;
9491

95-
// Welford Covariance & Variance
9692
c2 += deltaX * (y - meanY);
9793
m2X += deltaX * (x - meanX);
9894

@@ -133,7 +129,6 @@ private void merge(
133129
double deltaX = otherMeanX - meanX;
134130
double deltaY = otherMeanY - meanY;
135131

136-
// Merge Logic
137132
c2 += otherC2 + deltaX * deltaY * count * otherCount / newCount;
138133
m2X += otherM2X + deltaX * deltaX * count * otherCount / newCount;
139134

@@ -149,7 +144,6 @@ public void outputIntermediate(ColumnBuilder[] columnBuilders) {
149144
if (count == 0) {
150145
columnBuilders[0].appendNull();
151146
} else {
152-
// 序列化 5 个变量: long(8) + 4 * double(8) = 40 bytes
153147
byte[] bytes = new byte[40];
154148
ByteBuffer buffer = ByteBuffer.wrap(bytes);
155149
buffer.putLong(count);
@@ -168,7 +162,6 @@ public void outputFinal(ColumnBuilder columnBuilder) {
168162
return;
169163
}
170164

171-
// 如果 X 没有波动 (m2X=0), 斜率无法计算 (除以0), 返回 NULL
172165
if (m2X == 0) {
173166
columnBuilder.appendNull();
174167
return;
@@ -181,15 +174,13 @@ public void outputFinal(ColumnBuilder columnBuilder) {
181174
columnBuilder.writeDouble(slope);
182175
break;
183176
case REGR_INTERCEPT:
184-
// Intercept = MeanY - Slope * MeanX
185177
columnBuilder.writeDouble(meanY - slope * meanX);
186178
break;
187179
default:
188180
throw new UnsupportedOperationException("Unknown type: " + regressionType);
189181
}
190182
}
191183

192-
// 其他必须实现的接口方法
193184
@Override
194185
public void removeIntermediate(Column[] input) {
195186
throw new UnsupportedOperationException();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableCentralMomentAccumulator.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ public class TableCentralMomentAccumulator implements TableAccumulator {
3939
private final TSDataType seriesDataType;
4040
private final CentralMomentAccumulator.MomentType momentType;
4141

42-
// State
4342
private long count;
4443
private double mean;
4544
private double m2;
@@ -172,7 +171,7 @@ public void evaluateIntermediate(ColumnBuilder columnBuilder) {
172171
if (count == 0) {
173172
columnBuilder.appendNull();
174173
} else {
175-
// Serialize: long(8) + 4*double(32) = 40 bytes
174+
176175
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES + Double.BYTES * 4);
177176
buffer.putLong(count);
178177
buffer.putDouble(mean);
@@ -199,7 +198,7 @@ public void evaluateFinal(ColumnBuilder columnBuilder) {
199198
double result = (count * m3) / ((count - 1) * (count - 2) * stdev * stdev * stdev);
200199
columnBuilder.writeDouble(result);
201200
}
202-
} else { // KURTOSIS
201+
} else {
203202
if (count < 4) {
204203
columnBuilder.appendNull();
205204
} else {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableCorrelationAccumulator.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ public class TableCorrelationAccumulator implements TableAccumulator {
4242
private long count;
4343
private double meanX;
4444
private double meanY;
45-
private double m2X; // sum((x - meanX)^2)
46-
private double m2Y; // sum((y - meanY)^2)
47-
private double c2; // sum((x - meanX) * (y - meanY))
45+
private double m2X;
46+
private double m2Y;
47+
private double c2;
4848

4949
public TableCorrelationAccumulator(
5050
TSDataType xDataType,
@@ -118,7 +118,6 @@ private void update(double x, double y) {
118118
meanX += deltaX / newCount;
119119
meanY += deltaY / newCount;
120120

121-
// Welford's algorithm for covariance and variance
122121
c2 += deltaX * (y - meanY);
123122
m2X += deltaX * (x - meanX);
124123
m2Y += deltaY * (y - meanY);
@@ -180,7 +179,6 @@ private void merge(
180179
double deltaX = otherMeanX - meanX;
181180
double deltaY = otherMeanY - meanY;
182181

183-
// Merge formulas
184182
c2 += otherC2 + deltaX * deltaY * count * otherCount / newCount;
185183
m2X += otherM2X + deltaX * deltaX * count * otherCount / newCount;
186184
m2Y += otherM2Y + deltaY * deltaY * count * otherCount / newCount;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableRegressionAccumulator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public TableAccumulator copy() {
6666

6767
@Override
6868
public void addInput(Column[] arguments, AggregationMask mask) {
69-
// arguments[0] -> Y, arguments[1] -> X
69+
7070
int positionCount = mask.getSelectedPositionCount();
7171

7272
if (mask.isSelectAll()) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedCentralMomentAccumulator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ public void evaluateFinal(int groupId, ColumnBuilder columnBuilder) {
249249
double result = (count * m3) / ((count - 1) * (count - 2) * stdev * stdev * stdev);
250250
columnBuilder.writeDouble(result);
251251
}
252-
} else { // KURTOSIS
252+
} else {
253253
if (count < 4) {
254254
columnBuilder.appendNull();
255255
} else {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedCorrelationAccumulator.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,6 @@ private void update(int groupId, double x, double y) {
136136
meanXs.add(groupId, deltaX / newCount);
137137
meanYs.add(groupId, deltaY / newCount);
138138

139-
// Welford's algorithm for covariance and variance
140139
c2s.add(groupId, deltaX * (y - meanYs.get(groupId)));
141140
m2Xs.add(groupId, deltaX * (x - meanXs.get(groupId)));
142141
m2Ys.add(groupId, deltaY * (y - meanYs.get(groupId)));
@@ -194,7 +193,6 @@ private void merge(
194193
double deltaX = otherMeanX - meanXs.get(groupId);
195194
double deltaY = otherMeanY - meanYs.get(groupId);
196195

197-
// Merge formulas
198196
c2s.add(groupId, otherC2 + deltaX * deltaY * counts.get(groupId) * otherCount / newCount);
199197
m2Xs.add(groupId, otherM2X + deltaX * deltaX * counts.get(groupId) * otherCount / newCount);
200198
m2Ys.add(groupId, otherM2Y + deltaY * deltaY * counts.get(groupId) * otherCount / newCount);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedRegressionAccumulator.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public void setGroupCount(long groupCount) {
8383

8484
@Override
8585
public void addInput(int[] groupIds, Column[] arguments, AggregationMask mask) {
86-
// arguments[0] -> Y, arguments[1] -> X
86+
8787
int positionCount = mask.getSelectedPositionCount();
8888

8989
if (mask.isSelectAll()) {
@@ -135,7 +135,6 @@ private void update(int groupId, double x, double y) {
135135
meanXs.add(groupId, deltaX / newCount);
136136
meanYs.add(groupId, deltaY / newCount);
137137

138-
// Welford's algorithm for covariance and variance of X
139138
c2s.add(groupId, deltaX * (y - meanYs.get(groupId)));
140139
m2Xs.add(groupId, deltaX * (x - meanXs.get(groupId)));
141140

0 commit comments

Comments
 (0)