Skip to content

Commit 8563406

Browse files
committed
Populate provenance nodes for spanner ingestion
1 parent 9b8e6f4 commit 8563406

6 files changed

Lines changed: 95 additions & 43 deletions

File tree

pipeline/ingestion/src/main/java/org/datacommons/ingestion/data/GraphReader.java

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,26 @@
22

33
import com.google.cloud.ByteArray;
44
import com.google.cloud.spanner.Mutation;
5+
import com.google.cloud.storage.Blob;
6+
import com.google.cloud.storage.BlobId;
7+
import com.google.cloud.storage.Storage;
8+
import com.google.cloud.storage.StorageOptions;
59
import java.io.Serializable;
10+
import java.nio.charset.StandardCharsets;
611
import java.util.ArrayList;
712
import java.util.Collections;
813
import java.util.List;
914
import java.util.Map;
1015
import org.apache.beam.sdk.Pipeline;
1116
import org.apache.beam.sdk.metrics.Counter;
17+
import org.apache.beam.sdk.transforms.Create;
1218
import org.apache.beam.sdk.transforms.DoFn;
1319
import org.apache.beam.sdk.transforms.Flatten;
1420
import org.apache.beam.sdk.transforms.ParDo;
1521
import org.apache.beam.sdk.values.KV;
1622
import org.apache.beam.sdk.values.PCollection;
1723
import org.apache.beam.sdk.values.PCollectionList;
24+
import org.apache.beam.sdk.values.TypeDescriptor;
1825
import org.datacommons.Storage.Observations;
1926
import org.datacommons.ingestion.spanner.SpannerClient;
2027
import org.datacommons.pipeline.util.PipelineUtils;
@@ -25,6 +32,7 @@
2532
import org.datacommons.proto.Mcf.McfStatVarObsSeries.StatVarObs;
2633
import org.datacommons.proto.Mcf.ValueType;
2734
import org.datacommons.util.GraphUtils;
35+
import org.datacommons.util.McfUtil;
2836
import org.slf4j.Logger;
2937
import org.slf4j.LoggerFactory;
3038

