diff --git a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java index a3130736bc7..387698ea1d5 100644 --- a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java +++ b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java @@ -95,6 +95,7 @@ public class GreptimeDbDataStorage extends AbstractHistoryDataStorage { private static final String LOG_TABLE_NAME = WarehouseConstants.LOG_TABLE_NAME; private static final String LABEL_KEY_START_TIME = "start"; private static final String LABEL_KEY_END_TIME = "end"; + private static final String LABEL_KEY_TS = "ts"; private static final int LOG_BATCH_SIZE = 500; private GreptimeDB greptimeDb; @@ -151,6 +152,8 @@ public void saveData(CollectRep.MetricsData metricsData) { tableSchemaBuilder.addTag("instance", DataType.String) .addTimestamp("ts", DataType.TimestampMillisecond); List fields = metricsData.getFields(); + Map customLabels = metricsData.getLabels(); + List fieldNames = fields.stream().map(CollectRep.Field::getName).collect(Collectors.toList()); fields.forEach(field -> { if (field.getLabel()) { tableSchemaBuilder.addTag(field.getName(), DataType.String); @@ -162,9 +165,19 @@ public void saveData(CollectRep.MetricsData metricsData) { } } }); + List labelKeys = new LinkedList<>(); + if (!Objects.isNull(customLabels) && !customLabels.isEmpty()) { + for (Map.Entry label : customLabels.entrySet()) { + String key = label.getKey(); + if (!LABEL_KEY_INSTANCE.equals(key) && !LABEL_KEY_TS.equals(key) && !fieldNames.contains(key)) { + tableSchemaBuilder.addTag(key, DataType.String); + labelKeys.add(key); + } + } + } Table table = Table.from(tableSchemaBuilder.build()); long now = System.currentTimeMillis(); - Object[] values = new Object[2 + fields.size()]; + Object[] values = new Object[2 + fields.size() + labelKeys.size()]; values[0] = instance; values[1] = now; RowWrapper rowWrapper = metricsData.readRow(); @@ -194,6 +207,10 @@ public void saveData(CollectRep.MetricsData metricsData) { } }); + for (int i = 0; i < labelKeys.size(); i++) { + values[2 + fields.size() + i] = customLabels.get(labelKeys.get(i)); + } + table.addRow(values); } diff --git a/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorageTest.java b/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorageTest.java index c28130c493c..3de76446b6c 100644 --- a/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorageTest.java +++ b/hertzbeat-warehouse/src/test/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorageTest.java @@ -38,12 +38,17 @@ import io.greptime.models.Result; import io.greptime.models.Table; import io.greptime.models.WriteOk; +import io.greptime.v1.Common; +import io.greptime.v1.RowData; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import org.apache.hertzbeat.common.constants.CommonConstants; import org.apache.hertzbeat.common.entity.arrow.ArrowCell; import org.apache.hertzbeat.common.entity.arrow.RowWrapper; @@ -150,6 +155,55 @@ void testSaveData() { } } + @Test + void testSaveDataWithCustomLabels() throws Exception { + try (MockedStatic mockedStatic = mockStatic(GreptimeDB.class)) { + mockedStatic.when(() -> GreptimeDB.create(any())).thenReturn(greptimeDb); + + Result mockResult = mock(Result.class); + when(mockResult.isOk()).thenReturn(true); + CompletableFuture> mockFuture = CompletableFuture.completedFuture(mockResult); + when(greptimeDb.write(any(Table.class))).thenReturn(mockFuture); + + greptimeDbDataStorage = new GreptimeDbDataStorage(greptimeProperties, restTemplate, greptimeSqlQueryExecutor); + + CollectRep.MetricsData metricsData = createMockMetricsData(true); + Map customLabels = new LinkedHashMap<>(); + customLabels.put("instance", "bad_instance"); + customLabels.put("ts", "bad_ts"); + customLabels.put("usage", "bad_usage"); + customLabels.put("env", "prod"); + when(metricsData.getLabels()).thenReturn(customLabels); + when(metricsData.getInstance()).thenReturn("server1"); + + ArgumentCaptor tableCaptor = ArgumentCaptor.forClass(Table.class); + greptimeDbDataStorage.saveData(metricsData); + + verify(greptimeDb).write(tableCaptor.capture()); + Table capturedTable = tableCaptor.getValue(); + + List columnSchemas = getColumnSchemas(capturedTable); + List columnNames = columnSchemas.stream() + .map(RowData.ColumnSchema::getColumnName) + .collect(Collectors.toList()); + assertEquals(5, columnNames.size()); + assertEquals(2, Collections.frequency(columnNames, "instance")); + assertEquals(1, Collections.frequency(columnNames, "ts")); + assertEquals(1, Collections.frequency(columnNames, "usage")); + + List envColumnSchemas = columnSchemas.stream() + .filter(columnSchema -> "env".equals(columnSchema.getColumnName())) + .collect(Collectors.toList()); + assertEquals(1, envColumnSchemas.size()); + assertEquals(Common.SemanticType.TAG, envColumnSchemas.get(0).getSemanticType()); + + List rows = getRows(capturedTable); + assertEquals(1, rows.size()); + RowData.Row row = rows.get(0); + assertEquals("prod", row.getValuesList().get(4).getStringValue()); + } + } + @Test void testGetHistoryMetricData() { greptimeDbDataStorage = new GreptimeDbDataStorage(greptimeProperties, restTemplate, greptimeSqlQueryExecutor); @@ -387,4 +441,20 @@ private List> createMockLogRows() { return List.of(row); } + + private List getColumnSchemas(Table table) throws Exception { + Field columnSchemasField = table.getClass().getDeclaredField("columnSchemas"); + columnSchemasField.setAccessible(true); + @SuppressWarnings("unchecked") + List columnSchemas = (List) columnSchemasField.get(table); + return columnSchemas; + } + + private List getRows(Table table) throws Exception { + Field rowsField = table.getClass().getDeclaredField("rows"); + rowsField.setAccessible(true); + @SuppressWarnings("unchecked") + List rows = (List) rowsField.get(table); + return rows; + } }