Skip to content

Commit f93511b

Browse files
authored
Differ updates (datacommonsorg#1513)
* Support for entity nodes * Use flex template for dataflow job * Remove local dataflow mode * Remove series analysis * Other fixes and clean up
1 parent 15cb39d commit f93511b

28 files changed

Lines changed: 971 additions & 460 deletions

import-automation/executor/app/configs.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,6 @@ class ExecutorConfig:
117117
local_repo_dir: str = '/data'
118118
# Location of the import tool jar.
119119
import_tool_path: str = '/import-tool.jar'
120-
# Location of the differ tool jar.
121-
differ_tool_path: str = '/differ-tool.jar'
122120
# Cloud workflow id.
123121
cloud_workflow_id: str = 'import-automation-workflow'
124122
# Maximum time a user script can run for in seconds.

import-automation/executor/app/executor/import_executor.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,7 @@ def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str,
452452
validation_output_file = os.path.join(validation_output_path,
453453
'validation_output.csv')
454454
differ_output = os.path.join(validation_output_path,
455-
'point_analysis_summary.csv')
455+
'obs_diff_summary.csv')
456456

457457
# Invoke differ and validation scripts.
458458
differ_output_file = ''
@@ -462,11 +462,10 @@ def _invoke_import_validation(self, repo_dir: str, relative_import_dir: str,
462462
differ = ImportDiffer(current_data=current_data_path,
463463
previous_data=previous_data_path,
464464
output_location=validation_output_path,
465-
differ_tool='',
466465
project_id=self.config.gcp_project_id,
467466
job_name=differ_job_name,
468467
file_format='mcf',
469-
runner_mode='native')
468+
runner_mode='local')
470469
differ.run_differ()
471470
differ_output_file = differ_output
472471
else:

import-automation/executor/config_override_test.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
"user_script_timeout": 3600,
77
"disable_email_notifications": true,
88
"import_tool_path" : "/tmp/import-tool/import-tool.jar",
9-
"differ_tool_path" : "/tmp/import-tool/differ-tool.jar",
109
"gcp_project_id": "datcom-ci",
1110
"gcs_volume_mount_dir": "/tmp",
1211
"storage_prod_bucket_name": "datcom-import-test"

