Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,17 @@ public static void buildPipeline(
for (JsonElement element : jsonArray) {
JsonElement importElement = element.getAsJsonObject().get("importName");
JsonElement pathElement = element.getAsJsonObject().get("graphPath");

if (isJsonNullOrEmpty(importElement) || isJsonNullOrEmpty(pathElement)) {
LOGGER.error("Invalid import input json: {}", element.toString());
if (isJsonNullOrEmpty(importElement)) {
LOGGER.error("Invalid import input json, missing importName: {}", element.toString());
continue;
}
String importName = importElement.getAsString();
String graphPath = pathElement.getAsString();
String graphPath = isJsonNullOrEmpty(pathElement) ? null : pathElement.getAsString();

if (graphPath == null) {
LOGGER.error("Invalid import input json, missing graphPath: {}", element.toString());
continue;
}

// Process the individual import.
processImport(pipeline, spannerClient, importName, graphPath, options.getSkipDelete());
Expand Down Expand Up @@ -141,12 +145,22 @@ private static void processImport(
"CreateEmptyEdgesWait-" + importName, Create.empty(TypeDescriptor.of(Void.class)));
}

// 2. Read and Split Graph:
// Read the graph data (TFRecord or MCF files) and split into schema and observation nodes.
PCollection<McfGraph> graph =
graphPath.contains("tfrecord")
? PipelineUtils.readMcfGraph(importName, graphPath, pipeline)
: PipelineUtils.readMcfFiles(importName, graphPath, pipeline);
PCollection<McfGraph> graph;
switch (PipelineUtils.resolveFormat(graphPath)) {
case TFRECORD:
graph = PipelineUtils.readMcfGraph(importName, graphPath, pipeline);
break;
case JSONLD:
graph = PipelineUtils.readJsonLdFiles(importName, graphPath, pipeline);
break;
case MCF:
graph = PipelineUtils.readMcfFiles(importName, graphPath, pipeline);
break;
default:
throw new IllegalArgumentException(
"Invalid import config: missing graphPath or template/csv paths");
}

PCollectionTuple graphNodes = PipelineUtils.splitGraph(importName, graph);
PCollection<McfGraph> observationNodes = graphNodes.get(PipelineUtils.OBSERVATION_NODES_TAG);
PCollection<McfGraph> schemaNodes = graphNodes.get(PipelineUtils.SCHEMA_NODES_TAG);
Expand Down Expand Up @@ -181,9 +195,11 @@ private static void processImport(
spannerClient.writeMutations(pipeline, "WriteNodesToSpanner-" + importName, nodeMutations);

// Write Edges (wait for Nodes write and Edges delete)
edgeMutations.apply(
"EdgesWaitOn-" + importName, Wait.on(List.of(writtenNodes.getOutput(), deleteEdgesWait)));
spannerClient.writeMutations(pipeline, "WriteEdgesToSpanner-" + importName, edgeMutations);
PCollection<Mutation> waitingEdges =
edgeMutations.apply(
"EdgesWaitOn-" + importName,
Wait.on(List.of(writtenNodes.getOutput(), deleteEdgesWait)));
spannerClient.writeMutations(pipeline, "WriteEdgesToSpanner-" + importName, waitingEdges);

// 4. Process Observation Nodes:
// Build an optimized graph from observation nodes and convert to Observation mutations.
Expand All @@ -193,9 +209,9 @@ private static void processImport(
GraphReader.graphToObservations(optimizedGraph, importName, spannerClient, obsCounter)
.apply("ExtractObsMutations-" + importName, Values.create());
// Write Observations (wait for Obs delete)
observationMutations.apply("ObsWaitOn-" + importName, Wait.on(deleteObsWait));
PCollection<Mutation> waitingObs =
observationMutations.apply("ObsWaitOn-" + importName, Wait.on(deleteObsWait));

spannerClient.writeMutations(
pipeline, "WriteObservationsToSpanner-" + importName, observationMutations);
spannerClient.writeMutations(pipeline, "WriteObservationsToSpanner-" + importName, waitingObs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Comparator;
Expand All @@ -18,6 +20,7 @@
import java.util.stream.StreamSupport;
import java.util.zip.GZIPOutputStream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.TFRecordIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
Expand All @@ -39,6 +42,7 @@
import org.datacommons.proto.Mcf.McfOptimizedGraph;
import org.datacommons.proto.Mcf.McfStatVarObsSeries;
import org.datacommons.util.GraphUtils;
import org.datacommons.util.parser.jsonld.JsonLdParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -162,6 +166,55 @@ public McfGraph apply(String input) {
return mcf;
}

// Input file formats supported by the ingestion pipeline.
public enum InputFormat {
JSONLD,
TFRECORD,
MCF
}

/** Resolves the input format based on the file path. */
public static InputFormat resolveFormat(String graphPath) {
if (graphPath == null) {
throw new IllegalArgumentException("graphPath cannot be null");
}
if (graphPath.contains("tfrecord")) {
return InputFormat.TFRECORD;
}
if (graphPath.contains(".jsonld")) {
return InputFormat.JSONLD;
}
// Fallback to MCF as default behavior
return InputFormat.MCF;
}

/** Reads JSON-LD files and converts them to McfGraph protos. */
public static PCollection<McfGraph> readJsonLdFiles(String name, String files, Pipeline p) {
return p.apply("MatchJsonLdFiles-" + name, FileIO.match().filepattern(files))
.apply("ReadJsonLdFiles-" + name, FileIO.readMatches())
.apply(
"ParseJsonLd-" + name,
ParDo.of(
new DoFn<FileIO.ReadableFile, McfGraph>() {
@ProcessElement
public void processElement(
@Element FileIO.ReadableFile file, OutputReceiver<McfGraph> receiver) {
try (InputStream is = Channels.newInputStream(file.open())) {
McfGraph graph = JsonLdParser.parse(is);
Comment thread
gmechali marked this conversation as resolved.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Parsing the entire JSON-LD file into memory using JsonLdParser.parse(is) can lead to OutOfMemoryError for large input files. Consider implementing a streaming parser or processing the file in chunks if possible.

for (Map.Entry<String, org.datacommons.proto.Mcf.McfGraph.PropertyValues>
entry : graph.getNodesMap().entrySet()) {
McfGraph.Builder singleNodeGraph = McfGraph.newBuilder();
singleNodeGraph.putNodes(entry.getKey(), entry.getValue());
receiver.output(singleNodeGraph.build());
}
} catch (IOException e) {
throw new RuntimeException(
"Failed to parse JSON-LD file: " + file.toString(), e);
}
}
}));
}

public static PCollectionTuple splitGraph(String name, PCollection<McfGraph> graph) {
return graph.apply(
"SplitGraph-" + name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,59 @@ public void testCombineGraphNodes() {
Assert.assertEquals(PipelineResult.State.DONE, state);
}

@Test
public void testReadJsonLdFiles() throws java.io.IOException {
options.setStableUniqueNames(PipelineOptions.CheckEnabled.OFF);

java.nio.file.Path tempFile = java.nio.file.Files.createTempFile("test", ".jsonld");
String jsonLdContent =
"{\n"
+ " \"@context\": {\n"
+ " \"schema\": \"https://schema.org/\",\n"
+ " \"name\": \"https://schema.org/name\"\n"
+ " },\n"
+ " \"@graph\": [\n"
+ " {\n"
+ " \"@id\": \"dcid:TestNode\",\n"
+ " \"@type\": \"schema:Thing\",\n"
+ " \"name\": \"Test Node Name\"\n"
+ " }\n"
+ " ]\n"
+ "}";
java.nio.file.Files.write(
tempFile, jsonLdContent.getBytes(java.nio.charset.StandardCharsets.UTF_8));

PCollection<McfGraph> result = PipelineUtils.readJsonLdFiles("test", tempFile.toString(), p);

McfGraph.Builder expectedGraph = McfGraph.newBuilder();
PropertyValues.Builder pv = PropertyValues.newBuilder();
pv.putPvs(
"typeOf",
Values.newBuilder()
.addTypedValues(
TypedValue.newBuilder().setValue("Thing").setType(ValueType.RESOLVED_REF))
.build());
pv.putPvs(
"dcid",
Values.newBuilder()
.addTypedValues(
TypedValue.newBuilder().setValue("dcid:TestNode").setType(ValueType.TEXT))
.build());
pv.putPvs(
"name",
Values.newBuilder()
.addTypedValues(
TypedValue.newBuilder().setValue("Test Node Name").setType(ValueType.TEXT))
.build());
expectedGraph.putNodes("dcid:TestNode", pv.build());

PAssert.that(result).containsInAnyOrder(expectedGraph.build());
PipelineResult.State state = p.run().waitUntilFinish();
Assert.assertEquals(PipelineResult.State.DONE, state);

java.nio.file.Files.delete(tempFile);
}

private McfGraph createGraph(Map<String, Map<String, List<String>>> nodeData) {
McfGraph.Builder graph = McfGraph.newBuilder();
for (Map.Entry<String, Map<String, List<String>>> nodeEntry : nodeData.entrySet()) {
Expand Down
26 changes: 19 additions & 7 deletions tool/src/main/java/org/datacommons/tool/Args.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import org.apache.logging.log4j.util.Strings;
import org.datacommons.proto.Debug;
import org.datacommons.util.FileGroup;
import org.datacommons.util.JsonLdFileGroup;
import org.datacommons.util.LogWrapper;
import org.datacommons.util.McfFileGroup;

// Class representing the command line arguments to dc-import tool. Largely used as a struct.
class Args {
Expand Down Expand Up @@ -75,14 +77,24 @@ public Debug.CommandArgs toProto() {
if (!LogWrapper.TEST_MODE && fileGroup != null) {
List<String> allFiles = new ArrayList<>();

if (fileGroup.getMcfs() != null) {
for (File f : fileGroup.getMcfs()) {
allFiles.add(f.getPath());
if (fileGroup instanceof McfFileGroup) {
McfFileGroup mcfGroup = (McfFileGroup) fileGroup;
if (mcfGroup.getMcfs() != null) {
for (File f : mcfGroup.getMcfs()) {
allFiles.add(f.getPath());
}
}
}
if (fileGroup.getTmcfs() != null) {
for (File f : fileGroup.getTmcfs()) {
allFiles.add(f.getPath());
if (mcfGroup.getTmcfs() != null) {
for (File f : mcfGroup.getTmcfs()) {
allFiles.add(f.getPath());
}
}
} else if (fileGroup instanceof JsonLdFileGroup) {
JsonLdFileGroup jsonLdGroup = (JsonLdFileGroup) fileGroup;
if (jsonLdGroup.getJsonLds() != null) {
for (File f : jsonLdGroup.getJsonLds()) {
allFiles.add(f.getPath());
}
}
}
if (fileGroup.getCsvs() != null) {
Expand Down
5 changes: 4 additions & 1 deletion tool/src/main/java/org/datacommons/tool/Lint.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.datacommons.util.FileGroup;
import picocli.CommandLine;

@CommandLine.Command(name = "lint", description = "Run various checks on input MCF/TMCF/CSV files")
@CommandLine.Command(
name = "lint",
description = "Run various checks on input MCF/TMCF/CSV/JSON-LD files")
class Lint implements Callable<Integer> {
private static final Logger logger = LogManager.getLogger(Lint.class);

Expand All @@ -32,6 +34,7 @@ class Lint implements Callable<Integer> {
description =
("List of input files. The file extensions are used to infer the format. "
+ "Valid extensions include .mcf for Instance MCF, .tmcf for Template MCF, "
+ ".jsonld for JSON-LD instances, "
+ ".csv for tabular text files delimited by comma (overridden with -d), and .tsv "
+ "for tab-delimited tabular files."))
private File[] files;
Expand Down
Loading