diff options
author | Mitch Garnaat <mitch@garnaat.com> | 2011-07-08 12:01:18 -0400 |
---|---|---|
committer | Mitch Garnaat <mitch@garnaat.com> | 2011-07-08 12:01:18 -0400 |
commit | 74ed4ef4482275bef2a6e2df8c0f1265da1a9833 (patch) | |
tree | 0872b606e38956a9850d772e6e70d15cbbe07ab4 | |
parent | dce2719a9660c36b2faf95b5ac81205b13937064 (diff) | |
parent | 425499b616dea116d37d230290680e25e18e8065 (diff) | |
download | boto-74ed4ef4482275bef2a6e2df8c0f1265da1a9833.tar.gz |
Merge branch 'master' of https://github.com/bwbeach/boto into bwbeach-master
-rw-r--r-- | boto/connection.py | 7 | ||||
-rw-r--r-- | tests/s3/test_pool.py | 232 | ||||
-rw-r--r-- | tests/sqs/test_connection.py | 3 |
3 files changed, 239 insertions, 3 deletions
diff --git a/boto/connection.py b/boto/connection.py index 8e4c72fe..9212f4ba 100644 --- a/boto/connection.py +++ b/boto/connection.py @@ -430,8 +430,11 @@ class AWSAuthConnection(object): try: self._pool[self._cached_name(host, is_secure)].put_nowait(connection) except Queue.Full: - # gracefully fail in case of pool overflow - connection.close() + # Don't close the connection here, because the caller of + # _mexe, which calls this, has not read the response body + # yet. We'll just let the connection object close itself + # when it is freed. + pass def proxy_ssl(self): host = '%s:%d' % (self.host, self.port) diff --git a/tests/s3/test_pool.py b/tests/s3/test_pool.py new file mode 100644 index 00000000..65772b36 --- /dev/null +++ b/tests/s3/test_pool.py @@ -0,0 +1,232 @@ +# Copyright (c) 2011 Brian Beach +# All rights reserved. +# +# Permission is hereby granted, free of charge, to any person obtaining a +# copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, dis- +# tribute, sublicense, and/or sell copies of the Software, and to permit +# persons to whom the Software is furnished to do so, subject to the fol- +# lowing conditions: +# +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- +# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. + +""" +Some multi-threading tests of boto in a greenlet environment. +""" + +import gevent.monkey +gevent.monkey.patch_all() + +import boto +import time +import uuid + +from StringIO import StringIO + +from gevent.greenlet import Greenlet + +def put_object(bucket, name): + bucket.new_key(name).set_contents_from_string(name) + +def get_object(bucket, name): + assert bucket.get_key(name).get_contents_as_string() == name + +def test_close_connections(): + """ + A test that exposes the problem where connections are returned to the + connection pool (and closed) before the caller reads the response. + + I couldn't think of a way to test it without greenlets, so this test + doesn't run as part of the standard test suite. That way, no more + dependencies are added to the test suite. + """ + + print "Running test_close_connections" + + # Connect to S3 + s3 = boto.connect_s3() + + # Clean previous tests. + for b in s3.get_all_buckets(): + if b.name.startswith('test-'): + for key in b.get_all_keys(): + key.delete() + b.delete() + + # Make a test bucket + bucket = s3.create_bucket('test-%d' % int(time.time())) + + # Create 30 threads that each create an object in S3. The number + # 30 is chosen because it is larger than the connection pool size + # (20). + names = [str(uuid.uuid4) for _ in range(30)] + threads = [ + Greenlet.spawn(put_object, bucket, name) + for name in names + ] + for t in threads: + t.get() + + # Create 30 threads to read the contents of the new objects. This + # is where closing the connection early is a problem, because + # there is a response that needs to be read, and it can't be read + # if the connection has already been closed. + threads = [ + Greenlet.spawn(get_object, bucket, name) + for name in names + ] + for t in threads: + t.get() + +# test_reuse_connections needs to read a file that is big enough that +# one read() call on the socket won't read the whole thing. +BIG_SIZE = 10000 + +class WriteAndCount(object): + + """ + A file-like object that counts the number of characters written. + """ + + def __init__(self): + self.size = 0 + + def write(self, data): + self.size += len(data) + time.sleep(0) # yield to other threads + +def read_big_object(bucket, name, count): + for _ in range(count): + key = bucket.get_key(name) + out = WriteAndCount() + key.get_contents_to_file(out) + if out.size != BIG_SIZE: + print out.size, BIG_SIZE + assert out.size == BIG_SIZE + +class LittleQuerier(object): + + """ + An object that manages a thread that keeps pulling down small + objects from S3 and checking the answers until told to stop. + """ + + def __init__(self, bucket, small_names): + self.running = True + self.bucket = bucket + self.small_names = small_names + self.thread = Greenlet.spawn(self.run) + + def stop(self): + self.running = False + self.thread.get() + + def run(self): + count = 0 + while self.running: + i = count % 4 + key = self.bucket.get_key(self.small_names[i]) + expected = str(i) + rh = { 'response-content-type' : 'small/' + str(i) } + actual = key.get_contents_as_string(response_headers = rh) + if expected != actual: + print "AHA:", repr(expected), repr(actual) + assert expected == actual + count += 1 + +def test_reuse_connections(): + """ + This test is an attempt to expose problems because of the fact + that boto returns connections to the connection pool before + reading the response. The strategy is to start a couple big reads + from S3, where it will take time to read the response, and then + start other requests that will reuse the same connection from the + pool while the big response is still being read. + + The test passes because of an interesting combination of factors. + I was expecting that it would fail because two threads would be + reading the same connection at the same time. That doesn't happen + because httplib catches the problem before it happens and raises + an exception. + + Here's the sequence of events: + + - Thread 1: Send a request to read a big S3 object. + - Thread 1: Returns connection to pool. + - Thread 1: Start reading the body if the response. + + - Thread 2: Get the same connection from the pool. + - Thread 2: Send another request on the same connection. + - Thread 2: Try to read the response, but + HTTPConnection.get_response notices that the + previous response isn't done reading yet, and + raises a ResponseNotReady exception. + - Thread 2: _mexe catches the exception, does not return the + connection to the pool, gets a new connection, and + retries. + + - Thread 1: Finish reading the body of its response. + + - Server: Gets the second request on the connection, and + sends a response. This response is ignored because + the connection has been dropped on the client end. + + If you add a print statement in HTTPConnection.get_response at the + point where it raises ResponseNotReady, and then run this test, + you can see that it's happening. + """ + + print "Running test_reuse_connections" + + # Connect to S3 + s3 = boto.connect_s3() + + # Make a test bucket + bucket = s3.create_bucket('test-%d' % int(time.time())) + + # Create some small objects in S3. + small_names = [str(uuid.uuid4()) for _ in range(4)] + for (i, name) in enumerate(small_names): + bucket.new_key(name).set_contents_from_string(str(i)) + + # Create a big object in S3. + big_name = str(uuid.uuid4()) + contents = "-" * BIG_SIZE + bucket.new_key(big_name).set_contents_from_string(contents) + + # Start some threads to read it and check that they are reading + # the correct thing. Each thread will read the object 40 times. + threads = [ + Greenlet.spawn(read_big_object, bucket, big_name, 40) + for _ in range(5) + ] + + # Do some other things that may (incorrectly) re-use the same + # connections while the big objects are being read. + queriers = [ + LittleQuerier(bucket, small_names) + for _ in range(5) + ] + + # Clean up. + for t in threads: + t.get() + for q in queriers: + q.stop() + +def main(): + test_close_connections() + test_reuse_connections() + +if __name__ == '__main__': + main() diff --git a/tests/sqs/test_connection.py b/tests/sqs/test_connection.py index 8c3e20c6..6996a54a 100644 --- a/tests/sqs/test_connection.py +++ b/tests/sqs/test_connection.py @@ -43,7 +43,8 @@ class SQSConnectionTest (unittest.TestCase): # try illegal name try: - queue = c.create_queue('bad_queue_name') + queue = c.create_queue('bad*queue*name') + self.fail('queue name should have been bad') except SQSError: pass |