comparison venv/lib/python2.7/site-packages/boto/dynamodb2/table.py @ 0:d67268158946 draft

planemo upload commit a3f181f5f126803c654b3a66dd4e83a48f7e203b
author bcclaywell
date Mon, 12 Oct 2015 17:43:33 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:d67268158946
1 import boto
2 from boto.dynamodb2 import exceptions
3 from boto.dynamodb2.fields import (HashKey, RangeKey,
4 AllIndex, KeysOnlyIndex, IncludeIndex,
5 GlobalAllIndex, GlobalKeysOnlyIndex,
6 GlobalIncludeIndex)
7 from boto.dynamodb2.items import Item
8 from boto.dynamodb2.layer1 import DynamoDBConnection
9 from boto.dynamodb2.results import ResultSet, BatchGetResultSet
10 from boto.dynamodb2.types import (NonBooleanDynamizer, Dynamizer, FILTER_OPERATORS,
11 QUERY_OPERATORS, STRING)
12 from boto.exception import JSONResponseError
13
14
15 class Table(object):
16 """
17 Interacts & models the behavior of a DynamoDB table.
18
19 The ``Table`` object represents a set (or rough categorization) of
20 records within DynamoDB. The important part is that all records within the
21 table, while largely-schema-free, share the same schema & are essentially
22 namespaced for use in your application. For example, you might have a
23 ``users`` table or a ``forums`` table.
24 """
25 max_batch_get = 100
26
27 _PROJECTION_TYPE_TO_INDEX = dict(
28 global_indexes=dict(
29 ALL=GlobalAllIndex,
30 KEYS_ONLY=GlobalKeysOnlyIndex,
31 INCLUDE=GlobalIncludeIndex,
32 ), local_indexes=dict(
33 ALL=AllIndex,
34 KEYS_ONLY=KeysOnlyIndex,
35 INCLUDE=IncludeIndex,
36 )
37 )
38
39 def __init__(self, table_name, schema=None, throughput=None, indexes=None,
40 global_indexes=None, connection=None):
41 """
42 Sets up a new in-memory ``Table``.
43
44 This is useful if the table already exists within DynamoDB & you simply
45 want to use it for additional interactions. The only required parameter
46 is the ``table_name``. However, under the hood, the object will call
47 ``describe_table`` to determine the schema/indexes/throughput. You
48 can avoid this extra call by passing in ``schema`` & ``indexes``.
49
50 **IMPORTANT** - If you're creating a new ``Table`` for the first time,
51 you should use the ``Table.create`` method instead, as it will
52 persist the table structure to DynamoDB.
53
54 Requires a ``table_name`` parameter, which should be a simple string
55 of the name of the table.
56
57 Optionally accepts a ``schema`` parameter, which should be a list of
58 ``BaseSchemaField`` subclasses representing the desired schema.
59
60 Optionally accepts a ``throughput`` parameter, which should be a
61 dictionary. If provided, it should specify a ``read`` & ``write`` key,
62 both of which should have an integer value associated with them.
63
64 Optionally accepts a ``indexes`` parameter, which should be a list of
65 ``BaseIndexField`` subclasses representing the desired indexes.
66
67 Optionally accepts a ``global_indexes`` parameter, which should be a
68 list of ``GlobalBaseIndexField`` subclasses representing the desired
69 indexes.
70
71 Optionally accepts a ``connection`` parameter, which should be a
72 ``DynamoDBConnection`` instance (or subclass). This is primarily useful
73 for specifying alternate connection parameters.
74
75 Example::
76
77 # The simple, it-already-exists case.
78 >>> conn = Table('users')
79
80 # The full, minimum-extra-calls case.
81 >>> from boto import dynamodb2
82 >>> users = Table('users', schema=[
83 ... HashKey('username'),
84 ... RangeKey('date_joined', data_type=NUMBER)
85 ... ], throughput={
86 ... 'read':20,
87 ... 'write': 10,
88 ... }, indexes=[
89 ... KeysOnlyIndex('MostRecentlyJoined', parts=[
90 ... HashKey('username')
91 ... RangeKey('date_joined')
92 ... ]),
93 ... ], global_indexes=[
94 ... GlobalAllIndex('UsersByZipcode', parts=[
95 ... HashKey('zipcode'),
96 ... RangeKey('username'),
97 ... ],
98 ... throughput={
99 ... 'read':10,
100 ... 'write":10,
101 ... }),
102 ... ], connection=dynamodb2.connect_to_region('us-west-2',
103 ... aws_access_key_id='key',
104 ... aws_secret_access_key='key',
105 ... ))
106
107 """
108 self.table_name = table_name
109 self.connection = connection
110 self.throughput = {
111 'read': 5,
112 'write': 5,
113 }
114 self.schema = schema
115 self.indexes = indexes
116 self.global_indexes = global_indexes
117
118 if self.connection is None:
119 self.connection = DynamoDBConnection()
120
121 if throughput is not None:
122 self.throughput = throughput
123
124 self._dynamizer = NonBooleanDynamizer()
125
126 def use_boolean(self):
127 self._dynamizer = Dynamizer()
128
129 @classmethod
130 def create(cls, table_name, schema, throughput=None, indexes=None,
131 global_indexes=None, connection=None):
132 """
133 Creates a new table in DynamoDB & returns an in-memory ``Table`` object.
134
135 This will setup a brand new table within DynamoDB. The ``table_name``
136 must be unique for your AWS account. The ``schema`` is also required
137 to define the key structure of the table.
138
139 **IMPORTANT** - You should consider the usage pattern of your table
140 up-front, as the schema can **NOT** be modified once the table is
141 created, requiring the creation of a new table & migrating the data
142 should you wish to revise it.
143
144 **IMPORTANT** - If the table already exists in DynamoDB, additional
145 calls to this method will result in an error. If you just need
146 a ``Table`` object to interact with the existing table, you should
147 just initialize a new ``Table`` object, which requires only the
148 ``table_name``.
149
150 Requires a ``table_name`` parameter, which should be a simple string
151 of the name of the table.
152
153 Requires a ``schema`` parameter, which should be a list of
154 ``BaseSchemaField`` subclasses representing the desired schema.
155
156 Optionally accepts a ``throughput`` parameter, which should be a
157 dictionary. If provided, it should specify a ``read`` & ``write`` key,
158 both of which should have an integer value associated with them.
159
160 Optionally accepts a ``indexes`` parameter, which should be a list of
161 ``BaseIndexField`` subclasses representing the desired indexes.
162
163 Optionally accepts a ``global_indexes`` parameter, which should be a
164 list of ``GlobalBaseIndexField`` subclasses representing the desired
165 indexes.
166
167 Optionally accepts a ``connection`` parameter, which should be a
168 ``DynamoDBConnection`` instance (or subclass). This is primarily useful
169 for specifying alternate connection parameters.
170
171 Example::
172
173 >>> users = Table.create('users', schema=[
174 ... HashKey('username'),
175 ... RangeKey('date_joined', data_type=NUMBER)
176 ... ], throughput={
177 ... 'read':20,
178 ... 'write': 10,
179 ... }, indexes=[
180 ... KeysOnlyIndex('MostRecentlyJoined', parts=[
181 ... RangeKey('date_joined')
182 ... ]), global_indexes=[
183 ... GlobalAllIndex('UsersByZipcode', parts=[
184 ... HashKey('zipcode'),
185 ... RangeKey('username'),
186 ... ],
187 ... throughput={
188 ... 'read':10,
189 ... 'write':10,
190 ... }),
191 ... ])
192
193 """
194 table = cls(table_name=table_name, connection=connection)
195 table.schema = schema
196
197 if throughput is not None:
198 table.throughput = throughput
199
200 if indexes is not None:
201 table.indexes = indexes
202
203 if global_indexes is not None:
204 table.global_indexes = global_indexes
205
206 # Prep the schema.
207 raw_schema = []
208 attr_defs = []
209 seen_attrs = set()
210
211 for field in table.schema:
212 raw_schema.append(field.schema())
213 # Build the attributes off what we know.
214 seen_attrs.add(field.name)
215 attr_defs.append(field.definition())
216
217 raw_throughput = {
218 'ReadCapacityUnits': int(table.throughput['read']),
219 'WriteCapacityUnits': int(table.throughput['write']),
220 }
221 kwargs = {}
222
223 kwarg_map = {
224 'indexes': 'local_secondary_indexes',
225 'global_indexes': 'global_secondary_indexes',
226 }
227 for index_attr in ('indexes', 'global_indexes'):
228 table_indexes = getattr(table, index_attr)
229 if table_indexes:
230 raw_indexes = []
231 for index_field in table_indexes:
232 raw_indexes.append(index_field.schema())
233 # Make sure all attributes specified in the indexes are
234 # added to the definition
235 for field in index_field.parts:
236 if field.name not in seen_attrs:
237 seen_attrs.add(field.name)
238 attr_defs.append(field.definition())
239
240 kwargs[kwarg_map[index_attr]] = raw_indexes
241
242 table.connection.create_table(
243 table_name=table.table_name,
244 attribute_definitions=attr_defs,
245 key_schema=raw_schema,
246 provisioned_throughput=raw_throughput,
247 **kwargs
248 )
249 return table
250
251 def _introspect_schema(self, raw_schema, raw_attributes=None):
252 """
253 Given a raw schema structure back from a DynamoDB response, parse
254 out & build the high-level Python objects that represent them.
255 """
256 schema = []
257 sane_attributes = {}
258
259 if raw_attributes:
260 for field in raw_attributes:
261 sane_attributes[field['AttributeName']] = field['AttributeType']
262
263 for field in raw_schema:
264 data_type = sane_attributes.get(field['AttributeName'], STRING)
265
266 if field['KeyType'] == 'HASH':
267 schema.append(
268 HashKey(field['AttributeName'], data_type=data_type)
269 )
270 elif field['KeyType'] == 'RANGE':
271 schema.append(
272 RangeKey(field['AttributeName'], data_type=data_type)
273 )
274 else:
275 raise exceptions.UnknownSchemaFieldError(
276 "%s was seen, but is unknown. Please report this at "
277 "https://github.com/boto/boto/issues." % field['KeyType']
278 )
279
280 return schema
281
282 def _introspect_all_indexes(self, raw_indexes, map_indexes_projection):
283 """
284 Given a raw index/global index structure back from a DynamoDB response,
285 parse out & build the high-level Python objects that represent them.
286 """
287 indexes = []
288
289 for field in raw_indexes:
290 index_klass = map_indexes_projection.get('ALL')
291 kwargs = {
292 'parts': []
293 }
294
295 if field['Projection']['ProjectionType'] == 'ALL':
296 index_klass = map_indexes_projection.get('ALL')
297 elif field['Projection']['ProjectionType'] == 'KEYS_ONLY':
298 index_klass = map_indexes_projection.get('KEYS_ONLY')
299 elif field['Projection']['ProjectionType'] == 'INCLUDE':
300 index_klass = map_indexes_projection.get('INCLUDE')
301 kwargs['includes'] = field['Projection']['NonKeyAttributes']
302 else:
303 raise exceptions.UnknownIndexFieldError(
304 "%s was seen, but is unknown. Please report this at "
305 "https://github.com/boto/boto/issues." % \
306 field['Projection']['ProjectionType']
307 )
308
309 name = field['IndexName']
310 kwargs['parts'] = self._introspect_schema(field['KeySchema'], None)
311 indexes.append(index_klass(name, **kwargs))
312
313 return indexes
314
315 def _introspect_indexes(self, raw_indexes):
316 """
317 Given a raw index structure back from a DynamoDB response, parse
318 out & build the high-level Python objects that represent them.
319 """
320 return self._introspect_all_indexes(
321 raw_indexes, self._PROJECTION_TYPE_TO_INDEX.get('local_indexes'))
322
323 def _introspect_global_indexes(self, raw_global_indexes):
324 """
325 Given a raw global index structure back from a DynamoDB response, parse
326 out & build the high-level Python objects that represent them.
327 """
328 return self._introspect_all_indexes(
329 raw_global_indexes,
330 self._PROJECTION_TYPE_TO_INDEX.get('global_indexes'))
331
332 def describe(self):
333 """
334 Describes the current structure of the table in DynamoDB.
335
336 This information will be used to update the ``schema``, ``indexes``,
337 ``global_indexes`` and ``throughput`` information on the ``Table``. Some
338 calls, such as those involving creating keys or querying, will require
339 this information to be populated.
340
341 It also returns the full raw data structure from DynamoDB, in the
342 event you'd like to parse out additional information (such as the
343 ``ItemCount`` or usage information).
344
345 Example::
346
347 >>> users.describe()
348 {
349 # Lots of keys here...
350 }
351 >>> len(users.schema)
352 2
353
354 """
355 result = self.connection.describe_table(self.table_name)
356
357 # Blindly update throughput, since what's on DynamoDB's end is likely
358 # more correct.
359 raw_throughput = result['Table']['ProvisionedThroughput']
360 self.throughput['read'] = int(raw_throughput['ReadCapacityUnits'])
361 self.throughput['write'] = int(raw_throughput['WriteCapacityUnits'])
362
363 if not self.schema:
364 # Since we have the data, build the schema.
365 raw_schema = result['Table'].get('KeySchema', [])
366 raw_attributes = result['Table'].get('AttributeDefinitions', [])
367 self.schema = self._introspect_schema(raw_schema, raw_attributes)
368
369 if not self.indexes:
370 # Build the index information as well.
371 raw_indexes = result['Table'].get('LocalSecondaryIndexes', [])
372 self.indexes = self._introspect_indexes(raw_indexes)
373
374 # Build the global index information as well.
375 raw_global_indexes = result['Table'].get('GlobalSecondaryIndexes', [])
376 self.global_indexes = self._introspect_global_indexes(raw_global_indexes)
377
378 # This is leaky.
379 return result
380
381 def update(self, throughput=None, global_indexes=None):
382 """
383 Updates table attributes and global indexes in DynamoDB.
384
385 Optionally accepts a ``throughput`` parameter, which should be a
386 dictionary. If provided, it should specify a ``read`` & ``write`` key,
387 both of which should have an integer value associated with them.
388
389 Optionally accepts a ``global_indexes`` parameter, which should be a
390 dictionary. If provided, it should specify the index name, which is also
391 a dict containing a ``read`` & ``write`` key, both of which
392 should have an integer value associated with them. If you are writing
393 new code, please use ``Table.update_global_secondary_index``.
394
395 Returns ``True`` on success.
396
397 Example::
398
399 # For a read-heavier application...
400 >>> users.update(throughput={
401 ... 'read': 20,
402 ... 'write': 10,
403 ... })
404 True
405
406 # To also update the global index(es) throughput.
407 >>> users.update(throughput={
408 ... 'read': 20,
409 ... 'write': 10,
410 ... },
411 ... global_secondary_indexes={
412 ... 'TheIndexNameHere': {
413 ... 'read': 15,
414 ... 'write': 5,
415 ... }
416 ... })
417 True
418 """
419
420 data = None
421
422 if throughput:
423 self.throughput = throughput
424 data = {
425 'ReadCapacityUnits': int(self.throughput['read']),
426 'WriteCapacityUnits': int(self.throughput['write']),
427 }
428
429 gsi_data = None
430
431 if global_indexes:
432 gsi_data = []
433
434 for gsi_name, gsi_throughput in global_indexes.items():
435 gsi_data.append({
436 "Update": {
437 "IndexName": gsi_name,
438 "ProvisionedThroughput": {
439 "ReadCapacityUnits": int(gsi_throughput['read']),
440 "WriteCapacityUnits": int(gsi_throughput['write']),
441 },
442 },
443 })
444
445 if throughput or global_indexes:
446 self.connection.update_table(
447 self.table_name,
448 provisioned_throughput=data,
449 global_secondary_index_updates=gsi_data,
450 )
451
452 return True
453 else:
454 msg = 'You need to provide either the throughput or the ' \
455 'global_indexes to update method'
456 boto.log.error(msg)
457
458 return False
459
460 def create_global_secondary_index(self, global_index):
461 """
462 Creates a global index in DynamoDB after the table has been created.
463
464 Requires a ``global_indexes`` parameter, which should be a
465 ``GlobalBaseIndexField`` subclass representing the desired index.
466
467 To update ``global_indexes`` information on the ``Table``, you'll need
468 to call ``Table.describe``.
469
470 Returns ``True`` on success.
471
472 Example::
473
474 # To create a global index
475 >>> users.create_global_secondary_index(
476 ... global_index=GlobalAllIndex(
477 ... 'TheIndexNameHere', parts=[
478 ... HashKey('requiredHashkey', data_type=STRING),
479 ... RangeKey('optionalRangeKey', data_type=STRING)
480 ... ],
481 ... throughput={
482 ... 'read': 2,
483 ... 'write': 1,
484 ... })
485 ... )
486 True
487
488 """
489
490 if global_index:
491 gsi_data = []
492 gsi_data_attr_def = []
493
494 gsi_data.append({
495 "Create": global_index.schema()
496 })
497
498 for attr_def in global_index.parts:
499 gsi_data_attr_def.append(attr_def.definition())
500
501 self.connection.update_table(
502 self.table_name,
503 global_secondary_index_updates=gsi_data,
504 attribute_definitions=gsi_data_attr_def
505 )
506
507 return True
508 else:
509 msg = 'You need to provide the global_index to ' \
510 'create_global_secondary_index method'
511 boto.log.error(msg)
512
513 return False
514
515 def delete_global_secondary_index(self, global_index_name):
516 """
517 Deletes a global index in DynamoDB after the table has been created.
518
519 Requires a ``global_index_name`` parameter, which should be a simple
520 string of the name of the global secondary index.
521
522 To update ``global_indexes`` information on the ``Table``, you'll need
523 to call ``Table.describe``.
524
525 Returns ``True`` on success.
526
527 Example::
528
529 # To delete a global index
530 >>> users.delete_global_secondary_index('TheIndexNameHere')
531 True
532
533 """
534
535 if global_index_name:
536 gsi_data = [
537 {
538 "Delete": {
539 "IndexName": global_index_name
540 }
541 }
542 ]
543
544 self.connection.update_table(
545 self.table_name,
546 global_secondary_index_updates=gsi_data,
547 )
548
549 return True
550 else:
551 msg = 'You need to provide the global index name to ' \
552 'delete_global_secondary_index method'
553 boto.log.error(msg)
554
555 return False
556
557 def update_global_secondary_index(self, global_indexes):
558 """
559 Updates a global index(es) in DynamoDB after the table has been created.
560
561 Requires a ``global_indexes`` parameter, which should be a
562 dictionary. If provided, it should specify the index name, which is also
563 a dict containing a ``read`` & ``write`` key, both of which
564 should have an integer value associated with them.
565
566 To update ``global_indexes`` information on the ``Table``, you'll need
567 to call ``Table.describe``.
568
569 Returns ``True`` on success.
570
571 Example::
572
573 # To update a global index
574 >>> users.update_global_secondary_index(global_indexes={
575 ... 'TheIndexNameHere': {
576 ... 'read': 15,
577 ... 'write': 5,
578 ... }
579 ... })
580 True
581
582 """
583
584 if global_indexes:
585 gsi_data = []
586
587 for gsi_name, gsi_throughput in global_indexes.items():
588 gsi_data.append({
589 "Update": {
590 "IndexName": gsi_name,
591 "ProvisionedThroughput": {
592 "ReadCapacityUnits": int(gsi_throughput['read']),
593 "WriteCapacityUnits": int(gsi_throughput['write']),
594 },
595 },
596 })
597
598 self.connection.update_table(
599 self.table_name,
600 global_secondary_index_updates=gsi_data,
601 )
602 return True
603 else:
604 msg = 'You need to provide the global indexes to ' \
605 'update_global_secondary_index method'
606 boto.log.error(msg)
607
608 return False
609
610 def delete(self):
611 """
612 Deletes a table in DynamoDB.
613
614 **IMPORTANT** - Be careful when using this method, there is no undo.
615
616 Returns ``True`` on success.
617
618 Example::
619
620 >>> users.delete()
621 True
622
623 """
624 self.connection.delete_table(self.table_name)
625 return True
626
627 def _encode_keys(self, keys):
628 """
629 Given a flat Python dictionary of keys/values, converts it into the
630 nested dictionary DynamoDB expects.
631
632 Converts::
633
634 {
635 'username': 'john',
636 'tags': [1, 2, 5],
637 }
638
639 ...to...::
640
641 {
642 'username': {'S': 'john'},
643 'tags': {'NS': ['1', '2', '5']},
644 }
645
646 """
647 raw_key = {}
648
649 for key, value in keys.items():
650 raw_key[key] = self._dynamizer.encode(value)
651
652 return raw_key
653
654 def get_item(self, consistent=False, attributes=None, **kwargs):
655 """
656 Fetches an item (record) from a table in DynamoDB.
657
658 To specify the key of the item you'd like to get, you can specify the
659 key attributes as kwargs.
660
661 Optionally accepts a ``consistent`` parameter, which should be a
662 boolean. If you provide ``True``, it will perform
663 a consistent (but more expensive) read from DynamoDB.
664 (Default: ``False``)
665
666 Optionally accepts an ``attributes`` parameter, which should be a
667 list of fieldname to fetch. (Default: ``None``, which means all fields
668 should be fetched)
669
670 Returns an ``Item`` instance containing all the data for that record.
671
672 Raises an ``ItemNotFound`` exception if the item is not found.
673
674 Example::
675
676 # A simple hash key.
677 >>> john = users.get_item(username='johndoe')
678 >>> john['first_name']
679 'John'
680
681 # A complex hash+range key.
682 >>> john = users.get_item(username='johndoe', last_name='Doe')
683 >>> john['first_name']
684 'John'
685
686 # A consistent read (assuming the data might have just changed).
687 >>> john = users.get_item(username='johndoe', consistent=True)
688 >>> john['first_name']
689 'Johann'
690
691 # With a key that is an invalid variable name in Python.
692 # Also, assumes a different schema than previous examples.
693 >>> john = users.get_item(**{
694 ... 'date-joined': 127549192,
695 ... })
696 >>> john['first_name']
697 'John'
698
699 """
700 raw_key = self._encode_keys(kwargs)
701 item_data = self.connection.get_item(
702 self.table_name,
703 raw_key,
704 attributes_to_get=attributes,
705 consistent_read=consistent
706 )
707 if 'Item' not in item_data:
708 raise exceptions.ItemNotFound("Item %s couldn't be found." % kwargs)
709 item = Item(self)
710 item.load(item_data)
711 return item
712
713 def has_item(self, **kwargs):
714 """
715 Return whether an item (record) exists within a table in DynamoDB.
716
717 To specify the key of the item you'd like to get, you can specify the
718 key attributes as kwargs.
719
720 Optionally accepts a ``consistent`` parameter, which should be a
721 boolean. If you provide ``True``, it will perform
722 a consistent (but more expensive) read from DynamoDB.
723 (Default: ``False``)
724
725 Optionally accepts an ``attributes`` parameter, which should be a
726 list of fieldnames to fetch. (Default: ``None``, which means all fields
727 should be fetched)
728
729 Returns ``True`` if an ``Item`` is present, ``False`` if not.
730
731 Example::
732
733 # Simple, just hash-key schema.
734 >>> users.has_item(username='johndoe')
735 True
736
737 # Complex schema, item not present.
738 >>> users.has_item(
739 ... username='johndoe',
740 ... date_joined='2014-01-07'
741 ... )
742 False
743
744 """
745 try:
746 self.get_item(**kwargs)
747 except (JSONResponseError, exceptions.ItemNotFound):
748 return False
749
750 return True
751
752 def lookup(self, *args, **kwargs):
753 """
754 Look up an entry in DynamoDB. This is mostly backwards compatible
755 with boto.dynamodb. Unlike get_item, it takes hash_key and range_key first,
756 although you may still specify keyword arguments instead.
757
758 Also unlike the get_item command, if the returned item has no keys
759 (i.e., it does not exist in DynamoDB), a None result is returned, instead
760 of an empty key object.
761
762 Example::
763 >>> user = users.lookup(username)
764 >>> user = users.lookup(username, consistent=True)
765 >>> app = apps.lookup('my_customer_id', 'my_app_id')
766
767 """
768 if not self.schema:
769 self.describe()
770 for x, arg in enumerate(args):
771 kwargs[self.schema[x].name] = arg
772 ret = self.get_item(**kwargs)
773 if not ret.keys():
774 return None
775 return ret
776
777 def new_item(self, *args):
778 """
779 Returns a new, blank item
780
781 This is mostly for consistency with boto.dynamodb
782 """
783 if not self.schema:
784 self.describe()
785 data = {}
786 for x, arg in enumerate(args):
787 data[self.schema[x].name] = arg
788 return Item(self, data=data)
789
790 def put_item(self, data, overwrite=False):
791 """
792 Saves an entire item to DynamoDB.
793
794 By default, if any part of the ``Item``'s original data doesn't match
795 what's currently in DynamoDB, this request will fail. This prevents
796 other processes from updating the data in between when you read the
797 item & when your request to update the item's data is processed, which
798 would typically result in some data loss.
799
800 Requires a ``data`` parameter, which should be a dictionary of the data
801 you'd like to store in DynamoDB.
802
803 Optionally accepts an ``overwrite`` parameter, which should be a
804 boolean. If you provide ``True``, this will tell DynamoDB to blindly
805 overwrite whatever data is present, if any.
806
807 Returns ``True`` on success.
808
809 Example::
810
811 >>> users.put_item(data={
812 ... 'username': 'jane',
813 ... 'first_name': 'Jane',
814 ... 'last_name': 'Doe',
815 ... 'date_joined': 126478915,
816 ... })
817 True
818
819 """
820 item = Item(self, data=data)
821 return item.save(overwrite=overwrite)
822
823 def _put_item(self, item_data, expects=None):
824 """
825 The internal variant of ``put_item`` (full data). This is used by the
826 ``Item`` objects, since that operation is represented at the
827 table-level by the API, but conceptually maps better to telling an
828 individual ``Item`` to save itself.
829 """
830 kwargs = {}
831
832 if expects is not None:
833 kwargs['expected'] = expects
834
835 self.connection.put_item(self.table_name, item_data, **kwargs)
836 return True
837
838 def _update_item(self, key, item_data, expects=None):
839 """
840 The internal variant of ``put_item`` (partial data). This is used by the
841 ``Item`` objects, since that operation is represented at the
842 table-level by the API, but conceptually maps better to telling an
843 individual ``Item`` to save itself.
844 """
845 raw_key = self._encode_keys(key)
846 kwargs = {}
847
848 if expects is not None:
849 kwargs['expected'] = expects
850
851 self.connection.update_item(self.table_name, raw_key, item_data, **kwargs)
852 return True
853
854 def delete_item(self, expected=None, conditional_operator=None, **kwargs):
855 """
856 Deletes a single item. You can perform a conditional delete operation
857 that deletes the item if it exists, or if it has an expected attribute
858 value.
859
860 Conditional deletes are useful for only deleting items if specific
861 conditions are met. If those conditions are met, DynamoDB performs
862 the delete. Otherwise, the item is not deleted.
863
864 To specify the expected attribute values of the item, you can pass a
865 dictionary of conditions to ``expected``. Each condition should follow
866 the pattern ``<attributename>__<comparison_operator>=<value_to_expect>``.
867
868 **IMPORTANT** - Be careful when using this method, there is no undo.
869
870 To specify the key of the item you'd like to get, you can specify the
871 key attributes as kwargs.
872
873 Optionally accepts an ``expected`` parameter which is a dictionary of
874 expected attribute value conditions.
875
876 Optionally accepts a ``conditional_operator`` which applies to the
877 expected attribute value conditions:
878
879 + `AND` - If all of the conditions evaluate to true (default)
880 + `OR` - True if at least one condition evaluates to true
881
882 Returns ``True`` on success, ``False`` on failed conditional delete.
883
884 Example::
885
886 # A simple hash key.
887 >>> users.delete_item(username='johndoe')
888 True
889
890 # A complex hash+range key.
891 >>> users.delete_item(username='jane', last_name='Doe')
892 True
893
894 # With a key that is an invalid variable name in Python.
895 # Also, assumes a different schema than previous examples.
896 >>> users.delete_item(**{
897 ... 'date-joined': 127549192,
898 ... })
899 True
900
901 # Conditional delete
902 >>> users.delete_item(username='johndoe',
903 ... expected={'balance__eq': 0})
904 True
905 """
906 expected = self._build_filters(expected, using=FILTER_OPERATORS)
907 raw_key = self._encode_keys(kwargs)
908
909 try:
910 self.connection.delete_item(self.table_name, raw_key,
911 expected=expected,
912 conditional_operator=conditional_operator)
913 except exceptions.ConditionalCheckFailedException:
914 return False
915
916 return True
917
918 def get_key_fields(self):
919 """
920 Returns the fields necessary to make a key for a table.
921
922 If the ``Table`` does not already have a populated ``schema``,
923 this will request it via a ``Table.describe`` call.
924
925 Returns a list of fieldnames (strings).
926
927 Example::
928
929 # A simple hash key.
930 >>> users.get_key_fields()
931 ['username']
932
933 # A complex hash+range key.
934 >>> users.get_key_fields()
935 ['username', 'last_name']
936
937 """
938 if not self.schema:
939 # We don't know the structure of the table. Get a description to
940 # populate the schema.
941 self.describe()
942
943 return [field.name for field in self.schema]
944
945 def batch_write(self):
946 """
947 Allows the batching of writes to DynamoDB.
948
949 Since each write/delete call to DynamoDB has a cost associated with it,
950 when loading lots of data, it makes sense to batch them, creating as
951 few calls as possible.
952
953 This returns a context manager that will transparently handle creating
954 these batches. The object you get back lightly-resembles a ``Table``
955 object, sharing just the ``put_item`` & ``delete_item`` methods
956 (which are all that DynamoDB can batch in terms of writing data).
957
958 DynamoDB's maximum batch size is 25 items per request. If you attempt
959 to put/delete more than that, the context manager will batch as many
960 as it can up to that number, then flush them to DynamoDB & continue
961 batching as more calls come in.
962
963 Example::
964
965 # Assuming a table with one record...
966 >>> with users.batch_write() as batch:
967 ... batch.put_item(data={
968 ... 'username': 'johndoe',
969 ... 'first_name': 'John',
970 ... 'last_name': 'Doe',
971 ... 'owner': 1,
972 ... })
973 ... # Nothing across the wire yet.
974 ... batch.delete_item(username='bob')
975 ... # Still no requests sent.
976 ... batch.put_item(data={
977 ... 'username': 'jane',
978 ... 'first_name': 'Jane',
979 ... 'last_name': 'Doe',
980 ... 'date_joined': 127436192,
981 ... })
982 ... # Nothing yet, but once we leave the context, the
983 ... # put/deletes will be sent.
984
985 """
986 # PHENOMENAL COSMIC DOCS!!! itty-bitty code.
987 return BatchTable(self)
988
989 def _build_filters(self, filter_kwargs, using=QUERY_OPERATORS):
990 """
991 An internal method for taking query/scan-style ``**kwargs`` & turning
992 them into the raw structure DynamoDB expects for filtering.
993 """
994 if filter_kwargs is None:
995 return
996
997 filters = {}
998
999 for field_and_op, value in filter_kwargs.items():
1000 field_bits = field_and_op.split('__')
1001 fieldname = '__'.join(field_bits[:-1])
1002
1003 try:
1004 op = using[field_bits[-1]]
1005 except KeyError:
1006 raise exceptions.UnknownFilterTypeError(
1007 "Operator '%s' from '%s' is not recognized." % (
1008 field_bits[-1],
1009 field_and_op
1010 )
1011 )
1012
1013 lookup = {
1014 'AttributeValueList': [],
1015 'ComparisonOperator': op,
1016 }
1017
1018 # Special-case the ``NULL/NOT_NULL`` case.
1019 if field_bits[-1] == 'null':
1020 del lookup['AttributeValueList']
1021
1022 if value is False:
1023 lookup['ComparisonOperator'] = 'NOT_NULL'
1024 else:
1025 lookup['ComparisonOperator'] = 'NULL'
1026 # Special-case the ``BETWEEN`` case.
1027 elif field_bits[-1] == 'between':
1028 if len(value) == 2 and isinstance(value, (list, tuple)):
1029 lookup['AttributeValueList'].append(
1030 self._dynamizer.encode(value[0])
1031 )
1032 lookup['AttributeValueList'].append(
1033 self._dynamizer.encode(value[1])
1034 )
1035 # Special-case the ``IN`` case
1036 elif field_bits[-1] == 'in':
1037 for val in value:
1038 lookup['AttributeValueList'].append(self._dynamizer.encode(val))
1039 else:
1040 # Fix up the value for encoding, because it was built to only work
1041 # with ``set``s.
1042 if isinstance(value, (list, tuple)):
1043 value = set(value)
1044 lookup['AttributeValueList'].append(
1045 self._dynamizer.encode(value)
1046 )
1047
1048 # Finally, insert it into the filters.
1049 filters[fieldname] = lookup
1050
1051 return filters
1052
1053 def query(self, limit=None, index=None, reverse=False, consistent=False,
1054 attributes=None, max_page_size=None, **filter_kwargs):
1055 """
1056 **WARNING:** This method is provided **strictly** for
1057 backward-compatibility. It returns results in an incorrect order.
1058
1059 If you are writing new code, please use ``Table.query_2``.
1060 """
1061 reverse = not reverse
1062 return self.query_2(limit=limit, index=index, reverse=reverse,
1063 consistent=consistent, attributes=attributes,
1064 max_page_size=max_page_size, **filter_kwargs)
1065
1066 def query_2(self, limit=None, index=None, reverse=False,
1067 consistent=False, attributes=None, max_page_size=None,
1068 query_filter=None, conditional_operator=None,
1069 **filter_kwargs):
1070 """
1071 Queries for a set of matching items in a DynamoDB table.
1072
1073 Queries can be performed against a hash key, a hash+range key or
1074 against any data stored in your local secondary indexes. Query filters
1075 can be used to filter on arbitrary fields.
1076
1077 **Note** - You can not query against arbitrary fields within the data
1078 stored in DynamoDB unless you specify ``query_filter`` values.
1079
1080 To specify the filters of the items you'd like to get, you can specify
1081 the filters as kwargs. Each filter kwarg should follow the pattern
1082 ``<fieldname>__<filter_operation>=<value_to_look_for>``. Query filters
1083 are specified in the same way.
1084
1085 Optionally accepts a ``limit`` parameter, which should be an integer
1086 count of the total number of items to return. (Default: ``None`` -
1087 all results)
1088
1089 Optionally accepts an ``index`` parameter, which should be a string of
1090 name of the local secondary index you want to query against.
1091 (Default: ``None``)
1092
1093 Optionally accepts a ``reverse`` parameter, which will present the
1094 results in reverse order. (Default: ``False`` - normal order)
1095
1096 Optionally accepts a ``consistent`` parameter, which should be a
1097 boolean. If you provide ``True``, it will force a consistent read of
1098 the data (more expensive). (Default: ``False`` - use eventually
1099 consistent reads)
1100
1101 Optionally accepts a ``attributes`` parameter, which should be a
1102 tuple. If you provide any attributes only these will be fetched
1103 from DynamoDB. This uses the ``AttributesToGet`` and set's
1104 ``Select`` to ``SPECIFIC_ATTRIBUTES`` API.
1105
1106 Optionally accepts a ``max_page_size`` parameter, which should be an
1107 integer count of the maximum number of items to retrieve
1108 **per-request**. This is useful in making faster requests & prevent
1109 the scan from drowning out other queries. (Default: ``None`` -
1110 fetch as many as DynamoDB will return)
1111
1112 Optionally accepts a ``query_filter`` which is a dictionary of filter
1113 conditions against any arbitrary field in the returned data.
1114
1115 Optionally accepts a ``conditional_operator`` which applies to the
1116 query filter conditions:
1117
1118 + `AND` - True if all filter conditions evaluate to true (default)
1119 + `OR` - True if at least one filter condition evaluates to true
1120
1121 Returns a ``ResultSet``, which transparently handles the pagination of
1122 results you get back.
1123
1124 Example::
1125
1126 # Look for last names equal to "Doe".
1127 >>> results = users.query(last_name__eq='Doe')
1128 >>> for res in results:
1129 ... print res['first_name']
1130 'John'
1131 'Jane'
1132
1133 # Look for last names beginning with "D", in reverse order, limit 3.
1134 >>> results = users.query(
1135 ... last_name__beginswith='D',
1136 ... reverse=True,
1137 ... limit=3
1138 ... )
1139 >>> for res in results:
1140 ... print res['first_name']
1141 'Alice'
1142 'Jane'
1143 'John'
1144
1145 # Use an LSI & a consistent read.
1146 >>> results = users.query(
1147 ... date_joined__gte=1236451000,
1148 ... owner__eq=1,
1149 ... index='DateJoinedIndex',
1150 ... consistent=True
1151 ... )
1152 >>> for res in results:
1153 ... print res['first_name']
1154 'Alice'
1155 'Bob'
1156 'John'
1157 'Fred'
1158
1159 # Filter by non-indexed field(s)
1160 >>> results = users.query(
1161 ... last_name__eq='Doe',
1162 ... reverse=True,
1163 ... query_filter={
1164 ... 'first_name__beginswith': 'A'
1165 ... }
1166 ... )
1167 >>> for res in results:
1168 ... print res['first_name'] + ' ' + res['last_name']
1169 'Alice Doe'
1170
1171 """
1172 if self.schema:
1173 if len(self.schema) == 1:
1174 if len(filter_kwargs) <= 1:
1175 if not self.global_indexes or not len(self.global_indexes):
1176 # If the schema only has one field, there's <= 1 filter
1177 # param & no Global Secondary Indexes, this is user
1178 # error. Bail early.
1179 raise exceptions.QueryError(
1180 "You must specify more than one key to filter on."
1181 )
1182
1183 if attributes is not None:
1184 select = 'SPECIFIC_ATTRIBUTES'
1185 else:
1186 select = None
1187
1188 results = ResultSet(
1189 max_page_size=max_page_size
1190 )
1191 kwargs = filter_kwargs.copy()
1192 kwargs.update({
1193 'limit': limit,
1194 'index': index,
1195 'reverse': reverse,
1196 'consistent': consistent,
1197 'select': select,
1198 'attributes_to_get': attributes,
1199 'query_filter': query_filter,
1200 'conditional_operator': conditional_operator,
1201 })
1202 results.to_call(self._query, **kwargs)
1203 return results
1204
1205 def query_count(self, index=None, consistent=False, conditional_operator=None,
1206 query_filter=None, scan_index_forward=True, limit=None,
1207 exclusive_start_key=None, **filter_kwargs):
1208 """
1209 Queries the exact count of matching items in a DynamoDB table.
1210
1211 Queries can be performed against a hash key, a hash+range key or
1212 against any data stored in your local secondary indexes. Query filters
1213 can be used to filter on arbitrary fields.
1214
1215 To specify the filters of the items you'd like to get, you can specify
1216 the filters as kwargs. Each filter kwarg should follow the pattern
1217 ``<fieldname>__<filter_operation>=<value_to_look_for>``. Query filters
1218 are specified in the same way.
1219
1220 Optionally accepts an ``index`` parameter, which should be a string of
1221 name of the local secondary index you want to query against.
1222 (Default: ``None``)
1223
1224 Optionally accepts a ``consistent`` parameter, which should be a
1225 boolean. If you provide ``True``, it will force a consistent read of
1226 the data (more expensive). (Default: ``False`` - use eventually
1227 consistent reads)
1228
1229 Optionally accepts a ``query_filter`` which is a dictionary of filter
1230 conditions against any arbitrary field in the returned data.
1231
1232 Optionally accepts a ``conditional_operator`` which applies to the
1233 query filter conditions:
1234
1235 + `AND` - True if all filter conditions evaluate to true (default)
1236 + `OR` - True if at least one filter condition evaluates to true
1237
1238 Optionally accept a ``exclusive_start_key`` which is used to get
1239 the remaining items when a query cannot return the complete count.
1240
1241 Returns an integer which represents the exact amount of matched
1242 items.
1243
1244 :type scan_index_forward: boolean
1245 :param scan_index_forward: Specifies ascending (true) or descending
1246 (false) traversal of the index. DynamoDB returns results reflecting
1247 the requested order determined by the range key. If the data type
1248 is Number, the results are returned in numeric order. For String,
1249 the results are returned in order of ASCII character code values.
1250 For Binary, DynamoDB treats each byte of the binary data as
1251 unsigned when it compares binary values.
1252
1253 If ScanIndexForward is not specified, the results are returned in
1254 ascending order.
1255
1256 :type limit: integer
1257 :param limit: The maximum number of items to evaluate (not necessarily
1258 the number of matching items).
1259
1260 Example::
1261
1262 # Look for last names equal to "Doe".
1263 >>> users.query_count(last_name__eq='Doe')
1264 5
1265
1266 # Use an LSI & a consistent read.
1267 >>> users.query_count(
1268 ... date_joined__gte=1236451000,
1269 ... owner__eq=1,
1270 ... index='DateJoinedIndex',
1271 ... consistent=True
1272 ... )
1273 2
1274
1275 """
1276 key_conditions = self._build_filters(
1277 filter_kwargs,
1278 using=QUERY_OPERATORS
1279 )
1280
1281 built_query_filter = self._build_filters(
1282 query_filter,
1283 using=FILTER_OPERATORS
1284 )
1285
1286 count_buffer = 0
1287 last_evaluated_key = exclusive_start_key
1288
1289 while True:
1290 raw_results = self.connection.query(
1291 self.table_name,
1292 index_name=index,
1293 consistent_read=consistent,
1294 select='COUNT',
1295 key_conditions=key_conditions,
1296 query_filter=built_query_filter,
1297 conditional_operator=conditional_operator,
1298 limit=limit,
1299 scan_index_forward=scan_index_forward,
1300 exclusive_start_key=last_evaluated_key
1301 )
1302
1303 count_buffer += int(raw_results.get('Count', 0))
1304 last_evaluated_key = raw_results.get('LastEvaluatedKey')
1305 if not last_evaluated_key or count_buffer < 1:
1306 break
1307
1308 return count_buffer
1309
1310 def _query(self, limit=None, index=None, reverse=False, consistent=False,
1311 exclusive_start_key=None, select=None, attributes_to_get=None,
1312 query_filter=None, conditional_operator=None, **filter_kwargs):
1313 """
1314 The internal method that performs the actual queries. Used extensively
1315 by ``ResultSet`` to perform each (paginated) request.
1316 """
1317 kwargs = {
1318 'limit': limit,
1319 'index_name': index,
1320 'consistent_read': consistent,
1321 'select': select,
1322 'attributes_to_get': attributes_to_get,
1323 'conditional_operator': conditional_operator,
1324 }
1325
1326 if reverse:
1327 kwargs['scan_index_forward'] = False
1328
1329 if exclusive_start_key:
1330 kwargs['exclusive_start_key'] = {}
1331
1332 for key, value in exclusive_start_key.items():
1333 kwargs['exclusive_start_key'][key] = \
1334 self._dynamizer.encode(value)
1335
1336 # Convert the filters into something we can actually use.
1337 kwargs['key_conditions'] = self._build_filters(
1338 filter_kwargs,
1339 using=QUERY_OPERATORS
1340 )
1341
1342 kwargs['query_filter'] = self._build_filters(
1343 query_filter,
1344 using=FILTER_OPERATORS
1345 )
1346
1347 raw_results = self.connection.query(
1348 self.table_name,
1349 **kwargs
1350 )
1351 results = []
1352 last_key = None
1353
1354 for raw_item in raw_results.get('Items', []):
1355 item = Item(self)
1356 item.load({
1357 'Item': raw_item,
1358 })
1359 results.append(item)
1360
1361 if raw_results.get('LastEvaluatedKey', None):
1362 last_key = {}
1363
1364 for key, value in raw_results['LastEvaluatedKey'].items():
1365 last_key[key] = self._dynamizer.decode(value)
1366
1367 return {
1368 'results': results,
1369 'last_key': last_key,
1370 }
1371
1372 def scan(self, limit=None, segment=None, total_segments=None,
1373 max_page_size=None, attributes=None, conditional_operator=None,
1374 **filter_kwargs):
1375 """
1376 Scans across all items within a DynamoDB table.
1377
1378 Scans can be performed against a hash key or a hash+range key. You can
1379 additionally filter the results after the table has been read but
1380 before the response is returned by using query filters.
1381
1382 To specify the filters of the items you'd like to get, you can specify
1383 the filters as kwargs. Each filter kwarg should follow the pattern
1384 ``<fieldname>__<filter_operation>=<value_to_look_for>``.
1385
1386 Optionally accepts a ``limit`` parameter, which should be an integer
1387 count of the total number of items to return. (Default: ``None`` -
1388 all results)
1389
1390 Optionally accepts a ``segment`` parameter, which should be an integer
1391 of the segment to retrieve on. Please see the documentation about
1392 Parallel Scans (Default: ``None`` - no segments)
1393
1394 Optionally accepts a ``total_segments`` parameter, which should be an
1395 integer count of number of segments to divide the table into.
1396 Please see the documentation about Parallel Scans (Default: ``None`` -
1397 no segments)
1398
1399 Optionally accepts a ``max_page_size`` parameter, which should be an
1400 integer count of the maximum number of items to retrieve
1401 **per-request**. This is useful in making faster requests & prevent
1402 the scan from drowning out other queries. (Default: ``None`` -
1403 fetch as many as DynamoDB will return)
1404
1405 Optionally accepts an ``attributes`` parameter, which should be a
1406 tuple. If you provide any attributes only these will be fetched
1407 from DynamoDB. This uses the ``AttributesToGet`` and set's
1408 ``Select`` to ``SPECIFIC_ATTRIBUTES`` API.
1409
1410 Returns a ``ResultSet``, which transparently handles the pagination of
1411 results you get back.
1412
1413 Example::
1414
1415 # All results.
1416 >>> everything = users.scan()
1417
1418 # Look for last names beginning with "D".
1419 >>> results = users.scan(last_name__beginswith='D')
1420 >>> for res in results:
1421 ... print res['first_name']
1422 'Alice'
1423 'John'
1424 'Jane'
1425
1426 # Use an ``IN`` filter & limit.
1427 >>> results = users.scan(
1428 ... age__in=[25, 26, 27, 28, 29],
1429 ... limit=1
1430 ... )
1431 >>> for res in results:
1432 ... print res['first_name']
1433 'Alice'
1434
1435 """
1436 results = ResultSet(
1437 max_page_size=max_page_size
1438 )
1439 kwargs = filter_kwargs.copy()
1440 kwargs.update({
1441 'limit': limit,
1442 'segment': segment,
1443 'total_segments': total_segments,
1444 'attributes': attributes,
1445 'conditional_operator': conditional_operator,
1446 })
1447 results.to_call(self._scan, **kwargs)
1448 return results
1449
1450 def _scan(self, limit=None, exclusive_start_key=None, segment=None,
1451 total_segments=None, attributes=None, conditional_operator=None,
1452 **filter_kwargs):
1453 """
1454 The internal method that performs the actual scan. Used extensively
1455 by ``ResultSet`` to perform each (paginated) request.
1456 """
1457 kwargs = {
1458 'limit': limit,
1459 'segment': segment,
1460 'total_segments': total_segments,
1461 'attributes_to_get': attributes,
1462 'conditional_operator': conditional_operator,
1463 }
1464
1465 if exclusive_start_key:
1466 kwargs['exclusive_start_key'] = {}
1467
1468 for key, value in exclusive_start_key.items():
1469 kwargs['exclusive_start_key'][key] = \
1470 self._dynamizer.encode(value)
1471
1472 # Convert the filters into something we can actually use.
1473 kwargs['scan_filter'] = self._build_filters(
1474 filter_kwargs,
1475 using=FILTER_OPERATORS
1476 )
1477
1478 raw_results = self.connection.scan(
1479 self.table_name,
1480 **kwargs
1481 )
1482 results = []
1483 last_key = None
1484
1485 for raw_item in raw_results.get('Items', []):
1486 item = Item(self)
1487 item.load({
1488 'Item': raw_item,
1489 })
1490 results.append(item)
1491
1492 if raw_results.get('LastEvaluatedKey', None):
1493 last_key = {}
1494
1495 for key, value in raw_results['LastEvaluatedKey'].items():
1496 last_key[key] = self._dynamizer.decode(value)
1497
1498 return {
1499 'results': results,
1500 'last_key': last_key,
1501 }
1502
1503 def batch_get(self, keys, consistent=False, attributes=None):
1504 """
1505 Fetches many specific items in batch from a table.
1506
1507 Requires a ``keys`` parameter, which should be a list of dictionaries.
1508 Each dictionary should consist of the keys values to specify.
1509
1510 Optionally accepts a ``consistent`` parameter, which should be a
1511 boolean. If you provide ``True``, a strongly consistent read will be
1512 used. (Default: False)
1513
1514 Optionally accepts an ``attributes`` parameter, which should be a
1515 tuple. If you provide any attributes only these will be fetched
1516 from DynamoDB.
1517
1518 Returns a ``ResultSet``, which transparently handles the pagination of
1519 results you get back.
1520
1521 Example::
1522
1523 >>> results = users.batch_get(keys=[
1524 ... {
1525 ... 'username': 'johndoe',
1526 ... },
1527 ... {
1528 ... 'username': 'jane',
1529 ... },
1530 ... {
1531 ... 'username': 'fred',
1532 ... },
1533 ... ])
1534 >>> for res in results:
1535 ... print res['first_name']
1536 'John'
1537 'Jane'
1538 'Fred'
1539
1540 """
1541 # We pass the keys to the constructor instead, so it can maintain it's
1542 # own internal state as to what keys have been processed.
1543 results = BatchGetResultSet(keys=keys, max_batch_get=self.max_batch_get)
1544 results.to_call(self._batch_get, consistent=consistent, attributes=attributes)
1545 return results
1546
1547 def _batch_get(self, keys, consistent=False, attributes=None):
1548 """
1549 The internal method that performs the actual batch get. Used extensively
1550 by ``BatchGetResultSet`` to perform each (paginated) request.
1551 """
1552 items = {
1553 self.table_name: {
1554 'Keys': [],
1555 },
1556 }
1557
1558 if consistent:
1559 items[self.table_name]['ConsistentRead'] = True
1560
1561 if attributes is not None:
1562 items[self.table_name]['AttributesToGet'] = attributes
1563
1564 for key_data in keys:
1565 raw_key = {}
1566
1567 for key, value in key_data.items():
1568 raw_key[key] = self._dynamizer.encode(value)
1569
1570 items[self.table_name]['Keys'].append(raw_key)
1571
1572 raw_results = self.connection.batch_get_item(request_items=items)
1573 results = []
1574 unprocessed_keys = []
1575
1576 for raw_item in raw_results['Responses'].get(self.table_name, []):
1577 item = Item(self)
1578 item.load({
1579 'Item': raw_item,
1580 })
1581 results.append(item)
1582
1583 raw_unproccessed = raw_results.get('UnprocessedKeys', {})
1584
1585 for raw_key in raw_unproccessed.get('Keys', []):
1586 py_key = {}
1587
1588 for key, value in raw_key.items():
1589 py_key[key] = self._dynamizer.decode(value)
1590
1591 unprocessed_keys.append(py_key)
1592
1593 return {
1594 'results': results,
1595 # NEVER return a ``last_key``. Just in-case any part of
1596 # ``ResultSet`` peeks through, since much of the
1597 # original underlying implementation is based on this key.
1598 'last_key': None,
1599 'unprocessed_keys': unprocessed_keys,
1600 }
1601
1602 def count(self):
1603 """
1604 Returns a (very) eventually consistent count of the number of items
1605 in a table.
1606
1607 Lag time is about 6 hours, so don't expect a high degree of accuracy.
1608
1609 Example::
1610
1611 >>> users.count()
1612 6
1613
1614 """
1615 info = self.describe()
1616 return info['Table'].get('ItemCount', 0)
1617
1618
1619 class BatchTable(object):
1620 """
1621 Used by ``Table`` as the context manager for batch writes.
1622
1623 You likely don't want to try to use this object directly.
1624 """
1625 def __init__(self, table):
1626 self.table = table
1627 self._to_put = []
1628 self._to_delete = []
1629 self._unprocessed = []
1630
1631 def __enter__(self):
1632 return self
1633
1634 def __exit__(self, type, value, traceback):
1635 if self._to_put or self._to_delete:
1636 # Flush anything that's left.
1637 self.flush()
1638
1639 if self._unprocessed:
1640 # Finally, handle anything that wasn't processed.
1641 self.resend_unprocessed()
1642
1643 def put_item(self, data, overwrite=False):
1644 self._to_put.append(data)
1645
1646 if self.should_flush():
1647 self.flush()
1648
1649 def delete_item(self, **kwargs):
1650 self._to_delete.append(kwargs)
1651
1652 if self.should_flush():
1653 self.flush()
1654
1655 def should_flush(self):
1656 if len(self._to_put) + len(self._to_delete) == 25:
1657 return True
1658
1659 return False
1660
1661 def flush(self):
1662 batch_data = {
1663 self.table.table_name: [
1664 # We'll insert data here shortly.
1665 ],
1666 }
1667
1668 for put in self._to_put:
1669 item = Item(self.table, data=put)
1670 batch_data[self.table.table_name].append({
1671 'PutRequest': {
1672 'Item': item.prepare_full(),
1673 }
1674 })
1675
1676 for delete in self._to_delete:
1677 batch_data[self.table.table_name].append({
1678 'DeleteRequest': {
1679 'Key': self.table._encode_keys(delete),
1680 }
1681 })
1682
1683 resp = self.table.connection.batch_write_item(batch_data)
1684 self.handle_unprocessed(resp)
1685
1686 self._to_put = []
1687 self._to_delete = []
1688 return True
1689
1690 def handle_unprocessed(self, resp):
1691 if len(resp.get('UnprocessedItems', [])):
1692 table_name = self.table.table_name
1693 unprocessed = resp['UnprocessedItems'].get(table_name, [])
1694
1695 # Some items have not been processed. Stow them for now &
1696 # re-attempt processing on ``__exit__``.
1697 msg = "%s items were unprocessed. Storing for later."
1698 boto.log.info(msg % len(unprocessed))
1699 self._unprocessed.extend(unprocessed)
1700
1701 def resend_unprocessed(self):
1702 # If there are unprocessed records (for instance, the user was over
1703 # their throughput limitations), iterate over them & send until they're
1704 # all there.
1705 boto.log.info(
1706 "Re-sending %s unprocessed items." % len(self._unprocessed)
1707 )
1708
1709 while len(self._unprocessed):
1710 # Again, do 25 at a time.
1711 to_resend = self._unprocessed[:25]
1712 # Remove them from the list.
1713 self._unprocessed = self._unprocessed[25:]
1714 batch_data = {
1715 self.table.table_name: to_resend
1716 }
1717 boto.log.info("Sending %s items" % len(to_resend))
1718 resp = self.table.connection.batch_write_item(batch_data)
1719 self.handle_unprocessed(resp)
1720 boto.log.info(
1721 "%s unprocessed items left" % len(self._unprocessed)
1722 )