summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@gmail.com>2015-06-09 10:55:19 -0700
committerJoshua Harlow <harlowja@gmail.com>2015-06-09 10:55:19 -0700
commit86a184367c3cb99b3364d57f9674c6ae57ea3f27 (patch)
tree5e9fcc6d5cac1ea87299ed1bc787470cb8818224
parent62bdb346993c9aef8bc861b6b4bce62468e8b837 (diff)
parent8d2db77794d59ce6d9ebab19cf281b8d24093810 (diff)
downloadkazoo-86a184367c3cb99b3364d57f9674c6ae57ea3f27.tar.gz
Merge pull request #333 from rgs1/support-reconfig
Add support for reconfig cluster membership operation (issue #234)
-rw-r--r--CHANGES.rst2
-rw-r--r--kazoo/client.py93
-rw-r--r--kazoo/exceptions.py10
-rw-r--r--kazoo/protocol/serialization.py19
-rw-r--r--kazoo/tests/test_client.py55
5 files changed, 179 insertions, 0 deletions
diff --git a/CHANGES.rst b/CHANGES.rst
index 1df11e8..c9abb4d 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -7,6 +7,8 @@ Changelog
Features
********
+- Issue #234: Add support for reconfig cluster membership operation
+
Bug Handling
************
diff --git a/kazoo/client.py b/kazoo/client.py
index 11f27fd..d706739 100644
--- a/kazoo/client.py
+++ b/kazoo/client.py
@@ -39,6 +39,7 @@ from kazoo.protocol.serialization import (
GetACL,
SetACL,
GetData,
+ Reconfig,
SetData,
Sync,
Transaction
@@ -1326,6 +1327,98 @@ class KazooClient(object):
except NoNodeError: # pragma: nocover
pass
+ def reconfig(self, joining, leaving, new_members, from_config=-1):
+ """Reconfig a cluster.
+
+ This call will succeed if the cluster was reconfigured accordingly.
+
+ :param joining: a comma separated list of servers being added
+ (see example for format) (incremental reconfiguration)
+ :param leaving: a comma separated list of servers being removed
+ (see example for format) (incremental reconfiguration)
+ :param new_members: a comma separated list of new membership
+ (non-incremental reconfiguration)
+ :param from_config: version of the current configuration (optional -
+ causes reconfiguration to throw an exception if
+ configuration is no longer current)
+ :type from_config: int
+ :returns:
+ Tuple (value, :class:`~kazoo.protocol.states.ZnodeStat`) of
+ node.
+ :rtype: tuple
+
+ Basic Example:
+
+ .. code-block:: python
+
+ zk = KazooClient()
+ zk.start()
+
+ # first add an observer (incremental reconfiguration)
+ joining = 'server.100=10.0.0.10:2889:3888:observer;0.0.0.0:2181'
+ data, _ = zk.reconfig(
+ joining=joining, leaving=None, new_members=None)
+
+ # wait and then remove it (just by using its id) (incremental)
+ data, _ = zk.reconfig(joining=None, leaving='100', new_members=None)
+
+ # now do a full change of the cluster (non-incremental)
+ new = [
+ 'server.100=10.0.0.10:2889:3888:observer;0.0.0.0:2181',
+ 'server.100=10.0.0.11:2889:3888:observer;0.0.0.0:2181',
+ 'server.100=10.0.0.12:2889:3888:observer;0.0.0.0:2181',
+ ]
+ data, _ = zk.reconfig(
+ joining=None, leaving=None, new_members=','.join(new))
+
+ zk.stop()
+
+ :raises:
+ :exc:`~kazoo.exceptions.UnimplementedError` if not supported.
+
+ :exc:`~kazoo.exceptions.NewConfigNoQuorumError` if no quorum of new
+ config is connected and up-to-date with the leader of last
+ commmitted config - try invoking reconfiguration after new servers
+ are connected and synced.
+
+ :exc:`~kazoo.exceptions.ReconfigInProcessError` if another
+ reconfiguration is in progress.
+
+ :exc:`~kazoo.exceptions.BadVersionError` if version doesn't
+ match.
+
+ :exc:`~kazoo.exceptions.BadArgumentsError` if any of the given
+ lists of servers has a bad format.
+
+ :exc:`~kazoo.exceptions.ZookeeperError` if the server
+ returns a non-zero error code.
+
+ """
+ result = self.reconfig_async(joining, leaving, new_members, from_config)
+ return result.get()
+
+ def reconfig_async(self, joining, leaving, new_members, from_config):
+ """Asynchronously reconfig a cluster. Takes the same arguments as
+ :meth:`reconfig`.
+
+ :rtype: :class:`~kazoo.interfaces.IAsyncResult`
+
+ """
+ if joining and not isinstance(joining, basestring):
+ raise TypeError("joining must be a string")
+ if leaving and not isinstance(leaving, basestring):
+ raise TypeError("leaving must be a string")
+ if new_members and not isinstance(new_members, basestring):
+ raise TypeError("new_members must be a string")
+ if not isinstance(from_config, int):
+ raise TypeError("from_config must be an int")
+
+ async_result = self.handler.async_result()
+ reconfig = Reconfig(joining, leaving, new_members, from_config)
+ self._call(reconfig, async_result)
+
+ return async_result
+
class TransactionRequest(object):
"""A Zookeeper Transaction Request
diff --git a/kazoo/exceptions.py b/kazoo/exceptions.py
index 8d4f0f3..6f32b4f 100644
--- a/kazoo/exceptions.py
+++ b/kazoo/exceptions.py
@@ -107,6 +107,16 @@ class BadArgumentsError(ZookeeperError):
pass
+@_zookeeper_exception(-13)
+class NewConfigNoQuorumError(ZookeeperError):
+ pass
+
+
+@_zookeeper_exception(-14)
+class ReconfigInProcessError(ZookeeperError):
+ pass
+
+
@_zookeeper_exception(-100)
class APIError(ZookeeperError):
pass
diff --git a/kazoo/protocol/serialization.py b/kazoo/protocol/serialization.py
index f44f49a..88bb9e2 100644
--- a/kazoo/protocol/serialization.py
+++ b/kazoo/protocol/serialization.py
@@ -10,6 +10,7 @@ from kazoo.security import Id
# Struct objects with formats compiled
bool_struct = struct.Struct('B')
int_struct = struct.Struct('!i')
+long_struct = struct.Struct('!q')
int_int_struct = struct.Struct('!ii')
int_int_long_struct = struct.Struct('!iiq')
@@ -352,6 +353,24 @@ class Transaction(namedtuple('Transaction', 'operations')):
return resp
+class Reconfig(namedtuple('Reconfig', 'joining leaving new_members config_id')):
+ type = 16
+
+ def serialize(self):
+ b = bytearray()
+ b.extend(write_string(self.joining))
+ b.extend(write_string(self.leaving))
+ b.extend(write_string(self.new_members))
+ b.extend(long_struct.pack(self.config_id))
+ return b
+
+ @classmethod
+ def deserialize(cls, bytes, offset):
+ data, offset = read_buffer(bytes, offset)
+ stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
+ return data, stat
+
+
class Auth(namedtuple('Auth', 'auth_type scheme auth')):
type = 100
diff --git a/kazoo/tests/test_client.py b/kazoo/tests/test_client.py
index a071e2e..b540e68 100644
--- a/kazoo/tests/test_client.py
+++ b/kazoo/tests/test_client.py
@@ -15,6 +15,7 @@ from kazoo.testing import KazooTestCase
from kazoo.exceptions import (
AuthFailedError,
BadArgumentsError,
+ BadVersionError,
ConfigurationError,
ConnectionClosedError,
ConnectionLoss,
@@ -1147,3 +1148,57 @@ class TestNonChrootClient(KazooTestCase):
client.chroot = '/a'
self.assertEquals(client.unchroot('/a/b'), '/b')
self.assertEquals(client.unchroot('/b/c'), '/b/c')
+
+
+class TestReconfig(KazooTestCase):
+
+ def setUp(self):
+ KazooTestCase.setUp(self)
+ if TRAVIS_ZK_VERSION:
+ version = TRAVIS_ZK_VERSION
+ else:
+ version = self.client.server_version()
+ if not version or version < (3, 5):
+ raise SkipTest("Must use Zookeeper 3.5 or above")
+
+ def test_add_remove_observer(self):
+ def free_sock_port():
+ s = socket.socket()
+ s.bind(('', 0))
+ return s, s.getsockname()[1]
+
+ # get ports for election, zab and client endpoints. we need to use
+ # ports for which we'd immediately get a RST upon connect(); otherwise
+ # the cluster could crash if it gets a SocketTimeoutException:
+ # https://issues.apache.org/jira/browse/ZOOKEEPER-2202
+ s1, port1 = free_sock_port()
+ s2, port2 = free_sock_port()
+ s3, port3 = free_sock_port()
+
+ joining = 'server.100=0.0.0.0:%d:%d:observer;0.0.0.0:%d' % (
+ port1, port2, port3)
+ data, _ = self.client.reconfig(joining=joining,
+ leaving=None,
+ new_members=None)
+ self.assertIn(joining, data)
+
+ data, _ = self.client.reconfig(joining=None,
+ leaving='100',
+ new_members=None)
+ self.assertNotIn(joining, data)
+
+ # try to add it again, but a config number in the future
+ curver = int(data.split('\n')[-1].split('=')[1], base=16)
+ self.assertRaises(BadVersionError,
+ self.client.reconfig,
+ joining=joining,
+ leaving=None,
+ new_members=None,
+ from_config=curver + 1)
+
+ def test_bad_input(self):
+ self.assertRaises(BadArgumentsError,
+ self.client.reconfig,
+ joining='some thing',
+ leaving=None,
+ new_members=None)