This repository was archived by the owner on Jan 22, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 33
Expand file tree
/
Copy pathtask_publisher.py
More file actions
63 lines (53 loc) · 2.45 KB
/
task_publisher.py
File metadata and controls
63 lines (53 loc) · 2.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import os
import boto3
import random
SAMPLE_IMAGES_FOLDER = "input-images/"
EXAMPLE_IMAGE_LOCAL_PATH = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "..", "resources", "example-image.png"
)
class TaskPublisher:
def __init__(self, sqs_queue_url, s3_bucket_name):
self.s3_client = boto3.client('s3')
self.sqs_client = boto3.client('sqs')
self.sqs_queue_url = sqs_queue_url
self.s3_bucket_name = s3_bucket_name
def _list_image_on_s3(self):
try:
print("Listing image in " + self.s3_bucket_name + " under " + SAMPLE_IMAGES_FOLDER)
response = self.s3_client.list_objects_v2(Bucket=self.s3_bucket_name, Prefix=SAMPLE_IMAGES_FOLDER)
objects_in_s3 = list(map(lambda x: x["Key"], response["Contents"]))
print("Listed image in " + self.s3_bucket_name + " under " + SAMPLE_IMAGES_FOLDER + " successfully.")
return list(filter(lambda x: x != SAMPLE_IMAGES_FOLDER, objects_in_s3))
except Exception:
print("Failed to list images in " + self.s3_bucket_name + " under " + SAMPLE_IMAGES_FOLDER)
return []
def _upload_images_onto_s3(self):
try:
print("Uploading example image onto S3")
self.s3_client.upload_file(Filename=EXAMPLE_IMAGE_LOCAL_PATH, Bucket=self.s3_bucket_name,
Key=SAMPLE_IMAGES_FOLDER + "example-image.png")
print("Successfully uploaded example image onto S3")
except Exception:
print("Failed to upload example image onto S3")
raise
def _send_sqs_message(self, message):
try:
self.sqs_client.send_message(
QueueUrl=self.sqs_queue_url,
MessageBody=message
)
print("Sent task to SQS.")
except Exception:
print("Failed to send message onto sqs queue")
def publish_image_transform_task(self, num_of_tasks=10):
images = self._list_image_on_s3()
if len(images) == 0:
print("No images in bucket. Uploading example image...")
self._upload_images_onto_s3()
return
print("Start publishing task onto sqs...")
for i in range(num_of_tasks):
lucky_number = random.randint(0, len(images)-1)
self._send_sqs_message(str(images[lucky_number]))