summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJyrki Pulliainen <jyrki@spotify.com>2013-12-05 09:07:04 +0100
committerJyrki Pulliainen <jyrki@spotify.com>2013-12-09 12:04:24 +0100
commitab1006ca51ca5035c23859806feda1342ce55fb4 (patch)
treefdb52b62fd439908ad4ff7d6b0db1f8af5aaace0
parent2a9adc48d5b6db28ee5f3426ba436e7444db96f2 (diff)
downloadkazoo-ab1006ca51ca5035c23859806feda1342ce55fb4.tar.gz
create: Make detecting _call short circuit more deterministic
-rw-r--r--kazoo/client.py30
-rw-r--r--kazoo/tests/test_client.py18
2 files changed, 36 insertions, 12 deletions
diff --git a/kazoo/client.py b/kazoo/client.py
index b96dbbb..ed7781b 100644
--- a/kazoo/client.py
+++ b/kazoo/client.py
@@ -469,19 +469,24 @@ class KazooClient(object):
def _call(self, request, async_object):
"""Ensure there's an active connection and put the request in
- the queue if there is."""
+ the queue if there is.
+
+ Returns False if the call short circuits due to AUTH_FAILED,
+ CLOSED, EXPIRED_SESSION or CONNECTING state.
+
+ """
if self._state == KeeperState.AUTH_FAILED:
async_object.set_exception(AuthFailedError())
- return
+ return False
elif self._state == KeeperState.CLOSED:
async_object.set_exception(ConnectionClosedError(
"Connection has been closed"))
- return
+ return False
elif self._state in (KeeperState.EXPIRED_SESSION,
KeeperState.CONNECTING):
async_object.set_exception(SessionExpiredError())
- return
+ return False
self._queue.append((request, async_object))
@@ -791,11 +796,6 @@ class KazooClient(object):
@capture_exceptions(async_result)
def do_create():
result = self._create_async_inner(path, value, acl, flags, trailing=sequence)
- if isinstance(result.exception, ConnectionClosedError):
- # Closed connection might have set the exception for
- # the async_result object. If that is the case, we'll
- # raise the exception immediately
- raise result.exception
result.rawlink(create_completion)
@capture_exceptions(async_result)
@@ -821,8 +821,16 @@ class KazooClient(object):
def _create_async_inner(self, path, value, acl, flags, trailing=False):
async_result = self.handler.async_result()
- self._call(Create(_prefix_root(self.chroot, path, trailing=trailing), value, acl, flags),
- async_result)
+ call_result = self._call(
+ Create(_prefix_root(self.chroot, path, trailing=trailing),
+ value, acl, flags), async_result)
+ if call_result is False:
+ # We hit a short-circuit exit on the _call. Because we are
+ # not using the original async_result here, we bubble the
+ # exception upwards to the do_create function in
+ # KazooClient.create so that it gets set on the correct
+ # async_result object
+ raise async_result.exception
return async_result
def ensure_path(self, path, acl=None):
diff --git a/kazoo/tests/test_client.py b/kazoo/tests/test_client.py
index 0cdeb1b..aba9e60 100644
--- a/kazoo/tests/test_client.py
+++ b/kazoo/tests/test_client.py
@@ -11,6 +11,7 @@ from nose.tools import raises
from kazoo.testing import KazooTestCase
from kazoo.exceptions import (
+ AuthFailedError,
BadArgumentsError,
ConfigurationError,
ConnectionClosedError,
@@ -19,7 +20,9 @@ from kazoo.exceptions import (
NoAuthError,
NoNodeError,
NodeExistsError,
+ SessionExpiredError,
)
+from kazoo.protocol.states import KeeperState
if sys.version_info > (3, ): # pragma: nocover
@@ -342,8 +345,21 @@ class TestClient(KazooTestCase):
eq_(path, "/1")
self.assertTrue(client.exists("/1"))
- def test_create_on_closed_connection(self):
+ def test_create_on_broken_connection(self):
client = self.client
+ client.start()
+
+ client._state = KeeperState.EXPIRED_SESSION
+ self.assertRaises(SessionExpiredError, client.create,
+ '/closedpath', b'bar')
+
+ client._state = KeeperState.AUTH_FAILED
+ self.assertRaises(AuthFailedError, client.create,
+ '/closedpath', b'bar')
+
+ client._state = KeeperState.CONNECTING
+ self.assertRaises(SessionExpiredError, client.create,
+ '/closedpath', b'bar')
client.stop()
client.close()