diff options
author | Jiangge Zhang <tonyseek@gmail.com> | 2016-07-29 11:41:29 +0800 |
---|---|---|
committer | Jiangge Zhang <tonyseek@gmail.com> | 2016-07-29 18:19:41 +0800 |
commit | 61a3576a822a72888eb330e32dee02fddf7f2acc (patch) | |
tree | d170ca39132b951ba44698bde5cfb44e46b67718 | |
parent | 1b4bca7cae660c8f9edd5c9693f9159e69bef320 (diff) | |
download | kazoo-61a3576a822a72888eb330e32dee02fddf7f2acc.tar.gz |
Fix the client.add_auth hangs by xids mismatch.
This commit fixes https://github.com/python-zk/kazoo/issues/229 by throwing
the runtime exception into the dequeued async_result. It will end the waiting
of user threading.
But this commit doesn't fix the xids mismatch itself. The unordered xids may
caused by a bug from the ZooKeeper server side, such as the official issues
https://issues.apache.org/jira/browse/ZOOKEEPER-1863 described.
-rw-r--r-- | kazoo/protocol/connection.py | 4 | ||||
-rw-r--r-- | kazoo/tests/test_connection.py | 73 |
2 files changed, 75 insertions, 2 deletions
diff --git a/kazoo/protocol/connection.py b/kazoo/protocol/connection.py index 067a629..1b398b9 100644 --- a/kazoo/protocol/connection.py +++ b/kazoo/protocol/connection.py @@ -338,8 +338,10 @@ class ConnectionHandler(object): if header.zxid and header.zxid > 0: client.last_zxid = header.zxid if header.xid != xid: - raise RuntimeError('xids do not match, expected %r ' + exc = RuntimeError('xids do not match, expected %r ' 'received %r', xid, header.xid) + async_object.set_exception(exc) + raise exc # Determine if its an exists request and a no node error exists_error = (header.err == NoNodeError.code and diff --git a/kazoo/tests/test_connection.py b/kazoo/tests/test_connection.py index 7067945..cda6b6f 100644 --- a/kazoo/tests/test_connection.py +++ b/kazoo/tests/test_connection.py @@ -1,9 +1,10 @@ -from collections import namedtuple +from collections import namedtuple, deque import os import threading import time import uuid import struct +import sys from nose import SkipTest from nose.tools import eq_ @@ -304,3 +305,73 @@ class TestReadOnlyMode(KazooTestCase): client.remove_listener(listen) self.cluster[1].run() self.cluster[2].run() + + +class TestUnorderedXids(KazooTestCase): + + def setUp(self): + super(TestUnorderedXids, self).setUp() + + self.connection = self.client._connection + self.connection_routine = self.connection._connection_routine + + self._pending = self.client._pending + self.client._pending = _naughty_deque() + + def tearDown(self): + self.client._pending = self._pending + super(TestUnorderedXids, self).tearDown() + + def _get_client(self, **kwargs): + # overrides for patching zk_loop + c = KazooTestCase._get_client(self, **kwargs) + self._zk_loop = c._connection.zk_loop + self._zk_loop_errors = [] + c._connection.zk_loop = self._zk_loop_func + return c + + def _zk_loop_func(self, *args, **kwargs): + # patched zk_loop which will catch and collect all RuntimeError + try: + self._zk_loop(*args, **kwargs) + except RuntimeError as e: + self._zk_loop_errors.append(e) + + def test_xids_mismatch(self): + from kazoo.protocol.states import KeeperState + + ev = threading.Event() + error_stack = [] + + @self.client.add_listener + def listen(state): + if self.client.client_state == KeeperState.CLOSED: + ev.set() + + def log_exception(*args): + error_stack.append((args, sys.exc_info())) + + self.connection.logger.exception = log_exception + + ev.clear() + self.assertRaises(RuntimeError, self.client.get_children, '/') + + ev.wait() + eq_(self.client.connected, False) + eq_(self.client.state, 'LOST') + eq_(self.client.client_state, KeeperState.CLOSED) + + args, exc_info = error_stack[-1] + eq_(args, ('Unhandled exception in connection loop',)) + eq_(exc_info[0], RuntimeError) + + self.client.handler.sleep_func(0.2) + assert not self.connection_routine.is_alive() + assert len(self._zk_loop_errors) == 1 + assert self._zk_loop_errors[0] == exc_info[1] + + +class _naughty_deque(deque): + def append(self, s): + request, async_object, xid = s + return deque.append(self, (request, async_object, xid + 1)) # +1s |