Skip to content

Commit ab9d7d8

Browse files
authored
fix pubsub auth valueerror bug (#35)
1 parent 6f5a6cb commit ab9d7d8

2 files changed

Lines changed: 27 additions & 8 deletions

File tree

setup.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,12 @@ def read(*names, **kwargs):
8383
"rasterio~=1.2.10",
8484
"zarr~=2.10.2",
8585
"tqdm~=4.62.3",
86-
"google-api-python-client~=2.29.0",
87-
"google-cloud-storage~=1.42.3",
88-
"google-cloud-functions~=1.3.1",
89-
"google-cloud-pubsub~=2.8.0",
90-
"google-cloud-bigquery~=2.30.1",
91-
"hydra-core~=1.1.1",
86+
"google-api-python-client>=2.29.0",
87+
"google-cloud-storage>=1.42.3",
88+
"google-cloud-functions>=1.3.1",
89+
"google-cloud-pubsub>=2.8.0",
90+
"google-cloud-bigquery>=2.30.1",
91+
"hydra-core>=1.1.1",
9292
"gcsfs~=2021.11.0",
9393
"pystac[validation]~=1.1.0",
9494
"sentinelhub~=3.4.1",

src/satextractor/deployer/gcp_deployer.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import concurrent
22
import json
33

4+
from google.api_core import retry
5+
from google.auth import jwt
46
from google.cloud import pubsub_v1
57
from loguru import logger
68
from satextractor.models.constellation_info import BAND_INFO
@@ -18,7 +20,18 @@ def deploy_tasks(
1820

1921
logger.info(f"Deploying {len(extraction_tasks)} tasks with job_id: {job_id}")
2022

21-
publisher = pubsub_v1.PublisherClient.from_service_account_json(credentials)
23+
credentials_json = json.load(open(credentials, "r"))
24+
25+
audience = "https://pubsub.googleapis.com/google.pubsub.v1.Publisher"
26+
credentials_ob = jwt.Credentials.from_service_account_info(
27+
credentials_json,
28+
audience=audience,
29+
)
30+
31+
publisher = pubsub_v1.PublisherClient(credentials=credentials_ob)
32+
33+
short_retry = retry.Retry(deadline=60)
34+
2235
publish_futures = []
2336
for i, task in tqdm(enumerate(extraction_tasks)):
2437
extraction_task_data = task.serialize()
@@ -31,9 +44,15 @@ def deploy_tasks(
3144
)
3245
data = json.dumps(data, default=str)
3346

34-
publish_future = publisher.publish(topic, data.encode("utf-8"))
47+
publish_future = publisher.publish(
48+
topic,
49+
data.encode("utf-8"),
50+
retry=short_retry,
51+
)
3552
publish_futures.append(publish_future)
3653

54+
logger.info(f"Generated {len(publish_futures)} futures.")
55+
3756
# Wait for all the publish futures to resolve before exiting.
3857
concurrent.futures.wait(
3958
publish_futures,

0 commit comments

Comments
 (0)