summaryrefslogtreecommitdiff
path: root/boto/dynamodb2/table.py
diff options
context:
space:
mode:
authorDaniel Lindsley <daniel@toastdriven.com>2013-09-12 15:42:59 -0700
committerDaniel Lindsley <daniel@toastdriven.com>2013-09-12 15:42:59 -0700
commit7b70eafa9c91dfd5842eb6dfa7b7da0fee8074ff (patch)
tree2576889bebc71596bfd1f723dc08446bae28d703 /boto/dynamodb2/table.py
parentad40af14b6989f940deac74e0276e8e8a8bd3372 (diff)
parent0efd52ea646fe536822dcf9d85bfe51cf4b7950a (diff)
downloadboto-2.13.0.tar.gz
Merge branch 'release-2.13.0'2.13.0
* release-2.13.0: (21 commits) Bumping version to 2.13.0 Updated release notes for everything that's been merged. Added support for modifying reserved instances in EC2. Added ``dry_run`` to. All. The. Things. Fixes #1679, #1566 - Altered DDBv2's ``batch_write`` to appropriately queue & retry unprocessed items. Adding API reference for boto.swf.layer2. Fixes #1709 - Cannot create Launch Configuration with Block Device Mappings Removing duplicate boto.s3.prefix entry to prevent Sphinx build errors. Removed the incomplete copyright notice from the README. More release notes for recent changes. Fixes #1664 - Corrected the behavior of ``dynamodb_load`` when working with sets. document get_all_images magic strings Added release notes for the recent merges. SNS ``publish`` now uses POST. Updated release notes to mention the Opsworks VPC support. Altered SDB to no longer cause errors when building the docs. Added Opsworks to the docs & cleaned up a bunch of Sphinx build errors. Fixed the creation of EC2 VPC instances with public IPs. Added dev release notes back in, as well as a template for future laziness. Updated Opsworks docstrings to latest. ...
Diffstat (limited to 'boto/dynamodb2/table.py')
-rw-r--r--boto/dynamodb2/table.py51
1 files changed, 45 insertions, 6 deletions
diff --git a/boto/dynamodb2/table.py b/boto/dynamodb2/table.py
index d552e4af..5d6803ce 100644
--- a/boto/dynamodb2/table.py
+++ b/boto/dynamodb2/table.py
@@ -1,3 +1,4 @@
+import boto
from boto.dynamodb2 import exceptions
from boto.dynamodb2.fields import (HashKey, RangeKey,
AllIndex, KeysOnlyIndex, IncludeIndex)
@@ -1070,17 +1071,19 @@ class BatchTable(object):
self.table = table
self._to_put = []
self._to_delete = []
+ self._unprocessed = []
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
- if not self._to_put and not self._to_delete:
- return False
+ if self._to_put or self._to_delete:
+ # Flush anything that's left.
+ self.flush()
- # Flush anything that's left.
- self.flush()
- return True
+ if self._unprocessed:
+ # Finally, handle anything that wasn't processed.
+ self.resend_unprocessed()
def put_item(self, data, overwrite=False):
self._to_put.append(data)
@@ -1122,7 +1125,43 @@ class BatchTable(object):
}
})
- self.table.connection.batch_write_item(batch_data)
+ resp = self.table.connection.batch_write_item(batch_data)
+ self.handle_unprocessed(resp)
+
self._to_put = []
self._to_delete = []
return True
+
+ def handle_unprocessed(self, resp):
+ if len(resp.get('UnprocessedItems', [])):
+ table_name = self.table.table_name
+ unprocessed = resp['UnprocessedItems'].get(table_name, [])
+
+ # Some items have not been processed. Stow them for now &
+ # re-attempt processing on ``__exit__``.
+ msg = "%s items were unprocessed. Storing for later."
+ boto.log.info(msg % len(unprocessed))
+ self._unprocessed.extend(unprocessed)
+
+ def resend_unprocessed(self):
+ # If there are unprocessed records (for instance, the user was over
+ # their throughput limitations), iterate over them & send until they're
+ # all there.
+ boto.log.info(
+ "Re-sending %s unprocessed items." % len(self._unprocessed)
+ )
+
+ while len(self._unprocessed):
+ # Again, do 25 at a time.
+ to_resend = self._unprocessed[:25]
+ # Remove them from the list.
+ self._unprocessed = self._unprocessed[25:]
+ batch_data = {
+ self.table.table_name: to_resend
+ }
+ boto.log.info("Sending %s items" % len(to_resend))
+ resp = self.table.connection.batch_write_item(batch_data)
+ self.handle_unprocessed(resp)
+ boto.log.info(
+ "%s unprocessed items left" % len(self._unprocessed)
+ ) \ No newline at end of file