diff --git a/import-automation/scripts/dc_manifest.proto b/import-automation/scripts/dc_manifest.proto new file mode 100644 index 0000000000..08f02de59c --- /dev/null +++ b/import-automation/scripts/dc_manifest.proto @@ -0,0 +1,369 @@ +syntax = "proto3"; + +// protoc --proto_path=. --python_out=. dc_manifest.proto + +// IMPORTANT NOTE: Do not change the existing field names in +// DataCommonsManifest::Import and ExternalTable messages, since these messages +// are shared with the Template Generator UI application in github. + +package datacommons.proto; + + + + +message ExternalTable { + // Name of the table. + string table_name = 1 ; + + // Path to the template MCF (previously called the "schema mapping file"). + // TODO(shanth): Rename to template_mcf_path. + string mapping_path = 2; + + // Path to one or more data CSV file patterns. + // NOTE: All files must be present in the same directory if DownloadConfig is + // specified. + // TODO(shanth): Change to csv_paths after textprotos in CNS have been updated + repeated string csv_path = 3; + + // Ordered list of column names that the schema mapping file will refer to. + // + // NOTE: This need not match the raw CSV column names. + repeated string column_names = 4; + + // Delimiter for CSV. + // NOTE: expected to be a single character. + string field_delim = 5 ; + + // Download info for TMCF/CSV files. + DownloadConfig download_config = 6; +} + +// Arguments relevant for the resolution of an import. +message ResolutionInfo { + // Set if we need any DCID generation or resolution. + bool uses_id_resolver = 1; + + // List of ID properties this dataset depends on for resolution. + repeated string requires_ids = 2; + + // List of ID properties this dataset provides for downstream resolution. + repeated string provides_ids = 3; + + // Info about new geos introduced by this dataset. + message GeoInfo { + // List the ID properties that DCIDs are based on. + repeated string dcid_sources = 1; + } + + // REQUIRED: Must be set if new geos are introduced. + GeoInfo new_geo = 4; + + // When "uses_id_resolver" is true, this indicates whether the resolution was + // done against the KG, and if so which type. + enum KGType { + KG_NONE = 0; + KG_BQ = 1; + KG_MCF = 2; + } + + KGType kg_for_resolution = 5 ; + + // There are scenarios where additional MCF nodes are necessary purely for + // resolution. Examples include StatVar MCFs for table flow and geo nodes + // used to resolve by other IDs (like wikidataId). + repeated string resolution_helper_mcf_paths = 6; + + // This must be present if |automated_mcf_generation_by| is set and |table| is + // empty in DataCommonsManifest::Import. One exception is for the "resolved + // MCF --> cache" mode in the controller, in that case, this field is not + // needed. + repeated string unresolved_mcf_urls = 7; +} + +// Configuration used to download files into CNS as the first step. +message DownloadConfig { + // The src directory is expected to contain CSV/TMCF/MCF files with the exact + // names as those in DataCommonsManifest::Import. + // + // If DownloadConfig is specified for a CSV/TMCF import (i.e., in + // ExternalTable.download_config), then the CSV file is required and the TMCF + // file is optional. If DownloadConfig is specified for a node MCF import + // (i.e., Import.mcf_download_config), then the source directory must contain + // all the files equivalent to "mcf_url". + // + // For example, suppose that the import specifies a TMCF path of + // /cns/path/to/file/data.tmcf, CSV path of /cns/path/to/file/data.csv and the + // "src_prefix_dir" is /bigstore/bucket/foo. Then /bigstore/bucket/foo/ + // should have files named data.csv (required) and data.tmcf (optional). + // + // As an MCF example, suppose the mcf_url is /cns/path/to/files/*.mcf and + // there are three resolved MCF files (1.mcf, 2.mcf, 3.mcf) that match the + // pattern. If "src_prefix_dir" is /bigstore/bucket/foo, then it must contain + // unresolved MCF files with those same names. + oneof prefix { + // Source prefix directory. + string src_prefix_dir = 1; + + // Path to a file containing the latest version directory name. If it is an + // absolute path, then it is used as the "src_prefix_dir". If not, then + // "src_prefix_dir" is constructed as: + // join(dirname(src_latest_version_file), ) + string src_latest_version_file = 2; + } + + message PathPair { + string src_file = 1; + string dst_file = 2; + } + + message FileOp { + oneof op { + // Copy from src_file to dst_file. + PathPair cp = 1; + + // Delete the given file. + string del = 2; + } + } + + reserved 3, 4; +} + +// Configuration used in selecting nodes for golden triples generation. +message GoldenTripleSelection { + // For specified number of svObs nodes, pick one into golden triples. + int32 svobs_nodes_pick_one_in = 3 ; + + reserved 1, 2; +} + +// A manifest is a collection of imports each having a name, provenance URL, +// provenance description, and list of CNS path patterns or external table info. +message DataCommonsManifest { + // Categories of imports. + // LINT.IfChange + enum ImportCategory { + IMPORT_CATEGORY_UNKNOWN = 0; + + // Import with Graph Schema nodes. + SCHEMA = 1; + + // Import with place nodes. + PLACE = 2; + + // Import with StatVar nodes having curated DCIDs. + CURATED_STATVAR = 3; + + // Import with StatVar nodes having auto-generated DCIDs. + GENERATED_STATVAR = 4; + + // Aggregated stats. + AGGREGATED_STATS = 7; + + // Stats, which contain observations. + STATS = 8; + + // Import with entity nodes that are not Schema/StatVars, observations or + // places. + ENTITY = 9; + + // Imputed stats. + IMPUTED_STATS = 10; + + // Intermediate stats that are used as inputs to derived graph computations, + // but are not built into cache. + // This could be either a direct import from source, or some kind of + // aggregation that doesn't get built into cache. + INTERMEDIATE_STATS = 11; + + // Schema import from custom DC. + CUSTOM_SCHEMA = 12; + + reserved 5, 6; + } + // LINT.ThenChange(//depot/google3/datacommons/import/mcf_vocab.h) + + // Next ID: 32. + message Import { + string import_name = 1; + string provenance_url = 2; + string provenance_description = 6; + ImportCategory category = 20; + + // The list of Import Groups that this import is part of. Must refer to a + // valid ImportGroup.name. If empty, then this import is part of all groups. + repeated string import_groups = 23; + + // When input is node MCF, "mcf_url" is set. + repeated string mcf_url = 3; + + // The URL of the optimized MCF in proto (TFRecord) format. + repeated string mcf_proto_url = 25; + + // GCS path prefix for resolved MCFs. + string mcf_gcs_path_prefix = 31; + + // In cases like Core Geo MCFs, the MCFs are versioned in piper but that + // version is not directly accessed by the binary (owing to memory + // limitations). Instead, we keep a copy of these files in CNS and provide + // that path in "mcf_url". Set this bool to true to mark that situation. + // + // If set, there exist tests (manifest_checker_test.cc) to ensure the MCFs + // are in sync and tooling (sync_replicated_mcfs.cc) to update the CNS copy. + bool is_mcf_replicated_from_piper = 18; + + // When input comes from TMCF/CSV, this is set. + repeated ExternalTable table = 4; + string curator_email = 5 ; + + // Information about resolution. + ResolutionInfo resolution_info = 9; + + // Dates associated with stats. All dates are in ISO8601 format. + // + // NOTE: For datasets that are released more frequently than a month, please + // skip |end_date_in_kg|, |end_date_in_source| and |next_release_date|. + // + // Earliest value of observationDate in the KG. + string start_date_in_kg = 10; + + // Latest value of observationDate in the KG. + string end_date_in_kg = 11; + + // Latest value of observationDate that is available at the source. + // If end_date_in_kg < end_date_in_source, the dataset needs refresh. + string end_date_in_source = 21; + + // Data Release Schedule + // + // Release frequency of the import source. + string release_frequency = 12; + + // Next date when the data source will be released. If this date is in the + // past, the dataset needs refresh. + string next_release_date = 13; + + // The URLs from where the data can be accessed/downloaded. + repeated string data_download_url = 15; + + // As part of which import-group do we regenerate the MCFs? + // + // The branch controllers can convert table to MCF or resolve MCF if this + // field is set to the appropriate import group. Not setting this field + // disables any such work, instead the resolved MCF (in "mcf_url") is used + // as is. + string automated_mcf_generation_by = 24; + + // Configuration for generating golden triples. + GoldenTripleSelection golden_triple_selection = 14; + + // Whether unresolved MCFs are missing. + // These are all cases that the resolved MCFs are PopObs instead of SVObs. + // For SVObs, the Flume pipeline will generate unresolved MCFs from CSV+TMCF + // automatically. + bool is_unresolved_mcf_missing = 22 ; + + // Indicates which MCF files to copy from GCS or other source before + // resolving. + // REQUIRES: this is an MCF-only import with table being empty. + DownloadConfig mcf_download_config = 17; + + // DatasetInfo that this import corresponds to. Must refer to a valid + // DatasetInfo.name. + string dataset_name = 26; + + // Metadata files that contain curated resolved MCF for the import. These + // MCFs provide a way for extending Provenance, Dataset and Source nodes + // with new properties, without requiring any manifest proto and pipeline + // changes. + repeated string metadata_mcf_url = 30; + + reserved 7, 8, 16, 19, 27, 28, 29; + } + + repeated Import import = 1; + + // Import Group is a collection of imports that are built together into a + // single cache for serving. + message ImportGroup { + // Name of the import group. + string name = 1; + + // Description. + string description = 2; + bool is_custom_dc = 5 ; + + reserved 3, 4; + } + + repeated ImportGroup import_groups = 2; + + // Represents a dataset from which one or more imports come from. This is used + // to display on the Data Sources page + // (https://docs.datacommons.org/datasets/). + message DatasetInfo { + // Name of dataset + string name = 1; + + // Url of dataset to link to + string url = 2; + + // Markdown description of data source + oneof description { + // Inline markdown string containing description + string description_md = 3; + + // Name of markdown file containing description, if used instead of [3] + string description_file = 4; + } + + // Verticals that should include this data source + repeated string verticals = 5; + } + + // Represents a source which contains zero or more datasets. For sources with + // zero distinct child datasets (i.e. the source directly corresponds to a + // dataset), the dataset details will be contained in the source. This is used + // to display on the Data Sources page + // (https://docs.datacommons.org/datasets/). + message DatasetSource { + // Name of source + string name = 1; + + // Url of source to link to + string url = 2; + + // Markdown description of source + string header_md = 3; + + // Markdown description of any terms of service + string footer_md = 4; + + // Datasets associated with this source + repeated DatasetInfo datasets = 5; + } + + repeated DatasetSource dataset_source = 4; + + // Paths to curated import MCF. These MCFs provide a way for extending + // Provenance, Dataset and Source nodes with new properties, without requiring + // any manifest proto and pipeline changes, as well as source-specific + // metadata about StatisticalVariables. + repeated string import_metadata_mcf = 5; + + reserved 3; +} + +// A snapshot of files in a single import. Each file has length and mtime, that +// act as cheap alternatives to a full content checksum. This snapshot gets +// programmatically generated and validated. +message ImportSnapshot { + message FileStat { + string path = 1; + int64 length = 2; + double mtime_secs = 3; + } + + string import_name = 1; + repeated FileStat stat = 2; +} \ No newline at end of file diff --git a/import-automation/scripts/dc_manifest_pb2.py b/import-automation/scripts/dc_manifest_pb2.py new file mode 100644 index 0000000000..a7d87f0af7 --- /dev/null +++ b/import-automation/scripts/dc_manifest_pb2.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: dc_manifest.proto +"""Generated protocol buffer code.""" +from google.protobuf.internal import builder as _builder +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x11\x64\x63_manifest.proto\x12\x11\x64\x61tacommons.proto\"\xb2\x01\n\rExternalTable\x12\x12\n\ntable_name\x18\x01 \x01(\t\x12\x14\n\x0cmapping_path\x18\x02 \x01(\t\x12\x10\n\x08\x63sv_path\x18\x03 \x03(\t\x12\x14\n\x0c\x63olumn_names\x18\x04 \x03(\t\x12\x13\n\x0b\x66ield_delim\x18\x05 \x01(\t\x12:\n\x0f\x64ownload_config\x18\x06 \x01(\x0b\x32!.datacommons.proto.DownloadConfig\"\xe8\x02\n\x0eResolutionInfo\x12\x18\n\x10uses_id_resolver\x18\x01 \x01(\x08\x12\x14\n\x0crequires_ids\x18\x02 \x03(\t\x12\x14\n\x0cprovides_ids\x18\x03 \x03(\t\x12:\n\x07new_geo\x18\x04 \x01(\x0b\x32).datacommons.proto.ResolutionInfo.GeoInfo\x12\x43\n\x11kg_for_resolution\x18\x05 \x01(\x0e\x32(.datacommons.proto.ResolutionInfo.KGType\x12#\n\x1bresolution_helper_mcf_paths\x18\x06 \x03(\t\x12\x1b\n\x13unresolved_mcf_urls\x18\x07 \x03(\t\x1a\x1f\n\x07GeoInfo\x12\x14\n\x0c\x64\x63id_sources\x18\x01 \x03(\t\",\n\x06KGType\x12\x0b\n\x07KG_NONE\x10\x00\x12\t\n\x05KG_BQ\x10\x01\x12\n\n\x06KG_MCF\x10\x02\"\xec\x01\n\x0e\x44ownloadConfig\x12\x18\n\x0esrc_prefix_dir\x18\x01 \x01(\tH\x00\x12!\n\x17src_latest_version_file\x18\x02 \x01(\tH\x00\x1a.\n\x08PathPair\x12\x10\n\x08src_file\x18\x01 \x01(\t\x12\x10\n\x08\x64st_file\x18\x02 \x01(\t\x1aW\n\x06\x46ileOp\x12\x38\n\x02\x63p\x18\x01 \x01(\x0b\x32*.datacommons.proto.DownloadConfig.PathPairH\x00\x12\r\n\x03\x64\x65l\x18\x02 \x01(\tH\x00\x42\x04\n\x02opB\x08\n\x06prefixJ\x04\x08\x03\x10\x04J\x04\x08\x04\x10\x05\"D\n\x15GoldenTripleSelection\x12\x1f\n\x17svobs_nodes_pick_one_in\x18\x03 \x01(\x05J\x04\x08\x01\x10\x02J\x04\x08\x02\x10\x03\"\xdd\r\n\x13\x44\x61taCommonsManifest\x12=\n\x06import\x18\x01 \x03(\x0b\x32-.datacommons.proto.DataCommonsManifest.Import\x12I\n\rimport_groups\x18\x02 \x03(\x0b\x32\x32.datacommons.proto.DataCommonsManifest.ImportGroup\x12L\n\x0e\x64\x61taset_source\x18\x04 \x03(\x0b\x32\x34.datacommons.proto.DataCommonsManifest.DatasetSource\x12\x1b\n\x13import_metadata_mcf\x18\x05 \x03(\t\x1a\xf0\x06\n\x06Import\x12\x13\n\x0bimport_name\x18\x01 \x01(\t\x12\x16\n\x0eprovenance_url\x18\x02 \x01(\t\x12\x1e\n\x16provenance_description\x18\x06 \x01(\t\x12G\n\x08\x63\x61tegory\x18\x14 \x01(\x0e\x32\x35.datacommons.proto.DataCommonsManifest.ImportCategory\x12\x15\n\rimport_groups\x18\x17 \x03(\t\x12\x0f\n\x07mcf_url\x18\x03 \x03(\t\x12\x15\n\rmcf_proto_url\x18\x19 \x03(\t\x12\x1b\n\x13mcf_gcs_path_prefix\x18\x1f \x01(\t\x12$\n\x1cis_mcf_replicated_from_piper\x18\x12 \x01(\x08\x12/\n\x05table\x18\x04 \x03(\x0b\x32 .datacommons.proto.ExternalTable\x12\x15\n\rcurator_email\x18\x05 \x01(\t\x12:\n\x0fresolution_info\x18\t \x01(\x0b\x32!.datacommons.proto.ResolutionInfo\x12\x18\n\x10start_date_in_kg\x18\n \x01(\t\x12\x16\n\x0e\x65nd_date_in_kg\x18\x0b \x01(\t\x12\x1a\n\x12\x65nd_date_in_source\x18\x15 \x01(\t\x12\x19\n\x11release_frequency\x18\x0c \x01(\t\x12\x19\n\x11next_release_date\x18\r \x01(\t\x12\x19\n\x11\x64\x61ta_download_url\x18\x0f \x03(\t\x12#\n\x1b\x61utomated_mcf_generation_by\x18\x18 \x01(\t\x12I\n\x17golden_triple_selection\x18\x0e \x01(\x0b\x32(.datacommons.proto.GoldenTripleSelection\x12!\n\x19is_unresolved_mcf_missing\x18\x16 \x01(\x08\x12>\n\x13mcf_download_config\x18\x11 \x01(\x0b\x32!.datacommons.proto.DownloadConfig\x12\x14\n\x0c\x64\x61taset_name\x18\x1a \x01(\t\x12\x18\n\x10metadata_mcf_url\x18\x1e \x03(\tJ\x04\x08\x07\x10\x08J\x04\x08\x08\x10\tJ\x04\x08\x10\x10\x11J\x04\x08\x13\x10\x14J\x04\x08\x1b\x10\x1cJ\x04\x08\x1c\x10\x1dJ\x04\x08\x1d\x10\x1e\x1aR\n\x0bImportGroup\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x13\n\x0b\x64\x65scription\x18\x02 \x01(\t\x12\x14\n\x0cis_custom_dc\x18\x05 \x01(\x08J\x04\x08\x03\x10\x04J\x04\x08\x04\x10\x05\x1a\x80\x01\n\x0b\x44\x61tasetInfo\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x18\n\x0e\x64\x65scription_md\x18\x03 \x01(\tH\x00\x12\x1a\n\x10\x64\x65scription_file\x18\x04 \x01(\tH\x00\x12\x11\n\tverticals\x18\x05 \x03(\tB\r\n\x0b\x64\x65scription\x1a\x96\x01\n\rDatasetSource\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x11\n\theader_md\x18\x03 \x01(\t\x12\x11\n\tfooter_md\x18\x04 \x01(\t\x12\x44\n\x08\x64\x61tasets\x18\x05 \x03(\x0b\x32\x32.datacommons.proto.DataCommonsManifest.DatasetInfo\"\xe7\x01\n\x0eImportCategory\x12\x1b\n\x17IMPORT_CATEGORY_UNKNOWN\x10\x00\x12\n\n\x06SCHEMA\x10\x01\x12\t\n\x05PLACE\x10\x02\x12\x13\n\x0f\x43URATED_STATVAR\x10\x03\x12\x15\n\x11GENERATED_STATVAR\x10\x04\x12\x14\n\x10\x41GGREGATED_STATS\x10\x07\x12\t\n\x05STATS\x10\x08\x12\n\n\x06\x45NTITY\x10\t\x12\x11\n\rIMPUTED_STATS\x10\n\x12\x16\n\x12INTERMEDIATE_STATS\x10\x0b\x12\x11\n\rCUSTOM_SCHEMA\x10\x0c\"\x04\x08\x05\x10\x05\"\x04\x08\x06\x10\x06J\x04\x08\x03\x10\x04\"\x9d\x01\n\x0eImportSnapshot\x12\x13\n\x0bimport_name\x18\x01 \x01(\t\x12\x38\n\x04stat\x18\x02 \x03(\x0b\x32*.datacommons.proto.ImportSnapshot.FileStat\x1a<\n\x08\x46ileStat\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\x0e\n\x06length\x18\x02 \x01(\x03\x12\x12\n\nmtime_secs\x18\x03 \x01(\x01\x62\x06proto3') + +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'dc_manifest_pb2', globals()) +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + _EXTERNALTABLE._serialized_start=41 + _EXTERNALTABLE._serialized_end=219 + _RESOLUTIONINFO._serialized_start=222 + _RESOLUTIONINFO._serialized_end=582 + _RESOLUTIONINFO_GEOINFO._serialized_start=505 + _RESOLUTIONINFO_GEOINFO._serialized_end=536 + _RESOLUTIONINFO_KGTYPE._serialized_start=538 + _RESOLUTIONINFO_KGTYPE._serialized_end=582 + _DOWNLOADCONFIG._serialized_start=585 + _DOWNLOADCONFIG._serialized_end=821 + _DOWNLOADCONFIG_PATHPAIR._serialized_start=664 + _DOWNLOADCONFIG_PATHPAIR._serialized_end=710 + _DOWNLOADCONFIG_FILEOP._serialized_start=712 + _DOWNLOADCONFIG_FILEOP._serialized_end=799 + _GOLDENTRIPLESELECTION._serialized_start=823 + _GOLDENTRIPLESELECTION._serialized_end=891 + _DATACOMMONSMANIFEST._serialized_start=894 + _DATACOMMONSMANIFEST._serialized_end=2651 + _DATACOMMONSMANIFEST_IMPORT._serialized_start=1163 + _DATACOMMONSMANIFEST_IMPORT._serialized_end=2043 + _DATACOMMONSMANIFEST_IMPORTGROUP._serialized_start=2045 + _DATACOMMONSMANIFEST_IMPORTGROUP._serialized_end=2127 + _DATACOMMONSMANIFEST_DATASETINFO._serialized_start=2130 + _DATACOMMONSMANIFEST_DATASETINFO._serialized_end=2258 + _DATACOMMONSMANIFEST_DATASETSOURCE._serialized_start=2261 + _DATACOMMONSMANIFEST_DATASETSOURCE._serialized_end=2411 + _DATACOMMONSMANIFEST_IMPORTCATEGORY._serialized_start=2414 + _DATACOMMONSMANIFEST_IMPORTCATEGORY._serialized_end=2645 + _IMPORTSNAPSHOT._serialized_start=2654 + _IMPORTSNAPSHOT._serialized_end=2811 + _IMPORTSNAPSHOT_FILESTAT._serialized_start=2751 + _IMPORTSNAPSHOT_FILESTAT._serialized_end=2811 +# @@protoc_insertion_point(module_scope) diff --git a/import-automation/scripts/generate_provisional_nodes.py b/import-automation/scripts/generate_provisional_nodes.py new file mode 100644 index 0000000000..e0bc87f7de --- /dev/null +++ b/import-automation/scripts/generate_provisional_nodes.py @@ -0,0 +1,294 @@ +import os +import re +import time +import logging +import csv +import io +import sys + +from absl import app +from absl import flags +from google.cloud import spanner + +FLAGS = flags.FLAGS +flags.DEFINE_string("directory", os.getcwd(), + "Directory to scan (default: current working directory)") +flags.DEFINE_bool("no_spanner", False, "Skip Spanner check") +flags.DEFINE_string("spanner_project", "datcom-store", "Spanner project ID") +flags.DEFINE_string("spanner_instance", "dc-kg-test", "Spanner instance ID") +flags.DEFINE_string("spanner_database", "dc_graph_2025_11_07", + "Spanner database ID") + +logging.basicConfig(level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s') + +# Spanner Configuration +BATCH_SIZE = 1000 # Number of IDs to query at once + +# Increase CSV field size limit for large MCF values +csv.field_size_limit(sys.maxsize) + +ENTITY_PREFIXES = ("dcid:", "dcs:", "schema:") + + +def strip_prefix(s): + """Strips common prefixes (dcid:, dcs:, schema:) from a string.""" + # Strip common prefixes + for prefix in ENTITY_PREFIXES: + if s.startswith(prefix): + return s[len(prefix):] + return s + + +def is_quoted(s): + """Checks if a string is surrounded by double quotes.""" + s = s.strip() + return s.startswith('"') and s.endswith('"') + + +def strip_quotes(s): + """Removes surrounding double quotes from a string if present.""" + s = s.strip() + if is_quoted(s): + return s[1:-1] + return s + + +def check_spanner_nodes(node_ids, project, instance_id, database_id): + """ + Checks which of the given node_ids exist in the Spanner Node table. + + Args: + node_ids: A collection of node IDs (strings) to check against Spanner. + project: Spanner project ID. + instance_id: Spanner instance ID. + database_id: Spanner database ID. + + Returns: + A set containing the node IDs that were found in the Spanner database. + """ + existing_nodes = set() + node_ids_list = list(node_ids) + + if not node_ids_list: + return existing_nodes + + logging.info( + f"Checking {len(node_ids_list)} potential missing nodes in Spanner...") + + try: + spanner_client = spanner.Client(project=project) + instance = spanner_client.instance(instance_id) + database = instance.database(database_id) + + # Using a single snapshot for consistency across batches + with database.snapshot(multi_use=True) as snapshot: + total_batches = (len(node_ids_list) + BATCH_SIZE - 1) // BATCH_SIZE + + for i in range(0, len(node_ids_list), BATCH_SIZE): + batch_num = (i // BATCH_SIZE) + 1 + if batch_num % 10 == 0 or batch_num == 1 or batch_num == total_batches: + logging.info( + f"Processing batch {batch_num}/{total_batches}...") + + batch = node_ids_list[i:i + BATCH_SIZE] + + try: + result = snapshot.execute_sql( + "SELECT subject_id FROM Node WHERE subject_id IN UNNEST(@ids)", + params={"ids": batch}, + param_types={ + "ids": + spanner.param_types.Array( + spanner.param_types.STRING) + }) + + # Consume the result fully + for row in result: + existing_nodes.add(row[0]) + + except Exception as e: + logging.error(f"Error in batch {batch_num}: {e}") + + except Exception as e: + logging.error(f"Failed to connect to Spanner or create snapshot: {e}") + + return existing_nodes + + +def generate_provisional_nodes(scan_dir, + no_spanner=False, + spanner_project=None, + spanner_instance=None, + spanner_database=None): + """ + Scans a directory of MCF files to find undefined nodes referenced in properties. + + Args: + scan_dir: The local directory containing .mcf files to scan. + no_spanner: If True, skips checking Cloud Spanner for existing nodes. + spanner_project: Spanner project ID. + spanner_instance: Spanner instance ID. + spanner_database: Spanner database ID. + + Returns: + The path to the generated provisional_nodes.mcf file. + """ + start_time = time.time() + root_dir = os.path.abspath(scan_dir) + output_dir = root_dir + + defined_nodes = set() + referenced_properties = set() + referenced_values = set() + + # Regex to capture "Key: Value" + pair_re = re.compile(r"^(\w+):\s*(.*)$") + + logging.info(f"Scanning directory: {root_dir}") + + # Walk through the directory to process each .mcf file + for dirpath, dirnames, filenames in os.walk(root_dir): + for filename in filenames: + if ".mcf" not in filename: + continue + + # Skip the output file itself if it already exists from a previous run + if filename == "provisional_nodes.mcf": + continue + + filepath = os.path.join(dirpath, filename) + + with open(filepath, 'r', encoding='utf-8') as f: + current_node_id = None + + for line in f: + line = line.strip() + if not line or line.startswith("//") or line.startswith( + "#"): + continue + + # Check for Node definition (e.g., "Node: dcid:City") + if line.startswith("Node:"): + if current_node_id: + defined_nodes.add(current_node_id) + + parts = line.split(":", 1) + if len(parts) > 1: + current_node_id = strip_prefix( + strip_quotes(parts[1])) + else: + current_node_id = None + continue + + # Check for Property: Value pairs + match = pair_re.match(line) + if match: + key = match.group(1).strip() + value_str = match.group(2).strip() + + # If explicitly defining dcid as a property, use that as the node ID + if key == "dcid": + current_node_id = strip_prefix( + strip_quotes(value_str)) + continue + + # 1. The Key (Property) is a reference to a Property node (e.g., "containedInPlace") + referenced_properties.add(strip_prefix(key)) + + # 2. The Value: Only check for explicit prefixes indicating references (e.g., "dcid:geoId/06") + f_io = io.StringIO(value_str) + reader = csv.reader(f_io, skipinitialspace=True) + try: + tokens = next(reader) + except StopIteration: + tokens = [] + + for token in tokens: + if not token: + continue + + clean_token = strip_quotes(token) + + # Only strict prefixes are references + if clean_token.startswith(ENTITY_PREFIXES): + ref_id = strip_prefix(clean_token) + referenced_values.add(ref_id) + + # Add the last node of the file + if current_node_id: + defined_nodes.add(current_node_id) + + # Calculate initially missing nodes (referenced but not defined locally) + missing_props = referenced_properties - defined_nodes + missing_values = referenced_values - defined_nodes + all_missing_local = missing_props | missing_values + + # Filter out empty strings if any + all_missing_local = {m for m in all_missing_local if m} + + logging.info(f"Found {len(defined_nodes)} defined nodes.") + logging.info(f"Found {len(referenced_properties)} referenced properties.") + logging.info(f"Found {len(referenced_values)} referenced values.") + logging.info(f"Found {len(all_missing_local)} locally missing definitions.") + + # Save locally missing nodes to a file (pre-Spanner check) for debugging/audit + local_missing_file_path = os.path.join(output_dir, + "local_missing_nodes.txt") + with open(local_missing_file_path, "w") as f: + for m in sorted(all_missing_local): + f.write(f"{m}\n") + logging.info( + f"Written locally missing nodes (pre-Spanner) to {local_missing_file_path}" + ) + + # Check Spanner for existence of these missing nodes + existing_in_spanner = set() + if not no_spanner: + existing_in_spanner = check_spanner_nodes(all_missing_local, + spanner_project, + spanner_instance, + spanner_database) + + if existing_in_spanner: + logging.info( + f"Found {len(existing_in_spanner)} nodes in Spanner (will not be emitted)." + ) + else: + logging.info("Skipping Spanner check as requested.") + + # Final missing set = missing locally AND missing in Spanner + final_missing = all_missing_local - existing_in_spanner + + logging.info(f"Final missing count: {len(final_missing)}") + + # Generate the provisional nodes MCF file + output_file_path = os.path.join(output_dir, "provisional_nodes.mcf") + with open(output_file_path, "w") as out: + for m in sorted(final_missing): + if m in missing_props: + node_type = "dcs:Property" + else: + node_type = "dcs:ProvisionalNode" + + # We don't print to stdout, we write to file directly to be useful + node_def = f"Node: dcid:{m}\ntypeOf: {node_type}\nisProvisional: dcs:True\n\n" + out.write(node_def) + + logging.info(f"Written missing nodes to {output_file_path}") + + end_time = time.time() + logging.info(f"Total runtime: {end_time - start_time:.2f} seconds") + return output_file_path + + +def main(_): + output_path = generate_provisional_nodes(FLAGS.directory, FLAGS.no_spanner, + FLAGS.spanner_project, + FLAGS.spanner_instance, + FLAGS.spanner_database) + logging.info(f"Generated provisional nodes at: {output_path}") + + +if __name__ == "__main__": + app.run(main) diff --git a/import-automation/scripts/manifest_to_mcf.py b/import-automation/scripts/manifest_to_mcf.py new file mode 100644 index 0000000000..77295dbcfc --- /dev/null +++ b/import-automation/scripts/manifest_to_mcf.py @@ -0,0 +1,171 @@ +import sys +import os +from urllib.parse import urlparse +from google.protobuf import text_format +import dc_manifest_pb2 + +def convert_to_mcf(import_proto, test_time=15700000): + import_name = import_proto.import_name + dcid = f"dc/base/{import_name}" + + mcf_lines = [] + mcf_lines.append(f"Node: dcid:{dcid}") + + # curator + curator_email = import_proto.curator_email + if not curator_email or curator_email == "imports@datacommons.org": + mcf_lines.append("curator: dcid:dc/cjj7vp") + else: + # Simple hash as in Java code fallback + mcf_lines.append(f"curator: dcid:dc/curator_{hash(curator_email) & 0xffffffff}") + + mcf_lines.append(f'dcid: "{dcid}"') + + if import_proto.provenance_description: + mcf_lines.append(f'description: "{import_proto.provenance_description}"') + + # entityResolutionType + res_type = "dcid:NoEntityResolution" + if import_proto.HasField("resolution_info"): + res_info = import_proto.resolution_info + if res_info.uses_id_resolver: + if res_info.kg_for_resolution == dc_manifest_pb2.ResolutionInfo.KG_MCF: + res_type = "dcid:IdBasedKgResolution" + else: + res_type = "dcid:IdResolutionWithoutKg" + mcf_lines.append(f"entityResolutionType: {res_type}") + + # importTime + mcf_lines.append(f"importTime: {test_time}") + + # isPartOf + dataset_name = import_proto.dataset_name + if dataset_name: + clean_dataset_name = dataset_name.replace(" ", "") + mcf_lines.append(f"isPartOf: dcid:dc/d/TestSource_{clean_dataset_name}") + + mcf_lines.append(f'name: "{import_name}"') + mcf_lines.append(f"provenance: dcid:{dcid}") + + # provenanceCategory + cat_map = { + dc_manifest_pb2.DataCommonsManifest.SCHEMA: "SchemaProvenance", + dc_manifest_pb2.DataCommonsManifest.PLACE: "PlaceProvenance", + dc_manifest_pb2.DataCommonsManifest.CURATED_STATVAR: "StatVarProvenance", + dc_manifest_pb2.DataCommonsManifest.GENERATED_STATVAR: "StatVarProvenance", + dc_manifest_pb2.DataCommonsManifest.STATS: "StatsProvenance", + dc_manifest_pb2.DataCommonsManifest.AGGREGATED_STATS: "StatsProvenance", + dc_manifest_pb2.DataCommonsManifest.IMPUTED_STATS: "StatsProvenance", + dc_manifest_pb2.DataCommonsManifest.INTERMEDIATE_STATS: "StatsProvenance", + } + category = import_proto.category + cat_dcid = cat_map.get(category, "UnknownProvenance") + mcf_lines.append(f"provenanceCategory: dcid:{cat_dcid}") + + if import_proto.mcf_url: + mcf_lines.append(f'resolvedTextMcfUrl: "{import_proto.mcf_url[0]}"') + + mcf_lines.append("source: dcid:dc/s/TestSource") + mcf_lines.append("typeOf: dcid:Provenance") + + if import_proto.provenance_url: + mcf_lines.append(f'url: "{import_proto.provenance_url}"') + + return "\n".join(mcf_lines) + +def convert_dataset_sources_to_mcf(dataset_sources): + out_nodes = [] + + for ds in dataset_sources: + source_id = ds.name.replace(" ", "") + s_node_id = f"dcid:dc/s/{source_id}" + + s_props = {} + s_props['dcid'] = [f'"{s_node_id.split(":", 1)[1]}"'] + s_props['name'] = [f'"{ds.name}"'] + + if ds.url: + s_props['url'] = [f'"{ds.url}"'] + domain = urlparse(ds.url).netloc + if domain: + s_props['domain'] = [f'"{domain}"'] + + s_props['provenance'] = ['dcid:dc/base/GeneratedGraphs'] + s_props['typeOf'] = ['dcid:Source'] + + d_nodes_out = [] + for d in ds.datasets: + dataset_id = d.name.replace(" ", "") + d_node_id = f"dcid:dc/d/{source_id}_{dataset_id}" + + d_props = {} + d_props['dcid'] = [f'"{d_node_id.split(":", 1)[1]}"'] + d_props['isPartOf'] = [f"dcid:dc/s/{source_id}"] + d_props['name'] = [f'"{d.name}"'] + + if d.url: + d_props['url'] = [f'"{d.url}"'] + + d_props['provenance'] = ['dcid:dc/base/GeneratedGraphs'] + d_props['typeOf'] = ['dcid:Dataset'] + + lines = [f"Node: {d_node_id}"] + for prop in sorted(d_props.keys()): + for val in d_props[prop]: + lines.append(f"{prop}: {val}") + d_nodes_out.append("\n".join(lines)) + + lines = [f"Node: {s_node_id}"] + for prop in sorted(s_props.keys()): + for val in s_props[prop]: + lines.append(f"{prop}: {val}") + + out_nodes.extend(d_nodes_out) + out_nodes.append("\n".join(lines)) + + return "\n\n".join(out_nodes) + +def process_text(input_text): + input_text = input_text.strip() + + # Try parsing as DataCommonsManifest + manifest = dc_manifest_pb2.DataCommonsManifest() + try: + text_format.Merge(input_text, manifest) + # Use getattr because 'import' is a Python keyword and cannot be accessed via dot notation. + imports = getattr(manifest, 'import', []) + dataset_sources = getattr(manifest, 'dataset_source', []) + + if imports or dataset_sources: + out_mcf = [] + if dataset_sources: + out_mcf.append(convert_dataset_sources_to_mcf(dataset_sources)) + if imports: + for imp in imports: + out_mcf.append(convert_to_mcf(imp)) + + return "\n\n".join(out_mcf) + else: + raise ValueError("No imports or dataset_sources found in the parsed manifest.") + except Exception as e: + raise ValueError(f"Failed to parse input as DataCommonsManifest: {e}") + +if __name__ == "__main__": + path = sys.argv[1] + + for root, _, files in os.walk(path): + for file in files: + if file.endswith(".textproto"): + textproto_path = os.path.join(root, file) + mcf_path = textproto_path.replace(".textproto", ".mcf") + + with open(textproto_path, 'r') as f: + input_text = f.read() + + try: + out_mcf = process_text(input_text) + with open(mcf_path, 'w') as f: + f.write(out_mcf + "\n") + print(f"Processed {textproto_path} -> {mcf_path}") + except Exception as e: + print(f"Error processing {textproto_path}: {e}") diff --git a/import-automation/scripts/retry_failed_ingestion.py b/import-automation/scripts/retry_failed_ingestion.py new file mode 100644 index 0000000000..308ad0b56c --- /dev/null +++ b/import-automation/scripts/retry_failed_ingestion.py @@ -0,0 +1,299 @@ +#!/usr/bin/env python3 +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Script to automate the retry process for failed Dataflow ingestion jobs. + +This script identifies failed imports within a specific Dataflow job, +reverts the failed imports in the Spanner database to their last known +good version, resets any 'PENDING' imports back to 'STAGING', and +optionally retriggers the Spanner ingestion workflow. +""" + +import json +import sys + +from absl import app +from absl import flags +from absl import logging +from google.cloud import spanner +from googleapiclient.discovery import build +from googleapiclient.errors import HttpError + +FLAGS = flags.FLAGS + +flags.DEFINE_string("project_id", "datcom-import-automation-prod", + "GCP Project ID") +flags.DEFINE_string("location", "us-central1", "GCP Location") +flags.DEFINE_string("job_id", None, "Failed Dataflow Job ID", required=True) +flags.DEFINE_string("spanner_project", "datcom-store", "Spanner Project ID") +flags.DEFINE_string("spanner_instance", "dc-kg-test", "Spanner Instance ID") +flags.DEFINE_string("spanner_database", "dc_graph_import", + "Spanner Database ID") +flags.DEFINE_string("workflow_name", "spanner-ingestion-workflow", + "Workflow name") + + +def get_failed_imports(dataflow, project_id, location, job_id): + """Identifies failed import names from Dataflow job messages and parameters. + + Args: + dataflow: An authenticated Google Cloud Dataflow API client. + project_id: The GCP Project ID where the Dataflow job ran. + location: The GCP location of the Dataflow job (e.g., 'us-central1'). + job_id: The unique ID of the failed Dataflow job. + + Returns: + A list of strings representing the names of the imports that failed + during the Dataflow job. + """ + failed_imports = set() + try: + logging.info(f"Fetching job details for {job_id}...") + job = dataflow.projects().locations().jobs().get( + projectId=project_id, + location=location, + jobId=job_id, + view='JOB_VIEW_ALL').execute() + + # Get all imports involved in this job from displayData. + all_import_names = set() + import_list_json = None + for item in job.get('pipelineDescription', {}).get('displayData', []): + if item.get('key') == 'importList': + import_list_json = item.get('strValue') + break + + if import_list_json: + try: + all_imports_data = json.loads(import_list_json) + all_import_names = { + i['importName'].split(':')[-1] for i in all_imports_data + } + logging.info( + f"Imports involved in this job: {all_import_names}") + except Exception as e: + logging.error(f"Error parsing importList parameter: {e}") + + logging.info(f"Fetching error messages for Dataflow job {job_id}...") + messages = dataflow.projects().locations().jobs().messages().list( + projectId=project_id, + location=location, + jobId=job_id, + minimumImportance='JOB_MESSAGE_ERROR').execute() + + for msg in messages.get('jobMessages', []): + text = msg.get('messageText', '') + # If we find a valid import name in the error message, add it. + for name in all_import_names: + if name in text: + failed_imports.add(name) + + # If the job failed globally and we haven't identified specific imports, + # or if we want to be safe, include all imports if the state is failed. + if job.get('currentState') in [ + 'JOB_STATE_FAILED', 'JOB_STATE_CANCELLED' + ]: + if not failed_imports: + logging.warning( + "Job failed globally. Reverting all involved imports.") + failed_imports.update(all_import_names) + else: + logging.info( + f"Job failed. Identified specific failed imports: {failed_imports}" + ) + + except HttpError as e: + logging.error(f"Error querying Dataflow API: {e}") + + return list(failed_imports) + + +def revert_import(database, import_name, job_id): + """Reverts an import to its previous version in Spanner and sets it to STAGING. + + This function updates the ImportStatus and ImportVersionHistory tables in + Spanner to effectively rollback an import that failed during the Dataflow job. + + Args: + database: An initialized Spanner Database object. + import_name: The name of the import to revert (e.g., 'foo:bar:import'). + job_id: The ID of the failed Dataflow job, used for commenting. + + Returns: + True if the revert was successful, 'ALREADY_REVERTED' if the import + was recently reverted, or False if no version history was found or an + error occurred. + """ + short_name = import_name.split(':')[-1] + + def _revert_txn(transaction): + # 1. Fetch the most recent version and comment from ImportVersionHistory. + sql = """ + SELECT Version, Comment FROM ImportVersionHistory + WHERE ImportName = @importName + ORDER BY UpdateTimestamp DESC + LIMIT 1 + """ + results = transaction.execute_sql( + sql, + params={'importName': short_name}, + param_types={'importName': spanner.param_types.STRING}) + rows = list(results) + + if not rows: + logging.warning( + f"No version history found for '{short_name}'. Cannot revert.") + return False + + previous_version = rows[0][0] + previous_comment = rows[0][1] if len(rows[0]) > 1 else "" + + if previous_comment and "Reverted due to Dataflow failure" in previous_comment: + logging.error( + f"Import '{short_name}' was already reverted recently. Aborting to prevent infinite loop." + ) + return "ALREADY_REVERTED" + + logging.info( + f"Reverting '{short_name}' to last known good version: {previous_version}" + ) + + # 2. Update ImportStatus to point to the previous version and set state to STAGING. + param_types = { + 'version': spanner.param_types.STRING, + 'importName': spanner.param_types.STRING + } + + transaction.execute_update(""" + UPDATE ImportStatus + SET LatestVersion = @version, State = 'STAGING', StatusUpdateTimestamp = PENDING_COMMIT_TIMESTAMP() + WHERE ImportName = @importName + """, + params={ + 'version': previous_version, + 'importName': short_name + }, + param_types=param_types) + + # 3. Add an entry to ImportVersionHistory to record the revert action. + transaction.execute_update( + """ + INSERT INTO ImportVersionHistory (ImportName, Version, UpdateTimestamp, Comment) + VALUES (@importName, @version, PENDING_COMMIT_TIMESTAMP(), @comment) + """, + params={ + 'importName': short_name, + 'version': previous_version, + 'comment': f"Reverted due to Dataflow failure ({job_id})" + }, + param_types={ + 'importName': spanner.param_types.STRING, + 'version': spanner.param_types.STRING, + 'comment': spanner.param_types.STRING + }) + return True + + try: + return database.run_in_transaction(_revert_txn) + except Exception as e: + logging.error(f"Transaction failed for '{short_name}': {e}") + return False + + +def reset_pending_imports(database): + """Resets all imports with state 'PENDING' back to 'STAGING'. + + This ensures that any imports that were stuck in a PENDING state due to + the failure are put back into the queue for processing. + + Args: + database: An initialized Spanner Database object. + """ + logging.info("Resetting all PENDING imports to STAGING...") + + def _reset_txn(transaction): + return transaction.execute_update(""" + UPDATE ImportStatus + SET State = 'STAGING', StatusUpdateTimestamp = PENDING_COMMIT_TIMESTAMP() + WHERE State = 'PENDING' + """) + + try: + count = database.run_in_transaction(_reset_txn) + logging.info(f"Successfully reset {count} PENDING imports to STAGING.") + except Exception as e: + logging.error(f"Failed to reset PENDING imports: {e}") + + +def retrigger_workflow(project_id, location, workflow_name): + """Retriggers the Spanner ingestion workflow with a retry flag. + + Args: + project_id: The GCP Project ID where the workflow is hosted. + location: The GCP location of the workflow (e.g., 'us-central1'). + workflow_name: The name of the workflow to execute. + """ + logging.info(f"Retriggering workflow '{workflow_name}'...") + try: + service = build('workflowexecutions', 'v1', cache_discovery=False) + parent = f"projects/{project_id}/locations/{location}/workflows/{workflow_name}" + # Pass is_retry=True as a workflow argument. + execution = service.projects().locations().workflows().executions( + ).create(parent=parent, + body={ + "argument": json.dumps({"is_retry": True}) + }).execute() + logging.info( + f"Retriggered successfully. New execution: {execution.get('name')}") + except Exception as e: + logging.error(f"Failed to retrigger workflow: {e}") + + +def main(argv): + """Main entry point for the script. + + Args: + argv: Command-line arguments passed to the script. + """ + dataflow = build('dataflow', 'v1b3', cache_discovery=False) + failed_imports = get_failed_imports(dataflow, FLAGS.project_id, + FLAGS.location, FLAGS.job_id) + + if not failed_imports: + logging.error("No failed imports identified.") + sys.exit(1) + + logging.info(f"Processing failed imports: {failed_imports}") + spanner_client = spanner.Client(project=FLAGS.spanner_project) + instance = spanner_client.instance(FLAGS.spanner_instance) + database = instance.database(FLAGS.spanner_database) + + can_retrigger = True + for imp in failed_imports: + status = revert_import(database, imp, FLAGS.job_id) + if status == "ALREADY_REVERTED": + can_retrigger = False + + if can_retrigger: + reset_pending_imports(database) + retrigger_workflow(FLAGS.project_id, FLAGS.location, + FLAGS.workflow_name) + else: + logging.warning( + "Skipping re-trigger because at least one import was already reverted." + ) + + +if __name__ == "__main__": + app.run(main)