summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJulien Danjou <julien@danjou.info>2014-08-04 15:51:14 +0200
committerJulien Danjou <julien@danjou.info>2014-08-07 18:12:51 +0200
commitd38fe0301d991300a29ab09175f110356fe47653 (patch)
tree9c865c49977e7cb4409d77902280fc447c6a878e
parent30588b877a813d1ced820313855dfb9826e2c083 (diff)
downloadtooz-d38fe0301d991300a29ab09175f110356fe47653.tar.gz
Switch to URL for loading backends0.3
This allow to pass options in a single string, which is going to be easier for managing options. Change-Id: I32409c09153b8abaf2b36c31f0bbf658a9d653bc
-rw-r--r--examples/coordinator.py2
-rw-r--r--examples/coordinator_heartbeat.py2
-rw-r--r--examples/group_membership.py2
-rw-r--r--examples/group_membership_watch.py2
-rw-r--r--examples/leader_election.py2
-rw-r--r--requirements.txt2
-rw-r--r--tooz/coordination.py27
-rw-r--r--tooz/drivers/ipc.py4
-rw-r--r--tooz/drivers/memcached.py24
-rw-r--r--tooz/drivers/zookeeper.py40
-rw-r--r--tooz/tests/test_coordination.py81
11 files changed, 80 insertions, 108 deletions
diff --git a/examples/coordinator.py b/examples/coordinator.py
index c6d0792..8850115 100644
--- a/examples/coordinator.py
+++ b/examples/coordinator.py
@@ -1,5 +1,5 @@
from tooz import coordination
-coordinator = coordination.get_coordinator('zookeeper', b'host-1')
+coordinator = coordination.get_coordinator('zookeeper://localhost', b'host-1')
coordinator.start()
coordinator.stop()
diff --git a/examples/coordinator_heartbeat.py b/examples/coordinator_heartbeat.py
index 45539eb..7f8c81b 100644
--- a/examples/coordinator_heartbeat.py
+++ b/examples/coordinator_heartbeat.py
@@ -2,7 +2,7 @@ import time
from tooz import coordination
-coordinator = coordination.get_coordinator('memcached', b'host-1')
+coordinator = coordination.get_coordinator('memcached://localhost', b'host-1')
coordinator.start()
while True:
diff --git a/examples/group_membership.py b/examples/group_membership.py
index a94b5e5..963bbb3 100644
--- a/examples/group_membership.py
+++ b/examples/group_membership.py
@@ -1,6 +1,6 @@
from tooz import coordination
-coordinator = coordination.get_coordinator('zookeeper', b'host-1')
+coordinator = coordination.get_coordinator('zookeeper://localhost', b'host-1')
coordinator.start()
# Create a group
diff --git a/examples/group_membership_watch.py b/examples/group_membership_watch.py
index 4da1963..5204df2 100644
--- a/examples/group_membership_watch.py
+++ b/examples/group_membership_watch.py
@@ -1,6 +1,6 @@
from tooz import coordination
-coordinator = coordination.get_coordinator('zookeeper', b'host-1')
+coordinator = coordination.get_coordinator('zookeeper://localhost', b'host-1')
coordinator.start()
# Create a group
diff --git a/examples/leader_election.py b/examples/leader_election.py
index 2daea4a..ff1d75f 100644
--- a/examples/leader_election.py
+++ b/examples/leader_election.py
@@ -1,6 +1,6 @@
from tooz import coordination
-coordinator = coordination.get_coordinator('zookeeper', b'host-1')
+coordinator = coordination.get_coordinator('zookeeper://localhost', b'host-1')
coordinator.start()
# Create a group
diff --git a/requirements.txt b/requirements.txt
index ef2e5db..dc29e50 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,5 +1,5 @@
pbr>=0.6,!=0.7,<1.0
-babel
+Babel>=1.3
stevedore>=0.14
six>=1.7.0
iso8601
diff --git a/tooz/coordination.py b/tooz/coordination.py
index c17a822..98daabe 100644
--- a/tooz/coordination.py
+++ b/tooz/coordination.py
@@ -20,6 +20,8 @@ import collections
import six
from stevedore import driver
+from tooz.openstack.common import network_utils
+
TOOZ_BACKENDS_NAMESPACE = "tooz.backends"
@@ -166,14 +168,11 @@ class CoordinationDriver(object):
"""
raise NotImplementedError
- def start(self, timeout=10):
+ def start(self):
"""Start the service engine.
If needed, the establishment of a connection to the servers
is initiated.
-
- :param timeout: Time in seconds to wait for connection to succeed.
- :type timeout: int
"""
@staticmethod
@@ -314,23 +313,21 @@ class CoordAsyncResult(object):
"""Returns True if the task is done, False otherwise."""
-# TODO(yassine)
-# Replace kwargs by something more simple.
-def get_coordinator(backend, member_id, **kwargs):
+def get_coordinator(backend_url, member_id):
"""Initialize and load the backend.
- :param backend: the current tooz provided backends are 'zookeeper'
+ :param backend_url: the backend URL to use
:type backend: str
:param member_id: the id of the member
:type member_id: str
- :param kwargs: additional backend specific options
- :type kwargs: dict
"""
- return driver.DriverManager(namespace=TOOZ_BACKENDS_NAMESPACE,
- name=backend,
- invoke_on_load=True,
- invoke_args=(member_id,),
- invoke_kwds=kwargs).driver
+ parsed_url = network_utils.urlsplit(backend_url)
+ parsed_qs = six.moves.urllib.parse.parse_qs(parsed_url.query)
+ return driver.DriverManager(
+ namespace=TOOZ_BACKENDS_NAMESPACE,
+ name=parsed_url.scheme,
+ invoke_on_load=True,
+ invoke_args=(member_id, parsed_url, parsed_qs)).driver
class ToozError(Exception):
diff --git a/tooz/drivers/ipc.py b/tooz/drivers/ipc.py
index 748dac2..50cbbda 100644
--- a/tooz/drivers/ipc.py
+++ b/tooz/drivers/ipc.py
@@ -44,7 +44,7 @@ class IPCLock(locking.Lock):
class IPCDriver(coordination.CoordinationDriver):
- def __init__(self, member_id, lock_timeout=30):
+ def __init__(self, member_id, parsed_url, options):
"""Initialize the IPC driver.
:param lock_timeout: how many seconds to wait when trying to acquire
@@ -55,7 +55,7 @@ class IPCDriver(coordination.CoordinationDriver):
lock_timeout
"""
super(IPCDriver, self).__init__()
- self.lock_timeout = lock_timeout
+ self.lock_timeout = int(options.get('lock_timeout', ['30'])[-1])
def get_lock(self, name):
return IPCLock(name, self.lock_timeout)
diff --git a/tooz/drivers/memcached.py b/tooz/drivers/memcached.py
index ab41e2a..026bd63 100644
--- a/tooz/drivers/memcached.py
+++ b/tooz/drivers/memcached.py
@@ -79,14 +79,20 @@ class MemcachedDriver(coordination.CoordinationDriver):
_MEMBER_PREFIX = b'_TOOZ_MEMBER_'
_GROUP_LIST_KEY = b'_TOOZ_GROUP_LIST'
- def __init__(self, member_id, membership_timeout=30, lock_timeout=30,
- leader_timeout=30):
+ def __init__(self, member_id, parsed_url, options):
super(MemcachedDriver, self).__init__()
self._member_id = member_id
self._groups = set()
- self.membership_timeout = membership_timeout
- self.lock_timeout = lock_timeout
- self.leader_timeout = leader_timeout
+ self.host = (parsed_url.hostname or "localhost",
+ parsed_url.port or 11211)
+ default_timeout = options.get('timeout', ['30'])
+ self.timeout = int(default_timeout[-1])
+ self.membership_timeout = int(options.get(
+ 'membership_timeout', default_timeout)[-1])
+ self.lock_timeout = int(options.get(
+ 'lock_timeout', default_timeout)[-1])
+ self.leader_timeout = int(options.get(
+ 'leader_timeout', default_timeout)[-1])
@staticmethod
def _msgpack_serializer(key, value):
@@ -102,14 +108,14 @@ class MemcachedDriver(coordination.CoordinationDriver):
return msgpack.loads(value)
raise Exception("Unknown serialization format")
- def start(self, host=("127.0.0.1", 11211), timeout=5):
+ def start(self):
try:
self.client = pymemcache.client.Client(
- host,
+ self.host,
serializer=self._msgpack_serializer,
deserializer=self._msgpack_deserializer,
- timeout=timeout,
- connect_timeout=timeout)
+ timeout=self.timeout,
+ connect_timeout=self.timeout)
except Exception as e:
raise coordination.ToozConnectionError(e)
self._group_members = collections.defaultdict(set)
diff --git a/tooz/drivers/zookeeper.py b/tooz/drivers/zookeeper.py
index 1ba581c..6ada6f7 100644
--- a/tooz/drivers/zookeeper.py
+++ b/tooz/drivers/zookeeper.py
@@ -16,12 +16,14 @@
import collections
import copy
+import threading
from kazoo import client
from kazoo import exceptions
from kazoo.protocol import paths
import six
-from zake import fake_client
+import zake.fake_client
+import zake.fake_storage
from tooz import coordination
from tooz import locking
@@ -43,9 +45,14 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
_TOOZ_NAMESPACE = b"tooz"
- def start(self, timeout=10):
+ def __init__(self, member_id, parsed_url, options):
+ super(BaseZooKeeperDriver, self).__init__()
+ self._member_id = member_id
+ self.timeout = int(options.get('timeout', ['10'])[-1])
+
+ def start(self):
try:
- self._coord.start(timeout=timeout)
+ self._coord.start(timeout=self.timeout)
except self._coord.handler.timeout_exception as e:
raise coordination.ToozConnectionError("operation error: %s" % (e))
@@ -201,20 +208,10 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
class KazooDriver(BaseZooKeeperDriver):
"""The driver using the Kazoo client against real ZooKeeper servers."""
- def __init__(self, member_id, hosts="127.0.0.1:2181", handler=None,
- **kwargs):
- """:param hosts: the list of zookeeper servers in the
- form "ip:port2, ip2:port2".
-
- :param handler: a kazoo async handler to use if provided, if not
- provided the default that kazoo uses internally will be used instead.
- """
-
- if not all((hosts, member_id)):
- raise KeyError("hosts=%r, member_id=%r" % hosts, member_id)
+ def __init__(self, member_id, parsed_url, options):
+ super(KazooDriver, self).__init__(member_id, parsed_url, options)
+ self._coord = client.KazooClient(hosts=parsed_url.netloc)
self._member_id = member_id
- self._coord = client.KazooClient(hosts=hosts, handler=handler)
- super(KazooDriver, self).__init__()
def _watch_group(self, group_id):
get_members_req = self.get_members(group_id)
@@ -361,14 +358,11 @@ class ZakeDriver(BaseZooKeeperDriver):
without the need of real ZooKeeper servers.
"""
- def __init__(self, member_id, storage=None, **kwargs):
- """:param storage: a fake storage object."""
+ fake_storage = zake.fake_storage.FakeStorage(threading.RLock())
- if not all((storage, member_id)):
- raise KeyError("storage=%r, member_id=%r" % storage, member_id)
- self._member_id = member_id
- self._coord = fake_client.FakeClient(storage=storage)
- super(ZakeDriver, self).__init__()
+ def __init__(self, member_id, parsed_url, options):
+ super(ZakeDriver, self).__init__(member_id, parsed_url, options)
+ self._coord = zake.fake_client.FakeClient(storage=self.fake_storage)
@staticmethod
def watch_join_group(group_id, callback):
diff --git a/tooz/tests/test_coordination.py b/tooz/tests/test_coordination.py
index 22dab71..7b119e9 100644
--- a/tooz/tests/test_coordination.py
+++ b/tooz/tests/test_coordination.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
-# Copyright (C) 2013 eNovance Inc. All Rights Reserved.
+# Copyright © 2013-2014 eNovance Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -13,50 +13,34 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-
-import threading
import time
import uuid
import testscenarios
from testtools import testcase
-from zake import fake_storage
import tooz.coordination
from tooz import tests
-# Real ZooKeeper server scenario
-zookeeper_tests = ('zookeeper_tests', {'backend': 'kazoo',
- 'kwargs': {'hosts': '127.0.0.1:2181'}})
-
-# Fake Kazoo client scenario
-fake_storage = fake_storage.FakeStorage(threading.RLock())
-fake_zookeeper_tests = ('fake_zookeeper_tests', {'backend': 'zake',
- 'kwargs': {'storage':
- fake_storage}})
-
class TestAPI(testscenarios.TestWithScenarios,
tests.TestCaseSkipNotImplemented):
scenarios = [
- zookeeper_tests,
- fake_zookeeper_tests,
- ('memcached', {'backend': 'memcached',
- 'kwargs': {'membership_timeout': 5}}),
- ('ipc', {'backend': 'ipc',
- 'kwargs': {'lock_timeout': 2}}),
+ ('zookeeper', {'url': 'kazoo://127.0.0.1:2181?timeout=5'}),
+ ('zake', {'url': 'zake://?timeout=5'}),
+ ('memcached', {'url': 'memcached://?timeout=5'}),
+ ('ipc', {'url': 'ipc://'}),
]
def setUp(self):
super(TestAPI, self).setUp()
self.group_id = self._get_random_uuid()
self.member_id = self._get_random_uuid()
- self._coord = tooz.coordination.get_coordinator(self.backend,
- self.member_id,
- **self.kwargs)
+ self._coord = tooz.coordination.get_coordinator(self.url,
+ self.member_id)
try:
- self._coord.start(timeout=5)
+ self._coord.start()
except tooz.coordination.ToozConnectionError as e:
raise testcase.TestSkipped(str(e))
@@ -98,9 +82,8 @@ class TestAPI(testscenarios.TestWithScenarios,
def test_join_group_with_member_id_already_exists(self):
self._coord.create_group(self.group_id).get()
self._coord.join_group(self.group_id).get()
- client = tooz.coordination.get_coordinator(self.backend,
- self.member_id,
- **self.kwargs)
+ client = tooz.coordination.get_coordinator(self.url,
+ self.member_id)
client.start()
join_group = client.join_group(self.group_id)
self.assertRaises(tooz.coordination.MemberAlreadyExist,
@@ -144,9 +127,8 @@ class TestAPI(testscenarios.TestWithScenarios,
def test_get_members(self):
group_id_test2 = self._get_random_uuid()
member_id_test2 = self._get_random_uuid()
- client2 = tooz.coordination.get_coordinator(self.backend,
- member_id_test2,
- **self.kwargs)
+ client2 = tooz.coordination.get_coordinator(self.url,
+ member_id_test2)
client2.start()
self._coord.create_group(group_id_test2).get()
@@ -213,12 +195,11 @@ class TestAPI(testscenarios.TestWithScenarios,
self._coord.heartbeat()
def test_disconnect_leave_group(self):
- if self.backend == 'zake':
+ if self.url.startswith('zake://'):
self.skipTest("Zake has a bug that prevent this test from working")
member_id_test2 = self._get_random_uuid()
- client2 = tooz.coordination.get_coordinator(self.backend,
- member_id_test2,
- **self.kwargs)
+ client2 = tooz.coordination.get_coordinator(self.url,
+ member_id_test2)
client2.start()
self._coord.create_group(self.group_id).get()
self._coord.join_group(self.group_id).get()
@@ -232,12 +213,11 @@ class TestAPI(testscenarios.TestWithScenarios,
self.assertTrue(member_id_test2 not in members_ids)
def test_timeout(self):
- if self.backend != 'memcached':
+ if not self.url.startswith('memcached://'):
self.skipTest("This test only works with memcached for now")
member_id_test2 = self._get_random_uuid()
- client2 = tooz.coordination.get_coordinator(self.backend,
- member_id_test2,
- **self.kwargs)
+ client2 = tooz.coordination.get_coordinator(self.url,
+ member_id_test2)
client2.start()
self._coord.create_group(self.group_id).get()
self._coord.join_group(self.group_id).get()
@@ -258,9 +238,8 @@ class TestAPI(testscenarios.TestWithScenarios,
def test_watch_group_join(self):
member_id_test2 = self._get_random_uuid()
- client2 = tooz.coordination.get_coordinator(self.backend,
- member_id_test2,
- **self.kwargs)
+ client2 = tooz.coordination.get_coordinator(self.url,
+ member_id_test2)
client2.start()
self._coord.create_group(self.group_id).get()
@@ -293,9 +272,8 @@ class TestAPI(testscenarios.TestWithScenarios,
def test_watch_leave_group(self):
member_id_test2 = self._get_random_uuid()
- client2 = tooz.coordination.get_coordinator(self.backend,
- member_id_test2,
- **self.kwargs)
+ client2 = tooz.coordination.get_coordinator(self.url,
+ member_id_test2)
client2.start()
self._coord.create_group(self.group_id).get()
@@ -367,9 +345,8 @@ class TestAPI(testscenarios.TestWithScenarios,
self._coord.run_watchers()
member_id_test2 = self._get_random_uuid()
- client2 = tooz.coordination.get_coordinator(self.backend,
- member_id_test2,
- **self.kwargs)
+ client2 = tooz.coordination.get_coordinator(self.url,
+ member_id_test2)
client2.start()
client2.watch_elected_as_leader(self.group_id, self._set_event)
client2.run_watchers()
@@ -421,9 +398,8 @@ class TestAPI(testscenarios.TestWithScenarios,
self._coord.run_watchers()
member_id_test2 = self._get_random_uuid()
- client2 = tooz.coordination.get_coordinator(self.backend,
- member_id_test2,
- **self.kwargs)
+ client2 = tooz.coordination.get_coordinator(self.url,
+ member_id_test2)
client2.start()
client2.watch_elected_as_leader(self.group_id, self._set_event)
client2.run_watchers()
@@ -468,9 +444,8 @@ class TestAPI(testscenarios.TestWithScenarios,
def test_get_lock_multiple_coords(self):
member_id2 = self._get_random_uuid()
- client2 = tooz.coordination.get_coordinator(self.backend,
- member_id2,
- **self.kwargs)
+ client2 = tooz.coordination.get_coordinator(self.url,
+ member_id2)
client2.start()
lock_name = self._get_random_uuid()