diff options
author | Vinay Karanam <vinayinvicible@gmail.com> | 2020-05-17 00:12:05 +0530 |
---|---|---|
committer | Asif Saif Uddin <auvipy@gmail.com> | 2020-05-17 11:15:19 +0600 |
commit | 7fefb6fe9ab85948aeeccce7d96f5a5289d6d337 (patch) | |
tree | 2c1f105d01108db02435ce00ada2de0f6b6500f6 | |
parent | c9451787be25a03edc9da7f5375290753eafe00a (diff) | |
download | kombu-7fefb6fe9ab85948aeeccce7d96f5a5289d6d337.tar.gz |
Modified Mutex to use redis LuaLock implementation
Fixes #1190
-rw-r--r-- | kombu/transport/redis.py | 29 | ||||
-rw-r--r-- | t/unit/transport/test_redis.py | 73 |
2 files changed, 44 insertions, 58 deletions
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 7eb3d400..cf6dd296 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -21,7 +21,6 @@ from kombu.utils.json import loads, dumps from kombu.utils.objects import cached_property from kombu.utils.scheduling import cycle_by_name from kombu.utils.url import _parse_url -from kombu.utils.uuid import uuid from kombu.utils.compat import _detect_environment from kombu.utils.functional import accepts_argument @@ -108,28 +107,24 @@ class MutexHeld(Exception): @contextmanager def Mutex(client, name, expire): - """The Redis lock implementation (probably shaky).""" - lock_id = uuid() - i_won = client.setnx(name, lock_id) + """Acquire redis lock in non blocking way. + + Raise MutexHeld if not successful. + """ + lock = client.lock(name, timeout=expire) + lock_acquired = False try: - if i_won: - client.expire(name, expire) + lock_acquired = lock.acquire(blocking=False) + if lock_acquired: yield else: - if not client.ttl(name): - client.expire(name, expire) raise MutexHeld() finally: - if i_won: + if lock_acquired: try: - with client.pipeline(True) as pipe: - pipe.watch(name) - if bytes_to_str(pipe.get(name)) == lock_id: - pipe.multi() - pipe.delete(name) - pipe.execute() - pipe.unwatch() - except redis.WatchError: + lock.release() + except redis.exceptions.LockNotOwnedError: + # when lock is expired pass diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py index 572e8a9c..a36dfc2b 100644 --- a/t/unit/transport/test_redis.py +++ b/t/unit/transport/test_redis.py @@ -14,7 +14,6 @@ from kombu.exceptions import InconsistencyError, VersionMismatch from kombu.five import Empty, Queue as _Queue, bytes_if_py2 from kombu.transport import virtual from kombu.utils import eventio # patch poll -from kombu.utils.encoding import str_to_bytes from kombu.utils.json import dumps @@ -1358,48 +1357,40 @@ class test_Mutex: def test_mutex(self, lock_id='xxx'): client = Mock(name='client') - with patch('kombu.transport.redis.uuid') as uuid: - # Won - uuid.return_value = lock_id - client.setnx.return_value = True - client.pipeline = ContextMock() - pipe = client.pipeline.return_value - pipe.get.return_value = str_to_bytes(lock_id) # redis gives bytes - held = False - with redis.Mutex(client, 'foo1', 100): - held = True - assert held - client.setnx.assert_called_with('foo1', lock_id) - pipe.get.return_value = b'yyy' - held = False - with redis.Mutex(client, 'foo1', 100): - held = True - assert held - - # Did not win - client.expire.reset_mock() - pipe.get.return_value = str_to_bytes(lock_id) - client.setnx.return_value = False - with pytest.raises(redis.MutexHeld): - held = False - with redis.Mutex(client, 'foo1', '100'): - held = True - assert not held - client.ttl.return_value = 0 - with pytest.raises(redis.MutexHeld): - held = False - with redis.Mutex(client, 'foo1', '100'): - held = True - assert not held - client.expire.assert_called() - - # Wins but raises WatchError (and that is ignored) - client.setnx.return_value = True - pipe.watch.side_effect = redis.redis.WatchError() - held = False + lock = client.lock.return_value = Mock(name='lock') + + # Won + lock.acquire.return_value = True + held = False + with redis.Mutex(client, 'foo1', 100): + held = True + assert held + lock.acquire.assert_called_with(blocking=False) + client.lock.assert_called_with('foo1', timeout=100) + + client.reset_mock() + lock.reset_mock() + + # Did not win + lock.acquire.return_value = False + held = False + with pytest.raises(redis.MutexHeld): with redis.Mutex(client, 'foo1', 100): held = True - assert held + assert not held + lock.acquire.assert_called_with(blocking=False) + client.lock.assert_called_with('foo1', timeout=100) + + client.reset_mock() + lock.reset_mock() + + # Wins but raises LockNotOwnedError (and that is ignored) + lock.acquire.return_value = True + lock.release.side_effect = redis.redis.exceptions.LockNotOwnedError() + held = False + with redis.Mutex(client, 'foo1', 100): + held = True + assert held @skip.unless_module('redis.sentinel') |