summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDamien Diederen <dd@crosstwine.com>2020-03-09 18:26:41 +0100
committerGitHub <noreply@github.com>2020-03-09 18:26:41 +0100
commita636d7a6bb88ab9080e902983b10c8f0f1bf60a8 (patch)
treef91cf59ee0e4d7d37cb4e8f2a09fe1de37d3e408
parent933b38b5506d84e05ed6f6c5c8c208a3367400da (diff)
downloadkazoo-a636d7a6bb88ab9080e902983b10c8f0f1bf60a8.tar.gz
fix(core): allow requests to be queued in CONNECTING state (#374) (#588)
With this patch, requests issued while the client is in the 'CONNECTING' state get queued instead of raising a misleading 'SessionExpiredError'. This fixes https://github.com/python-zk/kazoo/issues/374, and brings Kazoo more in line with the Java and C clients. See the 'kazoo.client.KazooClient.state' documentation as well as these discussions for more details: https://github.com/python-zk/kazoo/pull/570#issuecomment-554798550 https://github.com/python-zk/kazoo/pull/583#issuecomment-586422386
-rw-r--r--docs/api/client.rst21
-rw-r--r--kazoo/client.py9
-rw-r--r--kazoo/tests/test_client.py106
3 files changed, 127 insertions, 9 deletions
diff --git a/docs/api/client.rst b/docs/api/client.rst
index 8efe955..51335e6 100644
--- a/docs/api/client.rst
+++ b/docs/api/client.rst
@@ -30,6 +30,27 @@ Public API
A :class:`~kazoo.protocol.states.KazooState` attribute indicating
the current higher-level connection state.
+ .. note::
+
+ Up to version 2.6.1, requests could only be submitted
+ in the CONNECTED state. Requests submitted while
+ SUSPENDED would immediately raise a
+ :exc:`~kazoo.exceptions.SessionExpiredError`. This
+ was problematic, as sessions are usually recovered on
+ reconnect.
+
+ Kazoo now simply queues requests submitted in the
+ SUSPENDED state, expecting a recovery. This matches
+ the behavior of the Java and C clients.
+
+ Requests submitted in a LOST state still fail
+ immediately with the corresponding exception.
+
+ See:
+
+ * https://github.com/python-zk/kazoo/issues/374 and
+ * https://github.com/python-zk/kazoo/pull/570
+
.. autoclass:: TransactionRequest
:members:
:member-order: bysource
diff --git a/kazoo/client.py b/kazoo/client.py
index a129fc5..a5bdae4 100644
--- a/kazoo/client.py
+++ b/kazoo/client.py
@@ -584,11 +584,11 @@ class KazooClient(object):
"and wouldn't close after %s seconds" % timeout)
def _call(self, request, async_object):
- """Ensure there's an active connection and put the request in
- the queue if there is.
+ """Ensure the client is in CONNECTED or SUSPENDED state and put the
+ request in the queue if it is.
Returns False if the call short circuits due to AUTH_FAILED,
- CLOSED, EXPIRED_SESSION or CONNECTING state.
+ CLOSED, or EXPIRED_SESSION state.
"""
@@ -599,8 +599,7 @@ class KazooClient(object):
async_object.set_exception(ConnectionClosedError(
"Connection has been closed"))
return False
- elif self._state in (KeeperState.EXPIRED_SESSION,
- KeeperState.CONNECTING):
+ elif self._state == KeeperState.EXPIRED_SESSION:
async_object.set_exception(SessionExpiredError())
return False
diff --git a/kazoo/tests/test_client.py b/kazoo/tests/test_client.py
index fb6d2d9..32cc0d1 100644
--- a/kazoo/tests/test_client.py
+++ b/kazoo/tests/test_client.py
@@ -8,7 +8,7 @@ import unittest
import mock
from mock import patch
from nose import SkipTest
-from nose.tools import eq_
+from nose.tools import eq_, ok_, assert_not_equal
from nose.tools import raises
from kazoo.testing import KazooTestCase
@@ -492,9 +492,6 @@ class TestClient(KazooTestCase):
self.assertRaises(AuthFailedError, client.create,
'/closedpath', b'bar')
- client._state = KeeperState.CONNECTING
- self.assertRaises(SessionExpiredError, client.create,
- '/closedpath', b'bar')
client.stop()
client.close()
@@ -982,6 +979,107 @@ class TestClient(KazooTestCase):
finally:
self.cluster[0].run()
+ # utility for test_request_queuing*
+ def _make_request_queuing_client(self):
+ from kazoo.client import KazooClient
+ server = self.cluster[0]
+ handler = self._makeOne()
+ # create a client with only one server in its list, and
+ # infinite retries
+ client = KazooClient(
+ hosts=server.address + self.client.chroot,
+ handler=handler,
+ connection_retry=dict(
+ max_tries=-1,
+ delay=0.1,
+ backoff=1,
+ max_jitter=0.0,
+ sleep_func=handler.sleep_func
+ )
+ )
+
+ return client, server
+
+ # utility for test_request_queuing*
+ def _request_queuing_common(self, client, server, path, expire_session):
+ ev_suspended = client.handler.event_object()
+ ev_connected = client.handler.event_object()
+
+ def listener(state):
+ if state == KazooState.SUSPENDED:
+ ev_suspended.set()
+ elif state == KazooState.CONNECTED:
+ ev_connected.set()
+ client.add_listener(listener)
+
+ # wait for the client to connect
+ client.start()
+
+ try:
+ # force the client to suspend
+ server.stop()
+
+ ev_suspended.wait(5)
+ ok_(ev_suspended.is_set())
+ ev_connected.clear()
+
+ # submit a request, expecting it to be queued
+ result = client.create_async(path)
+ assert_not_equal(len(client._queue), 0)
+ eq_(result.ready(), False)
+ eq_(client.state, KazooState.SUSPENDED)
+
+ # optionally cause a SessionExpiredError to occur by
+ # mangling the first byte of the session password.
+ if expire_session:
+ b0 = b'\x00'
+ if client._session_passwd[0] == 0:
+ b0 = b'\xff'
+ client._session_passwd = b0 + client._session_passwd[1:]
+ finally:
+ server.run()
+
+ # wait for the client to reconnect (either with a recovered
+ # session, or with a new one if expire_session was set)
+ ev_connected.wait(5)
+ ok_(ev_connected.is_set())
+
+ return result
+
+ def test_request_queuing_session_recovered(self):
+ path = "/" + uuid.uuid4().hex
+ client, server = self._make_request_queuing_client()
+
+ try:
+ result = self._request_queuing_common(
+ client=client,
+ server=server,
+ path=path,
+ expire_session=False
+ )
+
+ eq_(result.get(), path)
+ assert_not_equal(client.exists(path), None)
+ finally:
+ client.stop()
+
+ def test_request_queuing_session_expired(self):
+ path = "/" + uuid.uuid4().hex
+ client, server = self._make_request_queuing_client()
+
+ try:
+ result = self._request_queuing_common(
+ client=client,
+ server=server,
+ path=path,
+ expire_session=True
+ )
+
+ eq_(len(client._queue), 0)
+ self.assertRaises(SessionExpiredError, result.get)
+ finally:
+ client.stop()
+
dummy_dict = {
'aversion': 1, 'ctime': 0, 'cversion': 1,