Mercurial > repos > bcclaywell > argo_navis
comparison venv/lib/python2.7/site-packages/boto/dynamodb2/table.py @ 0:d67268158946 draft
planemo upload commit a3f181f5f126803c654b3a66dd4e83a48f7e203b
author | bcclaywell |
---|---|
date | Mon, 12 Oct 2015 17:43:33 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:d67268158946 |
---|---|
1 import boto | |
2 from boto.dynamodb2 import exceptions | |
3 from boto.dynamodb2.fields import (HashKey, RangeKey, | |
4 AllIndex, KeysOnlyIndex, IncludeIndex, | |
5 GlobalAllIndex, GlobalKeysOnlyIndex, | |
6 GlobalIncludeIndex) | |
7 from boto.dynamodb2.items import Item | |
8 from boto.dynamodb2.layer1 import DynamoDBConnection | |
9 from boto.dynamodb2.results import ResultSet, BatchGetResultSet | |
10 from boto.dynamodb2.types import (NonBooleanDynamizer, Dynamizer, FILTER_OPERATORS, | |
11 QUERY_OPERATORS, STRING) | |
12 from boto.exception import JSONResponseError | |
13 | |
14 | |
15 class Table(object): | |
16 """ | |
17 Interacts & models the behavior of a DynamoDB table. | |
18 | |
19 The ``Table`` object represents a set (or rough categorization) of | |
20 records within DynamoDB. The important part is that all records within the | |
21 table, while largely-schema-free, share the same schema & are essentially | |
22 namespaced for use in your application. For example, you might have a | |
23 ``users`` table or a ``forums`` table. | |
24 """ | |
25 max_batch_get = 100 | |
26 | |
27 _PROJECTION_TYPE_TO_INDEX = dict( | |
28 global_indexes=dict( | |
29 ALL=GlobalAllIndex, | |
30 KEYS_ONLY=GlobalKeysOnlyIndex, | |
31 INCLUDE=GlobalIncludeIndex, | |
32 ), local_indexes=dict( | |
33 ALL=AllIndex, | |
34 KEYS_ONLY=KeysOnlyIndex, | |
35 INCLUDE=IncludeIndex, | |
36 ) | |
37 ) | |
38 | |
39 def __init__(self, table_name, schema=None, throughput=None, indexes=None, | |
40 global_indexes=None, connection=None): | |
41 """ | |
42 Sets up a new in-memory ``Table``. | |
43 | |
44 This is useful if the table already exists within DynamoDB & you simply | |
45 want to use it for additional interactions. The only required parameter | |
46 is the ``table_name``. However, under the hood, the object will call | |
47 ``describe_table`` to determine the schema/indexes/throughput. You | |
48 can avoid this extra call by passing in ``schema`` & ``indexes``. | |
49 | |
50 **IMPORTANT** - If you're creating a new ``Table`` for the first time, | |
51 you should use the ``Table.create`` method instead, as it will | |
52 persist the table structure to DynamoDB. | |
53 | |
54 Requires a ``table_name`` parameter, which should be a simple string | |
55 of the name of the table. | |
56 | |
57 Optionally accepts a ``schema`` parameter, which should be a list of | |
58 ``BaseSchemaField`` subclasses representing the desired schema. | |
59 | |
60 Optionally accepts a ``throughput`` parameter, which should be a | |
61 dictionary. If provided, it should specify a ``read`` & ``write`` key, | |
62 both of which should have an integer value associated with them. | |
63 | |
64 Optionally accepts a ``indexes`` parameter, which should be a list of | |
65 ``BaseIndexField`` subclasses representing the desired indexes. | |
66 | |
67 Optionally accepts a ``global_indexes`` parameter, which should be a | |
68 list of ``GlobalBaseIndexField`` subclasses representing the desired | |
69 indexes. | |
70 | |
71 Optionally accepts a ``connection`` parameter, which should be a | |
72 ``DynamoDBConnection`` instance (or subclass). This is primarily useful | |
73 for specifying alternate connection parameters. | |
74 | |
75 Example:: | |
76 | |
77 # The simple, it-already-exists case. | |
78 >>> conn = Table('users') | |
79 | |
80 # The full, minimum-extra-calls case. | |
81 >>> from boto import dynamodb2 | |
82 >>> users = Table('users', schema=[ | |
83 ... HashKey('username'), | |
84 ... RangeKey('date_joined', data_type=NUMBER) | |
85 ... ], throughput={ | |
86 ... 'read':20, | |
87 ... 'write': 10, | |
88 ... }, indexes=[ | |
89 ... KeysOnlyIndex('MostRecentlyJoined', parts=[ | |
90 ... HashKey('username') | |
91 ... RangeKey('date_joined') | |
92 ... ]), | |
93 ... ], global_indexes=[ | |
94 ... GlobalAllIndex('UsersByZipcode', parts=[ | |
95 ... HashKey('zipcode'), | |
96 ... RangeKey('username'), | |
97 ... ], | |
98 ... throughput={ | |
99 ... 'read':10, | |
100 ... 'write":10, | |
101 ... }), | |
102 ... ], connection=dynamodb2.connect_to_region('us-west-2', | |
103 ... aws_access_key_id='key', | |
104 ... aws_secret_access_key='key', | |
105 ... )) | |
106 | |
107 """ | |
108 self.table_name = table_name | |
109 self.connection = connection | |
110 self.throughput = { | |
111 'read': 5, | |
112 'write': 5, | |
113 } | |
114 self.schema = schema | |
115 self.indexes = indexes | |
116 self.global_indexes = global_indexes | |
117 | |
118 if self.connection is None: | |
119 self.connection = DynamoDBConnection() | |
120 | |
121 if throughput is not None: | |
122 self.throughput = throughput | |
123 | |
124 self._dynamizer = NonBooleanDynamizer() | |
125 | |
126 def use_boolean(self): | |
127 self._dynamizer = Dynamizer() | |
128 | |
129 @classmethod | |
130 def create(cls, table_name, schema, throughput=None, indexes=None, | |
131 global_indexes=None, connection=None): | |
132 """ | |
133 Creates a new table in DynamoDB & returns an in-memory ``Table`` object. | |
134 | |
135 This will setup a brand new table within DynamoDB. The ``table_name`` | |
136 must be unique for your AWS account. The ``schema`` is also required | |
137 to define the key structure of the table. | |
138 | |
139 **IMPORTANT** - You should consider the usage pattern of your table | |
140 up-front, as the schema can **NOT** be modified once the table is | |
141 created, requiring the creation of a new table & migrating the data | |
142 should you wish to revise it. | |
143 | |
144 **IMPORTANT** - If the table already exists in DynamoDB, additional | |
145 calls to this method will result in an error. If you just need | |
146 a ``Table`` object to interact with the existing table, you should | |
147 just initialize a new ``Table`` object, which requires only the | |
148 ``table_name``. | |
149 | |
150 Requires a ``table_name`` parameter, which should be a simple string | |
151 of the name of the table. | |
152 | |
153 Requires a ``schema`` parameter, which should be a list of | |
154 ``BaseSchemaField`` subclasses representing the desired schema. | |
155 | |
156 Optionally accepts a ``throughput`` parameter, which should be a | |
157 dictionary. If provided, it should specify a ``read`` & ``write`` key, | |
158 both of which should have an integer value associated with them. | |
159 | |
160 Optionally accepts a ``indexes`` parameter, which should be a list of | |
161 ``BaseIndexField`` subclasses representing the desired indexes. | |
162 | |
163 Optionally accepts a ``global_indexes`` parameter, which should be a | |
164 list of ``GlobalBaseIndexField`` subclasses representing the desired | |
165 indexes. | |
166 | |
167 Optionally accepts a ``connection`` parameter, which should be a | |
168 ``DynamoDBConnection`` instance (or subclass). This is primarily useful | |
169 for specifying alternate connection parameters. | |
170 | |
171 Example:: | |
172 | |
173 >>> users = Table.create('users', schema=[ | |
174 ... HashKey('username'), | |
175 ... RangeKey('date_joined', data_type=NUMBER) | |
176 ... ], throughput={ | |
177 ... 'read':20, | |
178 ... 'write': 10, | |
179 ... }, indexes=[ | |
180 ... KeysOnlyIndex('MostRecentlyJoined', parts=[ | |
181 ... RangeKey('date_joined') | |
182 ... ]), global_indexes=[ | |
183 ... GlobalAllIndex('UsersByZipcode', parts=[ | |
184 ... HashKey('zipcode'), | |
185 ... RangeKey('username'), | |
186 ... ], | |
187 ... throughput={ | |
188 ... 'read':10, | |
189 ... 'write':10, | |
190 ... }), | |
191 ... ]) | |
192 | |
193 """ | |
194 table = cls(table_name=table_name, connection=connection) | |
195 table.schema = schema | |
196 | |
197 if throughput is not None: | |
198 table.throughput = throughput | |
199 | |
200 if indexes is not None: | |
201 table.indexes = indexes | |
202 | |
203 if global_indexes is not None: | |
204 table.global_indexes = global_indexes | |
205 | |
206 # Prep the schema. | |
207 raw_schema = [] | |
208 attr_defs = [] | |
209 seen_attrs = set() | |
210 | |
211 for field in table.schema: | |
212 raw_schema.append(field.schema()) | |
213 # Build the attributes off what we know. | |
214 seen_attrs.add(field.name) | |
215 attr_defs.append(field.definition()) | |
216 | |
217 raw_throughput = { | |
218 'ReadCapacityUnits': int(table.throughput['read']), | |
219 'WriteCapacityUnits': int(table.throughput['write']), | |
220 } | |
221 kwargs = {} | |
222 | |
223 kwarg_map = { | |
224 'indexes': 'local_secondary_indexes', | |
225 'global_indexes': 'global_secondary_indexes', | |
226 } | |
227 for index_attr in ('indexes', 'global_indexes'): | |
228 table_indexes = getattr(table, index_attr) | |
229 if table_indexes: | |
230 raw_indexes = [] | |
231 for index_field in table_indexes: | |
232 raw_indexes.append(index_field.schema()) | |
233 # Make sure all attributes specified in the indexes are | |
234 # added to the definition | |
235 for field in index_field.parts: | |
236 if field.name not in seen_attrs: | |
237 seen_attrs.add(field.name) | |
238 attr_defs.append(field.definition()) | |
239 | |
240 kwargs[kwarg_map[index_attr]] = raw_indexes | |
241 | |
242 table.connection.create_table( | |
243 table_name=table.table_name, | |
244 attribute_definitions=attr_defs, | |
245 key_schema=raw_schema, | |
246 provisioned_throughput=raw_throughput, | |
247 **kwargs | |
248 ) | |
249 return table | |
250 | |
251 def _introspect_schema(self, raw_schema, raw_attributes=None): | |
252 """ | |
253 Given a raw schema structure back from a DynamoDB response, parse | |
254 out & build the high-level Python objects that represent them. | |
255 """ | |
256 schema = [] | |
257 sane_attributes = {} | |
258 | |
259 if raw_attributes: | |
260 for field in raw_attributes: | |
261 sane_attributes[field['AttributeName']] = field['AttributeType'] | |
262 | |
263 for field in raw_schema: | |
264 data_type = sane_attributes.get(field['AttributeName'], STRING) | |
265 | |
266 if field['KeyType'] == 'HASH': | |
267 schema.append( | |
268 HashKey(field['AttributeName'], data_type=data_type) | |
269 ) | |
270 elif field['KeyType'] == 'RANGE': | |
271 schema.append( | |
272 RangeKey(field['AttributeName'], data_type=data_type) | |
273 ) | |
274 else: | |
275 raise exceptions.UnknownSchemaFieldError( | |
276 "%s was seen, but is unknown. Please report this at " | |
277 "https://github.com/boto/boto/issues." % field['KeyType'] | |
278 ) | |
279 | |
280 return schema | |
281 | |
282 def _introspect_all_indexes(self, raw_indexes, map_indexes_projection): | |
283 """ | |
284 Given a raw index/global index structure back from a DynamoDB response, | |
285 parse out & build the high-level Python objects that represent them. | |
286 """ | |
287 indexes = [] | |
288 | |
289 for field in raw_indexes: | |
290 index_klass = map_indexes_projection.get('ALL') | |
291 kwargs = { | |
292 'parts': [] | |
293 } | |
294 | |
295 if field['Projection']['ProjectionType'] == 'ALL': | |
296 index_klass = map_indexes_projection.get('ALL') | |
297 elif field['Projection']['ProjectionType'] == 'KEYS_ONLY': | |
298 index_klass = map_indexes_projection.get('KEYS_ONLY') | |
299 elif field['Projection']['ProjectionType'] == 'INCLUDE': | |
300 index_klass = map_indexes_projection.get('INCLUDE') | |
301 kwargs['includes'] = field['Projection']['NonKeyAttributes'] | |
302 else: | |
303 raise exceptions.UnknownIndexFieldError( | |
304 "%s was seen, but is unknown. Please report this at " | |
305 "https://github.com/boto/boto/issues." % \ | |
306 field['Projection']['ProjectionType'] | |
307 ) | |
308 | |
309 name = field['IndexName'] | |
310 kwargs['parts'] = self._introspect_schema(field['KeySchema'], None) | |
311 indexes.append(index_klass(name, **kwargs)) | |
312 | |
313 return indexes | |
314 | |
315 def _introspect_indexes(self, raw_indexes): | |
316 """ | |
317 Given a raw index structure back from a DynamoDB response, parse | |
318 out & build the high-level Python objects that represent them. | |
319 """ | |
320 return self._introspect_all_indexes( | |
321 raw_indexes, self._PROJECTION_TYPE_TO_INDEX.get('local_indexes')) | |
322 | |
323 def _introspect_global_indexes(self, raw_global_indexes): | |
324 """ | |
325 Given a raw global index structure back from a DynamoDB response, parse | |
326 out & build the high-level Python objects that represent them. | |
327 """ | |
328 return self._introspect_all_indexes( | |
329 raw_global_indexes, | |
330 self._PROJECTION_TYPE_TO_INDEX.get('global_indexes')) | |
331 | |
332 def describe(self): | |
333 """ | |
334 Describes the current structure of the table in DynamoDB. | |
335 | |
336 This information will be used to update the ``schema``, ``indexes``, | |
337 ``global_indexes`` and ``throughput`` information on the ``Table``. Some | |
338 calls, such as those involving creating keys or querying, will require | |
339 this information to be populated. | |
340 | |
341 It also returns the full raw data structure from DynamoDB, in the | |
342 event you'd like to parse out additional information (such as the | |
343 ``ItemCount`` or usage information). | |
344 | |
345 Example:: | |
346 | |
347 >>> users.describe() | |
348 { | |
349 # Lots of keys here... | |
350 } | |
351 >>> len(users.schema) | |
352 2 | |
353 | |
354 """ | |
355 result = self.connection.describe_table(self.table_name) | |
356 | |
357 # Blindly update throughput, since what's on DynamoDB's end is likely | |
358 # more correct. | |
359 raw_throughput = result['Table']['ProvisionedThroughput'] | |
360 self.throughput['read'] = int(raw_throughput['ReadCapacityUnits']) | |
361 self.throughput['write'] = int(raw_throughput['WriteCapacityUnits']) | |
362 | |
363 if not self.schema: | |
364 # Since we have the data, build the schema. | |
365 raw_schema = result['Table'].get('KeySchema', []) | |
366 raw_attributes = result['Table'].get('AttributeDefinitions', []) | |
367 self.schema = self._introspect_schema(raw_schema, raw_attributes) | |
368 | |
369 if not self.indexes: | |
370 # Build the index information as well. | |
371 raw_indexes = result['Table'].get('LocalSecondaryIndexes', []) | |
372 self.indexes = self._introspect_indexes(raw_indexes) | |
373 | |
374 # Build the global index information as well. | |
375 raw_global_indexes = result['Table'].get('GlobalSecondaryIndexes', []) | |
376 self.global_indexes = self._introspect_global_indexes(raw_global_indexes) | |
377 | |
378 # This is leaky. | |
379 return result | |
380 | |
381 def update(self, throughput=None, global_indexes=None): | |
382 """ | |
383 Updates table attributes and global indexes in DynamoDB. | |
384 | |
385 Optionally accepts a ``throughput`` parameter, which should be a | |
386 dictionary. If provided, it should specify a ``read`` & ``write`` key, | |
387 both of which should have an integer value associated with them. | |
388 | |
389 Optionally accepts a ``global_indexes`` parameter, which should be a | |
390 dictionary. If provided, it should specify the index name, which is also | |
391 a dict containing a ``read`` & ``write`` key, both of which | |
392 should have an integer value associated with them. If you are writing | |
393 new code, please use ``Table.update_global_secondary_index``. | |
394 | |
395 Returns ``True`` on success. | |
396 | |
397 Example:: | |
398 | |
399 # For a read-heavier application... | |
400 >>> users.update(throughput={ | |
401 ... 'read': 20, | |
402 ... 'write': 10, | |
403 ... }) | |
404 True | |
405 | |
406 # To also update the global index(es) throughput. | |
407 >>> users.update(throughput={ | |
408 ... 'read': 20, | |
409 ... 'write': 10, | |
410 ... }, | |
411 ... global_secondary_indexes={ | |
412 ... 'TheIndexNameHere': { | |
413 ... 'read': 15, | |
414 ... 'write': 5, | |
415 ... } | |
416 ... }) | |
417 True | |
418 """ | |
419 | |
420 data = None | |
421 | |
422 if throughput: | |
423 self.throughput = throughput | |
424 data = { | |
425 'ReadCapacityUnits': int(self.throughput['read']), | |
426 'WriteCapacityUnits': int(self.throughput['write']), | |
427 } | |
428 | |
429 gsi_data = None | |
430 | |
431 if global_indexes: | |
432 gsi_data = [] | |
433 | |
434 for gsi_name, gsi_throughput in global_indexes.items(): | |
435 gsi_data.append({ | |
436 "Update": { | |
437 "IndexName": gsi_name, | |
438 "ProvisionedThroughput": { | |
439 "ReadCapacityUnits": int(gsi_throughput['read']), | |
440 "WriteCapacityUnits": int(gsi_throughput['write']), | |
441 }, | |
442 }, | |
443 }) | |
444 | |
445 if throughput or global_indexes: | |
446 self.connection.update_table( | |
447 self.table_name, | |
448 provisioned_throughput=data, | |
449 global_secondary_index_updates=gsi_data, | |
450 ) | |
451 | |
452 return True | |
453 else: | |
454 msg = 'You need to provide either the throughput or the ' \ | |
455 'global_indexes to update method' | |
456 boto.log.error(msg) | |
457 | |
458 return False | |
459 | |
460 def create_global_secondary_index(self, global_index): | |
461 """ | |
462 Creates a global index in DynamoDB after the table has been created. | |
463 | |
464 Requires a ``global_indexes`` parameter, which should be a | |
465 ``GlobalBaseIndexField`` subclass representing the desired index. | |
466 | |
467 To update ``global_indexes`` information on the ``Table``, you'll need | |
468 to call ``Table.describe``. | |
469 | |
470 Returns ``True`` on success. | |
471 | |
472 Example:: | |
473 | |
474 # To create a global index | |
475 >>> users.create_global_secondary_index( | |
476 ... global_index=GlobalAllIndex( | |
477 ... 'TheIndexNameHere', parts=[ | |
478 ... HashKey('requiredHashkey', data_type=STRING), | |
479 ... RangeKey('optionalRangeKey', data_type=STRING) | |
480 ... ], | |
481 ... throughput={ | |
482 ... 'read': 2, | |
483 ... 'write': 1, | |
484 ... }) | |
485 ... ) | |
486 True | |
487 | |
488 """ | |
489 | |
490 if global_index: | |
491 gsi_data = [] | |
492 gsi_data_attr_def = [] | |
493 | |
494 gsi_data.append({ | |
495 "Create": global_index.schema() | |
496 }) | |
497 | |
498 for attr_def in global_index.parts: | |
499 gsi_data_attr_def.append(attr_def.definition()) | |
500 | |
501 self.connection.update_table( | |
502 self.table_name, | |
503 global_secondary_index_updates=gsi_data, | |
504 attribute_definitions=gsi_data_attr_def | |
505 ) | |
506 | |
507 return True | |
508 else: | |
509 msg = 'You need to provide the global_index to ' \ | |
510 'create_global_secondary_index method' | |
511 boto.log.error(msg) | |
512 | |
513 return False | |
514 | |
515 def delete_global_secondary_index(self, global_index_name): | |
516 """ | |
517 Deletes a global index in DynamoDB after the table has been created. | |
518 | |
519 Requires a ``global_index_name`` parameter, which should be a simple | |
520 string of the name of the global secondary index. | |
521 | |
522 To update ``global_indexes`` information on the ``Table``, you'll need | |
523 to call ``Table.describe``. | |
524 | |
525 Returns ``True`` on success. | |
526 | |
527 Example:: | |
528 | |
529 # To delete a global index | |
530 >>> users.delete_global_secondary_index('TheIndexNameHere') | |
531 True | |
532 | |
533 """ | |
534 | |
535 if global_index_name: | |
536 gsi_data = [ | |
537 { | |
538 "Delete": { | |
539 "IndexName": global_index_name | |
540 } | |
541 } | |
542 ] | |
543 | |
544 self.connection.update_table( | |
545 self.table_name, | |
546 global_secondary_index_updates=gsi_data, | |
547 ) | |
548 | |
549 return True | |
550 else: | |
551 msg = 'You need to provide the global index name to ' \ | |
552 'delete_global_secondary_index method' | |
553 boto.log.error(msg) | |
554 | |
555 return False | |
556 | |
557 def update_global_secondary_index(self, global_indexes): | |
558 """ | |
559 Updates a global index(es) in DynamoDB after the table has been created. | |
560 | |
561 Requires a ``global_indexes`` parameter, which should be a | |
562 dictionary. If provided, it should specify the index name, which is also | |
563 a dict containing a ``read`` & ``write`` key, both of which | |
564 should have an integer value associated with them. | |
565 | |
566 To update ``global_indexes`` information on the ``Table``, you'll need | |
567 to call ``Table.describe``. | |
568 | |
569 Returns ``True`` on success. | |
570 | |
571 Example:: | |
572 | |
573 # To update a global index | |
574 >>> users.update_global_secondary_index(global_indexes={ | |
575 ... 'TheIndexNameHere': { | |
576 ... 'read': 15, | |
577 ... 'write': 5, | |
578 ... } | |
579 ... }) | |
580 True | |
581 | |
582 """ | |
583 | |
584 if global_indexes: | |
585 gsi_data = [] | |
586 | |
587 for gsi_name, gsi_throughput in global_indexes.items(): | |
588 gsi_data.append({ | |
589 "Update": { | |
590 "IndexName": gsi_name, | |
591 "ProvisionedThroughput": { | |
592 "ReadCapacityUnits": int(gsi_throughput['read']), | |
593 "WriteCapacityUnits": int(gsi_throughput['write']), | |
594 }, | |
595 }, | |
596 }) | |
597 | |
598 self.connection.update_table( | |
599 self.table_name, | |
600 global_secondary_index_updates=gsi_data, | |
601 ) | |
602 return True | |
603 else: | |
604 msg = 'You need to provide the global indexes to ' \ | |
605 'update_global_secondary_index method' | |
606 boto.log.error(msg) | |
607 | |
608 return False | |
609 | |
610 def delete(self): | |
611 """ | |
612 Deletes a table in DynamoDB. | |
613 | |
614 **IMPORTANT** - Be careful when using this method, there is no undo. | |
615 | |
616 Returns ``True`` on success. | |
617 | |
618 Example:: | |
619 | |
620 >>> users.delete() | |
621 True | |
622 | |
623 """ | |
624 self.connection.delete_table(self.table_name) | |
625 return True | |
626 | |
627 def _encode_keys(self, keys): | |
628 """ | |
629 Given a flat Python dictionary of keys/values, converts it into the | |
630 nested dictionary DynamoDB expects. | |
631 | |
632 Converts:: | |
633 | |
634 { | |
635 'username': 'john', | |
636 'tags': [1, 2, 5], | |
637 } | |
638 | |
639 ...to...:: | |
640 | |
641 { | |
642 'username': {'S': 'john'}, | |
643 'tags': {'NS': ['1', '2', '5']}, | |
644 } | |
645 | |
646 """ | |
647 raw_key = {} | |
648 | |
649 for key, value in keys.items(): | |
650 raw_key[key] = self._dynamizer.encode(value) | |
651 | |
652 return raw_key | |
653 | |
654 def get_item(self, consistent=False, attributes=None, **kwargs): | |
655 """ | |
656 Fetches an item (record) from a table in DynamoDB. | |
657 | |
658 To specify the key of the item you'd like to get, you can specify the | |
659 key attributes as kwargs. | |
660 | |
661 Optionally accepts a ``consistent`` parameter, which should be a | |
662 boolean. If you provide ``True``, it will perform | |
663 a consistent (but more expensive) read from DynamoDB. | |
664 (Default: ``False``) | |
665 | |
666 Optionally accepts an ``attributes`` parameter, which should be a | |
667 list of fieldname to fetch. (Default: ``None``, which means all fields | |
668 should be fetched) | |
669 | |
670 Returns an ``Item`` instance containing all the data for that record. | |
671 | |
672 Raises an ``ItemNotFound`` exception if the item is not found. | |
673 | |
674 Example:: | |
675 | |
676 # A simple hash key. | |
677 >>> john = users.get_item(username='johndoe') | |
678 >>> john['first_name'] | |
679 'John' | |
680 | |
681 # A complex hash+range key. | |
682 >>> john = users.get_item(username='johndoe', last_name='Doe') | |
683 >>> john['first_name'] | |
684 'John' | |
685 | |
686 # A consistent read (assuming the data might have just changed). | |
687 >>> john = users.get_item(username='johndoe', consistent=True) | |
688 >>> john['first_name'] | |
689 'Johann' | |
690 | |
691 # With a key that is an invalid variable name in Python. | |
692 # Also, assumes a different schema than previous examples. | |
693 >>> john = users.get_item(**{ | |
694 ... 'date-joined': 127549192, | |
695 ... }) | |
696 >>> john['first_name'] | |
697 'John' | |
698 | |
699 """ | |
700 raw_key = self._encode_keys(kwargs) | |
701 item_data = self.connection.get_item( | |
702 self.table_name, | |
703 raw_key, | |
704 attributes_to_get=attributes, | |
705 consistent_read=consistent | |
706 ) | |
707 if 'Item' not in item_data: | |
708 raise exceptions.ItemNotFound("Item %s couldn't be found." % kwargs) | |
709 item = Item(self) | |
710 item.load(item_data) | |
711 return item | |
712 | |
713 def has_item(self, **kwargs): | |
714 """ | |
715 Return whether an item (record) exists within a table in DynamoDB. | |
716 | |
717 To specify the key of the item you'd like to get, you can specify the | |
718 key attributes as kwargs. | |
719 | |
720 Optionally accepts a ``consistent`` parameter, which should be a | |
721 boolean. If you provide ``True``, it will perform | |
722 a consistent (but more expensive) read from DynamoDB. | |
723 (Default: ``False``) | |
724 | |
725 Optionally accepts an ``attributes`` parameter, which should be a | |
726 list of fieldnames to fetch. (Default: ``None``, which means all fields | |
727 should be fetched) | |
728 | |
729 Returns ``True`` if an ``Item`` is present, ``False`` if not. | |
730 | |
731 Example:: | |
732 | |
733 # Simple, just hash-key schema. | |
734 >>> users.has_item(username='johndoe') | |
735 True | |
736 | |
737 # Complex schema, item not present. | |
738 >>> users.has_item( | |
739 ... username='johndoe', | |
740 ... date_joined='2014-01-07' | |
741 ... ) | |
742 False | |
743 | |
744 """ | |
745 try: | |
746 self.get_item(**kwargs) | |
747 except (JSONResponseError, exceptions.ItemNotFound): | |
748 return False | |
749 | |
750 return True | |
751 | |
752 def lookup(self, *args, **kwargs): | |
753 """ | |
754 Look up an entry in DynamoDB. This is mostly backwards compatible | |
755 with boto.dynamodb. Unlike get_item, it takes hash_key and range_key first, | |
756 although you may still specify keyword arguments instead. | |
757 | |
758 Also unlike the get_item command, if the returned item has no keys | |
759 (i.e., it does not exist in DynamoDB), a None result is returned, instead | |
760 of an empty key object. | |
761 | |
762 Example:: | |
763 >>> user = users.lookup(username) | |
764 >>> user = users.lookup(username, consistent=True) | |
765 >>> app = apps.lookup('my_customer_id', 'my_app_id') | |
766 | |
767 """ | |
768 if not self.schema: | |
769 self.describe() | |
770 for x, arg in enumerate(args): | |
771 kwargs[self.schema[x].name] = arg | |
772 ret = self.get_item(**kwargs) | |
773 if not ret.keys(): | |
774 return None | |
775 return ret | |
776 | |
777 def new_item(self, *args): | |
778 """ | |
779 Returns a new, blank item | |
780 | |
781 This is mostly for consistency with boto.dynamodb | |
782 """ | |
783 if not self.schema: | |
784 self.describe() | |
785 data = {} | |
786 for x, arg in enumerate(args): | |
787 data[self.schema[x].name] = arg | |
788 return Item(self, data=data) | |
789 | |
790 def put_item(self, data, overwrite=False): | |
791 """ | |
792 Saves an entire item to DynamoDB. | |
793 | |
794 By default, if any part of the ``Item``'s original data doesn't match | |
795 what's currently in DynamoDB, this request will fail. This prevents | |
796 other processes from updating the data in between when you read the | |
797 item & when your request to update the item's data is processed, which | |
798 would typically result in some data loss. | |
799 | |
800 Requires a ``data`` parameter, which should be a dictionary of the data | |
801 you'd like to store in DynamoDB. | |
802 | |
803 Optionally accepts an ``overwrite`` parameter, which should be a | |
804 boolean. If you provide ``True``, this will tell DynamoDB to blindly | |
805 overwrite whatever data is present, if any. | |
806 | |
807 Returns ``True`` on success. | |
808 | |
809 Example:: | |
810 | |
811 >>> users.put_item(data={ | |
812 ... 'username': 'jane', | |
813 ... 'first_name': 'Jane', | |
814 ... 'last_name': 'Doe', | |
815 ... 'date_joined': 126478915, | |
816 ... }) | |
817 True | |
818 | |
819 """ | |
820 item = Item(self, data=data) | |
821 return item.save(overwrite=overwrite) | |
822 | |
823 def _put_item(self, item_data, expects=None): | |
824 """ | |
825 The internal variant of ``put_item`` (full data). This is used by the | |
826 ``Item`` objects, since that operation is represented at the | |
827 table-level by the API, but conceptually maps better to telling an | |
828 individual ``Item`` to save itself. | |
829 """ | |
830 kwargs = {} | |
831 | |
832 if expects is not None: | |
833 kwargs['expected'] = expects | |
834 | |
835 self.connection.put_item(self.table_name, item_data, **kwargs) | |
836 return True | |
837 | |
838 def _update_item(self, key, item_data, expects=None): | |
839 """ | |
840 The internal variant of ``put_item`` (partial data). This is used by the | |
841 ``Item`` objects, since that operation is represented at the | |
842 table-level by the API, but conceptually maps better to telling an | |
843 individual ``Item`` to save itself. | |
844 """ | |
845 raw_key = self._encode_keys(key) | |
846 kwargs = {} | |
847 | |
848 if expects is not None: | |
849 kwargs['expected'] = expects | |
850 | |
851 self.connection.update_item(self.table_name, raw_key, item_data, **kwargs) | |
852 return True | |
853 | |
854 def delete_item(self, expected=None, conditional_operator=None, **kwargs): | |
855 """ | |
856 Deletes a single item. You can perform a conditional delete operation | |
857 that deletes the item if it exists, or if it has an expected attribute | |
858 value. | |
859 | |
860 Conditional deletes are useful for only deleting items if specific | |
861 conditions are met. If those conditions are met, DynamoDB performs | |
862 the delete. Otherwise, the item is not deleted. | |
863 | |
864 To specify the expected attribute values of the item, you can pass a | |
865 dictionary of conditions to ``expected``. Each condition should follow | |
866 the pattern ``<attributename>__<comparison_operator>=<value_to_expect>``. | |
867 | |
868 **IMPORTANT** - Be careful when using this method, there is no undo. | |
869 | |
870 To specify the key of the item you'd like to get, you can specify the | |
871 key attributes as kwargs. | |
872 | |
873 Optionally accepts an ``expected`` parameter which is a dictionary of | |
874 expected attribute value conditions. | |
875 | |
876 Optionally accepts a ``conditional_operator`` which applies to the | |
877 expected attribute value conditions: | |
878 | |
879 + `AND` - If all of the conditions evaluate to true (default) | |
880 + `OR` - True if at least one condition evaluates to true | |
881 | |
882 Returns ``True`` on success, ``False`` on failed conditional delete. | |
883 | |
884 Example:: | |
885 | |
886 # A simple hash key. | |
887 >>> users.delete_item(username='johndoe') | |
888 True | |
889 | |
890 # A complex hash+range key. | |
891 >>> users.delete_item(username='jane', last_name='Doe') | |
892 True | |
893 | |
894 # With a key that is an invalid variable name in Python. | |
895 # Also, assumes a different schema than previous examples. | |
896 >>> users.delete_item(**{ | |
897 ... 'date-joined': 127549192, | |
898 ... }) | |
899 True | |
900 | |
901 # Conditional delete | |
902 >>> users.delete_item(username='johndoe', | |
903 ... expected={'balance__eq': 0}) | |
904 True | |
905 """ | |
906 expected = self._build_filters(expected, using=FILTER_OPERATORS) | |
907 raw_key = self._encode_keys(kwargs) | |
908 | |
909 try: | |
910 self.connection.delete_item(self.table_name, raw_key, | |
911 expected=expected, | |
912 conditional_operator=conditional_operator) | |
913 except exceptions.ConditionalCheckFailedException: | |
914 return False | |
915 | |
916 return True | |
917 | |
918 def get_key_fields(self): | |
919 """ | |
920 Returns the fields necessary to make a key for a table. | |
921 | |
922 If the ``Table`` does not already have a populated ``schema``, | |
923 this will request it via a ``Table.describe`` call. | |
924 | |
925 Returns a list of fieldnames (strings). | |
926 | |
927 Example:: | |
928 | |
929 # A simple hash key. | |
930 >>> users.get_key_fields() | |
931 ['username'] | |
932 | |
933 # A complex hash+range key. | |
934 >>> users.get_key_fields() | |
935 ['username', 'last_name'] | |
936 | |
937 """ | |
938 if not self.schema: | |
939 # We don't know the structure of the table. Get a description to | |
940 # populate the schema. | |
941 self.describe() | |
942 | |
943 return [field.name for field in self.schema] | |
944 | |
945 def batch_write(self): | |
946 """ | |
947 Allows the batching of writes to DynamoDB. | |
948 | |
949 Since each write/delete call to DynamoDB has a cost associated with it, | |
950 when loading lots of data, it makes sense to batch them, creating as | |
951 few calls as possible. | |
952 | |
953 This returns a context manager that will transparently handle creating | |
954 these batches. The object you get back lightly-resembles a ``Table`` | |
955 object, sharing just the ``put_item`` & ``delete_item`` methods | |
956 (which are all that DynamoDB can batch in terms of writing data). | |
957 | |
958 DynamoDB's maximum batch size is 25 items per request. If you attempt | |
959 to put/delete more than that, the context manager will batch as many | |
960 as it can up to that number, then flush them to DynamoDB & continue | |
961 batching as more calls come in. | |
962 | |
963 Example:: | |
964 | |
965 # Assuming a table with one record... | |
966 >>> with users.batch_write() as batch: | |
967 ... batch.put_item(data={ | |
968 ... 'username': 'johndoe', | |
969 ... 'first_name': 'John', | |
970 ... 'last_name': 'Doe', | |
971 ... 'owner': 1, | |
972 ... }) | |
973 ... # Nothing across the wire yet. | |
974 ... batch.delete_item(username='bob') | |
975 ... # Still no requests sent. | |
976 ... batch.put_item(data={ | |
977 ... 'username': 'jane', | |
978 ... 'first_name': 'Jane', | |
979 ... 'last_name': 'Doe', | |
980 ... 'date_joined': 127436192, | |
981 ... }) | |
982 ... # Nothing yet, but once we leave the context, the | |
983 ... # put/deletes will be sent. | |
984 | |
985 """ | |
986 # PHENOMENAL COSMIC DOCS!!! itty-bitty code. | |
987 return BatchTable(self) | |
988 | |
989 def _build_filters(self, filter_kwargs, using=QUERY_OPERATORS): | |
990 """ | |
991 An internal method for taking query/scan-style ``**kwargs`` & turning | |
992 them into the raw structure DynamoDB expects for filtering. | |
993 """ | |
994 if filter_kwargs is None: | |
995 return | |
996 | |
997 filters = {} | |
998 | |
999 for field_and_op, value in filter_kwargs.items(): | |
1000 field_bits = field_and_op.split('__') | |
1001 fieldname = '__'.join(field_bits[:-1]) | |
1002 | |
1003 try: | |
1004 op = using[field_bits[-1]] | |
1005 except KeyError: | |
1006 raise exceptions.UnknownFilterTypeError( | |
1007 "Operator '%s' from '%s' is not recognized." % ( | |
1008 field_bits[-1], | |
1009 field_and_op | |
1010 ) | |
1011 ) | |
1012 | |
1013 lookup = { | |
1014 'AttributeValueList': [], | |
1015 'ComparisonOperator': op, | |
1016 } | |
1017 | |
1018 # Special-case the ``NULL/NOT_NULL`` case. | |
1019 if field_bits[-1] == 'null': | |
1020 del lookup['AttributeValueList'] | |
1021 | |
1022 if value is False: | |
1023 lookup['ComparisonOperator'] = 'NOT_NULL' | |
1024 else: | |
1025 lookup['ComparisonOperator'] = 'NULL' | |
1026 # Special-case the ``BETWEEN`` case. | |
1027 elif field_bits[-1] == 'between': | |
1028 if len(value) == 2 and isinstance(value, (list, tuple)): | |
1029 lookup['AttributeValueList'].append( | |
1030 self._dynamizer.encode(value[0]) | |
1031 ) | |
1032 lookup['AttributeValueList'].append( | |
1033 self._dynamizer.encode(value[1]) | |
1034 ) | |
1035 # Special-case the ``IN`` case | |
1036 elif field_bits[-1] == 'in': | |
1037 for val in value: | |
1038 lookup['AttributeValueList'].append(self._dynamizer.encode(val)) | |
1039 else: | |
1040 # Fix up the value for encoding, because it was built to only work | |
1041 # with ``set``s. | |
1042 if isinstance(value, (list, tuple)): | |
1043 value = set(value) | |
1044 lookup['AttributeValueList'].append( | |
1045 self._dynamizer.encode(value) | |
1046 ) | |
1047 | |
1048 # Finally, insert it into the filters. | |
1049 filters[fieldname] = lookup | |
1050 | |
1051 return filters | |
1052 | |
1053 def query(self, limit=None, index=None, reverse=False, consistent=False, | |
1054 attributes=None, max_page_size=None, **filter_kwargs): | |
1055 """ | |
1056 **WARNING:** This method is provided **strictly** for | |
1057 backward-compatibility. It returns results in an incorrect order. | |
1058 | |
1059 If you are writing new code, please use ``Table.query_2``. | |
1060 """ | |
1061 reverse = not reverse | |
1062 return self.query_2(limit=limit, index=index, reverse=reverse, | |
1063 consistent=consistent, attributes=attributes, | |
1064 max_page_size=max_page_size, **filter_kwargs) | |
1065 | |
1066 def query_2(self, limit=None, index=None, reverse=False, | |
1067 consistent=False, attributes=None, max_page_size=None, | |
1068 query_filter=None, conditional_operator=None, | |
1069 **filter_kwargs): | |
1070 """ | |
1071 Queries for a set of matching items in a DynamoDB table. | |
1072 | |
1073 Queries can be performed against a hash key, a hash+range key or | |
1074 against any data stored in your local secondary indexes. Query filters | |
1075 can be used to filter on arbitrary fields. | |
1076 | |
1077 **Note** - You can not query against arbitrary fields within the data | |
1078 stored in DynamoDB unless you specify ``query_filter`` values. | |
1079 | |
1080 To specify the filters of the items you'd like to get, you can specify | |
1081 the filters as kwargs. Each filter kwarg should follow the pattern | |
1082 ``<fieldname>__<filter_operation>=<value_to_look_for>``. Query filters | |
1083 are specified in the same way. | |
1084 | |
1085 Optionally accepts a ``limit`` parameter, which should be an integer | |
1086 count of the total number of items to return. (Default: ``None`` - | |
1087 all results) | |
1088 | |
1089 Optionally accepts an ``index`` parameter, which should be a string of | |
1090 name of the local secondary index you want to query against. | |
1091 (Default: ``None``) | |
1092 | |
1093 Optionally accepts a ``reverse`` parameter, which will present the | |
1094 results in reverse order. (Default: ``False`` - normal order) | |
1095 | |
1096 Optionally accepts a ``consistent`` parameter, which should be a | |
1097 boolean. If you provide ``True``, it will force a consistent read of | |
1098 the data (more expensive). (Default: ``False`` - use eventually | |
1099 consistent reads) | |
1100 | |
1101 Optionally accepts a ``attributes`` parameter, which should be a | |
1102 tuple. If you provide any attributes only these will be fetched | |
1103 from DynamoDB. This uses the ``AttributesToGet`` and set's | |
1104 ``Select`` to ``SPECIFIC_ATTRIBUTES`` API. | |
1105 | |
1106 Optionally accepts a ``max_page_size`` parameter, which should be an | |
1107 integer count of the maximum number of items to retrieve | |
1108 **per-request**. This is useful in making faster requests & prevent | |
1109 the scan from drowning out other queries. (Default: ``None`` - | |
1110 fetch as many as DynamoDB will return) | |
1111 | |
1112 Optionally accepts a ``query_filter`` which is a dictionary of filter | |
1113 conditions against any arbitrary field in the returned data. | |
1114 | |
1115 Optionally accepts a ``conditional_operator`` which applies to the | |
1116 query filter conditions: | |
1117 | |
1118 + `AND` - True if all filter conditions evaluate to true (default) | |
1119 + `OR` - True if at least one filter condition evaluates to true | |
1120 | |
1121 Returns a ``ResultSet``, which transparently handles the pagination of | |
1122 results you get back. | |
1123 | |
1124 Example:: | |
1125 | |
1126 # Look for last names equal to "Doe". | |
1127 >>> results = users.query(last_name__eq='Doe') | |
1128 >>> for res in results: | |
1129 ... print res['first_name'] | |
1130 'John' | |
1131 'Jane' | |
1132 | |
1133 # Look for last names beginning with "D", in reverse order, limit 3. | |
1134 >>> results = users.query( | |
1135 ... last_name__beginswith='D', | |
1136 ... reverse=True, | |
1137 ... limit=3 | |
1138 ... ) | |
1139 >>> for res in results: | |
1140 ... print res['first_name'] | |
1141 'Alice' | |
1142 'Jane' | |
1143 'John' | |
1144 | |
1145 # Use an LSI & a consistent read. | |
1146 >>> results = users.query( | |
1147 ... date_joined__gte=1236451000, | |
1148 ... owner__eq=1, | |
1149 ... index='DateJoinedIndex', | |
1150 ... consistent=True | |
1151 ... ) | |
1152 >>> for res in results: | |
1153 ... print res['first_name'] | |
1154 'Alice' | |
1155 'Bob' | |
1156 'John' | |
1157 'Fred' | |
1158 | |
1159 # Filter by non-indexed field(s) | |
1160 >>> results = users.query( | |
1161 ... last_name__eq='Doe', | |
1162 ... reverse=True, | |
1163 ... query_filter={ | |
1164 ... 'first_name__beginswith': 'A' | |
1165 ... } | |
1166 ... ) | |
1167 >>> for res in results: | |
1168 ... print res['first_name'] + ' ' + res['last_name'] | |
1169 'Alice Doe' | |
1170 | |
1171 """ | |
1172 if self.schema: | |
1173 if len(self.schema) == 1: | |
1174 if len(filter_kwargs) <= 1: | |
1175 if not self.global_indexes or not len(self.global_indexes): | |
1176 # If the schema only has one field, there's <= 1 filter | |
1177 # param & no Global Secondary Indexes, this is user | |
1178 # error. Bail early. | |
1179 raise exceptions.QueryError( | |
1180 "You must specify more than one key to filter on." | |
1181 ) | |
1182 | |
1183 if attributes is not None: | |
1184 select = 'SPECIFIC_ATTRIBUTES' | |
1185 else: | |
1186 select = None | |
1187 | |
1188 results = ResultSet( | |
1189 max_page_size=max_page_size | |
1190 ) | |
1191 kwargs = filter_kwargs.copy() | |
1192 kwargs.update({ | |
1193 'limit': limit, | |
1194 'index': index, | |
1195 'reverse': reverse, | |
1196 'consistent': consistent, | |
1197 'select': select, | |
1198 'attributes_to_get': attributes, | |
1199 'query_filter': query_filter, | |
1200 'conditional_operator': conditional_operator, | |
1201 }) | |
1202 results.to_call(self._query, **kwargs) | |
1203 return results | |
1204 | |
1205 def query_count(self, index=None, consistent=False, conditional_operator=None, | |
1206 query_filter=None, scan_index_forward=True, limit=None, | |
1207 exclusive_start_key=None, **filter_kwargs): | |
1208 """ | |
1209 Queries the exact count of matching items in a DynamoDB table. | |
1210 | |
1211 Queries can be performed against a hash key, a hash+range key or | |
1212 against any data stored in your local secondary indexes. Query filters | |
1213 can be used to filter on arbitrary fields. | |
1214 | |
1215 To specify the filters of the items you'd like to get, you can specify | |
1216 the filters as kwargs. Each filter kwarg should follow the pattern | |
1217 ``<fieldname>__<filter_operation>=<value_to_look_for>``. Query filters | |
1218 are specified in the same way. | |
1219 | |
1220 Optionally accepts an ``index`` parameter, which should be a string of | |
1221 name of the local secondary index you want to query against. | |
1222 (Default: ``None``) | |
1223 | |
1224 Optionally accepts a ``consistent`` parameter, which should be a | |
1225 boolean. If you provide ``True``, it will force a consistent read of | |
1226 the data (more expensive). (Default: ``False`` - use eventually | |
1227 consistent reads) | |
1228 | |
1229 Optionally accepts a ``query_filter`` which is a dictionary of filter | |
1230 conditions against any arbitrary field in the returned data. | |
1231 | |
1232 Optionally accepts a ``conditional_operator`` which applies to the | |
1233 query filter conditions: | |
1234 | |
1235 + `AND` - True if all filter conditions evaluate to true (default) | |
1236 + `OR` - True if at least one filter condition evaluates to true | |
1237 | |
1238 Optionally accept a ``exclusive_start_key`` which is used to get | |
1239 the remaining items when a query cannot return the complete count. | |
1240 | |
1241 Returns an integer which represents the exact amount of matched | |
1242 items. | |
1243 | |
1244 :type scan_index_forward: boolean | |
1245 :param scan_index_forward: Specifies ascending (true) or descending | |
1246 (false) traversal of the index. DynamoDB returns results reflecting | |
1247 the requested order determined by the range key. If the data type | |
1248 is Number, the results are returned in numeric order. For String, | |
1249 the results are returned in order of ASCII character code values. | |
1250 For Binary, DynamoDB treats each byte of the binary data as | |
1251 unsigned when it compares binary values. | |
1252 | |
1253 If ScanIndexForward is not specified, the results are returned in | |
1254 ascending order. | |
1255 | |
1256 :type limit: integer | |
1257 :param limit: The maximum number of items to evaluate (not necessarily | |
1258 the number of matching items). | |
1259 | |
1260 Example:: | |
1261 | |
1262 # Look for last names equal to "Doe". | |
1263 >>> users.query_count(last_name__eq='Doe') | |
1264 5 | |
1265 | |
1266 # Use an LSI & a consistent read. | |
1267 >>> users.query_count( | |
1268 ... date_joined__gte=1236451000, | |
1269 ... owner__eq=1, | |
1270 ... index='DateJoinedIndex', | |
1271 ... consistent=True | |
1272 ... ) | |
1273 2 | |
1274 | |
1275 """ | |
1276 key_conditions = self._build_filters( | |
1277 filter_kwargs, | |
1278 using=QUERY_OPERATORS | |
1279 ) | |
1280 | |
1281 built_query_filter = self._build_filters( | |
1282 query_filter, | |
1283 using=FILTER_OPERATORS | |
1284 ) | |
1285 | |
1286 count_buffer = 0 | |
1287 last_evaluated_key = exclusive_start_key | |
1288 | |
1289 while True: | |
1290 raw_results = self.connection.query( | |
1291 self.table_name, | |
1292 index_name=index, | |
1293 consistent_read=consistent, | |
1294 select='COUNT', | |
1295 key_conditions=key_conditions, | |
1296 query_filter=built_query_filter, | |
1297 conditional_operator=conditional_operator, | |
1298 limit=limit, | |
1299 scan_index_forward=scan_index_forward, | |
1300 exclusive_start_key=last_evaluated_key | |
1301 ) | |
1302 | |
1303 count_buffer += int(raw_results.get('Count', 0)) | |
1304 last_evaluated_key = raw_results.get('LastEvaluatedKey') | |
1305 if not last_evaluated_key or count_buffer < 1: | |
1306 break | |
1307 | |
1308 return count_buffer | |
1309 | |
1310 def _query(self, limit=None, index=None, reverse=False, consistent=False, | |
1311 exclusive_start_key=None, select=None, attributes_to_get=None, | |
1312 query_filter=None, conditional_operator=None, **filter_kwargs): | |
1313 """ | |
1314 The internal method that performs the actual queries. Used extensively | |
1315 by ``ResultSet`` to perform each (paginated) request. | |
1316 """ | |
1317 kwargs = { | |
1318 'limit': limit, | |
1319 'index_name': index, | |
1320 'consistent_read': consistent, | |
1321 'select': select, | |
1322 'attributes_to_get': attributes_to_get, | |
1323 'conditional_operator': conditional_operator, | |
1324 } | |
1325 | |
1326 if reverse: | |
1327 kwargs['scan_index_forward'] = False | |
1328 | |
1329 if exclusive_start_key: | |
1330 kwargs['exclusive_start_key'] = {} | |
1331 | |
1332 for key, value in exclusive_start_key.items(): | |
1333 kwargs['exclusive_start_key'][key] = \ | |
1334 self._dynamizer.encode(value) | |
1335 | |
1336 # Convert the filters into something we can actually use. | |
1337 kwargs['key_conditions'] = self._build_filters( | |
1338 filter_kwargs, | |
1339 using=QUERY_OPERATORS | |
1340 ) | |
1341 | |
1342 kwargs['query_filter'] = self._build_filters( | |
1343 query_filter, | |
1344 using=FILTER_OPERATORS | |
1345 ) | |
1346 | |
1347 raw_results = self.connection.query( | |
1348 self.table_name, | |
1349 **kwargs | |
1350 ) | |
1351 results = [] | |
1352 last_key = None | |
1353 | |
1354 for raw_item in raw_results.get('Items', []): | |
1355 item = Item(self) | |
1356 item.load({ | |
1357 'Item': raw_item, | |
1358 }) | |
1359 results.append(item) | |
1360 | |
1361 if raw_results.get('LastEvaluatedKey', None): | |
1362 last_key = {} | |
1363 | |
1364 for key, value in raw_results['LastEvaluatedKey'].items(): | |
1365 last_key[key] = self._dynamizer.decode(value) | |
1366 | |
1367 return { | |
1368 'results': results, | |
1369 'last_key': last_key, | |
1370 } | |
1371 | |
1372 def scan(self, limit=None, segment=None, total_segments=None, | |
1373 max_page_size=None, attributes=None, conditional_operator=None, | |
1374 **filter_kwargs): | |
1375 """ | |
1376 Scans across all items within a DynamoDB table. | |
1377 | |
1378 Scans can be performed against a hash key or a hash+range key. You can | |
1379 additionally filter the results after the table has been read but | |
1380 before the response is returned by using query filters. | |
1381 | |
1382 To specify the filters of the items you'd like to get, you can specify | |
1383 the filters as kwargs. Each filter kwarg should follow the pattern | |
1384 ``<fieldname>__<filter_operation>=<value_to_look_for>``. | |
1385 | |
1386 Optionally accepts a ``limit`` parameter, which should be an integer | |
1387 count of the total number of items to return. (Default: ``None`` - | |
1388 all results) | |
1389 | |
1390 Optionally accepts a ``segment`` parameter, which should be an integer | |
1391 of the segment to retrieve on. Please see the documentation about | |
1392 Parallel Scans (Default: ``None`` - no segments) | |
1393 | |
1394 Optionally accepts a ``total_segments`` parameter, which should be an | |
1395 integer count of number of segments to divide the table into. | |
1396 Please see the documentation about Parallel Scans (Default: ``None`` - | |
1397 no segments) | |
1398 | |
1399 Optionally accepts a ``max_page_size`` parameter, which should be an | |
1400 integer count of the maximum number of items to retrieve | |
1401 **per-request**. This is useful in making faster requests & prevent | |
1402 the scan from drowning out other queries. (Default: ``None`` - | |
1403 fetch as many as DynamoDB will return) | |
1404 | |
1405 Optionally accepts an ``attributes`` parameter, which should be a | |
1406 tuple. If you provide any attributes only these will be fetched | |
1407 from DynamoDB. This uses the ``AttributesToGet`` and set's | |
1408 ``Select`` to ``SPECIFIC_ATTRIBUTES`` API. | |
1409 | |
1410 Returns a ``ResultSet``, which transparently handles the pagination of | |
1411 results you get back. | |
1412 | |
1413 Example:: | |
1414 | |
1415 # All results. | |
1416 >>> everything = users.scan() | |
1417 | |
1418 # Look for last names beginning with "D". | |
1419 >>> results = users.scan(last_name__beginswith='D') | |
1420 >>> for res in results: | |
1421 ... print res['first_name'] | |
1422 'Alice' | |
1423 'John' | |
1424 'Jane' | |
1425 | |
1426 # Use an ``IN`` filter & limit. | |
1427 >>> results = users.scan( | |
1428 ... age__in=[25, 26, 27, 28, 29], | |
1429 ... limit=1 | |
1430 ... ) | |
1431 >>> for res in results: | |
1432 ... print res['first_name'] | |
1433 'Alice' | |
1434 | |
1435 """ | |
1436 results = ResultSet( | |
1437 max_page_size=max_page_size | |
1438 ) | |
1439 kwargs = filter_kwargs.copy() | |
1440 kwargs.update({ | |
1441 'limit': limit, | |
1442 'segment': segment, | |
1443 'total_segments': total_segments, | |
1444 'attributes': attributes, | |
1445 'conditional_operator': conditional_operator, | |
1446 }) | |
1447 results.to_call(self._scan, **kwargs) | |
1448 return results | |
1449 | |
1450 def _scan(self, limit=None, exclusive_start_key=None, segment=None, | |
1451 total_segments=None, attributes=None, conditional_operator=None, | |
1452 **filter_kwargs): | |
1453 """ | |
1454 The internal method that performs the actual scan. Used extensively | |
1455 by ``ResultSet`` to perform each (paginated) request. | |
1456 """ | |
1457 kwargs = { | |
1458 'limit': limit, | |
1459 'segment': segment, | |
1460 'total_segments': total_segments, | |
1461 'attributes_to_get': attributes, | |
1462 'conditional_operator': conditional_operator, | |
1463 } | |
1464 | |
1465 if exclusive_start_key: | |
1466 kwargs['exclusive_start_key'] = {} | |
1467 | |
1468 for key, value in exclusive_start_key.items(): | |
1469 kwargs['exclusive_start_key'][key] = \ | |
1470 self._dynamizer.encode(value) | |
1471 | |
1472 # Convert the filters into something we can actually use. | |
1473 kwargs['scan_filter'] = self._build_filters( | |
1474 filter_kwargs, | |
1475 using=FILTER_OPERATORS | |
1476 ) | |
1477 | |
1478 raw_results = self.connection.scan( | |
1479 self.table_name, | |
1480 **kwargs | |
1481 ) | |
1482 results = [] | |
1483 last_key = None | |
1484 | |
1485 for raw_item in raw_results.get('Items', []): | |
1486 item = Item(self) | |
1487 item.load({ | |
1488 'Item': raw_item, | |
1489 }) | |
1490 results.append(item) | |
1491 | |
1492 if raw_results.get('LastEvaluatedKey', None): | |
1493 last_key = {} | |
1494 | |
1495 for key, value in raw_results['LastEvaluatedKey'].items(): | |
1496 last_key[key] = self._dynamizer.decode(value) | |
1497 | |
1498 return { | |
1499 'results': results, | |
1500 'last_key': last_key, | |
1501 } | |
1502 | |
1503 def batch_get(self, keys, consistent=False, attributes=None): | |
1504 """ | |
1505 Fetches many specific items in batch from a table. | |
1506 | |
1507 Requires a ``keys`` parameter, which should be a list of dictionaries. | |
1508 Each dictionary should consist of the keys values to specify. | |
1509 | |
1510 Optionally accepts a ``consistent`` parameter, which should be a | |
1511 boolean. If you provide ``True``, a strongly consistent read will be | |
1512 used. (Default: False) | |
1513 | |
1514 Optionally accepts an ``attributes`` parameter, which should be a | |
1515 tuple. If you provide any attributes only these will be fetched | |
1516 from DynamoDB. | |
1517 | |
1518 Returns a ``ResultSet``, which transparently handles the pagination of | |
1519 results you get back. | |
1520 | |
1521 Example:: | |
1522 | |
1523 >>> results = users.batch_get(keys=[ | |
1524 ... { | |
1525 ... 'username': 'johndoe', | |
1526 ... }, | |
1527 ... { | |
1528 ... 'username': 'jane', | |
1529 ... }, | |
1530 ... { | |
1531 ... 'username': 'fred', | |
1532 ... }, | |
1533 ... ]) | |
1534 >>> for res in results: | |
1535 ... print res['first_name'] | |
1536 'John' | |
1537 'Jane' | |
1538 'Fred' | |
1539 | |
1540 """ | |
1541 # We pass the keys to the constructor instead, so it can maintain it's | |
1542 # own internal state as to what keys have been processed. | |
1543 results = BatchGetResultSet(keys=keys, max_batch_get=self.max_batch_get) | |
1544 results.to_call(self._batch_get, consistent=consistent, attributes=attributes) | |
1545 return results | |
1546 | |
1547 def _batch_get(self, keys, consistent=False, attributes=None): | |
1548 """ | |
1549 The internal method that performs the actual batch get. Used extensively | |
1550 by ``BatchGetResultSet`` to perform each (paginated) request. | |
1551 """ | |
1552 items = { | |
1553 self.table_name: { | |
1554 'Keys': [], | |
1555 }, | |
1556 } | |
1557 | |
1558 if consistent: | |
1559 items[self.table_name]['ConsistentRead'] = True | |
1560 | |
1561 if attributes is not None: | |
1562 items[self.table_name]['AttributesToGet'] = attributes | |
1563 | |
1564 for key_data in keys: | |
1565 raw_key = {} | |
1566 | |
1567 for key, value in key_data.items(): | |
1568 raw_key[key] = self._dynamizer.encode(value) | |
1569 | |
1570 items[self.table_name]['Keys'].append(raw_key) | |
1571 | |
1572 raw_results = self.connection.batch_get_item(request_items=items) | |
1573 results = [] | |
1574 unprocessed_keys = [] | |
1575 | |
1576 for raw_item in raw_results['Responses'].get(self.table_name, []): | |
1577 item = Item(self) | |
1578 item.load({ | |
1579 'Item': raw_item, | |
1580 }) | |
1581 results.append(item) | |
1582 | |
1583 raw_unproccessed = raw_results.get('UnprocessedKeys', {}) | |
1584 | |
1585 for raw_key in raw_unproccessed.get('Keys', []): | |
1586 py_key = {} | |
1587 | |
1588 for key, value in raw_key.items(): | |
1589 py_key[key] = self._dynamizer.decode(value) | |
1590 | |
1591 unprocessed_keys.append(py_key) | |
1592 | |
1593 return { | |
1594 'results': results, | |
1595 # NEVER return a ``last_key``. Just in-case any part of | |
1596 # ``ResultSet`` peeks through, since much of the | |
1597 # original underlying implementation is based on this key. | |
1598 'last_key': None, | |
1599 'unprocessed_keys': unprocessed_keys, | |
1600 } | |
1601 | |
1602 def count(self): | |
1603 """ | |
1604 Returns a (very) eventually consistent count of the number of items | |
1605 in a table. | |
1606 | |
1607 Lag time is about 6 hours, so don't expect a high degree of accuracy. | |
1608 | |
1609 Example:: | |
1610 | |
1611 >>> users.count() | |
1612 6 | |
1613 | |
1614 """ | |
1615 info = self.describe() | |
1616 return info['Table'].get('ItemCount', 0) | |
1617 | |
1618 | |
1619 class BatchTable(object): | |
1620 """ | |
1621 Used by ``Table`` as the context manager for batch writes. | |
1622 | |
1623 You likely don't want to try to use this object directly. | |
1624 """ | |
1625 def __init__(self, table): | |
1626 self.table = table | |
1627 self._to_put = [] | |
1628 self._to_delete = [] | |
1629 self._unprocessed = [] | |
1630 | |
1631 def __enter__(self): | |
1632 return self | |
1633 | |
1634 def __exit__(self, type, value, traceback): | |
1635 if self._to_put or self._to_delete: | |
1636 # Flush anything that's left. | |
1637 self.flush() | |
1638 | |
1639 if self._unprocessed: | |
1640 # Finally, handle anything that wasn't processed. | |
1641 self.resend_unprocessed() | |
1642 | |
1643 def put_item(self, data, overwrite=False): | |
1644 self._to_put.append(data) | |
1645 | |
1646 if self.should_flush(): | |
1647 self.flush() | |
1648 | |
1649 def delete_item(self, **kwargs): | |
1650 self._to_delete.append(kwargs) | |
1651 | |
1652 if self.should_flush(): | |
1653 self.flush() | |
1654 | |
1655 def should_flush(self): | |
1656 if len(self._to_put) + len(self._to_delete) == 25: | |
1657 return True | |
1658 | |
1659 return False | |
1660 | |
1661 def flush(self): | |
1662 batch_data = { | |
1663 self.table.table_name: [ | |
1664 # We'll insert data here shortly. | |
1665 ], | |
1666 } | |
1667 | |
1668 for put in self._to_put: | |
1669 item = Item(self.table, data=put) | |
1670 batch_data[self.table.table_name].append({ | |
1671 'PutRequest': { | |
1672 'Item': item.prepare_full(), | |
1673 } | |
1674 }) | |
1675 | |
1676 for delete in self._to_delete: | |
1677 batch_data[self.table.table_name].append({ | |
1678 'DeleteRequest': { | |
1679 'Key': self.table._encode_keys(delete), | |
1680 } | |
1681 }) | |
1682 | |
1683 resp = self.table.connection.batch_write_item(batch_data) | |
1684 self.handle_unprocessed(resp) | |
1685 | |
1686 self._to_put = [] | |
1687 self._to_delete = [] | |
1688 return True | |
1689 | |
1690 def handle_unprocessed(self, resp): | |
1691 if len(resp.get('UnprocessedItems', [])): | |
1692 table_name = self.table.table_name | |
1693 unprocessed = resp['UnprocessedItems'].get(table_name, []) | |
1694 | |
1695 # Some items have not been processed. Stow them for now & | |
1696 # re-attempt processing on ``__exit__``. | |
1697 msg = "%s items were unprocessed. Storing for later." | |
1698 boto.log.info(msg % len(unprocessed)) | |
1699 self._unprocessed.extend(unprocessed) | |
1700 | |
1701 def resend_unprocessed(self): | |
1702 # If there are unprocessed records (for instance, the user was over | |
1703 # their throughput limitations), iterate over them & send until they're | |
1704 # all there. | |
1705 boto.log.info( | |
1706 "Re-sending %s unprocessed items." % len(self._unprocessed) | |
1707 ) | |
1708 | |
1709 while len(self._unprocessed): | |
1710 # Again, do 25 at a time. | |
1711 to_resend = self._unprocessed[:25] | |
1712 # Remove them from the list. | |
1713 self._unprocessed = self._unprocessed[25:] | |
1714 batch_data = { | |
1715 self.table.table_name: to_resend | |
1716 } | |
1717 boto.log.info("Sending %s items" % len(to_resend)) | |
1718 resp = self.table.connection.batch_write_item(batch_data) | |
1719 self.handle_unprocessed(resp) | |
1720 boto.log.info( | |
1721 "%s unprocessed items left" % len(self._unprocessed) | |
1722 ) |