summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Bangert <ben@groovie.org>2017-06-12 11:00:21 -0700
committerBen Bangert <ben@groovie.org>2017-06-12 11:00:21 -0700
commita7b45390f3720a33c9ad3896a8a185bfb2628839 (patch)
tree1b2afbfb03e2ef13fda48fc8fcded35658e7c860
parent1956bab9596895b19b8f2fce9f8415e4c0e9a266 (diff)
downloadkazoo-feat/revert-pr-305.tar.gz
fix(core): revert PR #305 SetWatches which caused RuntimeErrorfeat/revert-pr-305
PR #305 introduced a feature to restore watches on reconnect. Unfortunately this introduced RuntimeError's under various cases, so reverting it is necessary.
-rw-r--r--kazoo/client.py3
-rw-r--r--kazoo/protocol/connection.py22
-rw-r--r--kazoo/protocol/serialization.py23
-rw-r--r--kazoo/tests/test_client.py37
4 files changed, 1 insertions, 84 deletions
diff --git a/kazoo/client.py b/kazoo/client.py
index b03f750..8a4c699 100644
--- a/kazoo/client.py
+++ b/kazoo/client.py
@@ -477,8 +477,7 @@ class KazooClient(object):
self._live.clear()
self._notify_pending(state)
self._make_state_change(KazooState.SUSPENDED)
- if state != KeeperState.CONNECTING:
- self._reset_watchers()
+ self._reset_watchers()
def _notify_pending(self, state):
"""Used to clear a pending response queue and request queue
diff --git a/kazoo/protocol/connection.py b/kazoo/protocol/connection.py
index 9cfb7a2..a63f470 100644
--- a/kazoo/protocol/connection.py
+++ b/kazoo/protocol/connection.py
@@ -26,7 +26,6 @@ from kazoo.protocol.serialization import (
Ping,
PingInstance,
ReplyHeader,
- SetWatches,
Transaction,
Watch,
int_struct
@@ -60,7 +59,6 @@ CHILD_EVENT = 4
WATCH_XID = -1
PING_XID = -2
AUTH_XID = -4
-SET_WATCHES_XID = -8
CLOSE_RESPONSE = Close.type
@@ -410,8 +408,6 @@ class ConnectionHandler(object):
async_object.set(True)
elif header.xid == WATCH_XID:
self._read_watch_event(buffer, offset)
- elif header.xid == SET_WATCHES_XID:
- self.logger.log(BLATHER, 'Received SetWatches reply')
else:
self.logger.log(BLATHER, 'Reading for header %r', header)
@@ -444,8 +440,6 @@ class ConnectionHandler(object):
# Special case for auth packets
if request.type == Auth.type:
xid = AUTH_XID
- elif request.type == SetWatches.type:
- xid = SET_WATCHES_XID
else:
self._xid = (self._xid % 2147483647) + 1
xid = self._xid
@@ -619,11 +613,6 @@ class ConnectionHandler(object):
client._session_id or 0, client._session_passwd,
client.read_only)
- # save the client's last_zxid before it gets overwritten by the
- # server's.
- # we'll need this to reset watches via SetWatches further below.
- last_zxid = client.last_zxid
-
connect_result, zxid = self._invoke(
client._session_timeout / 1000.0, connect)
@@ -663,15 +652,4 @@ class ConnectionHandler(object):
if zxid:
client.last_zxid = zxid
- # TODO: separate exist from data watches
- if client._data_watchers or client._child_watchers.keys():
- sw = SetWatches(last_zxid,
- client._data_watchers.keys(),
- client._data_watchers.keys(),
- client._child_watchers.keys())
- zxid = self._invoke(connect_timeout / 1000.0, sw,
- xid=SET_WATCHES_XID)
- if zxid:
- client.last_zxid = zxid
-
return read_timeout, connect_timeout
diff --git a/kazoo/protocol/serialization.py b/kazoo/protocol/serialization.py
index e3249f7..8a9ef0f 100644
--- a/kazoo/protocol/serialization.py
+++ b/kazoo/protocol/serialization.py
@@ -12,7 +12,6 @@ import six
# Struct objects with formats compiled
bool_struct = struct.Struct('B')
int_struct = struct.Struct('!i')
-long_struct = struct.Struct('!q')
int_int_struct = struct.Struct('!ii')
int_int_long_struct = struct.Struct('!iiq')
@@ -52,14 +51,6 @@ def write_string(bytes):
return int_struct.pack(len(utf8_str)) + utf8_str
-def write_string_vector(v):
- b = bytearray()
- b.extend(int_struct.pack(len(v)))
- for s in v:
- b.extend(write_string(s))
- return b
-
-
def write_buffer(bytes):
if bytes is None:
return int_struct.pack(-1)
@@ -386,20 +377,6 @@ class Auth(namedtuple('Auth', 'auth_type scheme auth')):
write_string(self.auth))
-class SetWatches(
- namedtuple('SetWatches',
- 'relativeZxid, dataWatches, existWatches, childWatches')):
- type = 101
-
- def serialize(self):
- b = bytearray()
- b.extend(long_struct.pack(self.relativeZxid))
- b.extend(write_string_vector(self.dataWatches))
- b.extend(write_string_vector(self.existWatches))
- b.extend(write_string_vector(self.childWatches))
- return b
-
-
class Watch(namedtuple('Watch', 'type state path')):
@classmethod
def deserialize(cls, bytes, offset):
diff --git a/kazoo/tests/test_client.py b/kazoo/tests/test_client.py
index af8ecf5..5bc7999 100644
--- a/kazoo/tests/test_client.py
+++ b/kazoo/tests/test_client.py
@@ -963,43 +963,6 @@ class TestClient(KazooTestCase):
finally:
self.cluster[0].run()
- def test_set_watches_on_reconnect(self):
- client = self.client
- watch_event = client.handler.event_object()
-
- client.create("/tacos")
-
- # set the watch
- def w(we):
- eq_(we.path, "/tacos")
- watch_event.set()
-
- client.get_children("/tacos", watch=w)
-
- # force a reconnect
- states = []
- rc = client.handler.event_object()
-
- @client.add_listener
- def listener(state):
- if state == KazooState.CONNECTED:
- states.append(state)
- rc.set()
-
- client._connection._socket.shutdown(socket.SHUT_RDWR)
-
- rc.wait(10)
- eq_(states, [KazooState.CONNECTED])
-
- # watches should still be there
- self.assertTrue(len(client._child_watchers) == 1)
-
- # ... and they should fire
- client.create("/tacos/hello_", b"", ephemeral=True, sequence=True)
-
- watch_event.wait(1)
- self.assertTrue(watch_event.is_set())
-
dummy_dict = {
'aversion': 1, 'ctime': 0, 'cversion': 1,