Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,35 @@ public static void buildPipeline(
for (JsonElement element : jsonArray) {
JsonElement importElement = element.getAsJsonObject().get("importName");
JsonElement pathElement = element.getAsJsonObject().get("graphPath");
JsonElement templateElement = element.getAsJsonObject().get("templatePath");
JsonElement csvElement = element.getAsJsonObject().get("csvPath");

if (isJsonNullOrEmpty(importElement) || isJsonNullOrEmpty(pathElement)) {
LOGGER.error("Invalid import input json: {}", element.toString());
if (isJsonNullOrEmpty(importElement)) {
LOGGER.error("Invalid import input json, missing importName: {}", element.toString());
continue;
}
String importName = importElement.getAsString();
String graphPath = pathElement.getAsString();
String graphPath = isJsonNullOrEmpty(pathElement) ? null : pathElement.getAsString();
String templatePath =
isJsonNullOrEmpty(templateElement) ? null : templateElement.getAsString();
String csvPath = isJsonNullOrEmpty(csvElement) ? null : csvElement.getAsString();

if (graphPath == null && (templatePath == null || csvPath == null)) {
LOGGER.error(
"Invalid import input json, missing graphPath or template/csv paths: {}",
element.toString());
continue;
}

// Process the individual import.
processImport(pipeline, spannerClient, importName, graphPath, options.getSkipDelete());
processImport(
pipeline,
spannerClient,
importName,
graphPath,
templatePath,
csvPath,
options.getSkipDelete());
}
}

Expand All @@ -109,8 +128,15 @@ private static void processImport(
SpannerClient spannerClient,
String importName,
String graphPath,
String templatePath,
String csvPath,
boolean skipDelete) {
LOGGER.info("Import: {} Graph path: {}", importName, graphPath);
LOGGER.info(
"Import: {} Graph path: {} Template: {} CSV: {}",
importName,
graphPath,
templatePath,
csvPath);

String provenance = "dc/base/" + importName;

Expand Down Expand Up @@ -138,10 +164,20 @@ private static void processImport(

// 2. Read and Split Graph:
// Read the graph data (TFRecord or MCF files) and split into schema and observation nodes.
PCollection<McfGraph> graph =
graphPath.contains("tfrecord")
? PipelineUtils.readMcfGraph(importName, graphPath, pipeline)
: PipelineUtils.readMcfFiles(importName, graphPath, pipeline);
PCollection<McfGraph> graph;
if (graphPath != null && graphPath.contains("tfrecord")) {
graph = PipelineUtils.readMcfGraph(importName, graphPath, pipeline);
} else if (graphPath != null
&& (graphPath.endsWith(".jsonld") || graphPath.contains(".jsonld"))) {
Comment thread
gmechali marked this conversation as resolved.
Outdated
graph = PipelineUtils.readJsonLdFiles(importName, graphPath, pipeline);
} else if (templatePath != null && csvPath != null) {
graph = PipelineUtils.readJsonLdTemplateFiles(importName, templatePath, csvPath, pipeline);
} else if (graphPath != null) {
graph = PipelineUtils.readMcfFiles(importName, graphPath, pipeline);
} else {
throw new IllegalArgumentException(
"Invalid import config: missing graphPath or template/csv paths");
}
PCollectionTuple graphNodes = PipelineUtils.splitGraph(importName, graph);
PCollection<McfGraph> observationNodes = graphNodes.get(PipelineUtils.OBSERVATION_NODES_TAG);
PCollection<McfGraph> schemaNodes = graphNodes.get(PipelineUtils.SCHEMA_NODES_TAG);
Expand Down Expand Up @@ -176,9 +212,11 @@ private static void processImport(
spannerClient.writeMutations(pipeline, "WriteNodesToSpanner-" + importName, nodeMutations);

// Write Edges (wait for Nodes write and Edges delete)
edgeMutations.apply(
"EdgesWaitOn-" + importName, Wait.on(List.of(writtenNodes.getOutput(), deleteEdgesWait)));
spannerClient.writeMutations(pipeline, "WriteEdgesToSpanner-" + importName, edgeMutations);
PCollection<Mutation> waitingEdges =
edgeMutations.apply(
"EdgesWaitOn-" + importName,
Wait.on(List.of(writtenNodes.getOutput(), deleteEdgesWait)));
spannerClient.writeMutations(pipeline, "WriteEdgesToSpanner-" + importName, waitingEdges);
Comment thread
gmechali marked this conversation as resolved.

// 4. Process Observation Nodes:
// Build an optimized graph from observation nodes and convert to Observation mutations.
Expand All @@ -188,9 +226,9 @@ private static void processImport(
GraphReader.graphToObservations(optimizedGraph, importName, spannerClient, obsCounter)
.apply("ExtractObsMutations-" + importName, Values.create());
// Write Observations (wait for Obs delete)
observationMutations.apply("ObsWaitOn-" + importName, Wait.on(deleteObsWait));
PCollection<Mutation> waitingObs =
observationMutations.apply("ObsWaitOn-" + importName, Wait.on(deleteObsWait));

spannerClient.writeMutations(
pipeline, "WriteObservationsToSpanner-" + importName, observationMutations);
spannerClient.writeMutations(pipeline, "WriteObservationsToSpanner-" + importName, waitingObs);
Comment thread
gmechali marked this conversation as resolved.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Comparator;
Expand All @@ -18,10 +22,14 @@
import java.util.stream.StreamSupport;
import java.util.zip.GZIPOutputStream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TFRecordIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
Expand All @@ -39,6 +47,8 @@
import org.datacommons.proto.Mcf.McfOptimizedGraph;
import org.datacommons.proto.Mcf.McfStatVarObsSeries;
import org.datacommons.util.GraphUtils;
import org.datacommons.util.jsonld.JsonLdParser;
import org.datacommons.util.jsonld.JsonLdTemplateParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -162,6 +172,69 @@ public McfGraph apply(String input) {
return mcf;
}

/** Reads JSON-LD files and converts them to McfGraph protos. */
public static PCollection<McfGraph> readJsonLdFiles(String name, String files, Pipeline p) {
return p.apply("MatchJsonLdFiles-" + name, FileIO.match().filepattern(files))
.apply("ReadJsonLdFiles-" + name, FileIO.readMatches())
.apply(
"ParseJsonLd-" + name,
ParDo.of(
new DoFn<FileIO.ReadableFile, McfGraph>() {
@ProcessElement
public void processElement(
@Element FileIO.ReadableFile file, OutputReceiver<McfGraph> receiver) {
try (InputStream is = Channels.newInputStream(file.open())) {
McfGraph graph = JsonLdParser.parse(is);
Comment thread
gmechali marked this conversation as resolved.
Comment thread
gmechali marked this conversation as resolved.
for (Map.Entry<String, org.datacommons.proto.Mcf.McfGraph.PropertyValues>
entry : graph.getNodesMap().entrySet()) {
McfGraph.Builder singleNodeGraph = McfGraph.newBuilder();
singleNodeGraph.putNodes(entry.getKey(), entry.getValue());
receiver.output(singleNodeGraph.build());
}
} catch (IOException e) {
throw new RuntimeException(
"Failed to parse JSON-LD file: " + file.toString(), e);
}
}
}));
}

public static PCollection<McfGraph> readJsonLdTemplateFiles(
String name, String templatePath, String csvPath, Pipeline p) {
return p.apply("CreateTemplateCsvKV-" + name, Create.of(KV.of(templatePath, csvPath)))
.apply(
"ProcessTemplateCsv-" + name,
ParDo.of(
new DoFn<KV<String, String>, McfGraph>() {
@ProcessElement
public void processElement(
@Element KV<String, String> element, OutputReceiver<McfGraph> receiver) {
String tPath = element.getKey();
String cPath = element.getValue();
try {
// Open template
ResourceId tRes = FileSystems.matchNewResource(tPath, false);
InputStream tStream = Channels.newInputStream(FileSystems.open(tRes));
JsonLdTemplateParser parser = new JsonLdTemplateParser(tStream);

// Open CSV
ResourceId cRes = FileSystems.matchNewResource(cPath, false);
InputStream cStream = Channels.newInputStream(FileSystems.open(cRes));
Reader cReader = new InputStreamReader(cStream, StandardCharsets.UTF_8);
parser.initCsv(cReader);

Comment thread
gmechali marked this conversation as resolved.
Outdated
McfGraph graph;
while ((graph = parser.parseNextRow()) != null) {
receiver.output(graph);
}
} catch (IOException e) {
Comment thread
gmechali marked this conversation as resolved.
Outdated
throw new RuntimeException(
"Failed to parse JSON-LD template/CSV: " + tPath + " / " + cPath, e);
}
}
}));
}

public static PCollectionTuple splitGraph(String name, PCollection<McfGraph> graph) {
return graph.apply(
"SplitGraph-" + name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,123 @@ public void testCombineGraphNodes() {
Assert.assertEquals(PipelineResult.State.DONE, state);
}

@Test
public void testReadJsonLdFiles() throws java.io.IOException {
options.setStableUniqueNames(PipelineOptions.CheckEnabled.OFF);

java.nio.file.Path tempFile = java.nio.file.Files.createTempFile("test", ".jsonld");
String jsonLdContent =
"[\n"
+ " {\n"
+ " \"@id\": \"dcid:TestNode\",\n"
+ " \"@type\": \"schema:Thing\",\n"
+ " \"name\": \"Test Node Name\"\n"
+ " }\n"
+ "]";
java.nio.file.Files.write(
tempFile, jsonLdContent.getBytes(java.nio.charset.StandardCharsets.UTF_8));

PCollection<McfGraph> result = PipelineUtils.readJsonLdFiles("test", tempFile.toString(), p);

McfGraph.Builder expectedGraph = McfGraph.newBuilder();
PropertyValues.Builder pv = PropertyValues.newBuilder();
pv.putPvs(
"typeOf",
Values.newBuilder()
.addTypedValues(
TypedValue.newBuilder().setValue("schema:Thing").setType(ValueType.RESOLVED_REF))
.build());
pv.putPvs(
"name",
Values.newBuilder()
.addTypedValues(
TypedValue.newBuilder().setValue("Test Node Name").setType(ValueType.TEXT))
.build());
expectedGraph.putNodes("dcid:TestNode", pv.build());

PAssert.that(result).containsInAnyOrder(expectedGraph.build());
PipelineResult.State state = p.run().waitUntilFinish();
Assert.assertEquals(PipelineResult.State.DONE, state);

java.nio.file.Files.delete(tempFile);
}

@Test
public void testReadJsonLdTemplateFiles() throws java.io.IOException {
options.setStableUniqueNames(PipelineOptions.CheckEnabled.OFF);

// Create temp template file
java.nio.file.Path tempTmplFile = java.nio.file.Files.createTempFile("test", ".tmpl.jsonld");
String tmplContent =
"{\n"
+ " \"@context\": { \"schema\": \"http://schema.org/\" },\n"
+ " \"@graph\": [\n"
+ " {\n"
+ " \"@id\": \"l:Sample\",\n"
+ " \"@type\": \"StatVarObservation\",\n"
+ " \"value\": {\n"
+ " \"@type\": \"http://datacommons.org/mapping/ColumnSource\",\n"
+ " \"http://datacommons.org/mapping/columnHeader\": [{ \"@value\": \"Val\" }]\n"
+ " }\n"
+ " }\n"
+ " ]\n"
+ "}";
java.nio.file.Files.write(
tempTmplFile, tmplContent.getBytes(java.nio.charset.StandardCharsets.UTF_8));

// Create temp CSV file
java.nio.file.Path tempCsvFile = java.nio.file.Files.createTempFile("test", ".csv");
String csvContent = "Val\n100\n200";
java.nio.file.Files.write(
tempCsvFile, csvContent.getBytes(java.nio.charset.StandardCharsets.UTF_8));

PCollection<McfGraph> result =
PipelineUtils.readJsonLdTemplateFiles(
"test", tempTmplFile.toString(), tempCsvFile.toString(), p);

McfGraph.Builder expectedGraph1 = McfGraph.newBuilder();
PropertyValues.Builder pv1 = PropertyValues.newBuilder();
pv1.putPvs(
"typeOf",
Values.newBuilder()
.addTypedValues(
TypedValue.newBuilder()
.setValue("StatVarObservation")
.setType(ValueType.RESOLVED_REF))
.build());
pv1.putPvs(
"value",
Values.newBuilder()
.addTypedValues(TypedValue.newBuilder().setValue("100").setType(ValueType.TEXT))
.build());
expectedGraph1.putNodes("l:Sample_0", pv1.build());

McfGraph.Builder expectedGraph2 = McfGraph.newBuilder();
PropertyValues.Builder pv2 = PropertyValues.newBuilder();
pv2.putPvs(
"typeOf",
Values.newBuilder()
.addTypedValues(
TypedValue.newBuilder()
.setValue("StatVarObservation")
.setType(ValueType.RESOLVED_REF))
.build());
pv2.putPvs(
"value",
Values.newBuilder()
.addTypedValues(TypedValue.newBuilder().setValue("200").setType(ValueType.TEXT))
.build());
expectedGraph2.putNodes("l:Sample_1", pv2.build());

PAssert.that(result).containsInAnyOrder(expectedGraph1.build(), expectedGraph2.build());

PipelineResult.State state = p.run().waitUntilFinish();
Assert.assertEquals(PipelineResult.State.DONE, state);

java.nio.file.Files.delete(tempTmplFile);
java.nio.file.Files.delete(tempCsvFile);
}

private McfGraph createGraph(Map<String, Map<String, List<String>>> nodeData) {
McfGraph.Builder graph = McfGraph.newBuilder();
for (Map.Entry<String, Map<String, List<String>>> nodeEntry : nodeData.entrySet()) {
Expand Down
30 changes: 23 additions & 7 deletions tool/src/main/java/org/datacommons/tool/Args.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,30 @@ public Debug.CommandArgs toProto() {
if (!LogWrapper.TEST_MODE && fileGroup != null) {
List<String> allFiles = new ArrayList<>();

if (fileGroup.getMcfs() != null) {
for (File f : fileGroup.getMcfs()) {
allFiles.add(f.getPath());
if (fileGroup instanceof org.datacommons.util.McfFileGroup) {
org.datacommons.util.McfFileGroup mcfGroup = (org.datacommons.util.McfFileGroup) fileGroup;
if (mcfGroup.getMcfs() != null) {
for (File f : mcfGroup.getMcfs()) {
allFiles.add(f.getPath());
}
}
}
if (fileGroup.getTmcfs() != null) {
for (File f : fileGroup.getTmcfs()) {
allFiles.add(f.getPath());
if (mcfGroup.getTmcfs() != null) {
for (File f : mcfGroup.getTmcfs()) {
allFiles.add(f.getPath());
}
}
} else if (fileGroup instanceof org.datacommons.util.JsonLdFileGroup) {
org.datacommons.util.JsonLdFileGroup jsonLdGroup =
(org.datacommons.util.JsonLdFileGroup) fileGroup;
if (jsonLdGroup.getJsonLds() != null) {
for (File f : jsonLdGroup.getJsonLds()) {
allFiles.add(f.getPath());
}
}
if (jsonLdGroup.getTmplJsonLds() != null) {
for (File f : jsonLdGroup.getTmplJsonLds()) {
allFiles.add(f.getPath());
}
}
}
if (fileGroup.getCsvs() != null) {
Expand Down
5 changes: 4 additions & 1 deletion tool/src/main/java/org/datacommons/tool/Lint.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.datacommons.util.FileGroup;
import picocli.CommandLine;

@CommandLine.Command(name = "lint", description = "Run various checks on input MCF/TMCF/CSV files")
@CommandLine.Command(
name = "lint",
description = "Run various checks on input MCF/TMCF/CSV/JSON-LD files")
class Lint implements Callable<Integer> {
private static final Logger logger = LogManager.getLogger(Lint.class);

Expand All @@ -32,6 +34,7 @@ class Lint implements Callable<Integer> {
description =
("List of input files. The file extensions are used to infer the format. "
+ "Valid extensions include .mcf for Instance MCF, .tmcf for Template MCF, "
+ ".jsonld for JSON-LD instances, .tmpl.jsonld for JSON-LD Templates, "
+ ".csv for tabular text files delimited by comma (overridden with -d), and .tsv "
+ "for tab-delimited tabular files."))
private File[] files;
Expand Down
Loading