diff options
Diffstat (limited to 'boto/dynamodb2/table.py')
-rw-r--r-- | boto/dynamodb2/table.py | 51 |
1 files changed, 45 insertions, 6 deletions
diff --git a/boto/dynamodb2/table.py b/boto/dynamodb2/table.py index d552e4af..5d6803ce 100644 --- a/boto/dynamodb2/table.py +++ b/boto/dynamodb2/table.py @@ -1,3 +1,4 @@ +import boto from boto.dynamodb2 import exceptions from boto.dynamodb2.fields import (HashKey, RangeKey, AllIndex, KeysOnlyIndex, IncludeIndex) @@ -1070,17 +1071,19 @@ class BatchTable(object): self.table = table self._to_put = [] self._to_delete = [] + self._unprocessed = [] def __enter__(self): return self def __exit__(self, type, value, traceback): - if not self._to_put and not self._to_delete: - return False + if self._to_put or self._to_delete: + # Flush anything that's left. + self.flush() - # Flush anything that's left. - self.flush() - return True + if self._unprocessed: + # Finally, handle anything that wasn't processed. + self.resend_unprocessed() def put_item(self, data, overwrite=False): self._to_put.append(data) @@ -1122,7 +1125,43 @@ class BatchTable(object): } }) - self.table.connection.batch_write_item(batch_data) + resp = self.table.connection.batch_write_item(batch_data) + self.handle_unprocessed(resp) + self._to_put = [] self._to_delete = [] return True + + def handle_unprocessed(self, resp): + if len(resp.get('UnprocessedItems', [])): + table_name = self.table.table_name + unprocessed = resp['UnprocessedItems'].get(table_name, []) + + # Some items have not been processed. Stow them for now & + # re-attempt processing on ``__exit__``. + msg = "%s items were unprocessed. Storing for later." + boto.log.info(msg % len(unprocessed)) + self._unprocessed.extend(unprocessed) + + def resend_unprocessed(self): + # If there are unprocessed records (for instance, the user was over + # their throughput limitations), iterate over them & send until they're + # all there. + boto.log.info( + "Re-sending %s unprocessed items." % len(self._unprocessed) + ) + + while len(self._unprocessed): + # Again, do 25 at a time. + to_resend = self._unprocessed[:25] + # Remove them from the list. + self._unprocessed = self._unprocessed[25:] + batch_data = { + self.table.table_name: to_resend + } + boto.log.info("Sending %s items" % len(to_resend)) + resp = self.table.connection.batch_write_item(batch_data) + self.handle_unprocessed(resp) + boto.log.info( + "%s unprocessed items left" % len(self._unprocessed) + )
\ No newline at end of file |