Skip to content

Commit d0cc3e5

Browse files
committed
add auto deadletter queue, dashboard
1 parent 9815cf8 commit d0cc3e5

1 file changed

Lines changed: 181 additions & 16 deletions

File tree

run.py

Lines changed: 181 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from __future__ import print_function
21
import os, sys
32
import boto3
43
import datetime
@@ -9,7 +8,15 @@
98
from email.mime.multipart import MIMEMultipart
109
from email.mime.text import MIMEText
1110

11+
CREATE_DASHBOARD = False
12+
CLEAN_DASHBOARD = False
13+
1214
from config import *
15+
16+
# Back compatability with old config requirements
17+
if ':' in SQS_DEAD_LETTER_QUEUE:
18+
SQS_DEAD_LETTER_QUEUE = SQS_DEAD_LETTER_QUEUE.rsplit(':',1)[1]
19+
1320
WAIT_TIME = 60
1421
MONITOR_TIME = 60
1522

@@ -46,15 +53,6 @@
4653
]
4754
}
4855

49-
SQS_DEFINITION = {
50-
"DelaySeconds": "0",
51-
"MaximumMessageSize": "262144",
52-
"MessageRetentionPeriod": "1209600",
53-
"ReceiveMessageWaitTimeSeconds": "0",
54-
"RedrivePolicy": "{\"deadLetterTargetArn\":\"" + SQS_DEAD_LETTER_QUEUE + "\",\"maxReceiveCount\":\"10\"}",
55-
"VisibilityTimeout": str(SQS_MESSAGE_VISIBILITY)
56-
}
57-
5856

5957
#################################
6058
# AUXILIARY FUNCTIONS
@@ -147,18 +145,39 @@ def create_or_update_ecs_service(ecs, ECS_SERVICE_NAME, ECS_TASK_NAME):
147145
ecs.create_service(cluster=ECS_CLUSTER, serviceName=ECS_SERVICE_NAME, taskDefinition=ECS_TASK_NAME, desiredCount=0)
148146
print('Service created')
149147

150-
def get_queue_url(sqs):
148+
def get_queue_url(sqs, queue_name):
151149
result = sqs.list_queues()
150+
queue_url = None
152151
if 'QueueUrls' in result.keys():
153152
for u in result['QueueUrls']:
154-
if u.split('/')[-1] == SQS_QUEUE_NAME:
155-
return u
156-
return None
153+
if u.split('/')[-1] == queue_name:
154+
queue_url = u
155+
return queue_url
157156

158157
def get_or_create_queue(sqs):
159-
u = get_queue_url(sqs)
160-
if u is None:
158+
queue_url = get_queue_url(sqs, SQS_QUEUE_NAME)
159+
dead_url = get_queue_url(sqs, SQS_DEAD_LETTER_QUEUE)
160+
if dead_url is None:
161+
print("Creating DeadLetter queue")
162+
sqs.create_queue(QueueName=SQS_DEAD_LETTER_QUEUE)
163+
time.sleep(WAIT_TIME)
164+
dead_url = get_queue_url(sqs, SQS_DEAD_LETTER_QUEUE)
165+
else:
166+
print (f'DeadLetter queue {SQS_DEAD_LETTER_QUEUE} already exists.')
167+
if queue_url is None:
161168
print('Creating queue')
169+
response = sqs.get_queue_attributes(QueueUrl=dead_url, AttributeNames=["QueueArn"])
170+
dead_arn = response["Attributes"]["QueueArn"]
171+
SQS_DEFINITION = {
172+
"DelaySeconds": "0",
173+
"MaximumMessageSize": "262144",
174+
"MessageRetentionPeriod": "1209600",
175+
"ReceiveMessageWaitTimeSeconds": "0",
176+
"RedrivePolicy": '{"deadLetterTargetArn":"'
177+
+ dead_arn
178+
+ '","maxReceiveCount":"10"}',
179+
"VisibilityTimeout": str(SQS_MESSAGE_VISIBILITY),
180+
}
162181
sqs.create_queue(QueueName=SQS_QUEUE_NAME, Attributes=SQS_DEFINITION)
163182
time.sleep(WAIT_TIME)
164183
else:
@@ -273,6 +292,144 @@ def export_logs(logs, loggroupId, starttime, bucketId):
273292
break
274293
time.sleep(30)
275294