import-automation/executor/run_import.sh

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -279,8 +279,6 @@ function run_import_executor {
279279
mkdir -p $TMP_DIR/import-tool
280280
run_cmd wget "https://storage.googleapis.com/datacommons_public/import_tools/import-tool.jar" \
281281
-O $TMP_DIR/import-tool/import-tool.jar
282-
run_cmd wget "https://storage.googleapis.com/datacommons_public/import_tools/differ-tool.jar" \
283-
-O $TMP_DIR/import-tool/differ-tool.jar
284282
fi
285283

286284
run_cmd $SCRIPT_DIR/run_local_executor.sh \

tools/import_differ/README.md

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
# Import Differ
22

3-
This utility generates a diff (point and series analysis) of two versions of the same dataset for import analysis.
3+
This utility generates a diff of two versions of a dataset for import analysis.
44

55
**Usage**
66

77
***Prerequisites***
88
- Python/Pandas is installed for native runner mode.
9-
- Java is installed for local runner mode.
109
- gcloud ADC is configured for cloud runner mode.
1110

1211
```
@@ -18,7 +17,7 @@ python import_differ.py --current_data=<path> --previous_data=<path> --output_lo
1817
- previous\_data: Path to the previous data (wildcard on local/GCS supported).
1918
- output\_location: Path to the output data folder (local/GCS).
2019
- file\_format: Format of the input data (mcf,tfrecord).
21-
- runner\_mode: Runner mode: native (Python) / local (Dataflow in local mode) / cloud (Dataflow in Cloud).
20+
- runner\_mode: Runner mode: local (Python) / cloud (Dataflow in Cloud).
2221
- project\_id: GCP project Id for the dataflow job.
2322
- job\_name: Name of the differ dataflow job.
2423

@@ -35,7 +34,8 @@ Summary output generated is of the form below showing counts of differences for
3534
|3|dcid:var4|0|2|0|
3635

3736
Detailed diff output is written to files for further analysis. Sample result files can be found under folder 'test/results'.
38-
- point\_analysis\_summary.csv: diff summry for point analysis
39-
- point\_analysis\_results.csv: detailed results for point analysis
40-
- series\_analysis\_summary.csv: diff summry for series analysis
41-
- series\_analysis\_results.csv: detailed results for series analysis
37+
- obs\_diff\_summary.csv: diff summary for observation analysis
38+
- obs\_diff\_samples.csv: sample diff for observation analysis
39+
- obs\_diff\_log.csv: diff log for observations
40+
- schema\_diff\_summary.csv: diff summary for schema analysis
41+
- schema\_diff\_log.csv: diff log for schema nodes

tools/import_differ/differ_utils.py

Lines changed: 41 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,13 @@
33
import os
44
import pandas as pd
55
import re
6-
import shutil
76

87
from absl import logging
98
from google.cloud import storage
10-
from googleapiclient.discovery import build
119

1210

13-
def load_mcf_file(file: str) -> pd.DataFrame:
14-
""" Reads an MCF text file and returns it as a dataframe."""
11+
def load_mcf_file(file: str):
12+
""" Reads an MCF text file and returns mcf nodes."""
1513
mcf_file = open(file, 'r', encoding='utf-8')
1614
mcf_contents = mcf_file.read()
1715
mcf_file.close()
@@ -27,25 +25,22 @@ def load_mcf_file(file: str) -> pd.DataFrame:
2725
if parsed_line is not None:
2826
current_mcf_node[parsed_line.group(1)] = parsed_line.group(2)
2927
if current_mcf_node:
30-
if current_mcf_node['typeOf'] == 'dcid:StatVarObservation':
31-
mcf_nodes.append(current_mcf_node)
32-
else:
33-
logging.warning(
34-
f'Ignoring node of type:{current_mcf_node["typeOf"]}')
35-
df = pd.DataFrame(mcf_nodes)
36-
return df
28+
mcf_nodes.append(current_mcf_node)
29+
30+
logging.info(f'Loaded {len(mcf_nodes)} nodes from file {file}')
31+
return mcf_nodes
3732

3833

3934
def load_mcf_files(path: str) -> pd.DataFrame:
4035
""" Loads all sharded mcf files in the given directory and
41-
returns a single combined dataframe."""
42-
df_list = []
36+
returns a combined MCF node list."""
37+
node_list = []
4338
filenames = glob.glob(path)
39+
logging.info(f'Loading {len(filenames)} files from path {path}')
4440
for filename in filenames:
45-
df = load_mcf_file(filename)
46-
df_list.append(df)
47-
result = pd.concat(df_list, ignore_index=True)
48-
return result
41+
nodes = load_mcf_file(filename)
42+
node_list.extend(nodes)
43+
return node_list
4944

5045

5146
def load_csv_data(path: str, tmp_dir: str) -> pd.DataFrame:
@@ -66,72 +61,33 @@ def load_csv_data(path: str, tmp_dir: str) -> pd.DataFrame:
6661

6762
def write_csv_data(df: pd.DataFrame, dest: str, file: str, tmp_dir: str):
6863
""" Writes a dataframe to a CSV file with the given path."""
69-
tmp_file = os.path.join(tmp_dir, file)
70-
with open(tmp_file, mode='w', encoding='utf-8') as out_file:
71-
df.to_csv(out_file, index=False, mode='w', header=True)
72-
upload_output_data(tmp_file, dest)
73-
74-
75-
def launch_dataflow_job(project: str, job: str, current_data: str,
76-
previous_data: str, file_format: str,
77-
output_location: str) -> str:
78-
parameters = {
79-
'currentData': current_data,
80-
'previousData': previous_data,
81-
'outputLocation': output_location + '/diff',
82-
}
83-
if file_format == 'mcf':
84-
logging.info('Using mcf file format')
85-
template = 'gs://datcom-dataflow/templates/differ-mcf'
64+
if dest.startswith('gs://'):
65+
path = os.path.join(tmp_dir, file)
8666
else:
87-
logging.info('Using tfrecord file format')
88-
template = 'gs://datcom-dataflow/templates/differ-tfr'
89-
parameters['useOptimizedGraphFormat'] = 'true'
90-
91-
dataflow = build("dataflow", "v1b3")
92-
request = (dataflow.projects().templates().launch(
93-
projectId=project,
94-
gcsPath=template,
95-
body={
96-
"jobName": job,
97-
"parameters": parameters,
98-
},
99-
))
100-
response = request.execute()
101-
job_id = response['job']['id']
102-
return f'https://pantheon.corp.google.com/dataflow/jobs/{job_id}?project={project}'
103-
104-
105-
def get_job_status(project: str, job: str) -> str:
106-
dataflow = build("dataflow", "v1b3")
107-
request = (dataflow.projects().jobs().list(projectId=project, name=job))
108-
response = request.execute()
109-
return response['jobs'][0]['currentState']
67+
path = os.path.join(dest, file)
68+
with open(path, mode='w', encoding='utf-8') as out_file:
69+
df.to_csv(out_file, index=False, mode='w', header=True)
70+
if dest.startswith('gs://'):
71+
upload_output_data(path, dest)
11072

11173

11274
def upload_output_data(src: str, dest: str):
113-
if dest.startswith('gs://'):
114-
client = storage.Client()
115-
bucket_name = dest.split('/')[2]
116-
bucket = client.get_bucket(bucket_name)
117-
for filepath in glob.iglob(src):
118-
filename = os.path.basename(filepath)
119-
logging.info('Uploading %s to %s', filename, dest)
120-
blobname = dest[len('gs://' + bucket_name + '/'):] + '/' + filename
121-
blob = bucket.blob(blobname)
122-
blob.upload_from_filename(filepath)
123-
else:
124-
os.makedirs(dest, exist_ok=True)
125-
for filepath in glob.iglob(src):
126-
shutil.copyfile(filepath,
127-
os.path.join(dest, os.path.basename(filepath)))
75+
client = storage.Client()
76+
bucket_name = dest.split('/')[2]
77+
bucket = client.get_bucket(bucket_name)
78+
for filepath in glob.iglob(src):
79+
filename = os.path.basename(filepath)
80+
logging.info('Uploading %s to %s', filename, dest)
81+
blobname = dest[len('gs://' + bucket_name + '/'):] + '/' + filename
82+
blob = bucket.blob(blobname)
83+
blob.upload_from_filename(filepath)
12884

12985

130-
def get_gcs_data(uri: str, tmp_dir: str) -> str:
86+
def get_gcs_data(uri: str, dest_dir: str) -> str:
13187
""" Downloads files from GCS and copies them to local.
13288
Args:
13389
uri: single file path or wildcard format
134-
tmp_dir: destination folder
90+
dest_dir: destination folder
13591
Returns:
13692
path to the output file/folder
13793
"""
@@ -141,20 +97,23 @@ def get_gcs_data(uri: str, tmp_dir: str) -> str:
14197
dirname = os.path.dirname(file_pat)
14298
for blob in bucket.list_blobs(prefix=dirname):
14399
if fnmatch.fnmatch(blob.name, file_pat):
144-
path = blob.name.replace('/', '_')
145-
blob.download_to_filename(os.path.join(tmp_dir, path))
146-
return os.path.join(tmp_dir, file_pat.replace('/', '_'))
100+
dest_file = os.path.join(dest_dir, blob.name)
101+
os.makedirs(os.path.dirname(dest_file), exist_ok=True)
102+
blob.download_to_filename(dest_file)
103+
return os.path.join(dest_dir, file_pat)
147104

148105

149-
def load_data(path: str, tmp_dir: str) -> pd.DataFrame:
150-
""" Loads data from the given path and returns as a dataframe.
106+
def load_data(path: str, tmp_dir: str) -> list:
107+
""" Loads data from the given path and returns dataframe.
151108
Args:
152109
path: local or gcs path (single file or wildcard format)
153-
tmp_dir: destination folder
110+
tmp_dir: temporary folder
154111
Returns:
155-
dataframe with the input data
112+
combined list of mcf nodes
156113
"""
157114
if path.startswith('gs://'):
158115
os.makedirs(tmp_dir, exist_ok=True)
159116
path = get_gcs_data(path, tmp_dir)
160-
return load_mcf_files(path)
117+
118+
mcf_nodes = load_mcf_files(path)
119+
return mcf_nodes
1.19 MB
Binary file not shown.

0 commit comments

Comments
 (0)