Mercurial > repos > bcclaywell > argo_navis
comparison venv/lib/python2.7/site-packages/boto/kinesis/layer1.py @ 0:d67268158946 draft
planemo upload commit a3f181f5f126803c654b3a66dd4e83a48f7e203b
author | bcclaywell |
---|---|
date | Mon, 12 Oct 2015 17:43:33 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:d67268158946 |
---|---|
1 # Copyright (c) 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved | |
2 # | |
3 # Permission is hereby granted, free of charge, to any person obtaining a | |
4 # copy of this software and associated documentation files (the | |
5 # "Software"), to deal in the Software without restriction, including | |
6 # without limitation the rights to use, copy, modify, merge, publish, dis- | |
7 # tribute, sublicense, and/or sell copies of the Software, and to permit | |
8 # persons to whom the Software is furnished to do so, subject to the fol- | |
9 # lowing conditions: | |
10 # | |
11 # The above copyright notice and this permission notice shall be included | |
12 # in all copies or substantial portions of the Software. | |
13 # | |
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | |
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- | |
16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT | |
17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, | |
18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS | |
20 # IN THE SOFTWARE. | |
21 # | |
22 | |
23 import base64 | |
24 import boto | |
25 | |
26 from boto.connection import AWSQueryConnection | |
27 from boto.regioninfo import RegionInfo | |
28 from boto.exception import JSONResponseError | |
29 from boto.kinesis import exceptions | |
30 from boto.compat import json | |
31 from boto.compat import six | |
32 | |
33 | |
34 class KinesisConnection(AWSQueryConnection): | |
35 """ | |
36 Amazon Kinesis Service API Reference | |
37 Amazon Kinesis is a managed service that scales elastically for | |
38 real time processing of streaming big data. | |
39 """ | |
40 APIVersion = "2013-12-02" | |
41 DefaultRegionName = "us-east-1" | |
42 DefaultRegionEndpoint = "kinesis.us-east-1.amazonaws.com" | |
43 ServiceName = "Kinesis" | |
44 TargetPrefix = "Kinesis_20131202" | |
45 ResponseError = JSONResponseError | |
46 | |
47 _faults = { | |
48 "ProvisionedThroughputExceededException": exceptions.ProvisionedThroughputExceededException, | |
49 "LimitExceededException": exceptions.LimitExceededException, | |
50 "ExpiredIteratorException": exceptions.ExpiredIteratorException, | |
51 "ResourceInUseException": exceptions.ResourceInUseException, | |
52 "ResourceNotFoundException": exceptions.ResourceNotFoundException, | |
53 "InvalidArgumentException": exceptions.InvalidArgumentException, | |
54 "SubscriptionRequiredException": exceptions.SubscriptionRequiredException | |
55 } | |
56 | |
57 | |
58 def __init__(self, **kwargs): | |
59 region = kwargs.pop('region', None) | |
60 if not region: | |
61 region = RegionInfo(self, self.DefaultRegionName, | |
62 self.DefaultRegionEndpoint) | |
63 if 'host' not in kwargs: | |
64 kwargs['host'] = region.endpoint | |
65 super(KinesisConnection, self).__init__(**kwargs) | |
66 self.region = region | |
67 | |
68 def _required_auth_capability(self): | |
69 return ['hmac-v4'] | |
70 | |
71 def add_tags_to_stream(self, stream_name, tags): | |
72 """ | |
73 Adds or updates tags for the specified Amazon Kinesis stream. | |
74 Each stream can have up to 10 tags. | |
75 | |
76 If tags have already been assigned to the stream, | |
77 `AddTagsToStream` overwrites any existing tags that correspond | |
78 to the specified tag keys. | |
79 | |
80 :type stream_name: string | |
81 :param stream_name: The name of the stream. | |
82 | |
83 :type tags: map | |
84 :param tags: The set of key-value pairs to use to create the tags. | |
85 | |
86 """ | |
87 params = {'StreamName': stream_name, 'Tags': tags, } | |
88 return self.make_request(action='AddTagsToStream', | |
89 body=json.dumps(params)) | |
90 | |
91 def create_stream(self, stream_name, shard_count): | |
92 """ | |
93 Creates a Amazon Kinesis stream. A stream captures and | |
94 transports data records that are continuously emitted from | |
95 different data sources or producers . Scale-out within an | |
96 Amazon Kinesis stream is explicitly supported by means of | |
97 shards, which are uniquely identified groups of data records | |
98 in an Amazon Kinesis stream. | |
99 | |
100 You specify and control the number of shards that a stream is | |
101 composed of. Each open shard can support up to 5 read | |
102 transactions per second, up to a maximum total of 2 MB of data | |
103 read per second. Each shard can support up to 1000 records | |
104 written per second, up to a maximum total of 1 MB data written | |
105 per second. You can add shards to a stream if the amount of | |
106 data input increases and you can remove shards if the amount | |
107 of data input decreases. | |
108 | |
109 The stream name identifies the stream. The name is scoped to | |
110 the AWS account used by the application. It is also scoped by | |
111 region. That is, two streams in two different accounts can | |
112 have the same name, and two streams in the same account, but | |
113 in two different regions, can have the same name. | |
114 | |
115 `CreateStream` is an asynchronous operation. Upon receiving a | |
116 `CreateStream` request, Amazon Kinesis immediately returns and | |
117 sets the stream status to `CREATING`. After the stream is | |
118 created, Amazon Kinesis sets the stream status to `ACTIVE`. | |
119 You should perform read and write operations only on an | |
120 `ACTIVE` stream. | |
121 | |
122 You receive a `LimitExceededException` when making a | |
123 `CreateStream` request if you try to do one of the following: | |
124 | |
125 | |
126 + Have more than five streams in the `CREATING` state at any | |
127 point in time. | |
128 + Create more shards than are authorized for your account. | |
129 | |
130 | |
131 The default limit for an AWS account is 10 shards per stream. | |
132 If you need to create a stream with more than 10 shards, | |
133 `contact AWS Support`_ to increase the limit on your account. | |
134 | |
135 You can use `DescribeStream` to check the stream status, which | |
136 is returned in `StreamStatus`. | |
137 | |
138 `CreateStream` has a limit of 5 transactions per second per | |
139 account. | |
140 | |
141 :type stream_name: string | |
142 :param stream_name: A name to identify the stream. The stream name is | |
143 scoped to the AWS account used by the application that creates the | |
144 stream. It is also scoped by region. That is, two streams in two | |
145 different AWS accounts can have the same name, and two streams in | |
146 the same AWS account, but in two different regions, can have the | |
147 same name. | |
148 | |
149 :type shard_count: integer | |
150 :param shard_count: The number of shards that the stream will use. The | |
151 throughput of the stream is a function of the number of shards; | |
152 more shards are required for greater provisioned throughput. | |
153 **Note:** The default limit for an AWS account is 10 shards per stream. | |
154 If you need to create a stream with more than 10 shards, `contact | |
155 AWS Support`_ to increase the limit on your account. | |
156 | |
157 """ | |
158 params = { | |
159 'StreamName': stream_name, | |
160 'ShardCount': shard_count, | |
161 } | |
162 return self.make_request(action='CreateStream', | |
163 body=json.dumps(params)) | |
164 | |
165 def delete_stream(self, stream_name): | |
166 """ | |
167 Deletes a stream and all its shards and data. You must shut | |
168 down any applications that are operating on the stream before | |
169 you delete the stream. If an application attempts to operate | |
170 on a deleted stream, it will receive the exception | |
171 `ResourceNotFoundException`. | |
172 | |
173 If the stream is in the `ACTIVE` state, you can delete it. | |
174 After a `DeleteStream` request, the specified stream is in the | |
175 `DELETING` state until Amazon Kinesis completes the deletion. | |
176 | |
177 **Note:** Amazon Kinesis might continue to accept data read | |
178 and write operations, such as PutRecord, PutRecords, and | |
179 GetRecords, on a stream in the `DELETING` state until the | |
180 stream deletion is complete. | |
181 | |
182 When you delete a stream, any shards in that stream are also | |
183 deleted, and any tags are dissociated from the stream. | |
184 | |
185 You can use the DescribeStream operation to check the state of | |
186 the stream, which is returned in `StreamStatus`. | |
187 | |
188 `DeleteStream` has a limit of 5 transactions per second per | |
189 account. | |
190 | |
191 :type stream_name: string | |
192 :param stream_name: The name of the stream to delete. | |
193 | |
194 """ | |
195 params = {'StreamName': stream_name, } | |
196 return self.make_request(action='DeleteStream', | |
197 body=json.dumps(params)) | |
198 | |
199 def describe_stream(self, stream_name, limit=None, | |
200 exclusive_start_shard_id=None): | |
201 """ | |
202 Describes the specified stream. | |
203 | |
204 The information about the stream includes its current status, | |
205 its Amazon Resource Name (ARN), and an array of shard objects. | |
206 For each shard object, there is information about the hash key | |
207 and sequence number ranges that the shard spans, and the IDs | |
208 of any earlier shards that played in a role in creating the | |
209 shard. A sequence number is the identifier associated with | |
210 every record ingested in the Amazon Kinesis stream. The | |
211 sequence number is assigned when a record is put into the | |
212 stream. | |
213 | |
214 You can limit the number of returned shards using the `Limit` | |
215 parameter. The number of shards in a stream may be too large | |
216 to return from a single call to `DescribeStream`. You can | |
217 detect this by using the `HasMoreShards` flag in the returned | |
218 output. `HasMoreShards` is set to `True` when there is more | |
219 data available. | |
220 | |
221 `DescribeStream` is a paginated operation. If there are more | |
222 shards available, you can request them using the shard ID of | |
223 the last shard returned. Specify this ID in the | |
224 `ExclusiveStartShardId` parameter in a subsequent request to | |
225 `DescribeStream`. | |
226 | |
227 `DescribeStream` has a limit of 10 transactions per second per | |
228 account. | |
229 | |
230 :type stream_name: string | |
231 :param stream_name: The name of the stream to describe. | |
232 | |
233 :type limit: integer | |
234 :param limit: The maximum number of shards to return. | |
235 | |
236 :type exclusive_start_shard_id: string | |
237 :param exclusive_start_shard_id: The shard ID of the shard to start | |
238 with. | |
239 | |
240 """ | |
241 params = {'StreamName': stream_name, } | |
242 if limit is not None: | |
243 params['Limit'] = limit | |
244 if exclusive_start_shard_id is not None: | |
245 params['ExclusiveStartShardId'] = exclusive_start_shard_id | |
246 return self.make_request(action='DescribeStream', | |
247 body=json.dumps(params)) | |
248 | |
249 def get_records(self, shard_iterator, limit=None, b64_decode=True): | |
250 """ | |
251 Gets data records from a shard. | |
252 | |
253 Specify a shard iterator using the `ShardIterator` parameter. | |
254 The shard iterator specifies the position in the shard from | |
255 which you want to start reading data records sequentially. If | |
256 there are no records available in the portion of the shard | |
257 that the iterator points to, `GetRecords` returns an empty | |
258 list. Note that it might take multiple calls to get to a | |
259 portion of the shard that contains records. | |
260 | |
261 You can scale by provisioning multiple shards. Your | |
262 application should have one thread per shard, each reading | |
263 continuously from its stream. To read from a stream | |
264 continually, call `GetRecords` in a loop. Use GetShardIterator | |
265 to get the shard iterator to specify in the first `GetRecords` | |
266 call. `GetRecords` returns a new shard iterator in | |
267 `NextShardIterator`. Specify the shard iterator returned in | |
268 `NextShardIterator` in subsequent calls to `GetRecords`. Note | |
269 that if the shard has been closed, the shard iterator can't | |
270 return more data and `GetRecords` returns `null` in | |
271 `NextShardIterator`. You can terminate the loop when the shard | |
272 is closed, or when the shard iterator reaches the record with | |
273 the sequence number or other attribute that marks it as the | |
274 last record to process. | |
275 | |
276 Each data record can be up to 50 KB in size, and each shard | |
277 can read up to 2 MB per second. You can ensure that your calls | |
278 don't exceed the maximum supported size or throughput by using | |
279 the `Limit` parameter to specify the maximum number of records | |
280 that `GetRecords` can return. Consider your average record | |
281 size when determining this limit. For example, if your average | |
282 record size is 40 KB, you can limit the data returned to about | |
283 1 MB per call by specifying 25 as the limit. | |
284 | |
285 The size of the data returned by `GetRecords` will vary | |
286 depending on the utilization of the shard. The maximum size of | |
287 data that `GetRecords` can return is 10 MB. If a call returns | |
288 10 MB of data, subsequent calls made within the next 5 seconds | |
289 throw `ProvisionedThroughputExceededException`. If there is | |
290 insufficient provisioned throughput on the shard, subsequent | |
291 calls made within the next 1 second throw | |
292 `ProvisionedThroughputExceededException`. Note that | |
293 `GetRecords` won't return any data when it throws an | |
294 exception. For this reason, we recommend that you wait one | |
295 second between calls to `GetRecords`; however, it's possible | |
296 that the application will get exceptions for longer than 1 | |
297 second. | |
298 | |
299 To detect whether the application is falling behind in | |
300 processing, add a timestamp to your records and note how long | |
301 it takes to process them. You can also monitor how much data | |
302 is in a stream using the CloudWatch metrics for write | |
303 operations ( `PutRecord` and `PutRecords`). For more | |
304 information, see `Monitoring Amazon Kinesis with Amazon | |
305 CloudWatch`_ in the Amazon Kinesis Developer Guide . | |
306 | |
307 :type shard_iterator: string | |
308 :param shard_iterator: The position in the shard from which you want to | |
309 start sequentially reading data records. A shard iterator specifies | |
310 this position using the sequence number of a data record in the | |
311 shard. | |
312 | |
313 :type limit: integer | |
314 :param limit: The maximum number of records to return. Specify a value | |
315 of up to 10,000. If you specify a value that is greater than | |
316 10,000, `GetRecords` throws `InvalidArgumentException`. | |
317 | |
318 :type b64_decode: boolean | |
319 :param b64_decode: Decode the Base64-encoded ``Data`` field of records. | |
320 | |
321 """ | |
322 params = {'ShardIterator': shard_iterator, } | |
323 if limit is not None: | |
324 params['Limit'] = limit | |
325 | |
326 response = self.make_request(action='GetRecords', | |
327 body=json.dumps(params)) | |
328 | |
329 # Base64 decode the data | |
330 if b64_decode: | |
331 for record in response.get('Records', []): | |
332 record['Data'] = base64.b64decode( | |
333 record['Data'].encode('utf-8')).decode('utf-8') | |
334 | |
335 return response | |
336 | |
337 def get_shard_iterator(self, stream_name, shard_id, shard_iterator_type, | |
338 starting_sequence_number=None): | |
339 """ | |
340 Gets a shard iterator. A shard iterator expires five minutes | |
341 after it is returned to the requester. | |
342 | |
343 A shard iterator specifies the position in the shard from | |
344 which to start reading data records sequentially. A shard | |
345 iterator specifies this position using the sequence number of | |
346 a data record in a shard. A sequence number is the identifier | |
347 associated with every record ingested in the Amazon Kinesis | |
348 stream. The sequence number is assigned when a record is put | |
349 into the stream. | |
350 | |
351 You must specify the shard iterator type. For example, you can | |
352 set the `ShardIteratorType` parameter to read exactly from the | |
353 position denoted by a specific sequence number by using the | |
354 `AT_SEQUENCE_NUMBER` shard iterator type, or right after the | |
355 sequence number by using the `AFTER_SEQUENCE_NUMBER` shard | |
356 iterator type, using sequence numbers returned by earlier | |
357 calls to PutRecord, PutRecords, GetRecords, or DescribeStream. | |
358 You can specify the shard iterator type `TRIM_HORIZON` in the | |
359 request to cause `ShardIterator` to point to the last | |
360 untrimmed record in the shard in the system, which is the | |
361 oldest data record in the shard. Or you can point to just | |
362 after the most recent record in the shard, by using the shard | |
363 iterator type `LATEST`, so that you always read the most | |
364 recent data in the shard. | |
365 | |
366 When you repeatedly read from an Amazon Kinesis stream use a | |
367 GetShardIterator request to get the first shard iterator to to | |
368 use in your first `GetRecords` request and then use the shard | |
369 iterator returned by the `GetRecords` request in | |
370 `NextShardIterator` for subsequent reads. A new shard iterator | |
371 is returned by every `GetRecords` request in | |
372 `NextShardIterator`, which you use in the `ShardIterator` | |
373 parameter of the next `GetRecords` request. | |
374 | |
375 If a `GetShardIterator` request is made too often, you receive | |
376 a `ProvisionedThroughputExceededException`. For more | |
377 information about throughput limits, see GetRecords. | |
378 | |
379 If the shard is closed, the iterator can't return more data, | |
380 and `GetShardIterator` returns `null` for its `ShardIterator`. | |
381 A shard can be closed using SplitShard or MergeShards. | |
382 | |
383 `GetShardIterator` has a limit of 5 transactions per second | |
384 per account per open shard. | |
385 | |
386 :type stream_name: string | |
387 :param stream_name: The name of the stream. | |
388 | |
389 :type shard_id: string | |
390 :param shard_id: The shard ID of the shard to get the iterator for. | |
391 | |
392 :type shard_iterator_type: string | |
393 :param shard_iterator_type: | |
394 Determines how the shard iterator is used to start reading data records | |
395 from the shard. | |
396 | |
397 The following are the valid shard iterator types: | |
398 | |
399 | |
400 + AT_SEQUENCE_NUMBER - Start reading exactly from the position denoted | |
401 by a specific sequence number. | |
402 + AFTER_SEQUENCE_NUMBER - Start reading right after the position | |
403 denoted by a specific sequence number. | |
404 + TRIM_HORIZON - Start reading at the last untrimmed record in the | |
405 shard in the system, which is the oldest data record in the shard. | |
406 + LATEST - Start reading just after the most recent record in the | |
407 shard, so that you always read the most recent data in the shard. | |
408 | |
409 :type starting_sequence_number: string | |
410 :param starting_sequence_number: The sequence number of the data record | |
411 in the shard from which to start reading from. | |
412 | |
413 """ | |
414 params = { | |
415 'StreamName': stream_name, | |
416 'ShardId': shard_id, | |
417 'ShardIteratorType': shard_iterator_type, | |
418 } | |
419 if starting_sequence_number is not None: | |
420 params['StartingSequenceNumber'] = starting_sequence_number | |
421 return self.make_request(action='GetShardIterator', | |
422 body=json.dumps(params)) | |
423 | |
424 def list_streams(self, limit=None, exclusive_start_stream_name=None): | |
425 """ | |
426 Lists your streams. | |
427 | |
428 The number of streams may be too large to return from a single | |
429 call to `ListStreams`. You can limit the number of returned | |
430 streams using the `Limit` parameter. If you do not specify a | |
431 value for the `Limit` parameter, Amazon Kinesis uses the | |
432 default limit, which is currently 10. | |
433 | |
434 You can detect if there are more streams available to list by | |
435 using the `HasMoreStreams` flag from the returned output. If | |
436 there are more streams available, you can request more streams | |
437 by using the name of the last stream returned by the | |
438 `ListStreams` request in the `ExclusiveStartStreamName` | |
439 parameter in a subsequent request to `ListStreams`. The group | |
440 of stream names returned by the subsequent request is then | |
441 added to the list. You can continue this process until all the | |
442 stream names have been collected in the list. | |
443 | |
444 `ListStreams` has a limit of 5 transactions per second per | |
445 account. | |
446 | |
447 :type limit: integer | |
448 :param limit: The maximum number of streams to list. | |
449 | |
450 :type exclusive_start_stream_name: string | |
451 :param exclusive_start_stream_name: The name of the stream to start the | |
452 list with. | |
453 | |
454 """ | |
455 params = {} | |
456 if limit is not None: | |
457 params['Limit'] = limit | |
458 if exclusive_start_stream_name is not None: | |
459 params['ExclusiveStartStreamName'] = exclusive_start_stream_name | |
460 return self.make_request(action='ListStreams', | |
461 body=json.dumps(params)) | |
462 | |
463 def list_tags_for_stream(self, stream_name, exclusive_start_tag_key=None, | |
464 limit=None): | |
465 """ | |
466 Lists the tags for the specified Amazon Kinesis stream. | |
467 | |
468 :type stream_name: string | |
469 :param stream_name: The name of the stream. | |
470 | |
471 :type exclusive_start_tag_key: string | |
472 :param exclusive_start_tag_key: The key to use as the starting point | |
473 for the list of tags. If this parameter is set, `ListTagsForStream` | |
474 gets all tags that occur after `ExclusiveStartTagKey`. | |
475 | |
476 :type limit: integer | |
477 :param limit: The number of tags to return. If this number is less than | |
478 the total number of tags associated with the stream, `HasMoreTags` | |
479 is set to `True`. To list additional tags, set | |
480 `ExclusiveStartTagKey` to the last key in the response. | |
481 | |
482 """ | |
483 params = {'StreamName': stream_name, } | |
484 if exclusive_start_tag_key is not None: | |
485 params['ExclusiveStartTagKey'] = exclusive_start_tag_key | |
486 if limit is not None: | |
487 params['Limit'] = limit | |
488 return self.make_request(action='ListTagsForStream', | |
489 body=json.dumps(params)) | |
490 | |
491 def merge_shards(self, stream_name, shard_to_merge, | |
492 adjacent_shard_to_merge): | |
493 """ | |
494 Merges two adjacent shards in a stream and combines them into | |
495 a single shard to reduce the stream's capacity to ingest and | |
496 transport data. Two shards are considered adjacent if the | |
497 union of the hash key ranges for the two shards form a | |
498 contiguous set with no gaps. For example, if you have two | |
499 shards, one with a hash key range of 276...381 and the other | |
500 with a hash key range of 382...454, then you could merge these | |
501 two shards into a single shard that would have a hash key | |
502 range of 276...454. After the merge, the single child shard | |
503 receives data for all hash key values covered by the two | |
504 parent shards. | |
505 | |
506 `MergeShards` is called when there is a need to reduce the | |
507 overall capacity of a stream because of excess capacity that | |
508 is not being used. You must specify the shard to be merged and | |
509 the adjacent shard for a stream. For more information about | |
510 merging shards, see `Merge Two Shards`_ in the Amazon Kinesis | |
511 Developer Guide . | |
512 | |
513 If the stream is in the `ACTIVE` state, you can call | |
514 `MergeShards`. If a stream is in the `CREATING`, `UPDATING`, | |
515 or `DELETING` state, `MergeShards` returns a | |
516 `ResourceInUseException`. If the specified stream does not | |
517 exist, `MergeShards` returns a `ResourceNotFoundException`. | |
518 | |
519 You can use DescribeStream to check the state of the stream, | |
520 which is returned in `StreamStatus`. | |
521 | |
522 `MergeShards` is an asynchronous operation. Upon receiving a | |
523 `MergeShards` request, Amazon Kinesis immediately returns a | |
524 response and sets the `StreamStatus` to `UPDATING`. After the | |
525 operation is completed, Amazon Kinesis sets the `StreamStatus` | |
526 to `ACTIVE`. Read and write operations continue to work while | |
527 the stream is in the `UPDATING` state. | |
528 | |
529 You use DescribeStream to determine the shard IDs that are | |
530 specified in the `MergeShards` request. | |
531 | |
532 If you try to operate on too many streams in parallel using | |
533 CreateStream, DeleteStream, `MergeShards` or SplitShard, you | |
534 will receive a `LimitExceededException`. | |
535 | |
536 `MergeShards` has limit of 5 transactions per second per | |
537 account. | |
538 | |
539 :type stream_name: string | |
540 :param stream_name: The name of the stream for the merge. | |
541 | |
542 :type shard_to_merge: string | |
543 :param shard_to_merge: The shard ID of the shard to combine with the | |
544 adjacent shard for the merge. | |
545 | |
546 :type adjacent_shard_to_merge: string | |
547 :param adjacent_shard_to_merge: The shard ID of the adjacent shard for | |
548 the merge. | |
549 | |
550 """ | |
551 params = { | |
552 'StreamName': stream_name, | |
553 'ShardToMerge': shard_to_merge, | |
554 'AdjacentShardToMerge': adjacent_shard_to_merge, | |
555 } | |
556 return self.make_request(action='MergeShards', | |
557 body=json.dumps(params)) | |
558 | |
559 def put_record(self, stream_name, data, partition_key, | |
560 explicit_hash_key=None, | |
561 sequence_number_for_ordering=None, | |
562 exclusive_minimum_sequence_number=None, | |
563 b64_encode=True): | |
564 """ | |
565 This operation puts a data record into an Amazon Kinesis | |
566 stream from a producer. This operation must be called to send | |
567 data from the producer into the Amazon Kinesis stream for | |
568 real-time ingestion and subsequent processing. The `PutRecord` | |
569 operation requires the name of the stream that captures, | |
570 stores, and transports the data; a partition key; and the data | |
571 blob itself. The data blob could be a segment from a log file, | |
572 geographic/location data, website clickstream data, or any | |
573 other data type. | |
574 | |
575 The partition key is used to distribute data across shards. | |
576 Amazon Kinesis segregates the data records that belong to a | |
577 data stream into multiple shards, using the partition key | |
578 associated with each data record to determine which shard a | |
579 given data record belongs to. | |
580 | |
581 Partition keys are Unicode strings, with a maximum length | |
582 limit of 256 bytes. An MD5 hash function is used to map | |
583 partition keys to 128-bit integer values and to map associated | |
584 data records to shards using the hash key ranges of the | |
585 shards. You can override hashing the partition key to | |
586 determine the shard by explicitly specifying a hash value | |
587 using the `ExplicitHashKey` parameter. For more information, | |
588 see the `Amazon Kinesis Developer Guide`_. | |
589 | |
590 `PutRecord` returns the shard ID of where the data record was | |
591 placed and the sequence number that was assigned to the data | |
592 record. | |
593 | |
594 Sequence numbers generally increase over time. To guarantee | |
595 strictly increasing ordering, use the | |
596 `SequenceNumberForOrdering` parameter. For more information, | |
597 see the `Amazon Kinesis Developer Guide`_. | |
598 | |
599 If a `PutRecord` request cannot be processed because of | |
600 insufficient provisioned throughput on the shard involved in | |
601 the request, `PutRecord` throws | |
602 `ProvisionedThroughputExceededException`. | |
603 | |
604 Data records are accessible for only 24 hours from the time | |
605 that they are added to an Amazon Kinesis stream. | |
606 | |
607 :type stream_name: string | |
608 :param stream_name: The name of the stream to put the data record into. | |
609 | |
610 :type data: blob | |
611 :param data: The data blob to put into the record, which is | |
612 Base64-encoded when the blob is serialized. | |
613 The maximum size of the data blob (the payload after | |
614 Base64-decoding) is 50 kilobytes (KB) | |
615 Set `b64_encode` to disable automatic Base64 encoding. | |
616 | |
617 :type partition_key: string | |
618 :param partition_key: Determines which shard in the stream the data | |
619 record is assigned to. Partition keys are Unicode strings with a | |
620 maximum length limit of 256 bytes. Amazon Kinesis uses the | |
621 partition key as input to a hash function that maps the partition | |
622 key and associated data to a specific shard. Specifically, an MD5 | |
623 hash function is used to map partition keys to 128-bit integer | |
624 values and to map associated data records to shards. As a result of | |
625 this hashing mechanism, all data records with the same partition | |
626 key will map to the same shard within the stream. | |
627 | |
628 :type explicit_hash_key: string | |
629 :param explicit_hash_key: The hash value used to explicitly determine | |
630 the shard the data record is assigned to by overriding the | |
631 partition key hash. | |
632 | |
633 :type sequence_number_for_ordering: string | |
634 :param sequence_number_for_ordering: Guarantees strictly increasing | |
635 sequence numbers, for puts from the same client and to the same | |
636 partition key. Usage: set the `SequenceNumberForOrdering` of record | |
637 n to the sequence number of record n-1 (as returned in the | |
638 PutRecordResult when putting record n-1 ). If this parameter is not | |
639 set, records will be coarsely ordered based on arrival time. | |
640 | |
641 :type b64_encode: boolean | |
642 :param b64_encode: Whether to Base64 encode `data`. Can be set to | |
643 ``False`` if `data` is already encoded to prevent double encoding. | |
644 | |
645 """ | |
646 params = { | |
647 'StreamName': stream_name, | |
648 'Data': data, | |
649 'PartitionKey': partition_key, | |
650 } | |
651 if explicit_hash_key is not None: | |
652 params['ExplicitHashKey'] = explicit_hash_key | |
653 if sequence_number_for_ordering is not None: | |
654 params['SequenceNumberForOrdering'] = sequence_number_for_ordering | |
655 if b64_encode: | |
656 if not isinstance(params['Data'], six.binary_type): | |
657 params['Data'] = params['Data'].encode('utf-8') | |
658 params['Data'] = base64.b64encode(params['Data']).decode('utf-8') | |
659 return self.make_request(action='PutRecord', | |
660 body=json.dumps(params)) | |
661 | |
662 def put_records(self, records, stream_name, b64_encode=True): | |
663 """ | |
664 Puts (writes) multiple data records from a producer into an | |
665 Amazon Kinesis stream in a single call (also referred to as a | |
666 `PutRecords` request). Use this operation to send data from a | |
667 data producer into the Amazon Kinesis stream for real-time | |
668 ingestion and processing. Each shard can support up to 1000 | |
669 records written per second, up to a maximum total of 1 MB data | |
670 written per second. | |
671 | |
672 You must specify the name of the stream that captures, stores, | |
673 and transports the data; and an array of request `Records`, | |
674 with each record in the array requiring a partition key and | |
675 data blob. | |
676 | |
677 The data blob can be any type of data; for example, a segment | |
678 from a log file, geographic/location data, website clickstream | |
679 data, and so on. | |
680 | |
681 The partition key is used by Amazon Kinesis as input to a hash | |
682 function that maps the partition key and associated data to a | |
683 specific shard. An MD5 hash function is used to map partition | |
684 keys to 128-bit integer values and to map associated data | |
685 records to shards. As a result of this hashing mechanism, all | |
686 data records with the same partition key map to the same shard | |
687 within the stream. For more information, see `Partition Key`_ | |
688 in the Amazon Kinesis Developer Guide . | |
689 | |
690 Each record in the `Records` array may include an optional | |
691 parameter, `ExplicitHashKey`, which overrides the partition | |
692 key to shard mapping. This parameter allows a data producer to | |
693 determine explicitly the shard where the record is stored. For | |
694 more information, see `Adding Multiple Records with | |
695 PutRecords`_ in the Amazon Kinesis Developer Guide . | |
696 | |
697 The `PutRecords` response includes an array of response | |
698 `Records`. Each record in the response array directly | |
699 correlates with a record in the request array using natural | |
700 ordering, from the top to the bottom of the request and | |
701 response. The response `Records` array always includes the | |
702 same number of records as the request array. | |
703 | |
704 The response `Records` array includes both successfully and | |
705 unsuccessfully processed records. Amazon Kinesis attempts to | |
706 process all records in each `PutRecords` request. A single | |
707 record failure does not stop the processing of subsequent | |
708 records. | |
709 | |
710 A successfully-processed record includes `ShardId` and | |
711 `SequenceNumber` values. The `ShardId` parameter identifies | |
712 the shard in the stream where the record is stored. The | |
713 `SequenceNumber` parameter is an identifier assigned to the | |
714 put record, unique to all records in the stream. | |
715 | |
716 An unsuccessfully-processed record includes `ErrorCode` and | |
717 `ErrorMessage` values. `ErrorCode` reflects the type of error | |
718 and can be one of the following values: | |
719 `ProvisionedThroughputExceededException` or `InternalFailure`. | |
720 `ErrorMessage` provides more detailed information about the | |
721 `ProvisionedThroughputExceededException` exception including | |
722 the account ID, stream name, and shard ID of the record that | |
723 was throttled. | |
724 | |
725 Data records are accessible for only 24 hours from the time | |
726 that they are added to an Amazon Kinesis stream. | |
727 | |
728 :type records: list | |
729 :param records: The records associated with the request. | |
730 | |
731 :type stream_name: string | |
732 :param stream_name: The stream name associated with the request. | |
733 | |
734 :type b64_encode: boolean | |
735 :param b64_encode: Whether to Base64 encode `data`. Can be set to | |
736 ``False`` if `data` is already encoded to prevent double encoding. | |
737 | |
738 """ | |
739 params = {'Records': records, 'StreamName': stream_name, } | |
740 if b64_encode: | |
741 for i in range(len(params['Records'])): | |
742 data = params['Records'][i]['Data'] | |
743 if not isinstance(data, six.binary_type): | |
744 data = data.encode('utf-8') | |
745 params['Records'][i]['Data'] = base64.b64encode( | |
746 data).decode('utf-8') | |
747 return self.make_request(action='PutRecords', | |
748 body=json.dumps(params)) | |
749 | |
750 def remove_tags_from_stream(self, stream_name, tag_keys): | |
751 """ | |
752 Deletes tags from the specified Amazon Kinesis stream. | |
753 | |
754 If you specify a tag that does not exist, it is ignored. | |
755 | |
756 :type stream_name: string | |
757 :param stream_name: The name of the stream. | |
758 | |
759 :type tag_keys: list | |
760 :param tag_keys: A list of tag keys. Each corresponding tag is removed | |
761 from the stream. | |
762 | |
763 """ | |
764 params = {'StreamName': stream_name, 'TagKeys': tag_keys, } | |
765 return self.make_request(action='RemoveTagsFromStream', | |
766 body=json.dumps(params)) | |
767 | |
768 def split_shard(self, stream_name, shard_to_split, new_starting_hash_key): | |
769 """ | |
770 Splits a shard into two new shards in the stream, to increase | |
771 the stream's capacity to ingest and transport data. | |
772 `SplitShard` is called when there is a need to increase the | |
773 overall capacity of stream because of an expected increase in | |
774 the volume of data records being ingested. | |
775 | |
776 You can also use `SplitShard` when a shard appears to be | |
777 approaching its maximum utilization, for example, when the set | |
778 of producers sending data into the specific shard are suddenly | |
779 sending more than previously anticipated. You can also call | |
780 `SplitShard` to increase stream capacity, so that more Amazon | |
781 Kinesis applications can simultaneously read data from the | |
782 stream for real-time processing. | |
783 | |
784 You must specify the shard to be split and the new hash key, | |
785 which is the position in the shard where the shard gets split | |
786 in two. In many cases, the new hash key might simply be the | |
787 average of the beginning and ending hash key, but it can be | |
788 any hash key value in the range being mapped into the shard. | |
789 For more information about splitting shards, see `Split a | |
790 Shard`_ in the Amazon Kinesis Developer Guide . | |
791 | |
792 You can use DescribeStream to determine the shard ID and hash | |
793 key values for the `ShardToSplit` and `NewStartingHashKey` | |
794 parameters that are specified in the `SplitShard` request. | |
795 | |
796 `SplitShard` is an asynchronous operation. Upon receiving a | |
797 `SplitShard` request, Amazon Kinesis immediately returns a | |
798 response and sets the stream status to `UPDATING`. After the | |
799 operation is completed, Amazon Kinesis sets the stream status | |
800 to `ACTIVE`. Read and write operations continue to work while | |
801 the stream is in the `UPDATING` state. | |
802 | |
803 You can use `DescribeStream` to check the status of the | |
804 stream, which is returned in `StreamStatus`. If the stream is | |
805 in the `ACTIVE` state, you can call `SplitShard`. If a stream | |
806 is in `CREATING` or `UPDATING` or `DELETING` states, | |
807 `DescribeStream` returns a `ResourceInUseException`. | |
808 | |
809 If the specified stream does not exist, `DescribeStream` | |
810 returns a `ResourceNotFoundException`. If you try to create | |
811 more shards than are authorized for your account, you receive | |
812 a `LimitExceededException`. | |
813 | |
814 The default limit for an AWS account is 10 shards per stream. | |
815 If you need to create a stream with more than 10 shards, | |
816 `contact AWS Support`_ to increase the limit on your account. | |
817 | |
818 If you try to operate on too many streams in parallel using | |
819 CreateStream, DeleteStream, MergeShards or SplitShard, you | |
820 receive a `LimitExceededException`. | |
821 | |
822 `SplitShard` has limit of 5 transactions per second per | |
823 account. | |
824 | |
825 :type stream_name: string | |
826 :param stream_name: The name of the stream for the shard split. | |
827 | |
828 :type shard_to_split: string | |
829 :param shard_to_split: The shard ID of the shard to split. | |
830 | |
831 :type new_starting_hash_key: string | |
832 :param new_starting_hash_key: A hash key value for the starting hash | |
833 key of one of the child shards created by the split. The hash key | |
834 range for a given shard constitutes a set of ordered contiguous | |
835 positive integers. The value for `NewStartingHashKey` must be in | |
836 the range of hash keys being mapped into the shard. The | |
837 `NewStartingHashKey` hash key value and all higher hash key values | |
838 in hash key range are distributed to one of the child shards. All | |
839 the lower hash key values in the range are distributed to the other | |
840 child shard. | |
841 | |
842 """ | |
843 params = { | |
844 'StreamName': stream_name, | |
845 'ShardToSplit': shard_to_split, | |
846 'NewStartingHashKey': new_starting_hash_key, | |
847 } | |
848 return self.make_request(action='SplitShard', | |
849 body=json.dumps(params)) | |
850 | |
851 def make_request(self, action, body): | |
852 headers = { | |
853 'X-Amz-Target': '%s.%s' % (self.TargetPrefix, action), | |
854 'Host': self.region.endpoint, | |
855 'Content-Type': 'application/x-amz-json-1.1', | |
856 'Content-Length': str(len(body)), | |
857 } | |
858 http_request = self.build_base_http_request( | |
859 method='POST', path='/', auth_path='/', params={}, | |
860 headers=headers, data=body) | |
861 response = self._mexe(http_request, sender=None, | |
862 override_num_retries=10) | |
863 response_body = response.read().decode('utf-8') | |
864 boto.log.debug(response.getheaders()) | |
865 boto.log.debug(response_body) | |
866 if response.status == 200: | |
867 if response_body: | |
868 return json.loads(response_body) | |
869 else: | |
870 json_body = json.loads(response_body) | |
871 fault_name = json_body.get('__type', None) | |
872 exception_class = self._faults.get(fault_name, self.ResponseError) | |
873 raise exception_class(response.status, response.reason, | |
874 body=json_body) | |
875 |