summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJiangge Zhang <tonyseek@gmail.com>2016-07-29 11:41:29 +0800
committerJiangge Zhang <tonyseek@gmail.com>2016-07-29 18:19:41 +0800
commit61a3576a822a72888eb330e32dee02fddf7f2acc (patch)
treed170ca39132b951ba44698bde5cfb44e46b67718
parent1b4bca7cae660c8f9edd5c9693f9159e69bef320 (diff)
downloadkazoo-61a3576a822a72888eb330e32dee02fddf7f2acc.tar.gz
Fix the client.add_auth hangs by xids mismatch.
This commit fixes https://github.com/python-zk/kazoo/issues/229 by throwing the runtime exception into the dequeued async_result. It will end the waiting of user threading. But this commit doesn't fix the xids mismatch itself. The unordered xids may caused by a bug from the ZooKeeper server side, such as the official issues https://issues.apache.org/jira/browse/ZOOKEEPER-1863 described.
-rw-r--r--kazoo/protocol/connection.py4
-rw-r--r--kazoo/tests/test_connection.py73
2 files changed, 75 insertions, 2 deletions
diff --git a/kazoo/protocol/connection.py b/kazoo/protocol/connection.py
index 067a629..1b398b9 100644
--- a/kazoo/protocol/connection.py
+++ b/kazoo/protocol/connection.py
@@ -338,8 +338,10 @@ class ConnectionHandler(object):
if header.zxid and header.zxid > 0:
client.last_zxid = header.zxid
if header.xid != xid:
- raise RuntimeError('xids do not match, expected %r '
+ exc = RuntimeError('xids do not match, expected %r '
'received %r', xid, header.xid)
+ async_object.set_exception(exc)
+ raise exc
# Determine if its an exists request and a no node error
exists_error = (header.err == NoNodeError.code and
diff --git a/kazoo/tests/test_connection.py b/kazoo/tests/test_connection.py
index 7067945..cda6b6f 100644
--- a/kazoo/tests/test_connection.py
+++ b/kazoo/tests/test_connection.py
@@ -1,9 +1,10 @@
-from collections import namedtuple
+from collections import namedtuple, deque
import os
import threading
import time
import uuid
import struct
+import sys
from nose import SkipTest
from nose.tools import eq_
@@ -304,3 +305,73 @@ class TestReadOnlyMode(KazooTestCase):
client.remove_listener(listen)
self.cluster[1].run()
self.cluster[2].run()
+
+
+class TestUnorderedXids(KazooTestCase):
+
+ def setUp(self):
+ super(TestUnorderedXids, self).setUp()
+
+ self.connection = self.client._connection
+ self.connection_routine = self.connection._connection_routine
+
+ self._pending = self.client._pending
+ self.client._pending = _naughty_deque()
+
+ def tearDown(self):
+ self.client._pending = self._pending
+ super(TestUnorderedXids, self).tearDown()
+
+ def _get_client(self, **kwargs):
+ # overrides for patching zk_loop
+ c = KazooTestCase._get_client(self, **kwargs)
+ self._zk_loop = c._connection.zk_loop
+ self._zk_loop_errors = []
+ c._connection.zk_loop = self._zk_loop_func
+ return c
+
+ def _zk_loop_func(self, *args, **kwargs):
+ # patched zk_loop which will catch and collect all RuntimeError
+ try:
+ self._zk_loop(*args, **kwargs)
+ except RuntimeError as e:
+ self._zk_loop_errors.append(e)
+
+ def test_xids_mismatch(self):
+ from kazoo.protocol.states import KeeperState
+
+ ev = threading.Event()
+ error_stack = []
+
+ @self.client.add_listener
+ def listen(state):
+ if self.client.client_state == KeeperState.CLOSED:
+ ev.set()
+
+ def log_exception(*args):
+ error_stack.append((args, sys.exc_info()))
+
+ self.connection.logger.exception = log_exception
+
+ ev.clear()
+ self.assertRaises(RuntimeError, self.client.get_children, '/')
+
+ ev.wait()
+ eq_(self.client.connected, False)
+ eq_(self.client.state, 'LOST')
+ eq_(self.client.client_state, KeeperState.CLOSED)
+
+ args, exc_info = error_stack[-1]
+ eq_(args, ('Unhandled exception in connection loop',))
+ eq_(exc_info[0], RuntimeError)
+
+ self.client.handler.sleep_func(0.2)
+ assert not self.connection_routine.is_alive()
+ assert len(self._zk_loop_errors) == 1
+ assert self._zk_loop_errors[0] == exc_info[1]
+
+
+class _naughty_deque(deque):
+ def append(self, s):
+ request, async_object, xid = s
+ return deque.append(self, (request, async_object, xid + 1)) # +1s