diff options
author | Jyrki Pulliainen <jyrki@spotify.com> | 2013-12-05 09:07:04 +0100 |
---|---|---|
committer | Jyrki Pulliainen <jyrki@spotify.com> | 2013-12-09 12:04:24 +0100 |
commit | ab1006ca51ca5035c23859806feda1342ce55fb4 (patch) | |
tree | fdb52b62fd439908ad4ff7d6b0db1f8af5aaace0 | |
parent | 2a9adc48d5b6db28ee5f3426ba436e7444db96f2 (diff) | |
download | kazoo-ab1006ca51ca5035c23859806feda1342ce55fb4.tar.gz |
create: Make detecting _call short circuit more deterministic
-rw-r--r-- | kazoo/client.py | 30 | ||||
-rw-r--r-- | kazoo/tests/test_client.py | 18 |
2 files changed, 36 insertions, 12 deletions
diff --git a/kazoo/client.py b/kazoo/client.py index b96dbbb..ed7781b 100644 --- a/kazoo/client.py +++ b/kazoo/client.py @@ -469,19 +469,24 @@ class KazooClient(object): def _call(self, request, async_object): """Ensure there's an active connection and put the request in - the queue if there is.""" + the queue if there is. + + Returns False if the call short circuits due to AUTH_FAILED, + CLOSED, EXPIRED_SESSION or CONNECTING state. + + """ if self._state == KeeperState.AUTH_FAILED: async_object.set_exception(AuthFailedError()) - return + return False elif self._state == KeeperState.CLOSED: async_object.set_exception(ConnectionClosedError( "Connection has been closed")) - return + return False elif self._state in (KeeperState.EXPIRED_SESSION, KeeperState.CONNECTING): async_object.set_exception(SessionExpiredError()) - return + return False self._queue.append((request, async_object)) @@ -791,11 +796,6 @@ class KazooClient(object): @capture_exceptions(async_result) def do_create(): result = self._create_async_inner(path, value, acl, flags, trailing=sequence) - if isinstance(result.exception, ConnectionClosedError): - # Closed connection might have set the exception for - # the async_result object. If that is the case, we'll - # raise the exception immediately - raise result.exception result.rawlink(create_completion) @capture_exceptions(async_result) @@ -821,8 +821,16 @@ class KazooClient(object): def _create_async_inner(self, path, value, acl, flags, trailing=False): async_result = self.handler.async_result() - self._call(Create(_prefix_root(self.chroot, path, trailing=trailing), value, acl, flags), - async_result) + call_result = self._call( + Create(_prefix_root(self.chroot, path, trailing=trailing), + value, acl, flags), async_result) + if call_result is False: + # We hit a short-circuit exit on the _call. Because we are + # not using the original async_result here, we bubble the + # exception upwards to the do_create function in + # KazooClient.create so that it gets set on the correct + # async_result object + raise async_result.exception return async_result def ensure_path(self, path, acl=None): diff --git a/kazoo/tests/test_client.py b/kazoo/tests/test_client.py index 0cdeb1b..aba9e60 100644 --- a/kazoo/tests/test_client.py +++ b/kazoo/tests/test_client.py @@ -11,6 +11,7 @@ from nose.tools import raises from kazoo.testing import KazooTestCase from kazoo.exceptions import ( + AuthFailedError, BadArgumentsError, ConfigurationError, ConnectionClosedError, @@ -19,7 +20,9 @@ from kazoo.exceptions import ( NoAuthError, NoNodeError, NodeExistsError, + SessionExpiredError, ) +from kazoo.protocol.states import KeeperState if sys.version_info > (3, ): # pragma: nocover @@ -342,8 +345,21 @@ class TestClient(KazooTestCase): eq_(path, "/1") self.assertTrue(client.exists("/1")) - def test_create_on_closed_connection(self): + def test_create_on_broken_connection(self): client = self.client + client.start() + + client._state = KeeperState.EXPIRED_SESSION + self.assertRaises(SessionExpiredError, client.create, + '/closedpath', b'bar') + + client._state = KeeperState.AUTH_FAILED + self.assertRaises(AuthFailedError, client.create, + '/closedpath', b'bar') + + client._state = KeeperState.CONNECTING + self.assertRaises(SessionExpiredError, client.create, + '/closedpath', b'bar') client.stop() client.close() |