This repository was archived by the owner on Jan 22, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 33
Expand file tree
/
Copy pathimage_processor.py
More file actions
135 lines (114 loc) · 5.79 KB
/
image_processor.py
File metadata and controls
135 lines (114 loc) · 5.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import os
import random
import time
from image_editor import ImageEditor
import boto3
BW_FOLDER = "bw-images/"
BRIGHTEN_FOLDER = "brighten-images/"
def delete_file(file_path):
try:
print("Removing file from " + file_path)
os.remove(file_path)
print("Successfully removed file from " + file_path)
except Exception:
print("Failed to remove file from" + file_path)
class ImageProcessor:
def __init__(self, sqs_queue_url, s3_bucket_name):
self.sqs_client = boto3.client('sqs')
self.s3_client = boto3.client('s3')
self.sqs_queue_url = sqs_queue_url
self.s3_bucket_name = s3_bucket_name
self.bw_image_processor = self.BWImageProcessor(self.s3_client, self.sqs_queue_url, self.s3_bucket_name)
self.brighten_image_processor = self.BrightenImageProcessor(self.s3_client, self.sqs_queue_url,
self.s3_bucket_name)
def _extract_tasks(self):
try:
print("Extracting tasks from sqs queue - " + self.sqs_queue_url)
response = self.sqs_client.receive_message(QueueUrl=self.sqs_queue_url, MaxNumberOfMessages=1)
if "Messages" not in response:
print("No messages exists in SQS queue at the moment, retry later.")
return []
messages = list(map(lambda x: x["Body"], response["Messages"]))
print("Extracted tasks from sqs queue successfully")
return messages
except Exception:
print("Failed to extract task from sqs queue - " + str(self.sqs_queue_url))
raise
@staticmethod
def _get_name_from_key(key):
return key.split("/")[-1]
def _download_image(self, image_key, file_path):
try:
print("Downloading " + image_key + " to " + file_path)
self.s3_client.download_file(Bucket=self.s3_bucket_name, Key=image_key, Filename=file_path)
print("Downloaded " + image_key + " to " + file_path + " successfully")
except Exception:
print("Failed to download image " + image_key + " to " + file_path)
raise
class BWImageProcessor:
def __init__(self, s3_client, sqs_queue_url, s3_bucket_name):
self.s3_client = s3_client
self.sqs_queue_url = sqs_queue_url
self.s3_bucket_name = s3_bucket_name
def _upload_file(self, filename, bucket, key):
try:
print("Uploading file " + filename + " into " + bucket + " with key: " + key)
self.s3_client.upload_file(filename, bucket, key)
print("Uploaded file " + filename + " into " + bucket + " with key: " + key + " successfully")
except Exception:
print("Failed to upload file " + filename + " into " + bucket + " with key: " + key)
def monochrome_and_upload(self, source_image):
image_name = source_image.split(".")[-2]
target_file_path = source_image + "-monochrome.png"
try:
ImageEditor.monochrome(source_image, target_file_path)
self._upload_file(target_file_path, self.s3_bucket_name,
BW_FOLDER + image_name + "-monochrome-" + str(
int(round(time.time() * 1000))) + ".png")
except Exception:
raise
finally:
delete_file(target_file_path)
class BrightenImageProcessor:
def __init__(self, s3_client, sqs_queue_url, s3_bucket_name):
self.s3_client = s3_client
self.sqs_queue_url = sqs_queue_url
self.s3_bucket_name = s3_bucket_name
def _upload_file(self, filename, bucket, key):
try:
print("Uploading file " + filename + " into " + bucket + " with key: " + key)
self.s3_client.upload_file(filename, bucket, key)
print("Uploaded file " + filename + " into " + bucket + " with key: " + key + " successfully")
except Exception:
print("Failed to upload file " + filename + " into " + bucket + " with key: " + key)
def brighten_and_upload(self, source_image):
image_name = source_image.split(".")[-2]
target_file_path = source_image + "-bright.png"
try:
ImageEditor.brighten_image(source_image, target_file_path)
self._upload_file(target_file_path, self.s3_bucket_name,
BW_FOLDER + image_name + "-bright-" + str(int(round(time.time() * 1000))) + ".png")
except Exception:
raise
finally:
delete_file(target_file_path)
def run(self):
try:
messages = self._extract_tasks()
if len(messages) == 0:
return
for image_key in messages:
image_name = self._get_name_from_key(image_key)
print("Image name: " + image_name)
image_name_without_file_suffix = image_name.split(".")[-2]
image_file_suffix = image_name.split(".")[-1]
temp_image_path = image_name_without_file_suffix + "-" + str(random.randrange(100000))
self._download_image(image_key, temp_image_path + "." + image_file_suffix)
self.bw_image_processor.monochrome_and_upload(temp_image_path + "." + image_file_suffix)
self.brighten_image_processor.brighten_and_upload(temp_image_path + "." + image_file_suffix)
delete_file(temp_image_path + "." + image_file_suffix)
except Exception as e:
print("Failed to process message from SQS queue...")
print(e)