-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlambda_function.py
63 lines (53 loc) · 1.83 KB
/
lambda_function.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import boto3
import json
import os
import sharedutils
import time
def lambda_handler(event, context):
# Environment variables
database_name = os.getenv('DatabaseName')
table_name = os.getenv('TableName')
fixity_output_bucket_name = os.getenv('FixityOutputBucket')
query_result_bucket_name = os.getenv('ResultBucket')
dayPeriod = int(os.getenv('DayPeriod'))
s3_output = f's3://{query_result_bucket_name}/results/'
startDate = sharedutils.getDateFromDay(dayPeriod)
endDate = sharedutils.getDateFromDay(0)
# Remove the limit after fanout function is inplace.
query = """
SELECT bucket, key
FROM %s
WHERE key NOT IN
(SELECT key
FROM %s
WHERE timestamp
BETWEEN CAST('%s' AS DATE)
AND CAST('%s' AS DATE) )
""" % (table_name, table_name, startDate, endDate)
queryResponse = sharedutils.execute_query(
query=query,
database=database_name,
s3_output=s3_output)
results = sharedutils.get_query_result(queryResponse["QueryExecutionId"])
outputBucket = fixity_output_bucket_name
taskResponse = ""
sqs = boto3.client('sqs')
fixityQueueUrl = os.getenv('FixityQueueURL')
if len(results) == 0:
taskResponse = "Query Athena is failed"
else:
for x in range(1, len(results["Rows"])):
task_json = sharedutils.create_steps_task_json(
results["Rows"][x]["Data"],
outputBucket)
print("Task:" + task_json)
msg = task_json
sqsresponse = sqs.send_message(QueueUrl=fixityQueueUrl, MessageBody=msg)
print(sqsresponse.get('MessageId'))
print(sqsresponse.get('MD5OfMessageBody'))
return {
"statusCode": 200,
"body": json.dumps({
"message": taskResponse,
}),
}