From 86039050f8735314bda7050aade450156be80e20 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Sat, 17 Nov 2012 17:08:14 +0000 Subject: QPID-4368: Added support for subject filtering on queues git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1410750 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/amqp/Filter.cpp | 5 ++++ cpp/src/qpid/broker/amqp/Filter.h | 1 + cpp/src/qpid/broker/amqp/Outgoing.cpp | 52 +++++++++++++++++++++++++++++++++++ cpp/src/qpid/broker/amqp/Outgoing.h | 3 ++ cpp/src/qpid/broker/amqp/Session.cpp | 7 +++-- 5 files changed, 66 insertions(+), 2 deletions(-) (limited to 'cpp/src') diff --git a/cpp/src/qpid/broker/amqp/Filter.cpp b/cpp/src/qpid/broker/amqp/Filter.cpp index 61e377c72f..38baba0df1 100644 --- a/cpp/src/qpid/broker/amqp/Filter.cpp +++ b/cpp/src/qpid/broker/amqp/Filter.cpp @@ -74,6 +74,11 @@ bool Filter::hasSubjectFilter() const return !subjectFilter.value.empty(); } +std::string Filter::getSubjectFilter() const +{ + return subjectFilter.value; +} + void Filter::setSubjectFilter(const StringFilter& filter) { diff --git a/cpp/src/qpid/broker/amqp/Filter.h b/cpp/src/qpid/broker/amqp/Filter.h index 5e2dee4d6e..20cceb372a 100644 --- a/cpp/src/qpid/broker/amqp/Filter.h +++ b/cpp/src/qpid/broker/amqp/Filter.h @@ -39,6 +39,7 @@ class Filter : qpid::amqp::MapReader void read(pn_data_t*); void write(pn_data_t*); bool hasSubjectFilter() const; + std::string getSubjectFilter() const; void bind(boost::shared_ptr exchange, boost::shared_ptr queue); private: struct StringFilter diff --git a/cpp/src/qpid/broker/amqp/Outgoing.cpp b/cpp/src/qpid/broker/amqp/Outgoing.cpp index 70c6b9ebd5..665bf2def4 100644 --- a/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -22,6 +22,7 @@ #include "qpid/broker/amqp/Header.h" #include "qpid/broker/amqp/Translation.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/TopicKeyNode.h" #include "qpid/sys/OutputControl.h" #include "qpid/amqp/MessageEncoder.h" #include "qpid/log/Statement.h" @@ -163,6 +164,57 @@ bool Outgoing::accept(const qpid::broker::Message&) return canDeliver(); } +void Outgoing::setSubjectFilter(const std::string& f) +{ + subjectFilter = f; +} + +namespace { + +bool match(TokenIterator& filter, TokenIterator& target) +{ + bool wild = false; + while (!filter.finished()) + { + if (filter.match1('*')) { + if (target.finished()) return false; + //else move to next word in filter target + filter.next(); + target.next(); + } else if (filter.match1('#')) { + // i.e. filter word is '#' which can match a variable number of words in the target + filter.next(); + if (filter.finished()) return true; + else if (target.finished()) return false; + wild = true; + } else { + //filter word needs to match target exactly + if (target.finished()) return false; + std::string word; + target.pop(word); + if (filter.match(word)) { + wild = false; + filter.next(); + } else if (!wild) { + return false; + } + } + } + return target.finished(); +} +bool match(const std::string& filter, const std::string& target) +{ + TokenIterator lhs(filter); + TokenIterator rhs(target); + return match(lhs, rhs); +} +} + +bool Outgoing::filter(const qpid::broker::Message& m) +{ + return subjectFilter.empty() || subjectFilter == m.getRoutingKey() || match(subjectFilter, m.getRoutingKey()); +} + void Outgoing::cancel() {} void Outgoing::acknowledged(const qpid::broker::DeliveryRecord&) {} diff --git a/cpp/src/qpid/broker/amqp/Outgoing.h b/cpp/src/qpid/broker/amqp/Outgoing.h index 91670bcd79..a8450a48cf 100644 --- a/cpp/src/qpid/broker/amqp/Outgoing.h +++ b/cpp/src/qpid/broker/amqp/Outgoing.h @@ -60,6 +60,7 @@ class Outgoing : public qpid::broker::Consumer, public boost::enable_shared_from { public: Outgoing(Broker&,boost::shared_ptr q, pn_link_t* l, ManagedSession&, qpid::sys::OutputControl& o, bool topic); + void setSubjectFilter(const std::string&); void init(); bool dispatch(); void write(const char* data, size_t size); @@ -71,6 +72,7 @@ class Outgoing : public qpid::broker::Consumer, public boost::enable_shared_from bool deliver(const QueueCursor& cursor, const qpid::broker::Message& msg); void notify(); bool accept(const qpid::broker::Message&); + bool filter(const qpid::broker::Message&); void cancel(); void acknowledged(const qpid::broker::DeliveryRecord&); qpid::broker::OwnershipToken* getSession(); @@ -99,6 +101,7 @@ class Outgoing : public qpid::broker::Consumer, public boost::enable_shared_from size_t current; int outstanding; std::vector buffer; + std::string subjectFilter; }; }}} // namespace qpid::broker::amqp diff --git a/cpp/src/qpid/broker/amqp/Session.cpp b/cpp/src/qpid/broker/amqp/Session.cpp index 93b8a747dc..760fa2d902 100644 --- a/cpp/src/qpid/broker/amqp/Session.cpp +++ b/cpp/src/qpid/broker/amqp/Session.cpp @@ -122,18 +122,21 @@ void Session::attach(pn_link_t* link) pn_terminus_set_address(pn_link_source(link), name.c_str()); ResolvedNode node = resolve(name, source); + Filter filter; + filter.read(pn_terminus_filter(source)); if (node.queue) { boost::shared_ptr q(new Outgoing(broker, node.queue, link, *this, out, false)); q->init(); + if (filter.hasSubjectFilter()) { + q->setSubjectFilter(filter.getSubjectFilter()); + } senders[link] = q; } else if (node.exchange) { QueueSettings settings(false, true); //TODO: populate settings from source details when available from engine boost::shared_ptr queue = broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this, "", connection.getUserid(), connection.getId()).first; - Filter filter; - filter.read(pn_terminus_filter(source)); if (filter.hasSubjectFilter()) { filter.bind(node.exchange, queue); filter.write(pn_terminus_filter(pn_link_source(link))); -- cgit v1.2.1