295+
def create_dashboard(requestInfo):
296+
cloudwatch = boto3.client("cloudwatch")
297+
# Change 'start run' to whatever your run command is
298+
DashboardMessage = {
299+
"widgets": [
300+
{
301+
"height": 6,
302+
"width": 6,
303+
"y": 0,
304+
"x": 18,
305+
"type": "metric",
306+
"properties": {
307+
"metrics": [
308+
[ "AWS/SQS", "NumberOfMessagesReceived", "QueueName", f'{APP_NAME}Queue' ],
309+
[ ".", "NumberOfMessagesDeleted", ".", "." ],
310+
],
311+
"view": "timeSeries",
312+
"stacked": False,
313+
"region": AWS_REGION,
314+
"period": 300,
315+
"stat": "Average"
316+
}
317+
},
318+
{
319+
"height": 6,
320+
"width": 6,
321+
"y": 0,
322+
"x": 6,
323+
"type": "metric",
324+
"properties": {
325+
"view": "timeSeries",
326+
"stacked": False,
327+
"metrics": [
328+
[ "AWS/ECS", "MemoryUtilization", "ClusterName", ECS_CLUSTER ]
329+
],
330+
"region": AWS_REGION,
331+
"period": 300,
332+
"yAxis": {
333+
"left": {
334+
"min": 0
335+
}
336+
}
337+
}
338+
},
339+
{
340+
"height": 6,
341+
"width": 6,
342+
"y": 0,
343+
"x": 12,
344+
"type": "metric",
345+
"properties": {
346+
"metrics": [
347+
[ "AWS/SQS", "ApproximateNumberOfMessagesVisible", "QueueName", f"{APP_NAME}Queue" ],
348+
[ ".", "ApproximateNumberOfMessagesNotVisible", ".", "."],
349+
],
350+
"view": "timeSeries",
351+
"stacked": True,
352+
"region": AWS_REGION,
353+
"period": 300,
354+
"stat": "Average"
355+
}
356+
},
357+
{
358+
"height": 6,
359+
"width": 12,
360+
"y": 6,
361+
"x": 12,
362+
"type": "log",
363+
"properties": {
364+
"query": f"SOURCE {APP_NAME} | fields @message| filter @message like 'start run'| stats count_distinct(@message)",
365+
"region": AWS_REGION,
366+
"stacked": False,
367+
"title": "Distinct Logs",
368+
"view": "table"
369+
}
370+
},
371+
{
372+
"height": 6,
373+
"width": 12,
374+
"y": 6,
375+
"x": 0,
376+
"type": "log",
377+
"properties": {
378+
"query": f"SOURCE {APP_NAME} | fields @message| filter @message like 'start run'| stats count(@message)",
379+
"region": AWS_REGION,
380+
"stacked": False,
381+
"title": "All Logs",
382+
"view": "table"
383+
}
384+
},
385+
{
386+
"height": 6,
387+
"width": 24,
388+
"y": 12,
389+
"x": 0,
390+
"type": "log",
391+
"properties": {
392+
"query": f"SOURCE {APP_NAME} | fields @message | filter @message like \"Error\"\n\n | display @message",
393+
"region": AWS_REGION,
394+
"stacked": False,
395+
"title": "Errors",
396+
"view": "table"
397+
}
398+
},
399+
{
400+
"height": 6,
401+
"width": 6,
402+
"y": 0,
403+
"x": 0,
404+
"type": "metric",
405+
"properties": {
406+
"metrics": [
407+
[ "AWS/EC2Spot", "FulfilledCapacity", "FleetRequestId", requestInfo["SpotFleetRequestId"]],
408+
[ ".", "TargetCapacity", ".", "."],
409+
],
410+
"view": "timeSeries",
411+
"stacked": False,
412+
"region": AWS_REGION,
413+
"period": 300,
414+
"stat": "Average"
415+
}
416+
}
417+
]
418+
}
419+
DashboardMessage_json = json.dumps(DashboardMessage, indent = 4)
420+
response = cloudwatch.put_dashboard(DashboardName=APP_NAME, DashboardBody=DashboardMessage_json)
421+
if response['DashboardValidationMessages']:
422+
print ('Likely error in Dashboard creation')
423+
print (response['DashboardValidationMessages'])
424+
425+
426+
def clean_dashboard(monitorapp):
427+
cloudwatch = boto3.client("cloudwatch")
428+
dashboard_list = cloudwatch.list_dashboards()
429+
for entry in dashboard_list["DashboardEntries"]:
430+
if monitorapp in entry["DashboardName"]:
431+
cloudwatch.delete_dashboards(DashboardNames=[entry["DashboardName"]])
432+
276433
#################################
277434
# CLASS TO HANDLE SQS QUEUE
278435
#################################
@@ -444,6 +601,10 @@ def startCluster():
444601

445602
print('Spot fleet successfully created. Your job should start in a few minutes.')
446603

604+
if CREATE_DASHBOARD:
605+
print ("Creating CloudWatch dashboard for run metrics")
606+
create_dashboard(requestInfo)
607+
447608
#################################
448609
# SERVICE 4: MONITOR JOB
449610
#################################
@@ -547,6 +708,10 @@ def monitor(cheapest=False):
547708
deregistertask(ECS_TASK_NAME,ecs)
548709
print("Removing cluster if it's not the default and not otherwise in use")
549710
removeClusterIfUnused(monitorcluster, ecs)
711+
712+
# Remove Cloudwatch dashboard if created and cleanup desired
713+
if CREATE_DASHBOARD and CLEAN_DASHBOARD:
714+
clean_dashboard(monitorapp)
550715

551716
#Step 6: Export the logs to S3
552717
logs=boto3.client('logs')

0 commit comments

Comments
 (0)