diff options
author | Ben Bangert <ben@groovie.org> | 2017-05-31 14:45:48 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-05-31 14:45:48 -0700 |
commit | e9815d1967af3f13ca9be61a67ce6070039fc950 (patch) | |
tree | b59a0f0b323297960d42ad20e1c1ae95027f6425 | |
parent | 4e0cad0481c94b4516426492dfe00ba99bdcb1e6 (diff) | |
parent | f4d7ebcec070e9dd109ab9a156363bdf24909992 (diff) | |
download | kazoo-e9815d1967af3f13ca9be61a67ce6070039fc950.tar.gz |
Merge pull request #391 from michielbaird/watcher_changes
Ensure pending watches are not dropped when connection is lost
-rw-r--r-- | kazoo/client.py | 17 | ||||
-rw-r--r-- | kazoo/protocol/states.py | 6 | ||||
-rw-r--r-- | kazoo/recipe/watchers.py | 3 | ||||
-rw-r--r-- | kazoo/tests/test_client.py | 16 |
4 files changed, 40 insertions, 2 deletions
diff --git a/kazoo/client.py b/kazoo/client.py index 07c41a9..f6e985d 100644 --- a/kazoo/client.py +++ b/kazoo/client.py @@ -44,8 +44,11 @@ from kazoo.protocol.serialization import ( Sync, Transaction ) +from kazoo.protocol.states import Callback +from kazoo.protocol.states import EventType from kazoo.protocol.states import KazooState from kazoo.protocol.states import KeeperState +from kazoo.protocol.states import WatchedEvent from kazoo.retry import KazooRetry from kazoo.security import ACL from kazoo.security import OPEN_ACL_UNSAFE @@ -192,7 +195,8 @@ class KazooClient(object): self._state = KeeperState.CLOSED self.state = KazooState.LOST self.state_listeners = set() - + self._child_watchers = defaultdict(set) + self._data_watchers = defaultdict(set) self._reset() self.read_only = read_only @@ -309,9 +313,20 @@ class KazooClient(object): self._protocol_version = None def _reset_watchers(self): + watchers = [] + for child_watchers in six.itervalues(self._child_watchers): + watchers.extend(child_watchers) + + for data_watchers in six.itervalues(self._data_watchers): + watchers.extend(data_watchers) + self._child_watchers = defaultdict(set) self._data_watchers = defaultdict(set) + ev = WatchedEvent(EventType.NONE, self._state, None) + for watch in watchers: + self.handler.dispatch_callback(Callback("watch", watch, (ev,))) + def _reset_session(self): self._session_id = None self._session_passwd = b'\x00' * 16 diff --git a/kazoo/protocol/states.py b/kazoo/protocol/states.py index 395c013..7eb09d7 100644 --- a/kazoo/protocol/states.py +++ b/kazoo/protocol/states.py @@ -93,13 +93,19 @@ class EventType(object): removed). This event does not indicate the data for a child node has changed, which must have its own watch established. + .. attribute:: NONE + + The connection state has been altered. + """ CREATED = 'CREATED' DELETED = 'DELETED' CHANGED = 'CHANGED' CHILD = 'CHILD' + NONE = 'NONE' EVENT_TYPE_MAP = { + -1:EventType.NONE, 1: EventType.CREATED, 2: EventType.DELETED, 3: EventType.CHANGED, diff --git a/kazoo/recipe/watchers.py b/kazoo/recipe/watchers.py index 7294eca..83237d4 100644 --- a/kazoo/recipe/watchers.py +++ b/kazoo/recipe/watchers.py @@ -345,7 +345,8 @@ class ChildrenWatch(object): raise def _watcher(self, event): - self._get_children(event) + if event.type != "NONE": + self._get_children(event) def _session_watcher(self, state): if state in (KazooState.LOST, KazooState.SUSPENDED): diff --git a/kazoo/tests/test_client.py b/kazoo/tests/test_client.py index 1b1fd34..ab18611 100644 --- a/kazoo/tests/test_client.py +++ b/kazoo/tests/test_client.py @@ -414,6 +414,22 @@ class TestConnection(KazooTestCase): client._state = oldstate client._connection._write_sock = None + def test_watch_trigger_expire(self): + client = self.client + cv = self.make_event() + + client.create("/test", b"") + + def test_watch(event): + cv.set() + + client.get("/test/", watch=test_watch) + self.expire_session(self.make_event) + + + cv.wait(3) + assert cv.is_set() + class TestClient(KazooTestCase): def _makeOne(self, *args): |