summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel G. Taylor <danielgtaylor@gmail.com>2013-12-12 13:28:28 -0800
committerDaniel G. Taylor <danielgtaylor@gmail.com>2013-12-12 13:51:00 -0800
commit3382b4d5fa3dc11a0c1091dfeef98b5251c204f9 (patch)
tree7d06322459ca3951ad8f95471f752f918c6caa02
parent40baa716a667abc4ea037197131aced07b8dc54e (diff)
downloadboto-3382b4d5fa3dc11a0c1091dfeef98b5251c204f9.tar.gz
Update to the latest Kinesis API and modify unit tests based on team feedback.
-rw-r--r--boto/kinesis/layer1.py108
-rw-r--r--tests/integration/kinesis/test_kinesis.py16
2 files changed, 63 insertions, 61 deletions
diff --git a/boto/kinesis/layer1.py b/boto/kinesis/layer1.py
index 681a013f..2f486e99 100644
--- a/boto/kinesis/layer1.py
+++ b/boto/kinesis/layer1.py
@@ -40,11 +40,11 @@ class KinesisConnection(AWSQueryConnection):
Amazon Kinesis is a managed service that scales elastically for
real time processing of streaming big data.
"""
- APIVersion = "2013-11-04"
+ APIVersion = "2013-12-02"
DefaultRegionName = "us-east-1"
DefaultRegionEndpoint = "kinesis.us-east-1.amazonaws.com"
ServiceName = "Kinesis"
- TargetPrefix = "Kinesis_20131104"
+ TargetPrefix = "Kinesis_20131202"
ResponseError = JSONResponseError
_faults = {
@@ -159,8 +159,8 @@ class KinesisConnection(AWSQueryConnection):
DELETING state until Amazon Kinesis completes the deletion.
**Note:** Amazon Kinesis might continue to accept data read
- and write operations, such as PutRecord and GetNextRecords, on
- a stream in the DELETING state until the stream deletion is
+ and write operations, such as PutRecord and GetRecords, on a
+ stream in the DELETING state until the stream deletion is
complete.
When you delete a stream, any shards in that stream are also
@@ -198,9 +198,9 @@ class KinesisConnection(AWSQueryConnection):
You can limit the number of returned shards using the `Limit`
parameter. The number of shards in a stream may be too large
to return from a single call to `DescribeStream`. You can
- detect this by using the `IsMoreDataAvailable` flag in the
- returned output. `IsMoreDataAvailable` is set to `True` when
- there is more data available.
+ detect this by using the `HasMoreShards` flag in the returned
+ output. `HasMoreShards` is set to `True` when there is more
+ data available.
If there are more shards available, you can request more
shards by using the shard ID of the last shard returned by the
@@ -208,7 +208,7 @@ class KinesisConnection(AWSQueryConnection):
parameter in a subsequent request to `DescribeStream`.
`DescribeStream` is a paginated operation.
- `DescribeStream` has limit of 10 transactions per second per
+ `DescribeStream` has a limit of 10 transactions per second per
account.
:type stream_name: string
@@ -230,11 +230,11 @@ class KinesisConnection(AWSQueryConnection):
return self.make_request(action='DescribeStream',
body=json.dumps(params))
- def get_next_records(self, shard_iterator, limit=None, b64_decode=True):
+ def get_records(self, shard_iterator, limit=None, b64_decode=True):
"""
This operation returns one or more data records from a shard.
- A `GetNextRecords` operation request can retrieve up to 10 MB
- of data.
+ A `GetRecords` operation request can retrieve up to 10 MB of
+ data.
You specify a shard iterator for the shard that you want to
read data from in the `ShardIterator` parameter. The shard
@@ -244,34 +244,32 @@ class KinesisConnection(AWSQueryConnection):
a data record in the shard. For more information about the
shard iterator, see GetShardIterator.
- `GetNextRecords` may return a partial result if the response
- size limit is exceeded. You will get an error, but not a
- partial result if the shard's provisioned throughput is
- exceeded, the shard iterator has expired, or an internal
- processing failure has occurred. Clients can request a smaller
- amount of data by specifying a maximum number of returned
- records using the `Limit` parameter. The `Limit` parameter can
- be set to an integer value of up to 10,000. If you set the
- value to an integer greater than 10,000, you will receive
+ `GetRecords` may return a partial result if the response size
+ limit is exceeded. You will get an error, but not a partial
+ result if the shard's provisioned throughput is exceeded, the
+ shard iterator has expired, or an internal processing failure
+ has occurred. Clients can request a smaller amount of data by
+ specifying a maximum number of returned records using the
+ `Limit` parameter. The `Limit` parameter can be set to an
+ integer value of up to 10,000. If you set the value to an
+ integer greater than 10,000, you will receive
`InvalidArgumentException`.
- A new shard iterator is returned by every `GetNextRecords`
- request in `NextShardIterator`, which you use in the
- `ShardIterator` parameter of the next `GetNextRecords`
- request. When you repeatedly read from an Amazon Kinesis
- stream use a GetShardIterator request to get the first shard
- iterator to use in your first `GetNextRecords` request and
- then use the shard iterator returned in `NextShardIterator`
- for subsequent reads.
+ A new shard iterator is returned by every `GetRecords` request
+ in `NextShardIterator`, which you use in the `ShardIterator`
+ parameter of the next `GetRecords` request. When you
+ repeatedly read from an Amazon Kinesis stream use a
+ GetShardIterator request to get the first shard iterator to
+ use in your first `GetRecords` request and then use the shard
+ iterator returned in `NextShardIterator` for subsequent reads.
- `GetNextRecords` can return `null` for the `NextShardIterator`
- to reflect that the shard has been closed and that the
- requested shard iterator would never have returned more data.
+ `GetRecords` can return `null` for the `NextShardIterator` to
+ reflect that the shard has been closed and that the requested
+ shard iterator would never have returned more data.
If no items can be processed because of insufficient
provisioned throughput on the shard involved in the request,
- `GetNextRecords` throws
- `ProvisionedThroughputExceededException`.
+ `GetRecords` throws `ProvisionedThroughputExceededException`.
:type shard_iterator: string
:param shard_iterator: The position in the shard from which you want to
@@ -288,7 +286,8 @@ class KinesisConnection(AWSQueryConnection):
params = {'ShardIterator': shard_iterator, }
if limit is not None:
params['Limit'] = limit
- response = self.make_request(action='GetNextRecords',
+
+ response = self.make_request(action='GetRecords',
body=json.dumps(params))
# Base64 decode the data
@@ -317,7 +316,7 @@ class KinesisConnection(AWSQueryConnection):
AT_SEQUENCE_NUMBER shard iterator type, or right after the
sequence number by using the AFTER_SEQUENCE_NUMBER shard
iterator type, using sequence numbers returned by earlier
- PutRecord, GetNextRecords or DescribeStream requests. You can
+ PutRecord, GetRecords or DescribeStream requests. You can
specify the shard iterator type TRIM_HORIZON in the request to
cause `ShardIterator` to point to the last untrimmed record in
the shard in the system, which is the oldest data record in
@@ -330,12 +329,12 @@ class KinesisConnection(AWSQueryConnection):
When you repeatedly read from an Amazon Kinesis stream use a
GetShardIterator request to get the first shard iterator to to
- use in your first `GetNextRecords` request and then use the
- shard iterator returned by the `GetNextRecords` request in
+ use in your first `GetRecords` request and then use the shard
+ iterator returned by the `GetRecords` request in
`NextShardIterator` for subsequent reads. A new shard iterator
- is returned by every `GetNextRecords` request in
+ is returned by every `GetRecords` request in
`NextShardIterator`, which you use in the `ShardIterator`
- parameter of the next `GetNextRecords` request.
+ parameter of the next `GetRecords` request.
If a `GetShardIterator` request is made too often, you will
receive a `ProvisionedThroughputExceededException`. For more
@@ -402,9 +401,9 @@ class KinesisConnection(AWSQueryConnection):
default limit, which is currently 10.
You can detect if there are more streams available to list by
- using the `IsMoreDataAvailable` flag from the returned output.
- If there are more streams available, you can request more
- streams by using the name of the last stream returned by the
+ using the `HasMoreStreams` flag from the returned output. If
+ there are more streams available, you can request more streams
+ by using the name of the last stream returned by the
`ListStreams` request in the `ExclusiveStartStreamName`
parameter in a subsequent request to `ListStreams`. The group
of stream names returned by the subsequent request is then
@@ -500,6 +499,7 @@ class KinesisConnection(AWSQueryConnection):
def put_record(self, stream_name, data, partition_key,
explicit_hash_key=None,
+ sequence_number_for_ordering=None,
exclusive_minimum_sequence_number=None,
b64_encode=True):
"""
@@ -532,11 +532,11 @@ class KinesisConnection(AWSQueryConnection):
placed and the sequence number that was assigned to the data
record.
- To enforce ordering in how Amazon Kinesis stores and returns
- the relevant records through calls to GetNextRecords, supply
- the sequence number returned by an earlier `PutRecord` request
- as the `ExclusiveMinimumSequenceNumber` in a later `PutRecord`
- request.
+ The `SequenceNumberForOrdering` sets the initial sequence
+ number for the partition key. Later `PutRecord` requests to
+ the same partition key (from the same client) will
+ automatically increase from `SequenceNumberForOrdering`,
+ ensuring strict sequential ordering.
If a `PutRecord` request cannot be processed because of
insufficient provisioned throughput on the shard involved in
@@ -570,10 +570,11 @@ class KinesisConnection(AWSQueryConnection):
the shard the data record is assigned to by overriding the
partition key hash.
- :type exclusive_minimum_sequence_number: string
- :param exclusive_minimum_sequence_number: The sequence number from a
- previous call to `PutRecord` that is used to keep the put data
- record in order with the data record put in the previous call.
+ :type sequence_number_for_ordering: string
+ :param sequence_number_for_ordering: The sequence number to use as the
+ initial number for the partition key. Subsequent calls to
+ `PutRecord` from the same client and for the same partition key
+ will increase from the `SequenceNumberForOrdering` value.
:type b64_encode: boolean
:param b64_encode: Whether to Base64 encode `data`. Can be set to
@@ -587,8 +588,8 @@ class KinesisConnection(AWSQueryConnection):
}
if explicit_hash_key is not None:
params['ExplicitHashKey'] = explicit_hash_key
- if exclusive_minimum_sequence_number is not None:
- params['ExclusiveMinimumSequenceNumber'] = exclusive_minimum_sequence_number
+ if sequence_number_for_ordering is not None:
+ params['SequenceNumberForOrdering'] = sequence_number_for_ordering
if b64_encode:
params['Data'] = base64.b64encode(params['Data'])
return self.make_request(action='PutRecord',
@@ -692,6 +693,7 @@ class KinesisConnection(AWSQueryConnection):
response = self._mexe(http_request, sender=None,
override_num_retries=10)
response_body = response.read()
+ boto.log.debug(response.getheaders())
boto.log.debug(response_body)
if response.status == 200:
if response_body:
diff --git a/tests/integration/kinesis/test_kinesis.py b/tests/integration/kinesis/test_kinesis.py
index 039100b3..3930f2f7 100644
--- a/tests/integration/kinesis/test_kinesis.py
+++ b/tests/integration/kinesis/test_kinesis.py
@@ -52,26 +52,26 @@ class TestKinesis(TestCase):
response = kinesis.describe_stream('test')
if response['StreamDescription']['StreamStatus'] == 'ACTIVE':
+ shard_id = response['StreamDescription']['Shards'][0]['ShardId']
break
else:
raise TimeoutError('Stream is still not active, aborting...')
+ # Get ready to process some data from the stream
+ response = kinesis.get_shard_iterator('test', shard_id, 'TRIM_HORIZON')
+ shard_iterator = response['ShardIterator']
+
# Write some data to the stream
data = 'Some data ...'
response = kinesis.put_record('test', data, data)
- shard_id = response['ShardId']
-
- # Process some data from the stream
- response = kinesis.get_shard_iterator('test', shard_id, 'TRIM_HORIZON')
- shard_iterator = response['ShardIterator']
# Wait for the data to show up
tries = 0
- while tries < 20:
+ while tries < 100:
tries += 1
- time.sleep(5)
+ time.sleep(1)
- response = kinesis.get_next_records(shard_iterator, limit=5)
+ response = kinesis.get_records(shard_iterator)
shard_iterator = response['NextShardIterator']
if len(response['Records']):