summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2014-12-26 10:45:50 +0000
committerGerrit Code Review <review@openstack.org>2014-12-26 10:45:50 +0000
commit7debf06f04f22c70aef523aa247a87ee8ed22708 (patch)
tree5d6cd1b2dc7464e6329e3922f4d8b5b86418b74f
parent74a85505c4688b5bf055c0d8f07955d632e3c3d9 (diff)
parent1ac3e8314bd772dc1eb84f1f5bc6f9e1b7fc9ce3 (diff)
downloadtooz-7debf06f04f22c70aef523aa247a87ee8ed22708.tar.gz
Merge "Add create/join/leave group support in IPC driver"
-rw-r--r--doc/source/drivers.rst5
-rw-r--r--tooz/drivers/ipc.py119
2 files changed, 108 insertions, 16 deletions
diff --git a/doc/source/drivers.rst b/doc/source/drivers.rst
index 4c6a560..5b828c6 100644
--- a/doc/source/drivers.rst
+++ b/doc/source/drivers.rst
@@ -19,8 +19,9 @@ API, some of them have different properties:
A lot of the features provided in tooz are based on timeout (heartbeats,
locks, etc) so are less resilient than other backends.
-* `ipc` is based on Posix IPC and only implements a lock mechanism for now.
- The lock can only be distributed locally to a computer processes.
+* `ipc` is based on Posix IPC and only implements a lock mechanism for now, and
+ some basic group primitives (with huge limitations). The lock can only be
+ distributed locally to a computer processes.
* `zake`_ is a driver using a fake implementation of ZooKeeper and can be
used to use Tooz in your unit tests suite for example.
diff --git a/tooz/drivers/ipc.py b/tooz/drivers/ipc.py
index 21d8647..dfd27e3 100644
--- a/tooz/drivers/ipc.py
+++ b/tooz/drivers/ipc.py
@@ -17,14 +17,18 @@
# under the License.
import hashlib
+import struct
import time
+from concurrent import futures
+import msgpack
import six
import sysv_ipc
import tooz
from tooz import coordination
from tooz import locking
+from tooz import utils
if sysv_ipc.KEY_MIN <= 0:
_KEY_RANGE = abs(sysv_ipc.KEY_MIN) + sysv_ipc.KEY_MAX
@@ -32,6 +36,19 @@ else:
_KEY_RANGE = sysv_ipc.KEY_MAX - sysv_ipc.KEY_MIN
+def ftok(name, project):
+ # Similar to ftok & http://semanchuk.com/philip/sysv_ipc/#ftok_weakness
+ # but hopefully without as many weaknesses...
+ h = hashlib.md5()
+ if not isinstance(project, six.binary_type):
+ project = project.encode('ascii')
+ h.update(project)
+ if not isinstance(name, six.binary_type):
+ name = name.encode('ascii')
+ h.update(name)
+ return (int(h.hexdigest(), 16) % _KEY_RANGE) + sysv_ipc.KEY_MIN
+
+
class IPCLock(locking.Lock):
"""A sysv IPC based lock.
@@ -45,22 +62,9 @@ class IPCLock(locking.Lock):
def __init__(self, name):
super(IPCLock, self).__init__(name)
- self.key = self.ftok(name, self._LOCK_PROJECT)
+ self.key = ftok(name, self._LOCK_PROJECT)
self._lock = None
- @staticmethod
- def ftok(name, project):
- # Similar to ftok & http://semanchuk.com/philip/sysv_ipc/#ftok_weakness
- # but hopefully without as many weaknesses...
- h = hashlib.md5()
- if not isinstance(project, six.binary_type):
- project = project.encode('ascii')
- h.update(project)
- if not isinstance(name, six.binary_type):
- name = name.encode('ascii')
- h.update(name)
- return (int(h.hexdigest(), 16) % _KEY_RANGE) + sysv_ipc.KEY_MIN
-
def acquire(self, blocking=True):
if (blocking is not True
and sysv_ipc.SEMAPHORE_TIMEOUT_SUPPORTED is False):
@@ -122,10 +126,82 @@ class IPCLock(locking.Lock):
class IPCDriver(coordination.CoordinationDriver):
"""A IPC based driver."""
+ _SEGMENT_SIZE = 1024
+ _GROUP_LIST_KEY = "GROUP_LIST"
+ _GROUP_PROJECT = "_TOOZ_INTERNAL"
+ _INTERNAL_LOCK_NAME = "TOOZ_INTERNAL_LOCK"
+
def __init__(self, member_id, parsed_url, options):
"""Initialize the IPC driver."""
super(IPCDriver, self).__init__()
+ def _start(self):
+ try:
+ self._group_list = sysv_ipc.SharedMemory(
+ ftok(self._GROUP_LIST_KEY, self._GROUP_PROJECT),
+ sysv_ipc.IPC_CREAT,
+ size=self._SEGMENT_SIZE)
+ except sysv_ipc.ExistentialError:
+ raise
+ self._lock = self.get_lock(self._INTERNAL_LOCK_NAME)
+ self._executor = futures.ThreadPoolExecutor(max_workers=1)
+
+ def _stop(self):
+ self._executor.shutdown(wait=True)
+
+ def __del__(self):
+ if hasattr(self, "_group_list"):
+ try:
+ self._group_list.detach()
+ self._group_list.remove()
+ except sysv_ipc.ExistentialError:
+ pass
+
+ def _read_group_list(self):
+ data = self._group_list.read(byte_count=2)
+ length = struct.unpack("H", data)[0]
+ if length == 0:
+ return set()
+ data = self._group_list.read(byte_count=length, offset=2)
+ return set(msgpack.loads(data))
+
+ def _write_group_list(self, group_list):
+ data = msgpack.dumps(list(group_list))
+ if len(data) >= self._SEGMENT_SIZE - 2:
+ raise coordination.ToozError("Group list is too big")
+ self._group_list.write(struct.pack('H', len(data)))
+ self._group_list.write(data, offset=2)
+
+ def create_group(self, group_id):
+ def _create_group():
+ with self._lock:
+ group_list = self._read_group_list()
+ if group_id in group_list:
+ raise coordination.GroupAlreadyExist(group_id)
+ group_list.add(group_id)
+ self._write_group_list(group_list)
+
+ return IPCFutureResult(self._executor.submit(_create_group))
+
+ def delete_group(self, group_id):
+ def _delete_group():
+ with self._lock:
+ group_list = self._read_group_list()
+ if group_id not in group_list:
+ raise coordination.GroupNotCreated(group_id)
+ group_list.remove(group_id)
+ self._write_group_list(group_list)
+
+ return IPCFutureResult(self._executor.submit(_delete_group))
+
+ def _get_groups_handler(self):
+ with self._lock:
+ return self._read_group_list()
+
+ def get_groups(self):
+ return IPCFutureResult(self._executor.submit(
+ self._get_groups_handler))
+
@staticmethod
def get_lock(name):
return IPCLock(name)
@@ -153,3 +229,18 @@ class IPCDriver(coordination.CoordinationDriver):
@staticmethod
def unwatch_elected_as_leader(group_id, callback):
raise tooz.NotImplemented
+
+
+class IPCFutureResult(coordination.CoordAsyncResult):
+ """IPC asynchronous result that references a future."""
+ def __init__(self, fut):
+ self._fut = fut
+
+ def get(self, timeout=10):
+ try:
+ return self._fut.result(timeout=timeout)
+ except futures.TimeoutError as e:
+ raise coordination.OperationTimedOut(utils.exception_message(e))
+
+ def done(self):
+ return self._fut.done()