diff options
author | Daniel G. Taylor <danielgtaylor@gmail.com> | 2013-12-12 13:28:28 -0800 |
---|---|---|
committer | Daniel G. Taylor <danielgtaylor@gmail.com> | 2013-12-12 13:51:00 -0800 |
commit | 3382b4d5fa3dc11a0c1091dfeef98b5251c204f9 (patch) | |
tree | 7d06322459ca3951ad8f95471f752f918c6caa02 | |
parent | 40baa716a667abc4ea037197131aced07b8dc54e (diff) | |
download | boto-3382b4d5fa3dc11a0c1091dfeef98b5251c204f9.tar.gz |
Update to the latest Kinesis API and modify unit tests based on team feedback.
-rw-r--r-- | boto/kinesis/layer1.py | 108 | ||||
-rw-r--r-- | tests/integration/kinesis/test_kinesis.py | 16 |
2 files changed, 63 insertions, 61 deletions
diff --git a/boto/kinesis/layer1.py b/boto/kinesis/layer1.py index 681a013f..2f486e99 100644 --- a/boto/kinesis/layer1.py +++ b/boto/kinesis/layer1.py @@ -40,11 +40,11 @@ class KinesisConnection(AWSQueryConnection): Amazon Kinesis is a managed service that scales elastically for real time processing of streaming big data. """ - APIVersion = "2013-11-04" + APIVersion = "2013-12-02" DefaultRegionName = "us-east-1" DefaultRegionEndpoint = "kinesis.us-east-1.amazonaws.com" ServiceName = "Kinesis" - TargetPrefix = "Kinesis_20131104" + TargetPrefix = "Kinesis_20131202" ResponseError = JSONResponseError _faults = { @@ -159,8 +159,8 @@ class KinesisConnection(AWSQueryConnection): DELETING state until Amazon Kinesis completes the deletion. **Note:** Amazon Kinesis might continue to accept data read - and write operations, such as PutRecord and GetNextRecords, on - a stream in the DELETING state until the stream deletion is + and write operations, such as PutRecord and GetRecords, on a + stream in the DELETING state until the stream deletion is complete. When you delete a stream, any shards in that stream are also @@ -198,9 +198,9 @@ class KinesisConnection(AWSQueryConnection): You can limit the number of returned shards using the `Limit` parameter. The number of shards in a stream may be too large to return from a single call to `DescribeStream`. You can - detect this by using the `IsMoreDataAvailable` flag in the - returned output. `IsMoreDataAvailable` is set to `True` when - there is more data available. + detect this by using the `HasMoreShards` flag in the returned + output. `HasMoreShards` is set to `True` when there is more + data available. If there are more shards available, you can request more shards by using the shard ID of the last shard returned by the @@ -208,7 +208,7 @@ class KinesisConnection(AWSQueryConnection): parameter in a subsequent request to `DescribeStream`. `DescribeStream` is a paginated operation. - `DescribeStream` has limit of 10 transactions per second per + `DescribeStream` has a limit of 10 transactions per second per account. :type stream_name: string @@ -230,11 +230,11 @@ class KinesisConnection(AWSQueryConnection): return self.make_request(action='DescribeStream', body=json.dumps(params)) - def get_next_records(self, shard_iterator, limit=None, b64_decode=True): + def get_records(self, shard_iterator, limit=None, b64_decode=True): """ This operation returns one or more data records from a shard. - A `GetNextRecords` operation request can retrieve up to 10 MB - of data. + A `GetRecords` operation request can retrieve up to 10 MB of + data. You specify a shard iterator for the shard that you want to read data from in the `ShardIterator` parameter. The shard @@ -244,34 +244,32 @@ class KinesisConnection(AWSQueryConnection): a data record in the shard. For more information about the shard iterator, see GetShardIterator. - `GetNextRecords` may return a partial result if the response - size limit is exceeded. You will get an error, but not a - partial result if the shard's provisioned throughput is - exceeded, the shard iterator has expired, or an internal - processing failure has occurred. Clients can request a smaller - amount of data by specifying a maximum number of returned - records using the `Limit` parameter. The `Limit` parameter can - be set to an integer value of up to 10,000. If you set the - value to an integer greater than 10,000, you will receive + `GetRecords` may return a partial result if the response size + limit is exceeded. You will get an error, but not a partial + result if the shard's provisioned throughput is exceeded, the + shard iterator has expired, or an internal processing failure + has occurred. Clients can request a smaller amount of data by + specifying a maximum number of returned records using the + `Limit` parameter. The `Limit` parameter can be set to an + integer value of up to 10,000. If you set the value to an + integer greater than 10,000, you will receive `InvalidArgumentException`. - A new shard iterator is returned by every `GetNextRecords` - request in `NextShardIterator`, which you use in the - `ShardIterator` parameter of the next `GetNextRecords` - request. When you repeatedly read from an Amazon Kinesis - stream use a GetShardIterator request to get the first shard - iterator to use in your first `GetNextRecords` request and - then use the shard iterator returned in `NextShardIterator` - for subsequent reads. + A new shard iterator is returned by every `GetRecords` request + in `NextShardIterator`, which you use in the `ShardIterator` + parameter of the next `GetRecords` request. When you + repeatedly read from an Amazon Kinesis stream use a + GetShardIterator request to get the first shard iterator to + use in your first `GetRecords` request and then use the shard + iterator returned in `NextShardIterator` for subsequent reads. - `GetNextRecords` can return `null` for the `NextShardIterator` - to reflect that the shard has been closed and that the - requested shard iterator would never have returned more data. + `GetRecords` can return `null` for the `NextShardIterator` to + reflect that the shard has been closed and that the requested + shard iterator would never have returned more data. If no items can be processed because of insufficient provisioned throughput on the shard involved in the request, - `GetNextRecords` throws - `ProvisionedThroughputExceededException`. + `GetRecords` throws `ProvisionedThroughputExceededException`. :type shard_iterator: string :param shard_iterator: The position in the shard from which you want to @@ -288,7 +286,8 @@ class KinesisConnection(AWSQueryConnection): params = {'ShardIterator': shard_iterator, } if limit is not None: params['Limit'] = limit - response = self.make_request(action='GetNextRecords', + + response = self.make_request(action='GetRecords', body=json.dumps(params)) # Base64 decode the data @@ -317,7 +316,7 @@ class KinesisConnection(AWSQueryConnection): AT_SEQUENCE_NUMBER shard iterator type, or right after the sequence number by using the AFTER_SEQUENCE_NUMBER shard iterator type, using sequence numbers returned by earlier - PutRecord, GetNextRecords or DescribeStream requests. You can + PutRecord, GetRecords or DescribeStream requests. You can specify the shard iterator type TRIM_HORIZON in the request to cause `ShardIterator` to point to the last untrimmed record in the shard in the system, which is the oldest data record in @@ -330,12 +329,12 @@ class KinesisConnection(AWSQueryConnection): When you repeatedly read from an Amazon Kinesis stream use a GetShardIterator request to get the first shard iterator to to - use in your first `GetNextRecords` request and then use the - shard iterator returned by the `GetNextRecords` request in + use in your first `GetRecords` request and then use the shard + iterator returned by the `GetRecords` request in `NextShardIterator` for subsequent reads. A new shard iterator - is returned by every `GetNextRecords` request in + is returned by every `GetRecords` request in `NextShardIterator`, which you use in the `ShardIterator` - parameter of the next `GetNextRecords` request. + parameter of the next `GetRecords` request. If a `GetShardIterator` request is made too often, you will receive a `ProvisionedThroughputExceededException`. For more @@ -402,9 +401,9 @@ class KinesisConnection(AWSQueryConnection): default limit, which is currently 10. You can detect if there are more streams available to list by - using the `IsMoreDataAvailable` flag from the returned output. - If there are more streams available, you can request more - streams by using the name of the last stream returned by the + using the `HasMoreStreams` flag from the returned output. If + there are more streams available, you can request more streams + by using the name of the last stream returned by the `ListStreams` request in the `ExclusiveStartStreamName` parameter in a subsequent request to `ListStreams`. The group of stream names returned by the subsequent request is then @@ -500,6 +499,7 @@ class KinesisConnection(AWSQueryConnection): def put_record(self, stream_name, data, partition_key, explicit_hash_key=None, + sequence_number_for_ordering=None, exclusive_minimum_sequence_number=None, b64_encode=True): """ @@ -532,11 +532,11 @@ class KinesisConnection(AWSQueryConnection): placed and the sequence number that was assigned to the data record. - To enforce ordering in how Amazon Kinesis stores and returns - the relevant records through calls to GetNextRecords, supply - the sequence number returned by an earlier `PutRecord` request - as the `ExclusiveMinimumSequenceNumber` in a later `PutRecord` - request. + The `SequenceNumberForOrdering` sets the initial sequence + number for the partition key. Later `PutRecord` requests to + the same partition key (from the same client) will + automatically increase from `SequenceNumberForOrdering`, + ensuring strict sequential ordering. If a `PutRecord` request cannot be processed because of insufficient provisioned throughput on the shard involved in @@ -570,10 +570,11 @@ class KinesisConnection(AWSQueryConnection): the shard the data record is assigned to by overriding the partition key hash. - :type exclusive_minimum_sequence_number: string - :param exclusive_minimum_sequence_number: The sequence number from a - previous call to `PutRecord` that is used to keep the put data - record in order with the data record put in the previous call. + :type sequence_number_for_ordering: string + :param sequence_number_for_ordering: The sequence number to use as the + initial number for the partition key. Subsequent calls to + `PutRecord` from the same client and for the same partition key + will increase from the `SequenceNumberForOrdering` value. :type b64_encode: boolean :param b64_encode: Whether to Base64 encode `data`. Can be set to @@ -587,8 +588,8 @@ class KinesisConnection(AWSQueryConnection): } if explicit_hash_key is not None: params['ExplicitHashKey'] = explicit_hash_key - if exclusive_minimum_sequence_number is not None: - params['ExclusiveMinimumSequenceNumber'] = exclusive_minimum_sequence_number + if sequence_number_for_ordering is not None: + params['SequenceNumberForOrdering'] = sequence_number_for_ordering if b64_encode: params['Data'] = base64.b64encode(params['Data']) return self.make_request(action='PutRecord', @@ -692,6 +693,7 @@ class KinesisConnection(AWSQueryConnection): response = self._mexe(http_request, sender=None, override_num_retries=10) response_body = response.read() + boto.log.debug(response.getheaders()) boto.log.debug(response_body) if response.status == 200: if response_body: diff --git a/tests/integration/kinesis/test_kinesis.py b/tests/integration/kinesis/test_kinesis.py index 039100b3..3930f2f7 100644 --- a/tests/integration/kinesis/test_kinesis.py +++ b/tests/integration/kinesis/test_kinesis.py @@ -52,26 +52,26 @@ class TestKinesis(TestCase): response = kinesis.describe_stream('test') if response['StreamDescription']['StreamStatus'] == 'ACTIVE': + shard_id = response['StreamDescription']['Shards'][0]['ShardId'] break else: raise TimeoutError('Stream is still not active, aborting...') + # Get ready to process some data from the stream + response = kinesis.get_shard_iterator('test', shard_id, 'TRIM_HORIZON') + shard_iterator = response['ShardIterator'] + # Write some data to the stream data = 'Some data ...' response = kinesis.put_record('test', data, data) - shard_id = response['ShardId'] - - # Process some data from the stream - response = kinesis.get_shard_iterator('test', shard_id, 'TRIM_HORIZON') - shard_iterator = response['ShardIterator'] # Wait for the data to show up tries = 0 - while tries < 20: + while tries < 100: tries += 1 - time.sleep(5) + time.sleep(1) - response = kinesis.get_next_records(shard_iterator, limit=5) + response = kinesis.get_records(shard_iterator) shard_iterator = response['NextShardIterator'] if len(response['Records']): |