summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMitch Garnaat <mitch@garnaat.com>2011-07-08 12:01:18 -0400
committerMitch Garnaat <mitch@garnaat.com>2011-07-08 12:01:18 -0400
commit74ed4ef4482275bef2a6e2df8c0f1265da1a9833 (patch)
tree0872b606e38956a9850d772e6e70d15cbbe07ab4
parentdce2719a9660c36b2faf95b5ac81205b13937064 (diff)
parent425499b616dea116d37d230290680e25e18e8065 (diff)
downloadboto-74ed4ef4482275bef2a6e2df8c0f1265da1a9833.tar.gz
Merge branch 'master' of https://github.com/bwbeach/boto into bwbeach-master
-rw-r--r--boto/connection.py7
-rw-r--r--tests/s3/test_pool.py232
-rw-r--r--tests/sqs/test_connection.py3
3 files changed, 239 insertions, 3 deletions
diff --git a/boto/connection.py b/boto/connection.py
index 8e4c72fe..9212f4ba 100644
--- a/boto/connection.py
+++ b/boto/connection.py
@@ -430,8 +430,11 @@ class AWSAuthConnection(object):
try:
self._pool[self._cached_name(host, is_secure)].put_nowait(connection)
except Queue.Full:
- # gracefully fail in case of pool overflow
- connection.close()
+ # Don't close the connection here, because the caller of
+ # _mexe, which calls this, has not read the response body
+ # yet. We'll just let the connection object close itself
+ # when it is freed.
+ pass
def proxy_ssl(self):
host = '%s:%d' % (self.host, self.port)
diff --git a/tests/s3/test_pool.py b/tests/s3/test_pool.py
new file mode 100644
index 00000000..65772b36
--- /dev/null
+++ b/tests/s3/test_pool.py
@@ -0,0 +1,232 @@
+# Copyright (c) 2011 Brian Beach
+# All rights reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a
+# copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish, dis-
+# tribute, sublicense, and/or sell copies of the Software, and to permit
+# persons to whom the Software is furnished to do so, subject to the fol-
+# lowing conditions:
+#
+# The above copyright notice and this permission notice shall be included
+# in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
+# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+
+"""
+Some multi-threading tests of boto in a greenlet environment.
+"""
+
+import gevent.monkey
+gevent.monkey.patch_all()
+
+import boto
+import time
+import uuid
+
+from StringIO import StringIO
+
+from gevent.greenlet import Greenlet
+
+def put_object(bucket, name):
+ bucket.new_key(name).set_contents_from_string(name)
+
+def get_object(bucket, name):
+ assert bucket.get_key(name).get_contents_as_string() == name
+
+def test_close_connections():
+ """
+ A test that exposes the problem where connections are returned to the
+ connection pool (and closed) before the caller reads the response.
+
+ I couldn't think of a way to test it without greenlets, so this test
+ doesn't run as part of the standard test suite. That way, no more
+ dependencies are added to the test suite.
+ """
+
+ print "Running test_close_connections"
+
+ # Connect to S3
+ s3 = boto.connect_s3()
+
+ # Clean previous tests.
+ for b in s3.get_all_buckets():
+ if b.name.startswith('test-'):
+ for key in b.get_all_keys():
+ key.delete()
+ b.delete()
+
+ # Make a test bucket
+ bucket = s3.create_bucket('test-%d' % int(time.time()))
+
+ # Create 30 threads that each create an object in S3. The number
+ # 30 is chosen because it is larger than the connection pool size
+ # (20).
+ names = [str(uuid.uuid4) for _ in range(30)]
+ threads = [
+ Greenlet.spawn(put_object, bucket, name)
+ for name in names
+ ]
+ for t in threads:
+ t.get()
+
+ # Create 30 threads to read the contents of the new objects. This
+ # is where closing the connection early is a problem, because
+ # there is a response that needs to be read, and it can't be read
+ # if the connection has already been closed.
+ threads = [
+ Greenlet.spawn(get_object, bucket, name)
+ for name in names
+ ]
+ for t in threads:
+ t.get()
+
+# test_reuse_connections needs to read a file that is big enough that
+# one read() call on the socket won't read the whole thing.
+BIG_SIZE = 10000
+
+class WriteAndCount(object):
+
+ """
+ A file-like object that counts the number of characters written.
+ """
+
+ def __init__(self):
+ self.size = 0
+
+ def write(self, data):
+ self.size += len(data)
+ time.sleep(0) # yield to other threads
+
+def read_big_object(bucket, name, count):
+ for _ in range(count):
+ key = bucket.get_key(name)
+ out = WriteAndCount()
+ key.get_contents_to_file(out)
+ if out.size != BIG_SIZE:
+ print out.size, BIG_SIZE
+ assert out.size == BIG_SIZE
+
+class LittleQuerier(object):
+
+ """
+ An object that manages a thread that keeps pulling down small
+ objects from S3 and checking the answers until told to stop.
+ """
+
+ def __init__(self, bucket, small_names):
+ self.running = True
+ self.bucket = bucket
+ self.small_names = small_names
+ self.thread = Greenlet.spawn(self.run)
+
+ def stop(self):
+ self.running = False
+ self.thread.get()
+
+ def run(self):
+ count = 0
+ while self.running:
+ i = count % 4
+ key = self.bucket.get_key(self.small_names[i])
+ expected = str(i)
+ rh = { 'response-content-type' : 'small/' + str(i) }
+ actual = key.get_contents_as_string(response_headers = rh)
+ if expected != actual:
+ print "AHA:", repr(expected), repr(actual)
+ assert expected == actual
+ count += 1
+
+def test_reuse_connections():
+ """
+ This test is an attempt to expose problems because of the fact
+ that boto returns connections to the connection pool before
+ reading the response. The strategy is to start a couple big reads
+ from S3, where it will take time to read the response, and then
+ start other requests that will reuse the same connection from the
+ pool while the big response is still being read.
+
+ The test passes because of an interesting combination of factors.
+ I was expecting that it would fail because two threads would be
+ reading the same connection at the same time. That doesn't happen
+ because httplib catches the problem before it happens and raises
+ an exception.
+
+ Here's the sequence of events:
+
+ - Thread 1: Send a request to read a big S3 object.
+ - Thread 1: Returns connection to pool.
+ - Thread 1: Start reading the body if the response.
+
+ - Thread 2: Get the same connection from the pool.
+ - Thread 2: Send another request on the same connection.
+ - Thread 2: Try to read the response, but
+ HTTPConnection.get_response notices that the
+ previous response isn't done reading yet, and
+ raises a ResponseNotReady exception.
+ - Thread 2: _mexe catches the exception, does not return the
+ connection to the pool, gets a new connection, and
+ retries.
+
+ - Thread 1: Finish reading the body of its response.
+
+ - Server: Gets the second request on the connection, and
+ sends a response. This response is ignored because
+ the connection has been dropped on the client end.
+
+ If you add a print statement in HTTPConnection.get_response at the
+ point where it raises ResponseNotReady, and then run this test,
+ you can see that it's happening.
+ """
+
+ print "Running test_reuse_connections"
+
+ # Connect to S3
+ s3 = boto.connect_s3()
+
+ # Make a test bucket
+ bucket = s3.create_bucket('test-%d' % int(time.time()))
+
+ # Create some small objects in S3.
+ small_names = [str(uuid.uuid4()) for _ in range(4)]
+ for (i, name) in enumerate(small_names):
+ bucket.new_key(name).set_contents_from_string(str(i))
+
+ # Create a big object in S3.
+ big_name = str(uuid.uuid4())
+ contents = "-" * BIG_SIZE
+ bucket.new_key(big_name).set_contents_from_string(contents)
+
+ # Start some threads to read it and check that they are reading
+ # the correct thing. Each thread will read the object 40 times.
+ threads = [
+ Greenlet.spawn(read_big_object, bucket, big_name, 40)
+ for _ in range(5)
+ ]
+
+ # Do some other things that may (incorrectly) re-use the same
+ # connections while the big objects are being read.
+ queriers = [
+ LittleQuerier(bucket, small_names)
+ for _ in range(5)
+ ]
+
+ # Clean up.
+ for t in threads:
+ t.get()
+ for q in queriers:
+ q.stop()
+
+def main():
+ test_close_connections()
+ test_reuse_connections()
+
+if __name__ == '__main__':
+ main()
diff --git a/tests/sqs/test_connection.py b/tests/sqs/test_connection.py
index 8c3e20c6..6996a54a 100644
--- a/tests/sqs/test_connection.py
+++ b/tests/sqs/test_connection.py
@@ -43,7 +43,8 @@ class SQSConnectionTest (unittest.TestCase):
# try illegal name
try:
- queue = c.create_queue('bad_queue_name')
+ queue = c.create_queue('bad*queue*name')
+ self.fail('queue name should have been bad')
except SQSError:
pass