Skip to content

Commit bd1cf17

Browse files
committed
lint
1 parent a48ca80 commit bd1cf17

6 files changed

Lines changed: 130 additions & 13 deletions

File tree

infra/bigquery_export_spark/Dockerfile

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ FROM debian:12-slim
77
ENV DEBIAN_FRONTEND=noninteractive
88

99
# Install utilities required by Spark scripts.
10-
RUN apt-get update && apt-get install -y procps tini libjemalloc2
10+
RUN apt-get update && apt-get install -y procps=\* tini=\* libjemalloc2=\* \
11+
&& apt-get clean \
12+
&& rm -rf /var/lib/apt/lists/*
1113

1214
# Enable jemalloc2 as default memory allocator
1315
ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2
@@ -23,10 +25,10 @@ RUN bash Miniforge3-Linux-x86_64.sh -b -p /opt/miniforge3 \
2325
&& ${CONDA_HOME}/bin/conda config --system --set channel_priority strict
2426

2527
WORKDIR /app
26-
COPY . .
28+
COPY requirements.txt .
2729

2830
# Install pip packages.
29-
RUN ${PYSPARK_PYTHON} -m pip install --no-cache-dir -r app/requirements.txt
31+
RUN ${PYSPARK_PYTHON} -m pip install --no-cache-dir -r requirements.txt
3032

3133
# Create the 'spark' group/user.
3234
# The GID and UID must be 1099. Home directory is required.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
google-cloud-bigquery==3.23
22
google-cloud-storage==2.16
33
google-cloud-firestore==2.20.1
4+
# pyspark==3.5.5
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
from pyspark.sql import SparkSession
2+
from google.cloud import firestore
3+
from google.cloud import bigquery
4+
import json
5+
import os
6+
7+
class FirestoreBatch:
8+
def __init__(self):
9+
self.firestore = firestore.Client()
10+
self.bigquery = bigquery.Client()
11+
self.batch_size = 500
12+
self.max_concurrent_batches = 200
13+
self.current_batch = []
14+
self.batch_promises = []
15+
self.spark = SparkSession.builder.appName("FirestoreBatchProcessor").getOrCreate()
16+
17+
def queue_batch(self, operation):
18+
batch = self.firestore.batch()
19+
20+
for doc in self.current_batch:
21+
if operation == "delete":
22+
batch.delete(doc.reference)
23+
elif operation == "set":
24+
doc_ref = self.firestore.collection(self.collection_name).document()
25+
batch.set(doc_ref, doc)
26+
else:
27+
raise ValueError("Invalid operation")
28+
29+
self.batch_promises.append(batch.commit())
30+
self.current_batch = []
31+
32+
def commit_batches(self):
33+
print(f"Committing {len(self.batch_promises)} batches to {self.collection_name}")
34+
for batch_promise in self.batch_promises:
35+
try:
36+
batch_promise
37+
except Exception as e:
38+
print(f"Error committing batch: {e}")
39+
raise
40+
self.batch_promises = []
41+
42+
def final_flush(self, operation):
43+
if self.current_batch:
44+
self.queue_batch(operation)
45+
if self.batch_promises:
46+
self.commit_batches()
47+
48+
def batch_delete(self):
49+
print("Starting batch deletion...")
50+
start_time = self.spark.sparkContext.startTime
51+
self.current_batch = []
52+
self.batch_promises = []
53+
total_docs_deleted = 0
54+
55+
collection_ref = self.firestore.collection(self.collection_name)
56+
if self.collection_type == "report":
57+
print(f"Deleting documents from {self.collection_name} for date {self.date}")
58+
query = collection_ref.where("date", "==", self.date)
59+
elif self.collection_type == "dict":
60+
print(f"Deleting documents from {self.collection_name}")
61+
query = collection_ref
62+
else:
63+
raise ValueError("Invalid collection type")
64+
65+
while True:
66+
docs = list(query.limit(self.batch_size * self.max_concurrent_batches).stream())
67+
if not docs:
68+
break
69+
70+
for doc in docs:
71+
self.current_batch.append(doc)
72+
if len(self.current_batch) >= self.batch_size:
73+
self.queue_batch("delete")
74+
if len(self.batch_promises) >= self.max_concurrent_batches:
75+
self.commit_batches()
76+
total_docs_deleted += 1
77+
78+
self.final_flush("delete")
79+
duration = (self.spark.sparkContext.startTime - start_time) / 1000
80+
print(f"Deletion complete. Total docs deleted: {total_docs_deleted}. Time: {duration} seconds")
81+
82+
def stream_from_bigquery(self, query):
83+
print("Starting BigQuery to Firestore transfer...")
84+
start_time = self.spark.sparkContext.startTime
85+
total_rows_processed = 0
86+
87+
df = self.spark.read.format("bigquery").option("query", query).load()
88+
89+
for row in df.collect():
90+
self.current_batch.append(row.asDict())
91+
if len(self.current_batch) >= self.batch_size:
92+
self.queue_batch("set")
93+
if len(self.batch_promises) >= self.max_concurrent_batches:
94+
self.commit_batches()
95+
total_rows_processed += 1
96+
97+
self.final_flush("set")
98+
duration = (self.spark.sparkContext.startTime - start_time) / 1000
99+
print(f"Transfer to {self.collection_name} complete. Total rows processed: {total_rows_processed}. Time: {duration} seconds")
100+
101+
def export(self):
102+
export_config = json.loads('{"name": "technologies", "type": "dict", "environment": "dev"}')
103+
query = str(json.loads("SELECT * FROM report.tech_report_technologies"))
104+
105+
self.date = getattr(export_config, "date", "")
106+
self.collection_name = export_config["name"]
107+
self.collection_type = export_config["type"]
108+
109+
self.batch_delete()
110+
self.stream_from_bigquery(query)
111+
112+
if __name__ == "__main__":
113+
processor = FirestoreBatch()
114+
processor.export()

infra/tf/dataform_export/main.tf

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,11 @@ resource "google_cloudfunctions2_function" "dataform_export" {
4949
}
5050

5151
resource "google_bigquery_routine" "run_export_job" {
52-
dataset_id = "reports"
53-
routine_id = "run_export_job"
54-
routine_type = "SCALAR_FUNCTION"
52+
dataset_id = "reports"
53+
routine_id = "run_export_job"
54+
routine_type = "SCALAR_FUNCTION"
5555
definition_body = ""
56-
description = <<EOT
56+
description = <<EOT
5757
Export data from Google BigQuery.
5858
Example payload JSON: {"dataform_trigger": "tech_report_complete", "date": "2025-01-01", "name": "adoption", "type": "report"}
5959
EOT

infra/tf/main.tf

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ provider "google" {
2727
module "dataform_export" {
2828
source = "./dataform_export"
2929

30-
project_number = local.project_number
31-
region = local.region
32-
function_identity = "cloud-function@httparchive.iam.gserviceaccount.com"
33-
function_name = "dataform-export"
34-
remote_functions_connection = google_bigquery_connection.remote-functions.id
30+
project_number = local.project_number
31+
region = local.region
32+
function_identity = "cloud-function@httparchive.iam.gserviceaccount.com"
33+
function_name = "dataform-export"
34+
remote_functions_connection = google_bigquery_connection.remote-functions.id
3535
}
3636

3737
module "dataform_trigger" {

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"@dataform/core": "3.0.14"
66
},
77
"scripts": {
8-
"format": "npx standard --fix; npx markdownlint --ignore-path .gitignore --config package.json --configPointer /markdownlint . --fix; terraform -chdir=infra/tf fmt",
8+
"format": "npx standard --fix; npx markdownlint --ignore-path .gitignore --config package.json --configPointer /markdownlint . --fix; terraform -chdir=infra/tf fmt -recursive",
99
"lint": "npx standard; npx markdownlint --ignore-path .gitignore --config package.json --configPointer /markdownlint .; dataform compile"
1010
},
1111
"standard": {

0 commit comments

Comments
 (0)