summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Held <jasonsheld@gmail.com>2019-02-27 09:23:12 -0500
committerAsif Saif Uddin <auvipy@gmail.com>2019-02-27 20:23:12 +0600
commit41dbbe3063f4c1e5a11fc8a0ce6c366c14917df6 (patch)
treefc50ed257ae41c7d8204e4b19925c11e8317768c
parenteb6e4c8d512c6451d2430744322014439e8a0df8 (diff)
downloadkombu-41dbbe3063f4c1e5a11fc8a0ce6c366c14917df6.tar.gz
Control pattern matching (#997)
* Added pattern/matcher to Mailbox * pattern/match for kombu 4 * Ensure kombu.matcher is covered by our documentation. * Adds test_matcher & pidbox unit tests. * Added tests to ensure exception is raised when matcher is not registered. * Adds to test for destination passed in to process.
-rw-r--r--docs/reference/index.rst1
-rw-r--r--docs/reference/kombu.matcher.rst11
-rw-r--r--kombu/matcher.py140
-rw-r--r--kombu/pidbox.py38
-rw-r--r--t/unit/test_matcher.py32
-rw-r--r--t/unit/test_pidbox.py18
6 files changed, 234 insertions, 6 deletions
diff --git a/docs/reference/index.rst b/docs/reference/index.rst
index 471fe4f8..e022ead7 100644
--- a/docs/reference/index.rst
+++ b/docs/reference/index.rst
@@ -10,6 +10,7 @@
kombu
kombu.common
+ kombu.matcher
kombu.mixins
kombu.simple
kombu.clocks
diff --git a/docs/reference/kombu.matcher.rst b/docs/reference/kombu.matcher.rst
new file mode 100644
index 00000000..0dc15521
--- /dev/null
+++ b/docs/reference/kombu.matcher.rst
@@ -0,0 +1,11 @@
+==============================================
+ Pattern matching registry - ``kombu.matcher``
+==============================================
+
+.. contents::
+ :local:
+.. currentmodule:: kombu.matcher
+
+.. automodule:: kombu.matcher
+ :members:
+ :undoc-members:
diff --git a/kombu/matcher.py b/kombu/matcher.py
new file mode 100644
index 00000000..018a0955
--- /dev/null
+++ b/kombu/matcher.py
@@ -0,0 +1,140 @@
+"""Pattern matching registry."""
+from __future__ import absolute_import, unicode_literals
+
+from re import match as rematch
+from fnmatch import fnmatch
+
+from .utils.compat import entrypoints
+from .utils.encoding import bytes_to_str
+
+
+class MatcherNotInstalled(Exception):
+ """Matcher not installed/found."""
+
+ pass
+
+
+class MatcherRegistry(object):
+ """Pattern matching function registry."""
+
+ MatcherNotInstalled = MatcherNotInstalled
+ matcher_pattern_first = ["pcre", ]
+
+ def __init__(self):
+ self._matchers = {}
+ self._default_matcher = None
+
+ def register(self, name, matcher):
+ """Add matcher by name to the registry."""
+ self._matchers[name] = matcher
+
+ def unregister(self, name):
+ """Remove matcher by name from the registry."""
+ try:
+ self._matchers.pop(name)
+ except KeyError:
+ raise self.MatcherNotInstalled(
+ 'No matcher installed for {}'.format(name)
+ )
+
+ def _set_default_matcher(self, name):
+ """Set the default matching method.
+
+ :param name: The name of the registered matching method.
+ For example, `glob` (default), `pcre`, or any custom
+ methods registered using :meth:`register`.
+
+ :raises MatcherNotInstalled: If the matching method requested
+ is not available.
+ """
+ try:
+ self._default_matcher = self._matchers[name]
+ except KeyError:
+ raise self.MatcherNotInstalled(
+ 'No matcher installed for {}'.format(name)
+ )
+
+ def match(self, data, pattern, matcher=None, matcher_kwargs=None):
+ """Call the matcher."""
+ if matcher and not self._matchers.get(matcher):
+ raise self.MatcherNotInstalled(
+ 'No matcher installed for {}'.format(matcher)
+ )
+ match_func = self._matchers[matcher or 'glob']
+ if matcher in self.matcher_pattern_first:
+ first_arg = bytes_to_str(pattern)
+ second_arg = bytes_to_str(data)
+ else:
+ first_arg = bytes_to_str(data)
+ second_arg = bytes_to_str(pattern)
+ return match_func(first_arg, second_arg, **matcher_kwargs or {})
+
+
+#: Global registry of matchers.
+registry = MatcherRegistry()
+
+
+"""
+.. function:: match(data, pattern, matcher=default_matcher,
+ matcher_kwargs=None):
+
+ Match `data` by `pattern` using `matcher`.
+
+ :param data: The data that should be matched. Must be string.
+ :param pattern: The pattern that should be applied. Must be string.
+ :keyword matcher: An optional string representing the mathcing
+ method (for example, `glob` or `pcre`).
+
+ If :const:`None` (default), then `glob` will be used.
+
+ :keyword matcher_kwargs: Additional keyword arguments that will be passed
+ to the specified `matcher`.
+ :returns: :const:`True` if `data` matches pattern,
+ :const:`False` otherwise.
+
+ :raises MatcherNotInstalled: If the matching method requested is not
+ available.
+"""
+match = registry.match
+
+
+"""
+.. function:: register(name, matcher):
+ Register a new matching method.
+
+ :param name: A convience name for the mathing method.
+ :param matcher: A method that will be passed data and pattern.
+"""
+register = registry.register
+
+
+"""
+.. function:: unregister(name):
+ Unregister registered matching method.
+
+ :param name: Registered matching method name.
+"""
+unregister = registry.unregister
+
+
+def register_glob():
+ """Register glob into default registry."""
+ registry.register('glob', fnmatch)
+
+
+def register_pcre():
+ """Register pcre into default registry."""
+ registry.register('pcre', rematch)
+
+
+# Register the base matching methods.
+register_glob()
+register_pcre()
+
+# Default matching method is 'glob'
+registry._set_default_matcher('glob')
+
+
+# Load entrypoints from installed extensions
+for ep, args in entrypoints('kombu.matchers'):
+ register(ep.name, *args)
diff --git a/kombu/pidbox.py b/kombu/pidbox.py
index f8fcecb0..266c75ac 100644
--- a/kombu/pidbox.py
+++ b/kombu/pidbox.py
@@ -15,11 +15,14 @@ from . import Exchange, Queue, Consumer, Producer
from .clocks import LamportClock
from .common import maybe_declare, oid_from
from .exceptions import InconsistencyError
-from .five import range
+from .five import range, string_t
from .log import get_logger
from .utils.functional import maybe_evaluate, reprcall
from .utils.objects import cached_property
from .utils.uuid import uuid
+from .matcher import match
+
+REPLY_QUEUE_EXPIRES = 10
W_PIDBOX_IN_USE = """\
A node named {node.hostname} is already using this process mailbox!
@@ -123,9 +126,21 @@ class Node(object):
def handle_message(self, body, message=None):
destination = body.get('destination')
+ pattern = body.get('pattern')
+ matcher = body.get('matcher')
if message:
self.adjust_clock(message.headers.get('clock') or 0)
- if not destination or self.hostname in destination:
+ hostname = self.hostname
+ run_dispatch = False
+ if destination:
+ if hostname in destination:
+ run_dispatch = True
+ elif pattern and matcher:
+ if match(hostname, pattern, matcher):
+ run_dispatch = True
+ else:
+ run_dispatch = True
+ if run_dispatch:
return self.dispatch(**body)
dispatch_from_message = handle_message
@@ -270,10 +285,12 @@ class Mailbox(object):
def _publish(self, type, arguments, destination=None,
reply_ticket=None, channel=None, timeout=None,
- serializer=None, producer=None):
+ serializer=None, producer=None, pattern=None, matcher=None):
message = {'method': type,
'arguments': arguments,
- 'destination': destination}
+ 'destination': destination,
+ 'pattern': pattern,
+ 'matcher': matcher}
chan = channel or self.connection.default_channel
exchange = self.exchange
if reply_ticket:
@@ -292,12 +309,19 @@ class Mailbox(object):
def _broadcast(self, command, arguments=None, destination=None,
reply=False, timeout=1, limit=None,
- callback=None, channel=None, serializer=None):
+ callback=None, channel=None, serializer=None,
+ pattern=None, matcher=None):
if destination is not None and \
not isinstance(destination, (list, tuple)):
raise ValueError(
'destination must be a list/tuple not {0}'.format(
type(destination)))
+ if (pattern is not None and not isinstance(pattern, string_t) and
+ matcher is not None and not isinstance(matcher, string_t)):
+ raise ValueError(
+ 'pattern and matcher must be '
+ 'strings not {}, {}'.format(type(pattern), type(matcher))
+ )
arguments = arguments or {}
reply_ticket = reply and uuid() or None
@@ -312,7 +336,9 @@ class Mailbox(object):
reply_ticket=reply_ticket,
channel=chan,
timeout=timeout,
- serializer=serializer)
+ serializer=serializer,
+ pattern=pattern,
+ matcher=matcher)
if reply_ticket:
return self._collect(reply_ticket, limit=limit,
diff --git a/t/unit/test_matcher.py b/t/unit/test_matcher.py
new file mode 100644
index 00000000..8433a19f
--- /dev/null
+++ b/t/unit/test_matcher.py
@@ -0,0 +1,32 @@
+from __future__ import absolute_import, unicode_literals
+
+from kombu.matcher import (
+ match, register, registry, unregister, fnmatch, rematch,
+ MatcherNotInstalled
+)
+
+import pytest
+
+
+class test_Matcher(object):
+
+ def test_register_match_unregister_matcher(self):
+ register("test_matcher", rematch)
+ registry.matcher_pattern_first.append("test_matcher")
+ assert registry._matchers["test_matcher"] == rematch
+ assert match("data", r"d.*", "test_matcher") is not None
+ assert registry._default_matcher == fnmatch
+ registry._set_default_matcher("test_matcher")
+ assert registry._default_matcher == rematch
+ unregister("test_matcher")
+ assert "test_matcher" not in registry._matchers
+ registry._set_default_matcher("glob")
+ assert registry._default_matcher == fnmatch
+
+ def test_unregister_matcher_not_registered(self):
+ with pytest.raises(MatcherNotInstalled):
+ unregister('notinstalled')
+
+ def test_match_using_unregistered_matcher(self):
+ with pytest.raises(MatcherNotInstalled):
+ match("data", r"d.*", "notinstalled")
diff --git a/t/unit/test_pidbox.py b/t/unit/test_pidbox.py
index c2dfccb0..a187af37 100644
--- a/t/unit/test_pidbox.py
+++ b/t/unit/test_pidbox.py
@@ -43,6 +43,11 @@ class test_Mailbox:
def _handler(self, state):
return self.stats['var']
+ def test_broadcast_matcher_pattern_string_type(self):
+ mailbox = pidbox.Mailbox("test_matcher_str")(self.connection)
+ with pytest.raises(ValueError):
+ mailbox._broadcast("ping", pattern=1, matcher=2)
+
def test_publish_reply_ignores_InconsistencyError(self):
mailbox = pidbox.Mailbox('test_reply__collect')(self.connection)
with patch('kombu.pidbox.Producer') as Producer:
@@ -233,6 +238,19 @@ class test_Mailbox:
body['destination'] = ['some_other_node']
assert node.handle_message(body, None) is None
+ # message for me should be processed.
+ body['destination'] = ['test_dispatch_from_message']
+ assert node.handle_message(body, None) is not None
+
+ # message not for me should not be processed.
+ body.pop("destination")
+ body['matcher'] = 'glob'
+ body["pattern"] = "something*"
+ assert node.handle_message(body, None) is None
+
+ body["pattern"] = "test*"
+ assert node.handle_message(body, None) is not None
+
def test_handle_message_adjusts_clock(self):
node = self.bound.Node('test_adjusts_clock')