comparison venv/lib/python2.7/site-packages/boto/sqs/queue.py @ 0:d67268158946 draft

planemo upload commit a3f181f5f126803c654b3a66dd4e83a48f7e203b
author bcclaywell
date Mon, 12 Oct 2015 17:43:33 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:d67268158946
1 # Copyright (c) 2006-2009 Mitch Garnaat http://garnaat.org/
2 #
3 # Permission is hereby granted, free of charge, to any person obtaining a
4 # copy of this software and associated documentation files (the
5 # "Software"), to deal in the Software without restriction, including
6 # without limitation the rights to use, copy, modify, merge, publish, dis-
7 # tribute, sublicense, and/or sell copies of the Software, and to permit
8 # persons to whom the Software is furnished to do so, subject to the fol-
9 # lowing conditions:
10 #
11 # The above copyright notice and this permission notice shall be included
12 # in all copies or substantial portions of the Software.
13 #
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20 # IN THE SOFTWARE.
21
22 """
23 Represents an SQS Queue
24 """
25 from boto.compat import urllib
26 from boto.sqs.message import Message
27
28
29 class Queue(object):
30
31 def __init__(self, connection=None, url=None, message_class=Message):
32 self.connection = connection
33 self.url = url
34 self.message_class = message_class
35 self.visibility_timeout = None
36
37 def __repr__(self):
38 return 'Queue(%s)' % self.url
39
40 def _id(self):
41 if self.url:
42 val = urllib.parse.urlparse(self.url)[2]
43 else:
44 val = self.url
45 return val
46 id = property(_id)
47
48 def _name(self):
49 if self.url:
50 val = urllib.parse.urlparse(self.url)[2].split('/')[2]
51 else:
52 val = self.url
53 return val
54 name = property(_name)
55
56 def _arn(self):
57 parts = self.id.split('/')
58 return 'arn:aws:sqs:%s:%s:%s' % (
59 self.connection.region.name, parts[1], parts[2])
60 arn = property(_arn)
61
62 def startElement(self, name, attrs, connection):
63 return None
64
65 def endElement(self, name, value, connection):
66 if name == 'QueueUrl':
67 self.url = value
68 elif name == 'VisibilityTimeout':
69 self.visibility_timeout = int(value)
70 else:
71 setattr(self, name, value)
72
73 def set_message_class(self, message_class):
74 """
75 Set the message class that should be used when instantiating
76 messages read from the queue. By default, the class
77 :class:`boto.sqs.message.Message` is used but this can be overriden
78 with any class that behaves like a message.
79
80 :type message_class: Message-like class
81 :param message_class: The new Message class
82 """
83 self.message_class = message_class
84
85 def get_attributes(self, attributes='All'):
86 """
87 Retrieves attributes about this queue object and returns
88 them in an Attribute instance (subclass of a Dictionary).
89
90 :type attributes: string
91 :param attributes: String containing one of:
92 ApproximateNumberOfMessages,
93 ApproximateNumberOfMessagesNotVisible,
94 VisibilityTimeout,
95 CreatedTimestamp,
96 LastModifiedTimestamp,
97 Policy
98 ReceiveMessageWaitTimeSeconds
99 :rtype: Attribute object
100 :return: An Attribute object which is a mapping type holding the
101 requested name/value pairs
102 """
103 return self.connection.get_queue_attributes(self, attributes)
104
105 def set_attribute(self, attribute, value):
106 """
107 Set a new value for an attribute of the Queue.
108
109 :type attribute: String
110 :param attribute: The name of the attribute you want to set. The
111 only valid value at this time is: VisibilityTimeout
112 :type value: int
113 :param value: The new value for the attribute.
114 For VisibilityTimeout the value must be an
115 integer number of seconds from 0 to 86400.
116
117 :rtype: bool
118 :return: True if successful, otherwise False.
119 """
120 return self.connection.set_queue_attribute(self, attribute, value)
121
122 def get_timeout(self):
123 """
124 Get the visibility timeout for the queue.
125
126 :rtype: int
127 :return: The number of seconds as an integer.
128 """
129 a = self.get_attributes('VisibilityTimeout')
130 return int(a['VisibilityTimeout'])
131
132 def set_timeout(self, visibility_timeout):
133 """
134 Set the visibility timeout for the queue.
135
136 :type visibility_timeout: int
137 :param visibility_timeout: The desired timeout in seconds
138 """
139 retval = self.set_attribute('VisibilityTimeout', visibility_timeout)
140 if retval:
141 self.visibility_timeout = visibility_timeout
142 return retval
143
144 def add_permission(self, label, aws_account_id, action_name):
145 """
146 Add a permission to a queue.
147
148 :type label: str or unicode
149 :param label: A unique identification of the permission you are setting.
150 Maximum of 80 characters ``[0-9a-zA-Z_-]``
151 Example, AliceSendMessage
152
153 :type aws_account_id: str or unicode
154 :param principal_id: The AWS account number of the principal who
155 will be given permission. The principal must have an AWS account,
156 but does not need to be signed up for Amazon SQS. For information
157 about locating the AWS account identification.
158
159 :type action_name: str or unicode
160 :param action_name: The action. Valid choices are:
161 SendMessage|ReceiveMessage|DeleteMessage|
162 ChangeMessageVisibility|GetQueueAttributes|*
163
164 :rtype: bool
165 :return: True if successful, False otherwise.
166
167 """
168 return self.connection.add_permission(self, label, aws_account_id,
169 action_name)
170
171 def remove_permission(self, label):
172 """
173 Remove a permission from a queue.
174
175 :type label: str or unicode
176 :param label: The unique label associated with the permission
177 being removed.
178
179 :rtype: bool
180 :return: True if successful, False otherwise.
181 """
182 return self.connection.remove_permission(self, label)
183
184 def read(self, visibility_timeout=None, wait_time_seconds=None,
185 message_attributes=None):
186 """
187 Read a single message from the queue.
188
189 :type visibility_timeout: int
190 :param visibility_timeout: The timeout for this message in seconds
191
192 :type wait_time_seconds: int
193 :param wait_time_seconds: The duration (in seconds) for which the call
194 will wait for a message to arrive in the queue before returning.
195 If a message is available, the call will return sooner than
196 wait_time_seconds.
197
198 :type message_attributes: list
199 :param message_attributes: The name(s) of additional message
200 attributes to return. The default is to return no additional
201 message attributes. Use ``['All']`` or ``['.*']`` to return all.
202
203 :rtype: :class:`boto.sqs.message.Message`
204 :return: A single message or None if queue is empty
205 """
206 rs = self.get_messages(1, visibility_timeout,
207 wait_time_seconds=wait_time_seconds,
208 message_attributes=message_attributes)
209 if len(rs) == 1:
210 return rs[0]
211 else:
212 return None
213
214 def write(self, message, delay_seconds=None):
215 """
216 Add a single message to the queue.
217
218 :type message: Message
219 :param message: The message to be written to the queue
220
221 :rtype: :class:`boto.sqs.message.Message`
222 :return: The :class:`boto.sqs.message.Message` object that was written.
223 """
224 new_msg = self.connection.send_message(self,
225 message.get_body_encoded(), delay_seconds=delay_seconds,
226 message_attributes=message.message_attributes)
227 message.id = new_msg.id
228 message.md5 = new_msg.md5
229 return message
230
231 def write_batch(self, messages):
232 """
233 Delivers up to 10 messages in a single request.
234
235 :type messages: List of lists.
236 :param messages: A list of lists or tuples. Each inner
237 tuple represents a single message to be written
238 and consists of and ID (string) that must be unique
239 within the list of messages, the message body itself
240 which can be a maximum of 64K in length, an
241 integer which represents the delay time (in seconds)
242 for the message (0-900) before the message will
243 be delivered to the queue, and an optional dict of
244 message attributes like those passed to ``send_message``
245 in the connection class.
246 """
247 return self.connection.send_message_batch(self, messages)
248
249 def new_message(self, body='', **kwargs):
250 """
251 Create new message of appropriate class.
252
253 :type body: message body
254 :param body: The body of the newly created message (optional).
255
256 :rtype: :class:`boto.sqs.message.Message`
257 :return: A new Message object
258 """
259 m = self.message_class(self, body, **kwargs)
260 m.queue = self
261 return m
262
263 # get a variable number of messages, returns a list of messages
264 def get_messages(self, num_messages=1, visibility_timeout=None,
265 attributes=None, wait_time_seconds=None,
266 message_attributes=None):
267 """
268 Get a variable number of messages.
269
270 :type num_messages: int
271 :param num_messages: The maximum number of messages to read from
272 the queue.
273
274 :type visibility_timeout: int
275 :param visibility_timeout: The VisibilityTimeout for the messages read.
276
277 :type attributes: str
278 :param attributes: The name of additional attribute to return
279 with response or All if you want all attributes. The
280 default is to return no additional attributes. Valid
281 values: All SenderId SentTimestamp ApproximateReceiveCount
282 ApproximateFirstReceiveTimestamp
283
284 :type wait_time_seconds: int
285 :param wait_time_seconds: The duration (in seconds) for which the call
286 will wait for a message to arrive in the queue before returning.
287 If a message is available, the call will return sooner than
288 wait_time_seconds.
289
290 :type message_attributes: list
291 :param message_attributes: The name(s) of additional message
292 attributes to return. The default is to return no additional
293 message attributes. Use ``['All']`` or ``['.*']`` to return all.
294
295 :rtype: list
296 :return: A list of :class:`boto.sqs.message.Message` objects.
297 """
298 return self.connection.receive_message(
299 self, number_messages=num_messages,
300 visibility_timeout=visibility_timeout, attributes=attributes,
301 wait_time_seconds=wait_time_seconds,
302 message_attributes=message_attributes)
303
304 def delete_message(self, message):
305 """
306 Delete a message from the queue.
307
308 :type message: :class:`boto.sqs.message.Message`
309 :param message: The :class:`boto.sqs.message.Message` object to delete.
310
311 :rtype: bool
312 :return: True if successful, False otherwise
313 """
314 return self.connection.delete_message(self, message)
315
316 def delete_message_batch(self, messages):
317 """
318 Deletes a list of messages in a single request.
319
320 :type messages: List of :class:`boto.sqs.message.Message` objects.
321 :param messages: A list of message objects.
322 """
323 return self.connection.delete_message_batch(self, messages)
324
325 def change_message_visibility_batch(self, messages):
326 """
327 A batch version of change_message_visibility that can act
328 on up to 10 messages at a time.
329
330 :type messages: List of tuples.
331 :param messages: A list of tuples where each tuple consists
332 of a :class:`boto.sqs.message.Message` object and an integer
333 that represents the new visibility timeout for that message.
334 """
335 return self.connection.change_message_visibility_batch(self, messages)
336
337 def delete(self):
338 """
339 Delete the queue.
340 """
341 return self.connection.delete_queue(self)
342
343 def purge(self):
344 """
345 Purge all messages in the queue.
346 """
347 return self.connection.purge_queue(self)
348
349 def clear(self, page_size=10, vtimeout=10):
350 """Deprecated utility function to remove all messages from a queue"""
351 return self.purge()
352
353 def count(self, page_size=10, vtimeout=10):
354 """
355 Utility function to count the number of messages in a queue.
356 Note: This function now calls GetQueueAttributes to obtain
357 an 'approximate' count of the number of messages in a queue.
358 """
359 a = self.get_attributes('ApproximateNumberOfMessages')
360 return int(a['ApproximateNumberOfMessages'])
361
362 def count_slow(self, page_size=10, vtimeout=10):
363 """
364 Deprecated. This is the old 'count' method that actually counts
365 the messages by reading them all. This gives an accurate count but
366 is very slow for queues with non-trivial number of messasges.
367 Instead, use get_attributes('ApproximateNumberOfMessages') to take
368 advantage of the new SQS capability. This is retained only for
369 the unit tests.
370 """
371 n = 0
372 l = self.get_messages(page_size, vtimeout)
373 while l:
374 for m in l:
375 n += 1
376 l = self.get_messages(page_size, vtimeout)
377 return n
378
379 def dump(self, file_name, page_size=10, vtimeout=10, sep='\n'):
380 """Utility function to dump the messages in a queue to a file
381 NOTE: Page size must be < 10 else SQS errors"""
382 fp = open(file_name, 'wb')
383 n = 0
384 l = self.get_messages(page_size, vtimeout)
385 while l:
386 for m in l:
387 fp.write(m.get_body())
388 if sep:
389 fp.write(sep)
390 n += 1
391 l = self.get_messages(page_size, vtimeout)
392 fp.close()
393 return n
394
395 def save_to_file(self, fp, sep='\n'):
396 """
397 Read all messages from the queue and persist them to file-like object.
398 Messages are written to the file and the 'sep' string is written
399 in between messages. Messages are deleted from the queue after
400 being written to the file.
401 Returns the number of messages saved.
402 """
403 n = 0
404 m = self.read()
405 while m:
406 n += 1
407 fp.write(m.get_body())
408 if sep:
409 fp.write(sep)
410 self.delete_message(m)
411 m = self.read()
412 return n
413
414 def save_to_filename(self, file_name, sep='\n'):
415 """
416 Read all messages from the queue and persist them to local file.
417 Messages are written to the file and the 'sep' string is written
418 in between messages. Messages are deleted from the queue after
419 being written to the file.
420 Returns the number of messages saved.
421 """
422 fp = open(file_name, 'wb')
423 n = self.save_to_file(fp, sep)
424 fp.close()
425 return n
426
427 # for backwards compatibility
428 save = save_to_filename
429
430 def save_to_s3(self, bucket):
431 """
432 Read all messages from the queue and persist them to S3.
433 Messages are stored in the S3 bucket using a naming scheme of::
434
435 <queue_id>/<message_id>
436
437 Messages are deleted from the queue after being saved to S3.
438 Returns the number of messages saved.
439 """
440 n = 0
441 m = self.read()
442 while m:
443 n += 1
444 key = bucket.new_key('%s/%s' % (self.id, m.id))
445 key.set_contents_from_string(m.get_body())
446 self.delete_message(m)
447 m = self.read()
448 return n
449
450 def load_from_s3(self, bucket, prefix=None):
451 """
452 Load messages previously saved to S3.
453 """
454 n = 0
455 if prefix:
456 prefix = '%s/' % prefix
457 else:
458 prefix = '%s/' % self.id[1:]
459 rs = bucket.list(prefix=prefix)
460 for key in rs:
461 n += 1
462 m = self.new_message(key.get_contents_as_string())
463 self.write(m)
464 return n
465
466 def load_from_file(self, fp, sep='\n'):
467 """Utility function to load messages from a file-like object to a queue"""
468 n = 0
469 body = ''
470 l = fp.readline()
471 while l:
472 if l == sep:
473 m = Message(self, body)
474 self.write(m)
475 n += 1
476 print('writing message %d' % n)
477 body = ''
478 else:
479 body = body + l
480 l = fp.readline()
481 return n
482
483 def load_from_filename(self, file_name, sep='\n'):
484 """Utility function to load messages from a local filename to a queue"""
485 fp = open(file_name, 'rb')
486 n = self.load_from_file(fp, sep)
487 fp.close()
488 return n
489
490 # for backward compatibility
491 load = load_from_filename
492