1- from pyspark .sql import SparkSession
2- from google .cloud import firestore
3- from google .cloud import bigquery
1+ """This module processes Firestore documents from BigQuery using Spark."""
2+
43import json
5- import os
4+ from google .cloud import bigquery , firestore # type: ignore
5+ from pyspark .sql import SparkSession # type: ignore
6+
67
78class FirestoreBatch :
9+ """Handles Firestore data batching from BigQuery using Spark."""
10+
811 def __init__ (self ):
12+ """Initialize FirestoreBatch with default settings."""
913 self .firestore = firestore .Client ()
1014 self .bigquery = bigquery .Client ()
1115 self .batch_size = 500
1216 self .max_concurrent_batches = 200
1317 self .current_batch = []
1418 self .batch_promises = []
15- self .spark = SparkSession .builder .appName ("FirestoreBatchProcessor" ).getOrCreate ()
19+ self .spark = SparkSession .builder .appName (
20+ "FirestoreBatchProcessor"
21+ ).getOrCreate ()
22+ self .config = {
23+ "date" : "" ,
24+ "collection_name" : "" ,
25+ "collection_type" : ""
26+ }
1627
1728 def queue_batch (self , operation ):
29+ """Queue a batch commit operation for Firestore."""
1830 batch = self .firestore .batch ()
1931
2032 for doc in self .current_batch :
2133 if operation == "delete" :
2234 batch .delete (doc .reference )
2335 elif operation == "set" :
24- doc_ref = self .firestore .collection (self .collection_name ).document ()
36+ doc_ref = self .firestore .collection (self .config [ " collection_name" ] ).document ()
2537 batch .set (doc_ref , doc )
2638 else :
2739 raise ValueError ("Invalid operation" )
@@ -30,7 +42,11 @@ def queue_batch(self, operation):
3042 self .current_batch = []
3143
3244 def commit_batches (self ):
33- print (f"Committing { len (self .batch_promises )} batches to { self .collection_name } " )
45+ """Commit all queued batch promises."""
46+ print (
47+ f"Committing { len (self .batch_promises )} "
48+ f"batches to { self .config ['collection_name' ]} "
49+ )
3450 for batch_promise in self .batch_promises :
3551 try :
3652 batch_promise
@@ -40,30 +56,39 @@ def commit_batches(self):
4056 self .batch_promises = []
4157
4258 def final_flush (self , operation ):
59+ """Flush any pending batch operations."""
4360 if self .current_batch :
4461 self .queue_batch (operation )
4562 if self .batch_promises :
4663 self .commit_batches ()
4764
4865 def batch_delete (self ):
66+ """Delete Firestore documents in batches."""
4967 print ("Starting batch deletion..." )
5068 start_time = self .spark .sparkContext .startTime
5169 self .current_batch = []
5270 self .batch_promises = []
5371 total_docs_deleted = 0
5472
55- collection_ref = self .firestore .collection (self .collection_name )
56- if self .collection_type == "report" :
57- print (f"Deleting documents from { self .collection_name } for date { self .date } " )
58- query = collection_ref .where ("date" , "==" , self .date )
59- elif self .collection_type == "dict" :
60- print (f"Deleting documents from { self .collection_name } " )
73+ collection_ref = self .firestore .collection (self .config ["collection_name" ])
74+ if self .config ["collection_type" ] == "report" :
75+ print (
76+ f"Deleting documents from { self .config ['collection_name' ]} "
77+ f"for date { self .config ['date' ]} "
78+ )
79+ query = collection_ref .where ("date" , "==" , self .config ["date" ])
80+ elif self .config ["collection_type" ] == "dict" :
81+ print (
82+ f"Deleting documents from { self .config ['collection_name' ]} "
83+ )
6184 query = collection_ref
6285 else :
6386 raise ValueError ("Invalid collection type" )
6487
6588 while True :
66- docs = list (query .limit (self .batch_size * self .max_concurrent_batches ).stream ())
89+ docs = list (
90+ query .limit (self .batch_size * self .max_concurrent_batches ).stream ()
91+ )
6792 if not docs :
6893 break
6994
@@ -77,9 +102,14 @@ def batch_delete(self):
77102
78103 self .final_flush ("delete" )
79104 duration = (self .spark .sparkContext .startTime - start_time ) / 1000
80- print (f"Deletion complete. Total docs deleted: { total_docs_deleted } . Time: { duration } seconds" )
105+ print (
106+ f"Deletion complete. "
107+ f"Total docs deleted: { total_docs_deleted } . "
108+ f"Time: { duration } seconds"
109+ )
81110
82111 def stream_from_bigquery (self , query ):
112+ """Stream data from BigQuery to Firestore."""
83113 print ("Starting BigQuery to Firestore transfer..." )
84114 start_time = self .spark .sparkContext .startTime
85115 total_rows_processed = 0
@@ -96,19 +126,30 @@ def stream_from_bigquery(self, query):
96126
97127 self .final_flush ("set" )
98128 duration = (self .spark .sparkContext .startTime - start_time ) / 1000
99- print (f"Transfer to { self .collection_name } complete. Total rows processed: { total_rows_processed } . Time: { duration } seconds" )
129+ print (
130+ f"Transfer to { self .config ['collection_name' ]} "
131+ f"complete. "
132+ f"Total rows processed: "
133+ f"{ total_rows_processed } . "
134+ f"Time: { duration } "
135+ f"seconds"
136+ )
100137
101138 def export (self ):
102- export_config = json .loads ('{"name": "technologies", "type": "dict", "environment": "dev"}' )
139+ """Export data from BigQuery to Firestore."""
140+ export_config = json .loads (
141+ '{"name": "technologies", "type": "dict", "environment": "dev"}'
142+ )
103143 query = str (json .loads ("SELECT * FROM report.tech_report_technologies" ))
104144
105- self .date = getattr (export_config , "date" , "" )
106- self .collection_name = export_config ["name" ]
107- self .collection_type = export_config ["type" ]
145+ self .config [ " date" ] = getattr (export_config , "date" , "" )
146+ self .config [ " collection_name" ] = export_config ["name" ]
147+ self .config [ " collection_type" ] = export_config ["type" ]
108148
109149 self .batch_delete ()
110150 self .stream_from_bigquery (query )
111151
152+
112153if __name__ == "__main__" :
113154 processor = FirestoreBatch ()
114155 processor .export ()
0 commit comments