summaryrefslogtreecommitdiff
path: root/boto/dynamodb2/table.py
diff options
context:
space:
mode:
authorDaniel G. Taylor <danielgtaylor@gmail.com>2013-10-09 14:19:40 -0700
committerDaniel G. Taylor <danielgtaylor@gmail.com>2013-10-09 14:19:40 -0700
commit5005178d88ef8e4bbe8c30949ffab46e46a716b0 (patch)
tree9a726460c3d64e438dada4e66b584aa189058d5f /boto/dynamodb2/table.py
parent3c56d13f56d4db34ea59eb526e221c1e07728c98 (diff)
parenta00851636307f971b03e72e1ce812cd3242816f3 (diff)
downloadboto-2.14.0.tar.gz
Merge branch 'release-2.14.0'2.14.0
Diffstat (limited to 'boto/dynamodb2/table.py')
-rw-r--r--boto/dynamodb2/table.py66
1 files changed, 52 insertions, 14 deletions
diff --git a/boto/dynamodb2/table.py b/boto/dynamodb2/table.py
index c1de437d..5d6803ce 100644
--- a/boto/dynamodb2/table.py
+++ b/boto/dynamodb2/table.py
@@ -1,3 +1,4 @@
+import boto
from boto.dynamodb2 import exceptions
from boto.dynamodb2.fields import (HashKey, RangeKey,
AllIndex, KeysOnlyIndex, IncludeIndex)
@@ -57,7 +58,7 @@ class Table(object):
>>> conn = Table('users')
# The full, minimum-extra-calls case.
- >>> from boto.dynamodb2.layer1 import DynamoDBConnection
+ >>> from boto import dynamodb2
>>> users = Table('users', schema=[
... HashKey('username'),
... RangeKey('date_joined', data_type=NUMBER)
@@ -69,11 +70,10 @@ class Table(object):
... RangeKey('date_joined')
... ]),
... ],
- ... connection=DynamoDBConnection(
- ... aws_access_key_id='key',
- ... aws_secret_access_key='key',
- ... region='us-west-2'
- ... ))
+ ... connection=dynamodb2.connect_to_region('us-west-2',
+ ... aws_access_key_id='key',
+ ... aws_secret_access_key='key',
+ ... ))
"""
self.table_name = table_name
@@ -133,7 +133,7 @@ class Table(object):
Example::
- >>> users = Table.create_table('users', schema=[
+ >>> users = Table.create('users', schema=[
... HashKey('username'),
... RangeKey('date_joined', data_type=NUMBER)
... ], throughput={
@@ -611,7 +611,7 @@ class Table(object):
'AttributeValueList': [],
'ComparisonOperator': op,
}
-
+
# Special-case the ``NULL/NOT_NULL`` case.
if field_bits[-1] == 'null':
del lookup['AttributeValueList']
@@ -1071,17 +1071,19 @@ class BatchTable(object):
self.table = table
self._to_put = []
self._to_delete = []
+ self._unprocessed = []
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
- if not self._to_put and not self._to_delete:
- return False
+ if self._to_put or self._to_delete:
+ # Flush anything that's left.
+ self.flush()
- # Flush anything that's left.
- self.flush()
- return True
+ if self._unprocessed:
+ # Finally, handle anything that wasn't processed.
+ self.resend_unprocessed()
def put_item(self, data, overwrite=False):
self._to_put.append(data)
@@ -1123,7 +1125,43 @@ class BatchTable(object):
}
})
- self.table.connection.batch_write_item(batch_data)
+ resp = self.table.connection.batch_write_item(batch_data)
+ self.handle_unprocessed(resp)
+
self._to_put = []
self._to_delete = []
return True
+
+ def handle_unprocessed(self, resp):
+ if len(resp.get('UnprocessedItems', [])):
+ table_name = self.table.table_name
+ unprocessed = resp['UnprocessedItems'].get(table_name, [])
+
+ # Some items have not been processed. Stow them for now &
+ # re-attempt processing on ``__exit__``.
+ msg = "%s items were unprocessed. Storing for later."
+ boto.log.info(msg % len(unprocessed))
+ self._unprocessed.extend(unprocessed)
+
+ def resend_unprocessed(self):
+ # If there are unprocessed records (for instance, the user was over
+ # their throughput limitations), iterate over them & send until they're
+ # all there.
+ boto.log.info(
+ "Re-sending %s unprocessed items." % len(self._unprocessed)
+ )
+
+ while len(self._unprocessed):
+ # Again, do 25 at a time.
+ to_resend = self._unprocessed[:25]
+ # Remove them from the list.
+ self._unprocessed = self._unprocessed[25:]
+ batch_data = {
+ self.table.table_name: to_resend
+ }
+ boto.log.info("Sending %s items" % len(to_resend))
+ resp = self.table.connection.batch_write_item(batch_data)
+ self.handle_unprocessed(resp)
+ boto.log.info(
+ "%s unprocessed items left" % len(self._unprocessed)
+ ) \ No newline at end of file