Skip to content

Commit 9d1b07e

Browse files
Set Blob
1 parent abd9ec2 commit 9d1b07e

3 files changed

Lines changed: 177 additions & 0 deletions

File tree

src/core/parquet_transformer.c

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,26 @@ static struct eb_transformer* parquet_transformer_clone(const struct eb_transfor
4848
static eb_status_t read_metadata_from_meta_file(const char* hash_str, char** source, char** model, char** timestamp);
4949
static void extract_schema_metadata(GArrowSchema* schema, char** hash, char** source, char** model, char** timestamp, char** dimensions);
5050

51+
/* Thread local storage for document text */
52+
static __thread char* tls_document_text = NULL;
53+
54+
/**
55+
* Set document text for the next parquet transform operation
56+
* Text will be stored in the 'blob' column and cleared after transform
57+
*
58+
* @param text Document text to store in blob column (NULL to clear)
59+
*/
60+
void eb_parquet_set_document_text(const char* text) {
61+
if (tls_document_text) {
62+
free(tls_document_text);
63+
tls_document_text = NULL;
64+
}
65+
66+
if (text) {
67+
tls_document_text = strdup(text);
68+
}
69+
}
70+
5171
/* Configuration structure for the Parquet transformer */
5272
typedef struct {
5373
int compression_level;
@@ -453,11 +473,15 @@ static eb_status_t parquet_transform(struct eb_transformer* transformer,
453473
/* Create metadata field (string type) */
454474
GArrowField* metadata_field = garrow_field_new("metadata", GARROW_DATA_TYPE(garrow_string_data_type_new()));
455475

476+
/* Create blob field (string type) */
477+
GArrowField* blob_field = garrow_field_new("blob", GARROW_DATA_TYPE(garrow_string_data_type_new()));
478+
456479
/* Create schema with fields */
457480
GList* fields = NULL;
458481
fields = g_list_append(fields, id_field);
459482
fields = g_list_append(fields, values_field);
460483
fields = g_list_append(fields, metadata_field);
484+
fields = g_list_append(fields, blob_field);
461485

462486
GArrowSchema* schema = garrow_schema_new(fields);
463487
g_list_free(fields);
@@ -557,6 +581,7 @@ static eb_status_t parquet_transform(struct eb_transformer* transformer,
557581
g_object_unref(values_field);
558582
g_object_unref(id_field);
559583
g_object_unref(metadata_field);
584+
g_object_unref(blob_field);
560585
g_object_unref(schema);
561586
if (metadata_json) free(metadata_json);
562587
if (need_to_free_decompressed) free(decompressed_data);
@@ -573,6 +598,7 @@ static eb_status_t parquet_transform(struct eb_transformer* transformer,
573598
g_object_unref(values_field);
574599
g_object_unref(id_field);
575600
g_object_unref(metadata_field);
601+
g_object_unref(blob_field);
576602

577603
/* Create arrays for each column */
578604
/* ID array (single string - the hash) */
@@ -677,11 +703,67 @@ static eb_status_t parquet_transform(struct eb_transformer* transformer,
677703
/* Free metadata JSON string */
678704
if (metadata_json) free(metadata_json);
679705

706+
/* Create blob array (single string) */
707+
GArrowStringArrayBuilder* blob_builder = garrow_string_array_builder_new();
708+
709+
/* Create blob JSON */
710+
char* blob_json = NULL;
711+
if (tls_document_text) {
712+
/* Create JSON with document text */
713+
size_t blob_size = strlen(tls_document_text) + 50; /* Extra space for JSON format */
714+
blob_json = (char*)malloc(blob_size);
715+
if (blob_json) {
716+
snprintf(blob_json, blob_size, "{\"text\":\"%s\"}", tls_document_text);
717+
}
718+
719+
/* Clear TLS after use */
720+
free(tls_document_text);
721+
tls_document_text = NULL;
722+
} else {
723+
/* No document text available */
724+
blob_json = strdup("{}");
725+
}
726+
727+
if (!blob_json) {
728+
blob_json = strdup("{}");
729+
}
730+
731+
garrow_string_array_builder_append_string(blob_builder, blob_json, &error);
732+
if (error) {
733+
DEBUG_ERROR("Failed to append blob value: %s", error->message);
734+
g_object_unref(blob_builder);
735+
free(blob_json);
736+
g_object_unref(id_array);
737+
g_object_unref(values_array);
738+
g_object_unref(metadata_array);
739+
g_object_unref(schema);
740+
g_error_free(error);
741+
if (need_to_free_decompressed) free(decompressed_data);
742+
return EB_ERROR_IO;
743+
}
744+
745+
GArrowArray* blob_array = GARROW_ARRAY(garrow_array_builder_finish(
746+
GARROW_ARRAY_BUILDER(blob_builder), &error));
747+
g_object_unref(blob_builder);
748+
free(blob_json);
749+
750+
if (error) {
751+
DEBUG_ERROR("Failed to finish blob array: %s", error->message);
752+
g_object_unref(id_array);
753+
g_object_unref(values_array);
754+
g_object_unref(metadata_array);
755+
g_object_unref(schema);
756+
g_error_free(error);
757+
if (need_to_free_decompressed) free(decompressed_data);
758+
return EB_ERROR_IO;
759+
}
760+
680761
/* Create record batch from arrays */
681762
GList* arrays = NULL;
682763
arrays = g_list_append(arrays, id_array);
683764
arrays = g_list_append(arrays, values_array);
684765
arrays = g_list_append(arrays, metadata_array);
766+
arrays = g_list_append(arrays, blob_array);
685767

686768
GArrowRecordBatch* record_batch = garrow_record_batch_new(schema, 1, arrays, &error);
687769
if (error) {
@@ -1204,6 +1286,48 @@ static eb_status_t parquet_inverse_transform(struct eb_transformer* transformer,
12041286
DEBUG_INFO("Extracted metadata - model: %s", model_str);
12051287
}
12061288

1289+
/* Extract blob if present */
1290+
gint blob_col_index = garrow_table_get_n_columns(table) > 3 ? 3 : -1;
1291+
if (blob_col_index >= 0) {
1292+
DEBUG_INFO("Found blob column, extracting values");
1293+
GArrowChunkedArray *blob_chunked = garrow_table_get_column_data(table, blob_col_index);
1294+
if (blob_chunked && garrow_chunked_array_get_n_chunks(blob_chunked) > 0) {
1295+
GArrowArray *blob_array = garrow_chunked_array_get_chunk(blob_chunked, 0);
1296+
if (blob_array) {
1297+
GArrowStringArray *blob_string_array = GARROW_STRING_ARRAY(blob_array);
1298+
if (blob_string_array) {
1299+
const gchar *blob_json = garrow_string_array_get_string(blob_string_array, 0);
1300+
if (blob_json) {
1301+
DEBUG_INFO("Blob JSON: %s", blob_json);
1302+
1303+
/* Parse JSON to extract text if needed */
1304+
const char* text_start = strstr(blob_json, "\"text\":\"");
1305+
if (text_start) {
1306+
text_start += 8; /* Skip "text":"" */
1307+
const char* text_end = strchr(text_start, '\"');
1308+
if (text_end) {
1309+
size_t text_len = text_end - text_start;
1310+
1311+
/* Store the document text for potential future use */
1312+
char* extracted_text = (char*)malloc(text_len + 1);
1313+
if (extracted_text) {
1314+
strncpy(extracted_text, text_start, text_len);
1315+
extracted_text[text_len] = '\0';
1316+
1317+
/* Set as TLS for next operation if needed */
1318+
eb_parquet_set_document_text(extracted_text);
1319+
free(extracted_text);
1320+
}
1321+
}
1322+
}
1323+
}
1324+
}
1325+
g_object_unref(blob_array);
1326+
}
1327+
g_object_unref(blob_chunked);
1328+
}
1329+
}
1330+
12071331
/* Free all objects in reverse order of creation, with proper NULL checks */
12081332
if (data_type && G_IS_OBJECT(data_type)) g_object_unref(data_type);
12091333
if (field && G_IS_OBJECT(field)) g_object_unref(field);

src/core/parquet_transformer.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,12 @@ struct eb_transformer *eb_parquet_transformer_create(int compression_level);
4949
*/
5050
eb_status_t eb_register_parquet_transformer(void);
5151

52+
/**
53+
* Set document text for the next parquet transform operation
54+
* Text will be stored in the 'blob' column and cleared after transform
55+
*
56+
* @param text Document text to store in blob column (NULL to clear)
57+
*/
58+
void eb_parquet_set_document_text(const char* text);
59+
5260
#endif /* EB_PARQUET_TRANSFORMER_H */

src/core/transport_s3.c

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,7 @@ static int s3_send_data(eb_transport_t *transport, const void *data, size_t size
404404
if (meta_file) {
405405
time_t file_timestamp = 0;
406406
char provider[128] = {0};
407+
char source_file_path[PATH_MAX] = {0}; // Store the source file path
407408

408409
char meta_line[1024];
409410
while (fgets(meta_line, sizeof(meta_line), meta_file)) {
@@ -423,10 +424,54 @@ static int s3_send_data(eb_transport_t *transport, const void *data, size_t size
423424
file_timestamp = atol(meta_line + 10);
424425
DEBUG_INFO("Found timestamp in metadata: %ld", (long)file_timestamp);
425426
}
427+
428+
/* Look for source file field */
429+
if (strncmp(meta_line, "source_file=", 12) == 0) {
430+
strncpy(source_file_path, meta_line + 12, sizeof(source_file_path) - 1);
431+
432+
/* Remove newline if present */
433+
char *newline = strchr(source_file_path, '\n');
434+
if (newline) *newline = '\0';
435+
436+
DEBUG_INFO("Found source file in metadata: %s", source_file_path);
437+
}
426438
}
427439

428440
fclose(meta_file);
429441

442+
/* Load source document text for blob field if source file exists */
443+
if (source_file_path[0] != '\0') {
444+
FILE *source_file = fopen(source_file_path, "r");
445+
if (source_file) {
446+
DEBUG_INFO("Reading document text from source: %s", source_file_path);
447+
448+
/* Determine file size */
449+
fseek(source_file, 0, SEEK_END);
450+
long source_size = ftell(source_file);
451+
fseek(source_file, 0, SEEK_SET);
452+
453+
/* Allocate buffer for document text */
454+
char *document_text = (char*)malloc(source_size + 1);
455+
if (document_text) {
456+
size_t bytes_read = fread(document_text, 1, source_size, source_file);
457+
document_text[bytes_read] = '\0';
458+
459+
/* Set document text for Parquet transformer */
460+
extern void eb_parquet_set_document_text(const char* text);
461+
DEBUG_INFO("Setting document text for blob field (%zu bytes)", bytes_read);
462+
eb_parquet_set_document_text(document_text);
463+
464+
free(document_text);
465+
} else {
466+
DEBUG_ERROR("Failed to allocate memory for document text");
467+
}
468+
469+
fclose(source_file);
470+
} else {
471+
DEBUG_WARN("Could not open source file: %s", source_file_path);
472+
}
473+
}
474+
430475
/* Use the metadata information */
431476
if (file_timestamp > 0) {
432477
now = file_timestamp;

0 commit comments

Comments
 (0)