summaryrefslogtreecommitdiff
path: root/boto/dynamodb2/table.py
diff options
context:
space:
mode:
Diffstat (limited to 'boto/dynamodb2/table.py')
-rw-r--r--boto/dynamodb2/table.py51
1 files changed, 45 insertions, 6 deletions
diff --git a/boto/dynamodb2/table.py b/boto/dynamodb2/table.py
index d552e4af..5d6803ce 100644
--- a/boto/dynamodb2/table.py
+++ b/boto/dynamodb2/table.py
@@ -1,3 +1,4 @@
+import boto
from boto.dynamodb2 import exceptions
from boto.dynamodb2.fields import (HashKey, RangeKey,
AllIndex, KeysOnlyIndex, IncludeIndex)
@@ -1070,17 +1071,19 @@ class BatchTable(object):
self.table = table
self._to_put = []
self._to_delete = []
+ self._unprocessed = []
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
- if not self._to_put and not self._to_delete:
- return False
+ if self._to_put or self._to_delete:
+ # Flush anything that's left.
+ self.flush()
- # Flush anything that's left.
- self.flush()
- return True
+ if self._unprocessed:
+ # Finally, handle anything that wasn't processed.
+ self.resend_unprocessed()
def put_item(self, data, overwrite=False):
self._to_put.append(data)
@@ -1122,7 +1125,43 @@ class BatchTable(object):
}
})
- self.table.connection.batch_write_item(batch_data)
+ resp = self.table.connection.batch_write_item(batch_data)
+ self.handle_unprocessed(resp)
+
self._to_put = []
self._to_delete = []
return True
+
+ def handle_unprocessed(self, resp):
+ if len(resp.get('UnprocessedItems', [])):
+ table_name = self.table.table_name
+ unprocessed = resp['UnprocessedItems'].get(table_name, [])
+
+ # Some items have not been processed. Stow them for now &
+ # re-attempt processing on ``__exit__``.
+ msg = "%s items were unprocessed. Storing for later."
+ boto.log.info(msg % len(unprocessed))
+ self._unprocessed.extend(unprocessed)
+
+ def resend_unprocessed(self):
+ # If there are unprocessed records (for instance, the user was over
+ # their throughput limitations), iterate over them & send until they're
+ # all there.
+ boto.log.info(
+ "Re-sending %s unprocessed items." % len(self._unprocessed)
+ )
+
+ while len(self._unprocessed):
+ # Again, do 25 at a time.
+ to_resend = self._unprocessed[:25]
+ # Remove them from the list.
+ self._unprocessed = self._unprocessed[25:]
+ batch_data = {
+ self.table.table_name: to_resend
+ }
+ boto.log.info("Sending %s items" % len(to_resend))
+ resp = self.table.connection.batch_write_item(batch_data)
+ self.handle_unprocessed(resp)
+ boto.log.info(
+ "%s unprocessed items left" % len(self._unprocessed)
+ ) \ No newline at end of file