From 1a8df623220914d12d723787bf1407a57de6c6c4 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Fri, 1 Mar 2024 15:54:33 -0600 Subject: [PATCH] Avoid retries in multiple writes --- preprocess.py | 63 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 37 insertions(+), 26 deletions(-) diff --git a/preprocess.py b/preprocess.py index 5dacb73..5dd36ae 100644 --- a/preprocess.py +++ b/preprocess.py @@ -4,6 +4,7 @@ import logging import os from collections import defaultdict +import operator import coiled import deltalake @@ -15,18 +16,28 @@ logger = logging.getLogger(__name__) -LOCAL = True +LOCAL = False if LOCAL: OUTDIR = Path(f"./data") STORAGE_OPTIONS = {} Cluster = LocalCluster else: + import boto3 OUTDIR = Path(f"s3://openscapes-scratch/etl-github") - STORAGE_OPTIONS = {"AWS_REGION": "us-west-2", "AWS_S3_ALLOW_UNSAFE_RENAME": "true"} + + session = boto3.session.Session() + credentials = session.get_credentials() + + STORAGE_OPTIONS = dict( + AWS_SECRET_ACCESS_KEY=credentials.secret_key, + AWS_ACCESS_KEY_ID=credentials.access_key, + AWS_SESSION_TOKEN=credentials.token, + AWS_REGION="us-west-2", + AWS_S3_ALLOW_UNSAFE_RENAME="true", + ) Cluster = functools.partial( coiled.Cluster, - name="etl-github", - n_workers=50, + name="etl-github-matt", region="us-west-2", shutdown_on_close=False, ) @@ -143,7 +154,7 @@ def process_file(filename: str) -> dict[str, pd.DataFrame]: out["Commits"] = list(toolz.concat(out["PushEvent"])) del out["PushEvent"] - out = toolz.valmap(pd.DataFrame, out) + out = {k: pd.DataFrame(v) for k, v in out.items()} out = { "comment": out["IssueCommentEvent"], "pr": out["PullRequestEvent"], @@ -157,17 +168,16 @@ def process_file(filename: str) -> dict[str, pd.DataFrame]: return out -def write_delta(tables: dict[str, pd.DataFrame]): - for table, df in tables.items(): - outfile = OUTDIR / table - outfile.fs.makedirs(outfile.parent, exist_ok=True) - deltalake.write_deltalake( - outfile, - df, - mode="append", - storage_options=STORAGE_OPTIONS, - partition_by="date", - ) +def write_delta(df: pd.DataFrame, name: str): + outfile = OUTDIR / name + outfile.fs.makedirs(outfile.parent, exist_ok=True) + deltalake.write_deltalake( + outfile, + df, + mode="append", + storage_options=STORAGE_OPTIONS, + partition_by="date", + ) def list_files(start, stop): dates = pd.date_range(start, stop - datetime.timedelta(days=1), freq="d") @@ -188,21 +198,22 @@ def compact(table): if __name__ == "__main__": filenames = list_files( - start=datetime.date(2024, 1, 1), - stop=datetime.date(2024, 1, 5), + start=datetime.date(2023, 1, 1), + stop=datetime.date(2023, 1, 2), ) + tables = ["comment", "pr", "commit", "create", "watch", "fork"] with Cluster() as cluster: - if not LOCAL: - cluster.send_private_envs({ - "AWS_ACCESS_KEY_ID": os.environ["AWS_ACCESS_KEY_ID"], - "AWS_SECRET_ACCESS_KEY": os.environ["AWS_SECRET_ACCESS_KEY"], - }) with cluster.get_client() as client: client.restart() futures = client.map(process_file, filenames) - futures = client.map(write_delta, futures, retries=10) - wait(futures) - tables = ["comment", "pr", "commit", "create", "watch", "fork"] + all_writes = [] + for name in tables: + writes = client.map(lambda a, b: a[b], futures, b=name) + writes = client.map(write_delta, writes, name=name) + all_writes.extend(writes) + del futures + wait(all_writes) + futures = client.map(compact, tables) wait(futures)