summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVinay Karanam <vinayinvicible@gmail.com>2020-05-17 00:12:05 +0530
committerAsif Saif Uddin <auvipy@gmail.com>2020-05-17 11:15:19 +0600
commit7fefb6fe9ab85948aeeccce7d96f5a5289d6d337 (patch)
tree2c1f105d01108db02435ce00ada2de0f6b6500f6
parentc9451787be25a03edc9da7f5375290753eafe00a (diff)
downloadkombu-7fefb6fe9ab85948aeeccce7d96f5a5289d6d337.tar.gz
Modified Mutex to use redis LuaLock implementation
Fixes #1190
-rw-r--r--kombu/transport/redis.py29
-rw-r--r--t/unit/transport/test_redis.py73
2 files changed, 44 insertions, 58 deletions
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 7eb3d400..cf6dd296 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -21,7 +21,6 @@ from kombu.utils.json import loads, dumps
from kombu.utils.objects import cached_property
from kombu.utils.scheduling import cycle_by_name
from kombu.utils.url import _parse_url
-from kombu.utils.uuid import uuid
from kombu.utils.compat import _detect_environment
from kombu.utils.functional import accepts_argument
@@ -108,28 +107,24 @@ class MutexHeld(Exception):
@contextmanager
def Mutex(client, name, expire):
- """The Redis lock implementation (probably shaky)."""
- lock_id = uuid()
- i_won = client.setnx(name, lock_id)
+ """Acquire redis lock in non blocking way.
+
+ Raise MutexHeld if not successful.
+ """
+ lock = client.lock(name, timeout=expire)
+ lock_acquired = False
try:
- if i_won:
- client.expire(name, expire)
+ lock_acquired = lock.acquire(blocking=False)
+ if lock_acquired:
yield
else:
- if not client.ttl(name):
- client.expire(name, expire)
raise MutexHeld()
finally:
- if i_won:
+ if lock_acquired:
try:
- with client.pipeline(True) as pipe:
- pipe.watch(name)
- if bytes_to_str(pipe.get(name)) == lock_id:
- pipe.multi()
- pipe.delete(name)
- pipe.execute()
- pipe.unwatch()
- except redis.WatchError:
+ lock.release()
+ except redis.exceptions.LockNotOwnedError:
+ # when lock is expired
pass
diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py
index 572e8a9c..a36dfc2b 100644
--- a/t/unit/transport/test_redis.py
+++ b/t/unit/transport/test_redis.py
@@ -14,7 +14,6 @@ from kombu.exceptions import InconsistencyError, VersionMismatch
from kombu.five import Empty, Queue as _Queue, bytes_if_py2
from kombu.transport import virtual
from kombu.utils import eventio # patch poll
-from kombu.utils.encoding import str_to_bytes
from kombu.utils.json import dumps
@@ -1358,48 +1357,40 @@ class test_Mutex:
def test_mutex(self, lock_id='xxx'):
client = Mock(name='client')
- with patch('kombu.transport.redis.uuid') as uuid:
- # Won
- uuid.return_value = lock_id
- client.setnx.return_value = True
- client.pipeline = ContextMock()
- pipe = client.pipeline.return_value
- pipe.get.return_value = str_to_bytes(lock_id) # redis gives bytes
- held = False
- with redis.Mutex(client, 'foo1', 100):
- held = True
- assert held
- client.setnx.assert_called_with('foo1', lock_id)
- pipe.get.return_value = b'yyy'
- held = False
- with redis.Mutex(client, 'foo1', 100):
- held = True
- assert held
-
- # Did not win
- client.expire.reset_mock()
- pipe.get.return_value = str_to_bytes(lock_id)
- client.setnx.return_value = False
- with pytest.raises(redis.MutexHeld):
- held = False
- with redis.Mutex(client, 'foo1', '100'):
- held = True
- assert not held
- client.ttl.return_value = 0
- with pytest.raises(redis.MutexHeld):
- held = False
- with redis.Mutex(client, 'foo1', '100'):
- held = True
- assert not held
- client.expire.assert_called()
-
- # Wins but raises WatchError (and that is ignored)
- client.setnx.return_value = True
- pipe.watch.side_effect = redis.redis.WatchError()
- held = False
+ lock = client.lock.return_value = Mock(name='lock')
+
+ # Won
+ lock.acquire.return_value = True
+ held = False
+ with redis.Mutex(client, 'foo1', 100):
+ held = True
+ assert held
+ lock.acquire.assert_called_with(blocking=False)
+ client.lock.assert_called_with('foo1', timeout=100)
+
+ client.reset_mock()
+ lock.reset_mock()
+
+ # Did not win
+ lock.acquire.return_value = False
+ held = False
+ with pytest.raises(redis.MutexHeld):
with redis.Mutex(client, 'foo1', 100):
held = True
- assert held
+ assert not held
+ lock.acquire.assert_called_with(blocking=False)
+ client.lock.assert_called_with('foo1', timeout=100)
+
+ client.reset_mock()
+ lock.reset_mock()
+
+ # Wins but raises LockNotOwnedError (and that is ignored)
+ lock.acquire.return_value = True
+ lock.release.side_effect = redis.redis.exceptions.LockNotOwnedError()
+ held = False
+ with redis.Mutex(client, 'foo1', 100):
+ held = True
+ assert held
@skip.unless_module('redis.sentinel')