Skip to content

Commit fe1e2f9

Browse files
committed
Populate provenance nodes for spanner ingestion
1 parent 9b8e6f4 commit fe1e2f9

3 files changed

Lines changed: 56 additions & 29 deletions

File tree

pipeline/ingestion/src/main/java/org/datacommons/ingestion/pipeline/ImportGroupPipeline.java

Lines changed: 50 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
public class ImportGroupPipeline {
2929
private static final Logger LOGGER = LoggerFactory.getLogger(ImportGroupPipeline.class);
30+
private static final String IMPORT_METADATA_FILE = "import_metadata_mcf.mcf";
3031
private static final Counter nodeInvalidTypeCounter =
3132
Metrics.counter(ImportGroupPipeline.class, "mcf_nodes_without_type");
3233
private static final Counter nodeCounter =
@@ -72,45 +73,67 @@ public static void main(String[] args) {
7273

7374
for (JsonElement element : jsonArray) {
7475
JsonElement importElement = element.getAsJsonObject().get("importName");
75-
JsonElement path = element.getAsJsonObject().get("latestVersion");
76+
JsonElement versionElement = element.getAsJsonObject().get("latestVersion");
77+
JsonElement pathElement = element.getAsJsonObject().get("graphDataPaths");
7678
if (importElement == null
77-
|| path == null
79+
|| versionElement == null
80+
|| pathElement == null
7881
|| importElement.getAsString().isEmpty()
79-
|| path.getAsString().isEmpty()) {
82+
|| versionElement.getAsString().isEmpty()) {
8083
LOGGER.error("Invalid import input json: {}", element.toString());
8184
continue;
8285
}
8386
String importName = importElement.getAsString();
87+
String latestVersion = versionElement.getAsString();
88+
LOGGER.info("Import: {} Latest version: {}", importName, latestVersion);
89+
90+
// Populate provenance node/edges.
8491
String provenance = "dc/base/" + importName;
85-
LOGGER.info("Import {} graph path {}", importName, path.getAsString());
92+
PCollection<McfGraph> provenanceMcf =
93+
PipelineUtils.readMcfFiles(latestVersion + IMPORT_METADATA_FILE, pipeline);
94+
PCollection<Mutation> provenanceNode =
95+
GraphReader.graphToNodes(
96+
provenanceMcf, spannerClient, nodeCounter, nodeInvalidTypeCounter)
97+
.apply("ExtractProvenanceNode", Values.create());
98+
PCollection<Mutation> provenanceEdges =
99+
GraphReader.graphToEdges(provenanceMcf, provenance, spannerClient, edgeCounter)
100+
.apply("ExtractProvenanceEdges", Values.create());
101+
nodeMutationList.add(provenanceNode);
102+
edgeMutationList.add(provenanceEdges);
86103

87104
PCollection<Mutation> deleteMutations =
88105
GraphReader.getDeleteMutations(importName, provenance, pipeline, spannerClient);
89106
deleteMutationList.add(deleteMutations);
90107
// Read schema mcf files and combine MCF nodes, and convert to spanner mutations (Node/Edge).
91-
PCollection<McfGraph> nodes = PipelineUtils.readMcfFiles(path.getAsString(), pipeline);
92-
PCollectionTuple graphNodes = PipelineUtils.splitGraph(nodes);
93-
PCollection<McfGraph> observationNodes = graphNodes.get(PipelineUtils.OBSERVATION_NODES_TAG);
94-
PCollection<McfGraph> schemaNodes = graphNodes.get(PipelineUtils.SCHEMA_NODES_TAG);
95-
PCollection<McfGraph> combinedGraph = PipelineUtils.combineGraphNodes(schemaNodes);
96-
PCollection<Mutation> nodeMutations =
97-
GraphReader.graphToNodes(
98-
combinedGraph, spannerClient, nodeCounter, nodeInvalidTypeCounter)
99-
.apply("ExtractNodeMutations", Values.create());
100-
PCollection<Mutation> edgeMutations =
101-
GraphReader.graphToEdges(combinedGraph, provenance, spannerClient, edgeCounter)
102-
.apply("ExtractEdgeMutations", Values.create());
103-
nodeMutationList.add(nodeMutations);
104-
edgeMutationList.add(edgeMutations);
105-
106-
// Read observation mcf files, build optimized graph, and convert to spanner mutations
107-
// (Observation).
108-
PCollection<McfOptimizedGraph> optimizedGraph =
109-
PipelineUtils.buildOptimizedMcfGraph(observationNodes);
110-
PCollection<Mutation> observationMutations =
111-
GraphReader.graphToObservations(optimizedGraph, importName, spannerClient, obsCounter)
112-
.apply("ExtractObsMutations", Values.create());
113-
obsMutationList.add(observationMutations);
108+
for (JsonElement path : pathElement.getAsJsonArray()) {
109+
String graphPath = latestVersion + path.getAsString();
110+
PCollection<McfGraph> nodes =
111+
path.getAsString().contains("tfrecord")
112+
? PipelineUtils.readMcfGraph(graphPath, pipeline)
113+
: PipelineUtils.readMcfFiles(graphPath, pipeline);
114+
PCollectionTuple graphNodes = PipelineUtils.splitGraph(nodes);
115+
PCollection<McfGraph> observationNodes =
116+
graphNodes.get(PipelineUtils.OBSERVATION_NODES_TAG);
117+
PCollection<McfGraph> schemaNodes = graphNodes.get(PipelineUtils.SCHEMA_NODES_TAG);
118+
PCollection<McfGraph> combinedGraph = PipelineUtils.combineGraphNodes(schemaNodes);
119+
PCollection<Mutation> nodeMutations =
120+
GraphReader.graphToNodes(
121+
combinedGraph, spannerClient, nodeCounter, nodeInvalidTypeCounter)
122+
.apply("ExtractNodeMutations", Values.create());
123+
PCollection<Mutation> edgeMutations =
124+
GraphReader.graphToEdges(combinedGraph, provenance, spannerClient, edgeCounter)
125+
.apply("ExtractEdgeMutations", Values.create());
126+
nodeMutationList.add(nodeMutations);
127+
edgeMutationList.add(edgeMutations);
128+
// Read observation mcf files, build optimized graph, and convert to spanner mutations
129+
// (Observation).
130+
PCollection<McfOptimizedGraph> optimizedGraph =
131+
PipelineUtils.buildOptimizedMcfGraph(observationNodes);
132+
PCollection<Mutation> observationMutations =
133+
GraphReader.graphToObservations(optimizedGraph, importName, spannerClient, obsCounter)
134+
.apply("ExtractObsMutations", Values.create());
135+
obsMutationList.add(observationMutations);
136+
}
114137
}
115138
PCollection<Mutation> deleteMutations =
116139
PCollectionList.of(deleteMutationList)

pipeline/ingestion/src/main/resources/spanner_schema.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ CREATE TABLE Observation (
3636
CREATE TABLE ImportStatus (
3737
ImportName STRING(MAX) NOT NULL,
3838
LatestVersion STRING(MAX),
39+
GraphDataPaths ARRAY<STRING(MAX)>,
3940
State STRING(1024) NOT NULL,
4041
JobId STRING(1024),
4142
WorkflowId STRING(1024),

pipeline/ingestion/template.sh

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@ elif [ "$OPERATION" == "run" ]; then
1616
echo "Running Dataflow Flex Template..."
1717
gcloud dataflow flex-template run "ingestion-job" \
1818
--template-file-gcs-location "gs://datcom-templates/templates/flex/ingestion.json" \
19-
--parameters ^~^importList='[{"importName":"scripts/us_fed/treasury_constant_maturity_rates:USFed_ConstantMaturityRates","latestVersion":"datcom-prod-imports/scripts/us_fed/treasury_constant_maturity_rates/USFed_ConstantMaturityRates/2025_09_09T20_18_16_090752_07_00/*/validation"}]' \
20-
--project=datcom-store \
19+
--parameters ^~^importList='[{"importName":"USFed_ConstantMaturityRates","latestVersion":"gs://datcom-prod-imports/scripts/us_fed/treasury_constant_maturity_rates/USFed_ConstantMaturityRates/2026_01_06T19_18_04_771937_08_00", "graphDataPaths":["*/*/*.mcf"]}]' \
20+
--parameters spannerDatabaseId="dc_graph_import" \
21+
--project=datcom-import-automation-prod \
22+
--num-workers=1 \
23+
--worker-machine-type=n2-highmem-8 \
2124
--region "us-central1"
2225
else
2326
echo "Usage: $0 [deploy|run]"

0 commit comments

Comments
 (0)