11"""This module processes Firestore documents from BigQuery using Spark."""
22
33import json
4+ import os
45
5- from google .cloud import bigquery , firestore # type: ignore
6+ from google .cloud import firestore # type: ignore
67from pyspark .sql import SparkSession # type: ignore
78
89
10+ PROJECT = "httparchive"
11+
12+
13+ # pylint: disable=too-many-instance-attributes
914class FirestoreBatch :
1015 """Handles Firestore data batching from BigQuery using Spark."""
1116
12- def __init__ (self ):
17+ def __init__ (self , export_config ):
1318 """Initialize FirestoreBatch with default settings."""
14- self .firestore = firestore .Client ()
15- self .bigquery = bigquery .Client ()
19+ self .config = {
20+ "collection_name" : export_config ["name" ],
21+ "date" : getattr (export_config , "date" , "" ),
22+ "collection_type" : export_config ["type" ],
23+ }
24+ self .firestore = firestore .Client (
25+ project = PROJECT , database = export_config ["database" ]
26+ )
1627 self .batch_size = 500
1728 self .max_concurrent_batches = 200
1829 self .current_batch = []
1930 self .batch_promises = []
2031 self .spark = SparkSession .builder .appName (
2132 "FirestoreBatchProcessor"
2233 ).getOrCreate ()
23- self .config = {"date" : "" , "collection_name" : "" , "collection_type" : "" }
2434
2535 def queue_batch (self , operation ):
2636 """Queue a batch commit operation for Firestore."""
@@ -36,7 +46,6 @@ def queue_batch(self, operation):
3646 batch .set (doc_ref , doc )
3747 else :
3848 raise ValueError ("Invalid operation" )
39-
4049 self .batch_promises .append (batch .commit ())
4150 self .current_batch = []
4251
@@ -75,16 +84,17 @@ def batch_delete(self):
7584 f"Deleting documents from { self .config ['collection_name' ]} "
7685 f"for date { self .config ['date' ]} "
7786 )
78- query = collection_ref .where ("date" , "==" , self .config ["date" ])
87+ collection_query = collection_ref .where ("date" , "==" , self .config ["date" ])
7988 elif self .config ["collection_type" ] == "dict" :
8089 print (f"Deleting documents from { self .config ['collection_name' ]} " )
81- query = collection_ref
90+ collection_query = collection_ref
8291 else :
8392 raise ValueError ("Invalid collection type" )
84-
8593 while True :
8694 docs = list (
87- query .limit (self .batch_size * self .max_concurrent_batches ).stream ()
95+ collection_query .limit (
96+ self .batch_size * self .max_concurrent_batches
97+ ).stream ()
8898 )
8999 if not docs :
90100 break
@@ -105,13 +115,13 @@ def batch_delete(self):
105115 f"Time: { duration } seconds"
106116 )
107117
108- def stream_from_bigquery (self , query ):
118+ def stream_from_bigquery (self , query_str ):
109119 """Stream data from BigQuery to Firestore."""
110120 print ("Starting BigQuery to Firestore transfer..." )
111121 start_time = self .spark .sparkContext .startTime
112122 total_rows_processed = 0
113123
114- df = self .spark .read .format ("bigquery" ).option ("query" , query ).load ()
124+ df = self .spark .read .format ("bigquery" ).option ("query" , query_str ).load ()
115125
116126 for row in df .collect ():
117127 self .current_batch .append (row .asDict ())
@@ -132,21 +142,19 @@ def stream_from_bigquery(self, query):
132142 f"seconds"
133143 )
134144
135- def export (self ):
145+ def export (self , query_str ):
136146 """Export data from BigQuery to Firestore."""
137- export_config = json .loads (
138- '{"name": "technologies", "type": "dict", "environment": "dev"}'
139- )
140- query = str (json .loads ("SELECT * FROM report.tech_report_technologies" ))
141-
142- self .config ["date" ] = getattr (export_config , "date" , "" )
143- self .config ["collection_name" ] = export_config ["name" ]
144- self .config ["collection_type" ] = export_config ["type" ]
145147
146148 self .batch_delete ()
147- self .stream_from_bigquery (query )
149+ self .stream_from_bigquery (query_str )
148150
149151
150152if __name__ == "__main__" :
151- processor = FirestoreBatch ()
152- processor .export ()
153+ # config_data = json.loads('{"name": "technologies", "type": "dict", "environment": "dev"}')
154+ # QUERY_STR = str(json.loads("SELECT * FROM report.tech_report_technologies"))
155+
156+ config_data = json .loads (os .environ ["BIGQUERY_PROC_PARAM.export_config" ])
157+ QUERY_STR = str (json .loads (os .environ ["BIGQUERY_PROC_PARAM.query" ]))
158+
159+ processor = FirestoreBatch (config_data )
160+ processor .export (QUERY_STR )
0 commit comments