Skip to content

Commit ffce1c3

Browse files
ready to be tested on complete dataset for first time
1 parent bdff1a9 commit ffce1c3

1 file changed

Lines changed: 152 additions & 0 deletions

File tree

analysis/ann_groups_to_bq_table.py

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
"""
2+
Script to query IDC for ANN objects referencing pathology whole-slide images,
3+
download and parse them in parallel, extract annotation group info, and write results to BigQuery.
4+
5+
Authentication:
6+
This script uses Google Application Default Credentials (ADC). The recommended way to authenticate is to run:
7+
gcloud auth application-default login
8+
in your terminal before running this script. Ensure your account has access to BigQuery and Cloud Storage resources.
9+
"""
10+
import concurrent.futures
11+
from typing import List, Dict
12+
from google.cloud import bigquery
13+
from google.cloud import storage
14+
from google.api_core.exceptions import NotFound
15+
import highdicom as hd
16+
17+
# CONFIGURATION
18+
PROJECT_ID = 'idc-pathomics-000'
19+
QUERY_TO_IDC = '''
20+
SELECT
21+
dcm_all.SOPInstanceUID,
22+
dcm_all.SeriesInstanceUID,
23+
dcm_all.gcs_url
24+
FROM
25+
`bigquery-public-data.idc_current.dicom_all` AS dcm_all
26+
WHERE Modality = 'ANN' AND collection_id LIKE '%bonemarrowwsi%'
27+
'''
28+
MAX_WORKERS = 8
29+
BQ_DATASET = 'idc_pathology'
30+
BQ_TABLE = 'annotation_groups'
31+
BQ_TABLE_SCHEMA = [
32+
bigquery.SchemaField('SOPInstanceUID', 'STRING'),
33+
bigquery.SchemaField('SeriesInstanceUID', 'STRING'),
34+
bigquery.SchemaField('annotated_property_category', 'RECORD', fields=[
35+
bigquery.SchemaField('CodeValue', 'STRING'),
36+
bigquery.SchemaField('CodeMeaning', 'STRING'),
37+
bigquery.SchemaField('CodingSchemeDesignator', 'STRING'),
38+
]),
39+
bigquery.SchemaField('annotated_property_type', 'RECORD', fields=[
40+
bigquery.SchemaField('CodeValue', 'STRING'),
41+
bigquery.SchemaField('CodeMeaning', 'STRING'),
42+
bigquery.SchemaField('CodingSchemeDesignator', 'STRING'),
43+
]),
44+
bigquery.SchemaField('num_annotations', 'INTEGER'),
45+
bigquery.SchemaField('annotated_SeriesInstanceUID', 'STRING'),
46+
]
47+
48+
49+
def query_ann_files(client: bigquery.Client) -> List[Dict]:
50+
query_job = client.query(QUERY_TO_IDC)
51+
return [dict(row) for row in query_job]
52+
53+
54+
def process_ann_files(rows: List[Dict]) -> List[Dict]:
55+
storage_client = storage.Client(project=PROJECT_ID)
56+
results = []
57+
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
58+
future_to_row = {
59+
executor.submit(
60+
parse_ann_blob,
61+
storage_client,
62+
row['gcs_url'],
63+
row['SOPInstanceUID'],
64+
row['SeriesInstanceUID']
65+
): row for row in rows
66+
}
67+
for future in concurrent.futures.as_completed(future_to_row):
68+
row = future_to_row[future]
69+
try:
70+
ann_results = future.result()
71+
results.extend(ann_results)
72+
except Exception as e:
73+
print(f'Error processing {row["gcs_url"]}: {e}')
74+
return results
75+
76+
77+
def parse_ann_blob(storage_client: storage.Client, gcs_url: str, sop_instance_uid: str,series_instance_uid: str) -> List[Dict]:
78+
# Parse bucket and blob name from GCS URL
79+
parts = gcs_url[5:].split('/', 1)
80+
bucket_name, blob_name = parts[0], parts[1]
81+
bucket = storage_client.bucket(bucket_name)
82+
blob = bucket.blob(blob_name)
83+
results = []
84+
with blob.open('rb') as file_obj:
85+
ann = hd.ann.annread(file_obj)
86+
for ann_group in ann.get_annotation_groups():
87+
results.append({
88+
'SOPInstanceUID': sop_instance_uid,
89+
'SeriesInstanceUID': series_instance_uid,
90+
'annotated_property_category': coded_concept_to_dict(ann_group.annotated_property_category),
91+
'annotated_property_type': coded_concept_to_dict(ann_group.annotated_property_type),
92+
'num_annotations': ann_group.number_of_annotations,
93+
'annotated_SeriesInstanceUID': ann.ReferencedSeriesSequence[0].SeriesInstanceUID
94+
})
95+
return results
96+
97+
98+
def coded_concept_to_dict(cc):
99+
return {
100+
'CodeValue': cc.value,
101+
'CodeMeaning': cc.meaning,
102+
'CodingSchemeDesignator': cc.scheme_designator
103+
} if cc is not None else None
104+
105+
106+
def main():
107+
bq_client = bigquery.Client(project=PROJECT_ID)
108+
# Query ANN files
109+
rows = query_ann_files(bq_client)
110+
print(f'Found {len(rows)} ANN files to process.')
111+
# Process ANN files in parallel
112+
results = process_ann_files(rows)
113+
print(f'Extracted annotation group info from {len(results)} groups.')
114+
# Write to BigQuery
115+
if results:
116+
table_ref = bq_client.dataset(BQ_DATASET).table(BQ_TABLE)
117+
# Always delete and recreate the table before inserting
118+
try:
119+
bq_client.delete_table(table_ref, not_found_ok=True)
120+
print(f'Table {BQ_TABLE} deleted.')
121+
except Exception as e:
122+
print(f'Error deleting table: {e}')
123+
table = bigquery.Table(table_ref, schema=BQ_TABLE_SCHEMA)
124+
bq_client.create_table(table)
125+
print(f'Table {BQ_TABLE} created.')
126+
# Retry loop to ensure table is available before inserting
127+
import time
128+
found = False
129+
for _ in range(10):
130+
try:
131+
bq_client.get_table(table_ref)
132+
found = True
133+
break
134+
except NotFound:
135+
time.sleep(2)
136+
if not found:
137+
print(f'Error: Table {BQ_TABLE} not found after creation. Saving results locally as ann_groups_results.json.')
138+
import json
139+
with open('ann_groups_results.json', 'w') as f:
140+
json.dump(results, f, indent=2)
141+
return
142+
errors = bq_client.insert_rows_json(table_ref, results)
143+
if errors:
144+
print(f'BigQuery insert errors: {errors}')
145+
else:
146+
print('Results written to BigQuery.')
147+
else:
148+
print('No results to write.')
149+
150+
151+
if __name__ == '__main__':
152+
main()

0 commit comments

Comments
 (0)