@@ -42,8 +50,8 @@ public static List<Node> graphToNodes(McfGraph graph, Counter mcfNodesWithoutTyp
4250
// Generate corresponding node
4351
Map<String, McfGraph.Values> pv = pvs.getPvsMap();
4452
Node.Builder node = Node.builder();
45-
node.subjectId(nodeEntry.getKey());
46-
node.value(nodeEntry.getKey());
53+
node.subjectId(McfUtil.stripNamespace(nodeEntry.getKey()));
54+
node.value(McfUtil.stripNamespace(nodeEntry.getKey()));
4755
node.name(GraphUtils.getPropertyValue(pv, "name"));
4856
List<String> types = GraphUtils.getPropertyValues(pv, "typeOf");
4957
if (types.isEmpty()) {
@@ -74,14 +82,42 @@ public static List<Node> graphToNodes(McfGraph graph, Counter mcfNodesWithoutTyp
7482
return nodes;
7583
}
7684

85+
public static PCollection<McfGraph> getProvenance(
86+
String bucketName,
87+
String importName,
88+
String provenanceFile,
89+
String meatadataFile,
90+
Pipeline p) {
91+
LOGGER.info("Reading provenance mcf from {} {} {}", bucketName, provenanceFile, meatadataFile);
92+
Storage storage = StorageOptions.getDefaultInstance().getService();
93+
Blob blob = storage.get(BlobId.of(bucketName, meatadataFile));
94+
List<McfGraph> mcfList = new ArrayList<>();
95+
if (blob != null && blob.exists()) {
96+
String s = new String(blob.getContent(), StandardCharsets.UTF_8);
97+
mcfList.add(GraphUtils.convertToGraph(s));
98+
}
99+
blob = storage.get(BlobId.of(bucketName, provenanceFile));
100+
if (blob != null && blob.exists()) {
101+
String s = new String(blob.getContent(), StandardCharsets.UTF_8);
102+
mcfList.add(GraphUtils.convertToGraph(s));
103+
}
104+
if (mcfList.isEmpty()) {
105+
String defaultProvenance =
106+
"Node: dcid:dc/base/" + importName + "\n" + "typeOf: dcid:Provenance\n";
107+
mcfList.add(GraphUtils.convertToGraph(defaultProvenance));
108+
}
109+
return p.apply(Create.of(mcfList).withType(TypeDescriptor.of(McfGraph.class)));
110+
}
111+
77112
public static List<Edge> graphToEdges(McfGraph graph, String provenance) {
78113
List<Edge> edges = new ArrayList<>();
79114
for (Map.Entry<String, PropertyValues> nodeEntry : graph.getNodesMap().entrySet()) {
80115
PropertyValues pvs = nodeEntry.getValue();
81116
if (!GraphUtils.isObservation(pvs)) {
82117
Map<String, McfGraph.Values> pv = pvs.getPvsMap();
83118
// String provenance = GraphUtils.getPropertyValue(pv, "provenance");
84-
String subjectId = nodeEntry.getKey(); // Use the map key as the subjectId
119+
String subjectId =
120+
McfUtil.stripNamespace(nodeEntry.getKey()); // Use the map key as the subjectId
85121
for (Map.Entry<String, McfGraph.Values> entry : pv.entrySet()) { // Iterate over properties
86122
for (TypedValue val : entry.getValue().getTypedValuesList()) {
87123
Edge.Builder edge = Edge.builder();
@@ -177,6 +213,9 @@ public static PCollection<KV<String, Mutation>> graphToNodes(
177213
public void processElement(
178214
@Element McfGraph element, OutputReceiver<KV<String, Mutation>> receiver) {
179215
List<Node> nodes = graphToNodes(element, mcfNodesWithoutTypeCounter);
216+
// for (Node node : nodes) {
217+
// LOGGER.info("Processing node: {}", node);
218+
// }
180219
List<KV<String, Mutation>> mutations =
181220
spannerClient.toGraphKVMutations(nodes, Collections.emptyList());
182221
mutations.stream()
@@ -202,6 +241,9 @@ public static PCollection<KV<String, Mutation>> graphToEdges(
202241
public void processElement(
203242
@Element McfGraph element, OutputReceiver<KV<String, Mutation>> receiver) {
204243
List<Edge> edges = graphToEdges(element, provenance);
244+
// for (Edge edge : edges) {
245+
// LOGGER.info("Processing Edge: {}", edge);
246+
// }
205247
List<KV<String, Mutation>> mutations =
206248
spannerClient.toGraphKVMutations(Collections.emptyList(), edges);
207249
mutations.stream()

pipeline/ingestion/src/main/java/org/datacommons/ingestion/pipeline/ImportGroupPipeline.java

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
public class ImportGroupPipeline {
2929
private static final Logger LOGGER = LoggerFactory.getLogger(ImportGroupPipeline.class);
30+
private static final String IMPORT_METADATA_FILE = "import_metadata_mcf.mcf";
3031
private static final Counter nodeInvalidTypeCounter =
3132
Metrics.counter(ImportGroupPipeline.class, "mcf_nodes_without_type");
3233
private static final Counter nodeCounter =
@@ -36,6 +37,10 @@ public class ImportGroupPipeline {
3637
private static final Counter obsCounter =
3738
Metrics.counter(ImportGroupPipeline.class, "graph_observation_count");
3839

40+
private static boolean isJsonNullOrEmpty(JsonElement element) {
41+
return element == null || element.getAsString().isEmpty();
42+
}
43+
3944
public static void main(String[] args) {
4045
IngestionPipelineOptions options =
4146
PipelineOptionsFactory.fromArgs(args).withValidation().as(IngestionPipelineOptions.class);
@@ -72,27 +77,49 @@ public static void main(String[] args) {
7277

7378
for (JsonElement element : jsonArray) {
7479
JsonElement importElement = element.getAsJsonObject().get("importName");
75-
JsonElement path = element.getAsJsonObject().get("latestVersion");
76-
if (importElement == null
77-
|| path == null
78-
|| importElement.getAsString().isEmpty()
79-
|| path.getAsString().isEmpty()) {
80+
JsonElement versionElement = element.getAsJsonObject().get("latestVersion");
81+
JsonElement pathElement = element.getAsJsonObject().get("graphPath");
82+
83+
if (isJsonNullOrEmpty(importElement)
84+
|| isJsonNullOrEmpty(pathElement)
85+
|| isJsonNullOrEmpty(versionElement)) {
8086
LOGGER.error("Invalid import input json: {}", element.toString());
8187
continue;
8288
}
8389
String importName = importElement.getAsString();
90+
String latestVersion = versionElement.getAsString();
91+
LOGGER.info("Import: {} Latest version: {}", importName, latestVersion);
92+
93+
// Populate provenance node/edges.
8494
String provenance = "dc/base/" + importName;
85-
LOGGER.info("Import {} graph path {}", importName, path.getAsString());
95+
PCollection<McfGraph> provenanceMcf =
96+
GraphReader.getProvenance(
97+
options.getStorageBucketId(),
98+
importName,
99+
latestVersion.substring(latestVersion.indexOf("/", 5) + 1)
100+
+ "/"
101+
+ IMPORT_METADATA_FILE,
102+
"provenance/" + importName + ".mcf",
103+
pipeline);
86104

87105
PCollection<Mutation> deleteMutations =
88106
GraphReader.getDeleteMutations(importName, provenance, pipeline, spannerClient);
89107
deleteMutationList.add(deleteMutations);
90108
// Read schema mcf files and combine MCF nodes, and convert to spanner mutations (Node/Edge).
91-
PCollection<McfGraph> nodes = PipelineUtils.readMcfFiles(path.getAsString(), pipeline);
109+
String graphPath = latestVersion + pathElement.getAsString();
110+
PCollection<McfGraph> nodes =
111+
pathElement.getAsString().contains("tfrecord")
112+
? PipelineUtils.readMcfGraph(graphPath, pipeline)
113+
: PipelineUtils.readMcfFiles(graphPath, pipeline);
92114
PCollectionTuple graphNodes = PipelineUtils.splitGraph(nodes);
93115
PCollection<McfGraph> observationNodes = graphNodes.get(PipelineUtils.OBSERVATION_NODES_TAG);
94116
PCollection<McfGraph> schemaNodes = graphNodes.get(PipelineUtils.SCHEMA_NODES_TAG);
95-
PCollection<McfGraph> combinedGraph = PipelineUtils.combineGraphNodes(schemaNodes);
117+
PCollection<McfGraph> schemaMcf =
118+
PCollectionList.of(schemaNodes)
119+
.and(provenanceMcf)
120+
.apply("FlattenSchema", Flatten.pCollections());
121+
122+
PCollection<McfGraph> combinedGraph = PipelineUtils.combineGraphNodes(schemaMcf);
96123
PCollection<Mutation> nodeMutations =
97124
GraphReader.graphToNodes(
98125
combinedGraph, spannerClient, nodeCounter, nodeInvalidTypeCounter)
@@ -102,7 +129,6 @@ public static void main(String[] args) {
102129
.apply("ExtractEdgeMutations", Values.create());
103130
nodeMutationList.add(nodeMutations);
104131
edgeMutationList.add(edgeMutations);
105-
106132
// Read observation mcf files, build optimized graph, and convert to spanner mutations
107133
// (Observation).
108134
PCollection<McfOptimizedGraph> optimizedGraph =

pipeline/ingestion/src/main/resources/spanner_schema.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ CREATE TABLE Observation (
3636
CREATE TABLE ImportStatus (
3737
ImportName STRING(MAX) NOT NULL,
3838
LatestVersion STRING(MAX),
39+
GraphPath STRING(MAX),
3940
State STRING(1024) NOT NULL,
4041
JobId STRING(1024),
4142
WorkflowId STRING(1024),

pipeline/ingestion/template.sh

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@ elif [ "$OPERATION" == "run" ]; then
1616
echo "Running Dataflow Flex Template..."
1717
gcloud dataflow flex-template run "ingestion-job" \
1818
--template-file-gcs-location "gs://datcom-templates/templates/flex/ingestion.json" \
19-
--parameters ^~^importList='[{"importName":"scripts/us_fed/treasury_constant_maturity_rates:USFed_ConstantMaturityRates","latestVersion":"datcom-prod-imports/scripts/us_fed/treasury_constant_maturity_rates/USFed_ConstantMaturityRates/2025_09_09T20_18_16_090752_07_00/*/validation"}]' \
20-
--project=datcom-store \
19+
--parameters ^~^importList='[{"importName":"USFed_ConstantMaturityRates","latestVersion":"gs://datcom-prod-imports/scripts/us_fed/treasury_constant_maturity_rates/USFed_ConstantMaturityRates/2026_01_06T19_18_04_771937_08_00", "graphDataPaths":["*/*/*.mcf"]}]' \
20+
--parameters spannerDatabaseId="dc_graph_import" \
21+
--project=datcom-import-automation-prod \
22+
--num-workers=1 \
23+
--worker-machine-type=n2-highmem-8 \
2124
--region "us-central1"
2225
else
2326
echo "Usage: $0 [deploy|run]"

util/src/main/java/org/datacommons/util/GraphUtils.java

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.Scanner;
1010
import java.util.Set;
1111
import java.util.stream.Collectors;
12+
import org.datacommons.proto.Debug;
1213
import org.datacommons.proto.Mcf.McfGraph;
1314
import org.datacommons.proto.Mcf.McfGraph.PropertyValues;
1415
import org.datacommons.proto.Mcf.McfGraph.TypedValue;
@@ -35,6 +36,8 @@ public enum Property {
3536
}
3637

3738
public static final String STAT_VAR_OB = "StatVarObservation";
39+
static Debug.Log.Builder log = Debug.Log.newBuilder();
40+
static LogWrapper logCtx = new LogWrapper(log);
3841

3942
private static final String REQ_SV_OBS_PROPS[] = {
4043
/** Required properties for a StatVarObservation. */
@@ -301,28 +304,7 @@ public static McfStatVarObsSeries convertMcfGraphToMcfStatVarObsSeries(
301304
* @return An McfGraph proto representing the input MCF string.
302305
*/
303306
public static McfGraph convertToGraph(String input) {
304-
McfGraph.Builder g = McfGraph.newBuilder();
305-
g.setType(McfType.INSTANCE_MCF);
306-
String node_id = "";
307-
McfGraph.PropertyValues.Builder base_node = McfGraph.PropertyValues.newBuilder();
308-
String[] lines = input.split("\n");
309-
for (String line : lines) {
310-
line = line.trim();
311-
if (line.isEmpty() || line.startsWith("//") || line.startsWith("#")) {
312-
continue;
313-
}
314-
int colon = line.indexOf(":");
315-
String lhs = line.substring(0, colon).trim();
316-
String rhs = line.substring(colon + 1).trim();
317-
if (lhs.equals(Property.dcid.name()) || lhs.equals("Node")) {
318-
node_id = rhs;
319-
}
320-
setPropVal(lhs, ValueType.TEXT, rhs, base_node);
321-
}
322-
if (!node_id.isEmpty()) {
323-
g.putNodes(node_id, base_node.build());
324-
}
325-
return g.build();
307+
return McfParser.parseInstanceMcfString(input, true, logCtx);
326308
}
327309

328310
/**

util/src/main/java/org/datacommons/util/McfParser.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public static McfParser init(
5858

5959
// Parse a string with instance nodes in MCF format into the McfGraph proto.
6060
public static McfGraph parseInstanceMcfString(
61-
String mcfString, boolean isResolved, LogWrapper logCtx) throws IOException {
61+
String mcfString, boolean isResolved, LogWrapper logCtx) {
6262
return parseMcfString(mcfString, Mcf.McfType.INSTANCE_MCF, isResolved, logCtx);
6363
}
6464

@@ -69,8 +69,7 @@ public static McfGraph parseInstanceMcfFile(
6969
}
7070

7171
// Parse a string with template nodes in MCF format into the McfGraph proto.
72-
public static McfGraph parseTemplateMcfString(String mcfString, LogWrapper logCtx)
73-
throws IOException {
72+
public static McfGraph parseTemplateMcfString(String mcfString, LogWrapper logCtx) {
7473
return parseMcfString(mcfString, Mcf.McfType.TEMPLATE_MCF, false, logCtx);
7574
}
7675

@@ -80,7 +79,7 @@ public static McfGraph parseTemplateMcfFile(String fileName, LogWrapper logCtx)
8079
return parseMcfFile(fileName, Mcf.McfType.TEMPLATE_MCF, false, logCtx);
8180
}
8281

83-
public McfGraph parseNextNode() throws IOException {
82+
public McfGraph parseNextNode() {
8483
if (finished) return null;
8584
while (lines.hasNext()) {
8685
String line = lines.next();
@@ -249,8 +248,7 @@ private McfGraph finish() throws AssertionError {
249248
}
250249

251250
private static McfGraph parseMcfString(
252-
String mcfString, Mcf.McfType type, boolean isResolved, LogWrapper logCtx)
253-
throws IOException {
251+
String mcfString, Mcf.McfType type, boolean isResolved, LogWrapper logCtx) {
254252
McfParser parser = McfParser.init(type, isResolved);
255253
parser.fileName = IN_MEMORY_FILE_NAME;
256254
parser.logCtx = logCtx;
@@ -264,7 +262,7 @@ private static McfGraph parseMcfFile(
264262
return parser.parseLines();
265263
}
266264

267-
private McfGraph parseLines() throws IOException {
265+
private McfGraph parseLines() {
268266
McfGraph g;
269267
ArrayList<McfGraph> graphs = new ArrayList<>();
270268
while ((g = parseNextNode()) != null) {

0 commit comments

Comments
 (0)