11"""This module processes Firestore documents from BigQuery using Spark."""
22
33import json
4+
45from google .cloud import bigquery , firestore # type: ignore
56from pyspark .sql import SparkSession # type: ignore
67
@@ -19,11 +20,7 @@ def __init__(self):
1920 self .spark = SparkSession .builder .appName (
2021 "FirestoreBatchProcessor"
2122 ).getOrCreate ()
22- self .config = {
23- "date" : "" ,
24- "collection_name" : "" ,
25- "collection_type" : ""
26- }
23+ self .config = {"date" : "" , "collection_name" : "" , "collection_type" : "" }
2724
2825 def queue_batch (self , operation ):
2926 """Queue a batch commit operation for Firestore."""
@@ -33,7 +30,9 @@ def queue_batch(self, operation):
3330 if operation == "delete" :
3431 batch .delete (doc .reference )
3532 elif operation == "set" :
36- doc_ref = self .firestore .collection (self .config ["collection_name" ]).document ()
33+ doc_ref = self .firestore .collection (
34+ self .config ["collection_name" ]
35+ ).document ()
3736 batch .set (doc_ref , doc )
3837 else :
3938 raise ValueError ("Invalid operation" )
@@ -78,9 +77,7 @@ def batch_delete(self):
7877 )
7978 query = collection_ref .where ("date" , "==" , self .config ["date" ])
8079 elif self .config ["collection_type" ] == "dict" :
81- print (
82- f"Deleting documents from { self .config ['collection_name' ]} "
83- )
80+ print (f"Deleting documents from { self .config ['collection_name' ]} " )
8481 query = collection_ref
8582 else :
8683 raise ValueError ("Invalid collection type" )
0 commit comments