summaryrefslogtreecommitdiff
path: root/taskflow/utils
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@gmail.com>2013-10-06 21:38:43 +0000
committerJoshua Harlow <harlowja@yahoo-inc.com>2013-10-07 15:43:21 -0700
commite70032d0d9749a48bf56f14b7b677fb5d92ff332 (patch)
treec2000d41956b7aa839248e428bab3ce06a5d0d55 /taskflow/utils
parent8750840ac872cb1bf983be5cbad3d0448e4f3ee6 (diff)
downloadtaskflow-e70032d0d9749a48bf56f14b7b677fb5d92ff332.tar.gz
Remove decorators and move to utils
In order to avoid the circular import in threading utils move the decorators functionality to utils/misc and move the locking functionality to utils/lock_utils and then use these functions from the threading util (and elsewhere). Fixes bug: 1236080 Change-Id: I9e71c2ba15782cbb6dd5ab7e1264b77ed47bc29e
Diffstat (limited to 'taskflow/utils')
-rw-r--r--taskflow/utils/lock_utils.py73
-rw-r--r--taskflow/utils/misc.py14
-rw-r--r--taskflow/utils/threading_utils.py44
3 files changed, 89 insertions, 42 deletions
diff --git a/taskflow/utils/lock_utils.py b/taskflow/utils/lock_utils.py
index 47a2bbc..3344ceb 100644
--- a/taskflow/utils/lock_utils.py
+++ b/taskflow/utils/lock_utils.py
@@ -24,12 +24,85 @@
import errno
import logging
import os
+import threading
import time
+from taskflow.utils import misc
+
LOG = logging.getLogger(__name__)
WAIT_TIME = 0.01
+def locked(*args, **kwargs):
+ """A decorator that looks for a given attribute (typically a lock or a list
+ of locks) and before executing the decorated function uses the given lock
+ or list of locks as a context manager, automatically releasing on exit.
+ """
+
+ def decorator(f):
+ attr_name = kwargs.get('lock', '_lock')
+
+ @misc.wraps(f)
+ def wrapper(*args, **kwargs):
+ lock = getattr(args[0], attr_name)
+ if isinstance(lock, (tuple, list)):
+ lock = MultiLock(locks=list(lock))
+ with lock:
+ return f(*args, **kwargs)
+
+ return wrapper
+
+ # This is needed to handle when the decorator has args or the decorator
+ # doesn't have args, python is rather weird here...
+ if kwargs or not args:
+ return decorator
+ else:
+ if len(args) == 1:
+ return decorator(args[0])
+ else:
+ return decorator
+
+
+class MultiLock(object):
+ """A class which can attempt to obtain many locks at once and release
+ said locks when exiting.
+
+ Useful as a context manager around many locks (instead of having to nest
+ said individual context managers).
+ """
+
+ def __init__(self, locks):
+ assert len(locks) > 0, "Zero locks requested"
+ self._locks = locks
+ self._locked = [False] * len(locks)
+
+ def __enter__(self):
+
+ def is_locked(lock):
+ # NOTE(harlowja): the threading2 lock doesn't seem to have this
+ # attribute, so thats why we are checking it existing first.
+ if hasattr(lock, 'locked'):
+ return lock.locked()
+ return False
+
+ for i in xrange(0, len(self._locked)):
+ if self._locked[i] or is_locked(self._locks[i]):
+ raise threading.ThreadError("Lock %s not previously released"
+ % (i + 1))
+ self._locked[i] = False
+ for (i, lock) in enumerate(self._locks):
+ self._locked[i] = lock.acquire()
+
+ def __exit__(self, type, value, traceback):
+ for (i, locked) in enumerate(self._locked):
+ try:
+ if locked:
+ self._locks[i].release()
+ self._locked[i] = False
+ except threading.ThreadError:
+ LOG.exception("Unable to release lock %s", i + 1)
+
+
class _InterProcessLock(object):
"""Lock implementation which allows multiple locks, working around
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py
index bbaa49f..d2afacf 100644
--- a/taskflow/utils/misc.py
+++ b/taskflow/utils/misc.py
@@ -22,6 +22,7 @@ from distutils import version
import collections
import copy
import errno
+import functools
import itertools
import logging
import os
@@ -36,6 +37,19 @@ import six
LOG = logging.getLogger(__name__)
+def wraps(fn):
+ """This will not be needed in python 3.2 or greater which already has this
+ built-in to its functools.wraps method.
+ """
+
+ def wrapper(f):
+ f = functools.wraps(fn)(f)
+ f.__wrapped__ = getattr(fn, '__wrapped__', fn)
+ return f
+
+ return wrapper
+
+
def get_version_string(obj):
"""Gets a object's version as a string.
diff --git a/taskflow/utils/threading_utils.py b/taskflow/utils/threading_utils.py
index 5c6e0bc..dc24353 100644
--- a/taskflow/utils/threading_utils.py
+++ b/taskflow/utils/threading_utils.py
@@ -22,6 +22,7 @@ import threading
import time
import types
+from taskflow.utils import lock_utils
LOG = logging.getLogger(__name__)
@@ -57,46 +58,6 @@ def get_optimal_thread_count():
return 2
-class MultiLock(object):
- """A class which can attempt to obtain many locks at once and release
- said locks when exiting.
-
- Useful as a context manager around many locks (instead of having to nest
- said individual context managers).
- """
-
- def __init__(self, locks):
- assert len(locks) > 0, "Zero locks requested"
- self._locks = locks
- self._locked = [False] * len(locks)
-
- def __enter__(self):
-
- def is_locked(lock):
- # NOTE(harlowja): the threading2 lock doesn't seem to have this
- # attribute, so thats why we are checking it existing first.
- if hasattr(lock, 'locked'):
- return lock.locked()
- return False
-
- for i in xrange(0, len(self._locked)):
- if self._locked[i] or is_locked(self._locks[i]):
- raise threading.ThreadError("Lock %s not previously released"
- % (i + 1))
- self._locked[i] = False
- for (i, lock) in enumerate(self._locks):
- self._locked[i] = lock.acquire()
-
- def __exit__(self, type, value, traceback):
- for (i, locked) in enumerate(self._locked):
- try:
- if locked:
- self._locks[i].release()
- self._locked[i] = False
- except threading.ThreadError:
- LOG.exception("Unable to release lock %s", i + 1)
-
-
class CountDownLatch(object):
"""Similar in concept to the java count down latch."""
@@ -137,11 +98,10 @@ class ThreadSafeMeta(type):
"""Metaclass that adds locking to all pubic methods of a class"""
def __new__(cls, name, bases, attrs):
- from taskflow import decorators
for attr_name, attr_value in attrs.iteritems():
if isinstance(attr_value, types.FunctionType):
if attr_name[0] != '_':
- attrs[attr_name] = decorators.locked(attr_value)
+ attrs[attr_name] = lock_utils.locked(attr_value)
return super(ThreadSafeMeta, cls).__new__(cls, name, bases, attrs)
def __call__(cls, *args, **kwargs):