summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Bangert <ben@groovie.org>2017-05-31 14:45:48 -0700
committerGitHub <noreply@github.com>2017-05-31 14:45:48 -0700
commite9815d1967af3f13ca9be61a67ce6070039fc950 (patch)
treeb59a0f0b323297960d42ad20e1c1ae95027f6425
parent4e0cad0481c94b4516426492dfe00ba99bdcb1e6 (diff)
parentf4d7ebcec070e9dd109ab9a156363bdf24909992 (diff)
downloadkazoo-e9815d1967af3f13ca9be61a67ce6070039fc950.tar.gz
Merge pull request #391 from michielbaird/watcher_changes
Ensure pending watches are not dropped when connection is lost
-rw-r--r--kazoo/client.py17
-rw-r--r--kazoo/protocol/states.py6
-rw-r--r--kazoo/recipe/watchers.py3
-rw-r--r--kazoo/tests/test_client.py16
4 files changed, 40 insertions, 2 deletions
diff --git a/kazoo/client.py b/kazoo/client.py
index 07c41a9..f6e985d 100644
--- a/kazoo/client.py
+++ b/kazoo/client.py
@@ -44,8 +44,11 @@ from kazoo.protocol.serialization import (
Sync,
Transaction
)
+from kazoo.protocol.states import Callback
+from kazoo.protocol.states import EventType
from kazoo.protocol.states import KazooState
from kazoo.protocol.states import KeeperState
+from kazoo.protocol.states import WatchedEvent
from kazoo.retry import KazooRetry
from kazoo.security import ACL
from kazoo.security import OPEN_ACL_UNSAFE
@@ -192,7 +195,8 @@ class KazooClient(object):
self._state = KeeperState.CLOSED
self.state = KazooState.LOST
self.state_listeners = set()
-
+ self._child_watchers = defaultdict(set)
+ self._data_watchers = defaultdict(set)
self._reset()
self.read_only = read_only
@@ -309,9 +313,20 @@ class KazooClient(object):
self._protocol_version = None
def _reset_watchers(self):
+ watchers = []
+ for child_watchers in six.itervalues(self._child_watchers):
+ watchers.extend(child_watchers)
+
+ for data_watchers in six.itervalues(self._data_watchers):
+ watchers.extend(data_watchers)
+
self._child_watchers = defaultdict(set)
self._data_watchers = defaultdict(set)
+ ev = WatchedEvent(EventType.NONE, self._state, None)
+ for watch in watchers:
+ self.handler.dispatch_callback(Callback("watch", watch, (ev,)))
+
def _reset_session(self):
self._session_id = None
self._session_passwd = b'\x00' * 16
diff --git a/kazoo/protocol/states.py b/kazoo/protocol/states.py
index 395c013..7eb09d7 100644
--- a/kazoo/protocol/states.py
+++ b/kazoo/protocol/states.py
@@ -93,13 +93,19 @@ class EventType(object):
removed). This event does not indicate the data for a child
node has changed, which must have its own watch established.
+ .. attribute:: NONE
+
+ The connection state has been altered.
+
"""
CREATED = 'CREATED'
DELETED = 'DELETED'
CHANGED = 'CHANGED'
CHILD = 'CHILD'
+ NONE = 'NONE'
EVENT_TYPE_MAP = {
+ -1:EventType.NONE,
1: EventType.CREATED,
2: EventType.DELETED,
3: EventType.CHANGED,
diff --git a/kazoo/recipe/watchers.py b/kazoo/recipe/watchers.py
index 7294eca..83237d4 100644
--- a/kazoo/recipe/watchers.py
+++ b/kazoo/recipe/watchers.py
@@ -345,7 +345,8 @@ class ChildrenWatch(object):
raise
def _watcher(self, event):
- self._get_children(event)
+ if event.type != "NONE":
+ self._get_children(event)
def _session_watcher(self, state):
if state in (KazooState.LOST, KazooState.SUSPENDED):
diff --git a/kazoo/tests/test_client.py b/kazoo/tests/test_client.py
index 1b1fd34..ab18611 100644
--- a/kazoo/tests/test_client.py
+++ b/kazoo/tests/test_client.py
@@ -414,6 +414,22 @@ class TestConnection(KazooTestCase):
client._state = oldstate
client._connection._write_sock = None
+ def test_watch_trigger_expire(self):
+ client = self.client
+ cv = self.make_event()
+
+ client.create("/test", b"")
+
+ def test_watch(event):
+ cv.set()
+
+ client.get("/test/", watch=test_watch)
+ self.expire_session(self.make_event)
+
+
+ cv.wait(3)
+ assert cv.is_set()
+
class TestClient(KazooTestCase):
def _makeOne(self, *args):