diff options
author | Jenkins <jenkins@review.openstack.org> | 2014-12-26 10:45:50 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2014-12-26 10:45:50 +0000 |
commit | 7debf06f04f22c70aef523aa247a87ee8ed22708 (patch) | |
tree | 5d6cd1b2dc7464e6329e3922f4d8b5b86418b74f | |
parent | 74a85505c4688b5bf055c0d8f07955d632e3c3d9 (diff) | |
parent | 1ac3e8314bd772dc1eb84f1f5bc6f9e1b7fc9ce3 (diff) | |
download | tooz-7debf06f04f22c70aef523aa247a87ee8ed22708.tar.gz |
Merge "Add create/join/leave group support in IPC driver"
-rw-r--r-- | doc/source/drivers.rst | 5 | ||||
-rw-r--r-- | tooz/drivers/ipc.py | 119 |
2 files changed, 108 insertions, 16 deletions
diff --git a/doc/source/drivers.rst b/doc/source/drivers.rst index 4c6a560..5b828c6 100644 --- a/doc/source/drivers.rst +++ b/doc/source/drivers.rst @@ -19,8 +19,9 @@ API, some of them have different properties: A lot of the features provided in tooz are based on timeout (heartbeats, locks, etc) so are less resilient than other backends. -* `ipc` is based on Posix IPC and only implements a lock mechanism for now. - The lock can only be distributed locally to a computer processes. +* `ipc` is based on Posix IPC and only implements a lock mechanism for now, and + some basic group primitives (with huge limitations). The lock can only be + distributed locally to a computer processes. * `zake`_ is a driver using a fake implementation of ZooKeeper and can be used to use Tooz in your unit tests suite for example. diff --git a/tooz/drivers/ipc.py b/tooz/drivers/ipc.py index 21d8647..dfd27e3 100644 --- a/tooz/drivers/ipc.py +++ b/tooz/drivers/ipc.py @@ -17,14 +17,18 @@ # under the License. import hashlib +import struct import time +from concurrent import futures +import msgpack import six import sysv_ipc import tooz from tooz import coordination from tooz import locking +from tooz import utils if sysv_ipc.KEY_MIN <= 0: _KEY_RANGE = abs(sysv_ipc.KEY_MIN) + sysv_ipc.KEY_MAX @@ -32,6 +36,19 @@ else: _KEY_RANGE = sysv_ipc.KEY_MAX - sysv_ipc.KEY_MIN +def ftok(name, project): + # Similar to ftok & http://semanchuk.com/philip/sysv_ipc/#ftok_weakness + # but hopefully without as many weaknesses... + h = hashlib.md5() + if not isinstance(project, six.binary_type): + project = project.encode('ascii') + h.update(project) + if not isinstance(name, six.binary_type): + name = name.encode('ascii') + h.update(name) + return (int(h.hexdigest(), 16) % _KEY_RANGE) + sysv_ipc.KEY_MIN + + class IPCLock(locking.Lock): """A sysv IPC based lock. @@ -45,22 +62,9 @@ class IPCLock(locking.Lock): def __init__(self, name): super(IPCLock, self).__init__(name) - self.key = self.ftok(name, self._LOCK_PROJECT) + self.key = ftok(name, self._LOCK_PROJECT) self._lock = None - @staticmethod - def ftok(name, project): - # Similar to ftok & http://semanchuk.com/philip/sysv_ipc/#ftok_weakness - # but hopefully without as many weaknesses... - h = hashlib.md5() - if not isinstance(project, six.binary_type): - project = project.encode('ascii') - h.update(project) - if not isinstance(name, six.binary_type): - name = name.encode('ascii') - h.update(name) - return (int(h.hexdigest(), 16) % _KEY_RANGE) + sysv_ipc.KEY_MIN - def acquire(self, blocking=True): if (blocking is not True and sysv_ipc.SEMAPHORE_TIMEOUT_SUPPORTED is False): @@ -122,10 +126,82 @@ class IPCLock(locking.Lock): class IPCDriver(coordination.CoordinationDriver): """A IPC based driver.""" + _SEGMENT_SIZE = 1024 + _GROUP_LIST_KEY = "GROUP_LIST" + _GROUP_PROJECT = "_TOOZ_INTERNAL" + _INTERNAL_LOCK_NAME = "TOOZ_INTERNAL_LOCK" + def __init__(self, member_id, parsed_url, options): """Initialize the IPC driver.""" super(IPCDriver, self).__init__() + def _start(self): + try: + self._group_list = sysv_ipc.SharedMemory( + ftok(self._GROUP_LIST_KEY, self._GROUP_PROJECT), + sysv_ipc.IPC_CREAT, + size=self._SEGMENT_SIZE) + except sysv_ipc.ExistentialError: + raise + self._lock = self.get_lock(self._INTERNAL_LOCK_NAME) + self._executor = futures.ThreadPoolExecutor(max_workers=1) + + def _stop(self): + self._executor.shutdown(wait=True) + + def __del__(self): + if hasattr(self, "_group_list"): + try: + self._group_list.detach() + self._group_list.remove() + except sysv_ipc.ExistentialError: + pass + + def _read_group_list(self): + data = self._group_list.read(byte_count=2) + length = struct.unpack("H", data)[0] + if length == 0: + return set() + data = self._group_list.read(byte_count=length, offset=2) + return set(msgpack.loads(data)) + + def _write_group_list(self, group_list): + data = msgpack.dumps(list(group_list)) + if len(data) >= self._SEGMENT_SIZE - 2: + raise coordination.ToozError("Group list is too big") + self._group_list.write(struct.pack('H', len(data))) + self._group_list.write(data, offset=2) + + def create_group(self, group_id): + def _create_group(): + with self._lock: + group_list = self._read_group_list() + if group_id in group_list: + raise coordination.GroupAlreadyExist(group_id) + group_list.add(group_id) + self._write_group_list(group_list) + + return IPCFutureResult(self._executor.submit(_create_group)) + + def delete_group(self, group_id): + def _delete_group(): + with self._lock: + group_list = self._read_group_list() + if group_id not in group_list: + raise coordination.GroupNotCreated(group_id) + group_list.remove(group_id) + self._write_group_list(group_list) + + return IPCFutureResult(self._executor.submit(_delete_group)) + + def _get_groups_handler(self): + with self._lock: + return self._read_group_list() + + def get_groups(self): + return IPCFutureResult(self._executor.submit( + self._get_groups_handler)) + @staticmethod def get_lock(name): return IPCLock(name) @@ -153,3 +229,18 @@ class IPCDriver(coordination.CoordinationDriver): @staticmethod def unwatch_elected_as_leader(group_id, callback): raise tooz.NotImplemented + + +class IPCFutureResult(coordination.CoordAsyncResult): + """IPC asynchronous result that references a future.""" + def __init__(self, fut): + self._fut = fut + + def get(self, timeout=10): + try: + return self._fut.result(timeout=timeout) + except futures.TimeoutError as e: + raise coordination.OperationTimedOut(utils.exception_message(e)) + + def done(self): + return self._fut.done() |