diff options
author | Joshua Harlow <harlowja@gmail.com> | 2015-06-09 10:55:19 -0700 |
---|---|---|
committer | Joshua Harlow <harlowja@gmail.com> | 2015-06-09 10:55:19 -0700 |
commit | 86a184367c3cb99b3364d57f9674c6ae57ea3f27 (patch) | |
tree | 5e9fcc6d5cac1ea87299ed1bc787470cb8818224 | |
parent | 62bdb346993c9aef8bc861b6b4bce62468e8b837 (diff) | |
parent | 8d2db77794d59ce6d9ebab19cf281b8d24093810 (diff) | |
download | kazoo-86a184367c3cb99b3364d57f9674c6ae57ea3f27.tar.gz |
Merge pull request #333 from rgs1/support-reconfig
Add support for reconfig cluster membership operation (issue #234)
-rw-r--r-- | CHANGES.rst | 2 | ||||
-rw-r--r-- | kazoo/client.py | 93 | ||||
-rw-r--r-- | kazoo/exceptions.py | 10 | ||||
-rw-r--r-- | kazoo/protocol/serialization.py | 19 | ||||
-rw-r--r-- | kazoo/tests/test_client.py | 55 |
5 files changed, 179 insertions, 0 deletions
diff --git a/CHANGES.rst b/CHANGES.rst index 1df11e8..c9abb4d 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -7,6 +7,8 @@ Changelog Features ******** +- Issue #234: Add support for reconfig cluster membership operation + Bug Handling ************ diff --git a/kazoo/client.py b/kazoo/client.py index 11f27fd..d706739 100644 --- a/kazoo/client.py +++ b/kazoo/client.py @@ -39,6 +39,7 @@ from kazoo.protocol.serialization import ( GetACL, SetACL, GetData, + Reconfig, SetData, Sync, Transaction @@ -1326,6 +1327,98 @@ class KazooClient(object): except NoNodeError: # pragma: nocover pass + def reconfig(self, joining, leaving, new_members, from_config=-1): + """Reconfig a cluster. + + This call will succeed if the cluster was reconfigured accordingly. + + :param joining: a comma separated list of servers being added + (see example for format) (incremental reconfiguration) + :param leaving: a comma separated list of servers being removed + (see example for format) (incremental reconfiguration) + :param new_members: a comma separated list of new membership + (non-incremental reconfiguration) + :param from_config: version of the current configuration (optional - + causes reconfiguration to throw an exception if + configuration is no longer current) + :type from_config: int + :returns: + Tuple (value, :class:`~kazoo.protocol.states.ZnodeStat`) of + node. + :rtype: tuple + + Basic Example: + + .. code-block:: python + + zk = KazooClient() + zk.start() + + # first add an observer (incremental reconfiguration) + joining = 'server.100=10.0.0.10:2889:3888:observer;0.0.0.0:2181' + data, _ = zk.reconfig( + joining=joining, leaving=None, new_members=None) + + # wait and then remove it (just by using its id) (incremental) + data, _ = zk.reconfig(joining=None, leaving='100', new_members=None) + + # now do a full change of the cluster (non-incremental) + new = [ + 'server.100=10.0.0.10:2889:3888:observer;0.0.0.0:2181', + 'server.100=10.0.0.11:2889:3888:observer;0.0.0.0:2181', + 'server.100=10.0.0.12:2889:3888:observer;0.0.0.0:2181', + ] + data, _ = zk.reconfig( + joining=None, leaving=None, new_members=','.join(new)) + + zk.stop() + + :raises: + :exc:`~kazoo.exceptions.UnimplementedError` if not supported. + + :exc:`~kazoo.exceptions.NewConfigNoQuorumError` if no quorum of new + config is connected and up-to-date with the leader of last + commmitted config - try invoking reconfiguration after new servers + are connected and synced. + + :exc:`~kazoo.exceptions.ReconfigInProcessError` if another + reconfiguration is in progress. + + :exc:`~kazoo.exceptions.BadVersionError` if version doesn't + match. + + :exc:`~kazoo.exceptions.BadArgumentsError` if any of the given + lists of servers has a bad format. + + :exc:`~kazoo.exceptions.ZookeeperError` if the server + returns a non-zero error code. + + """ + result = self.reconfig_async(joining, leaving, new_members, from_config) + return result.get() + + def reconfig_async(self, joining, leaving, new_members, from_config): + """Asynchronously reconfig a cluster. Takes the same arguments as + :meth:`reconfig`. + + :rtype: :class:`~kazoo.interfaces.IAsyncResult` + + """ + if joining and not isinstance(joining, basestring): + raise TypeError("joining must be a string") + if leaving and not isinstance(leaving, basestring): + raise TypeError("leaving must be a string") + if new_members and not isinstance(new_members, basestring): + raise TypeError("new_members must be a string") + if not isinstance(from_config, int): + raise TypeError("from_config must be an int") + + async_result = self.handler.async_result() + reconfig = Reconfig(joining, leaving, new_members, from_config) + self._call(reconfig, async_result) + + return async_result + class TransactionRequest(object): """A Zookeeper Transaction Request diff --git a/kazoo/exceptions.py b/kazoo/exceptions.py index 8d4f0f3..6f32b4f 100644 --- a/kazoo/exceptions.py +++ b/kazoo/exceptions.py @@ -107,6 +107,16 @@ class BadArgumentsError(ZookeeperError): pass +@_zookeeper_exception(-13) +class NewConfigNoQuorumError(ZookeeperError): + pass + + +@_zookeeper_exception(-14) +class ReconfigInProcessError(ZookeeperError): + pass + + @_zookeeper_exception(-100) class APIError(ZookeeperError): pass diff --git a/kazoo/protocol/serialization.py b/kazoo/protocol/serialization.py index f44f49a..88bb9e2 100644 --- a/kazoo/protocol/serialization.py +++ b/kazoo/protocol/serialization.py @@ -10,6 +10,7 @@ from kazoo.security import Id # Struct objects with formats compiled bool_struct = struct.Struct('B') int_struct = struct.Struct('!i') +long_struct = struct.Struct('!q') int_int_struct = struct.Struct('!ii') int_int_long_struct = struct.Struct('!iiq') @@ -352,6 +353,24 @@ class Transaction(namedtuple('Transaction', 'operations')): return resp +class Reconfig(namedtuple('Reconfig', 'joining leaving new_members config_id')): + type = 16 + + def serialize(self): + b = bytearray() + b.extend(write_string(self.joining)) + b.extend(write_string(self.leaving)) + b.extend(write_string(self.new_members)) + b.extend(long_struct.pack(self.config_id)) + return b + + @classmethod + def deserialize(cls, bytes, offset): + data, offset = read_buffer(bytes, offset) + stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset)) + return data, stat + + class Auth(namedtuple('Auth', 'auth_type scheme auth')): type = 100 diff --git a/kazoo/tests/test_client.py b/kazoo/tests/test_client.py index a071e2e..b540e68 100644 --- a/kazoo/tests/test_client.py +++ b/kazoo/tests/test_client.py @@ -15,6 +15,7 @@ from kazoo.testing import KazooTestCase from kazoo.exceptions import ( AuthFailedError, BadArgumentsError, + BadVersionError, ConfigurationError, ConnectionClosedError, ConnectionLoss, @@ -1147,3 +1148,57 @@ class TestNonChrootClient(KazooTestCase): client.chroot = '/a' self.assertEquals(client.unchroot('/a/b'), '/b') self.assertEquals(client.unchroot('/b/c'), '/b/c') + + +class TestReconfig(KazooTestCase): + + def setUp(self): + KazooTestCase.setUp(self) + if TRAVIS_ZK_VERSION: + version = TRAVIS_ZK_VERSION + else: + version = self.client.server_version() + if not version or version < (3, 5): + raise SkipTest("Must use Zookeeper 3.5 or above") + + def test_add_remove_observer(self): + def free_sock_port(): + s = socket.socket() + s.bind(('', 0)) + return s, s.getsockname()[1] + + # get ports for election, zab and client endpoints. we need to use + # ports for which we'd immediately get a RST upon connect(); otherwise + # the cluster could crash if it gets a SocketTimeoutException: + # https://issues.apache.org/jira/browse/ZOOKEEPER-2202 + s1, port1 = free_sock_port() + s2, port2 = free_sock_port() + s3, port3 = free_sock_port() + + joining = 'server.100=0.0.0.0:%d:%d:observer;0.0.0.0:%d' % ( + port1, port2, port3) + data, _ = self.client.reconfig(joining=joining, + leaving=None, + new_members=None) + self.assertIn(joining, data) + + data, _ = self.client.reconfig(joining=None, + leaving='100', + new_members=None) + self.assertNotIn(joining, data) + + # try to add it again, but a config number in the future + curver = int(data.split('\n')[-1].split('=')[1], base=16) + self.assertRaises(BadVersionError, + self.client.reconfig, + joining=joining, + leaving=None, + new_members=None, + from_config=curver + 1) + + def test_bad_input(self): + self.assertRaises(BadArgumentsError, + self.client.reconfig, + joining='some thing', + leaving=None, + new_members=None) |