diff options
author | Daniel G. Taylor <danielgtaylor@gmail.com> | 2013-10-09 14:19:40 -0700 |
---|---|---|
committer | Daniel G. Taylor <danielgtaylor@gmail.com> | 2013-10-09 14:19:40 -0700 |
commit | 5005178d88ef8e4bbe8c30949ffab46e46a716b0 (patch) | |
tree | 9a726460c3d64e438dada4e66b584aa189058d5f /boto/dynamodb2/table.py | |
parent | 3c56d13f56d4db34ea59eb526e221c1e07728c98 (diff) | |
parent | a00851636307f971b03e72e1ce812cd3242816f3 (diff) | |
download | boto-2.14.0.tar.gz |
Merge branch 'release-2.14.0'2.14.0
Diffstat (limited to 'boto/dynamodb2/table.py')
-rw-r--r-- | boto/dynamodb2/table.py | 66 |
1 files changed, 52 insertions, 14 deletions
diff --git a/boto/dynamodb2/table.py b/boto/dynamodb2/table.py index c1de437d..5d6803ce 100644 --- a/boto/dynamodb2/table.py +++ b/boto/dynamodb2/table.py @@ -1,3 +1,4 @@ +import boto from boto.dynamodb2 import exceptions from boto.dynamodb2.fields import (HashKey, RangeKey, AllIndex, KeysOnlyIndex, IncludeIndex) @@ -57,7 +58,7 @@ class Table(object): >>> conn = Table('users') # The full, minimum-extra-calls case. - >>> from boto.dynamodb2.layer1 import DynamoDBConnection + >>> from boto import dynamodb2 >>> users = Table('users', schema=[ ... HashKey('username'), ... RangeKey('date_joined', data_type=NUMBER) @@ -69,11 +70,10 @@ class Table(object): ... RangeKey('date_joined') ... ]), ... ], - ... connection=DynamoDBConnection( - ... aws_access_key_id='key', - ... aws_secret_access_key='key', - ... region='us-west-2' - ... )) + ... connection=dynamodb2.connect_to_region('us-west-2', + ... aws_access_key_id='key', + ... aws_secret_access_key='key', + ... )) """ self.table_name = table_name @@ -133,7 +133,7 @@ class Table(object): Example:: - >>> users = Table.create_table('users', schema=[ + >>> users = Table.create('users', schema=[ ... HashKey('username'), ... RangeKey('date_joined', data_type=NUMBER) ... ], throughput={ @@ -611,7 +611,7 @@ class Table(object): 'AttributeValueList': [], 'ComparisonOperator': op, } - + # Special-case the ``NULL/NOT_NULL`` case. if field_bits[-1] == 'null': del lookup['AttributeValueList'] @@ -1071,17 +1071,19 @@ class BatchTable(object): self.table = table self._to_put = [] self._to_delete = [] + self._unprocessed = [] def __enter__(self): return self def __exit__(self, type, value, traceback): - if not self._to_put and not self._to_delete: - return False + if self._to_put or self._to_delete: + # Flush anything that's left. + self.flush() - # Flush anything that's left. - self.flush() - return True + if self._unprocessed: + # Finally, handle anything that wasn't processed. + self.resend_unprocessed() def put_item(self, data, overwrite=False): self._to_put.append(data) @@ -1123,7 +1125,43 @@ class BatchTable(object): } }) - self.table.connection.batch_write_item(batch_data) + resp = self.table.connection.batch_write_item(batch_data) + self.handle_unprocessed(resp) + self._to_put = [] self._to_delete = [] return True + + def handle_unprocessed(self, resp): + if len(resp.get('UnprocessedItems', [])): + table_name = self.table.table_name + unprocessed = resp['UnprocessedItems'].get(table_name, []) + + # Some items have not been processed. Stow them for now & + # re-attempt processing on ``__exit__``. + msg = "%s items were unprocessed. Storing for later." + boto.log.info(msg % len(unprocessed)) + self._unprocessed.extend(unprocessed) + + def resend_unprocessed(self): + # If there are unprocessed records (for instance, the user was over + # their throughput limitations), iterate over them & send until they're + # all there. + boto.log.info( + "Re-sending %s unprocessed items." % len(self._unprocessed) + ) + + while len(self._unprocessed): + # Again, do 25 at a time. + to_resend = self._unprocessed[:25] + # Remove them from the list. + self._unprocessed = self._unprocessed[25:] + batch_data = { + self.table.table_name: to_resend + } + boto.log.info("Sending %s items" % len(to_resend)) + resp = self.table.connection.batch_write_item(batch_data) + self.handle_unprocessed(resp) + boto.log.info( + "%s unprocessed items left" % len(self._unprocessed) + )
\ No newline at end of file |