-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsqs.py
130 lines (108 loc) · 4.25 KB
/
sqs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
from boto.sqs.connection import SQSConnection
from boto.sqs import regions
def make_connection(region_name, aws_access_key_id, aws_secret_access_key):
'''Make an SQSconnection to an AWS account. Pass in region, AWS access
key id, and AWS secret access key'''
for reg in regions():
if reg.name == region_name:
region = reg
break
return SQSConnection(aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region=region)
def get_queue(conn, queue_name):
'''Create a queue with the given name, or get an existing queue with that
name from the AWS connection.'''
return conn.get_queue(queue_name)
def get_message(queue, num_messages=1, visibility_timeout=300,
wait_time_seconds=20):
'''Get a message from the given queue. Default visibility timeout is
5 minutes, message wait time is 20 seconds, number of messages is 1.'''
return queue.get_messages(visibility_timeout=visibility_timeout,
wait_time_seconds=wait_time_seconds,
message_attributes=['All'])
def get_attributes(message):
'''Return a dictionary of the message attributes.'''
return {key: value['string_value']
for key, value in message[0].message_attributes.iteritems()}
def delete_message_from_handle(conn, queue, message):
'''Delete a message from the given queue.'''
return conn.delete_message_from_handle(queue, message.receipt_handle)
def queue_size(queue):
'''Get the approximate number of messages in the given queue.'''
return queue.count()
def build_job_message(**kwargs):
'''Build a meesage to add to the jobs queue.'''
job_message = {'body': 'job'}
job_message['attributes'] = {
'job_id': {
'data_type': 'Number',
'string_value': kwargs['job_id']
},
'email': {
'data_type': 'String',
'string_value': kwargs['email']
},
'scene_id': {
'data_type': 'String',
'string_value': kwargs['scene_id']
},
'band_1': {
'data_type': 'Number',
'string_value': kwargs['band_1']
},
'band_2': {
'data_type': 'Number',
'string_value': kwargs['band_2']
},
'band_3': {
'data_type': 'Number',
'string_value': kwargs['band_3']
}
}
return job_message
def build_result_message(**kwargs):
'''Build a message to add to the results queue.'''
result_message = {'body': 'result'}
result_message['attributes'] = {
'job_id': {
'data_type': 'Number',
'string_value': kwargs['job_id']
},
'email': {
'data_type': 'String',
'string_value': kwargs['email']
},
'link': {
'data_type': 'String',
'string_value': kwargs['link']
},
'scene_id': {
'data_type': 'String',
'string_value': kwargs['scene_id']
}
}
return result_message
def send_message(conn, queue, message_content, message_attributes=None):
'''Write a message to the given queue.'''
return conn.send_message(queue=queue,
message_content=message_content,
message_attributes=message_attributes)
if __name__ == '__main__':
import os
AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID']
AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY']
LANDSAT_JOBS_QUEUE = 'landsat_jobs_queue'
REGION = 'us-west-2'
# import pdb; pdb.set_trace()
conn = make_connection(REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
queue = get_queue(conn, LANDSAT_JOBS_QUEUE)
message = build_job_message(job_id=1, email='[email protected]', scene_id='LC80470272015005LGN00',
band_1=4, band_2=3, band_3=2)
send_message(conn, queue, message['body'], message['attributes'])
print(queue_size(queue))
message = get_message(queue)
attrs = get_attributes(message)
print(attrs)
print(delete_message_from_handle(conn, queue, message[0]))
print(queue_size(queue))