diff options
| author | Alan Conway <aconway@apache.org> | 2013-05-29 18:03:59 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2013-05-29 18:03:59 +0000 |
| commit | fa16af0d7413cdad175f6b435e05a86273e695ec (patch) | |
| tree | 0653c9944a9c9350f1c754949d8b8388111eb988 | |
| parent | efa84dfce326ea33e2eddfd3cb8142a8d708e101 (diff) | |
| download | qpid-python-fa16af0d7413cdad175f6b435e05a86273e695ec.tar.gz | |
QPID-4886: Pass non-const reference to Message in QueueObserver functions.
Instead of modifying QueueObserver, a new class MessageInterceptor was
introduced to allow messages to be modified.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1487579 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | cpp/src/qpid/broker/MessageInterceptor.h | 53 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Queue.h | 4 |
3 files changed, 60 insertions, 1 deletions
diff --git a/cpp/src/qpid/broker/MessageInterceptor.h b/cpp/src/qpid/broker/MessageInterceptor.h new file mode 100644 index 0000000000..0d0bc93f06 --- /dev/null +++ b/cpp/src/qpid/broker/MessageInterceptor.h @@ -0,0 +1,53 @@ +#ifndef QPID_BROKER_MESSAGEINTERCEPTOR_H +#define QPID_BROKER_MESSAGEINTERCEPTOR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Observers.h" + +namespace qpid { +namespace broker { + +class Message; + +/** + * Interface for classes that want to modify a message as it is processed by the queue. + */ +class MessageInterceptor +{ + public: + virtual ~MessageInterceptor() {} + + /** Modify a message as it is being published onto the queue. */ + virtual void publish(Message&) = 0; +}; + +class MessageInterceptors : public Observers<MessageInterceptor> { + public: + void publish(Message& m) { + each(boost::bind(&MessageInterceptor::publish, _1, boost::ref(m))); + } +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_MESSAGEINTERCEPTOR_H*/ diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index c852a61f6e..e068ce6fe4 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -429,7 +429,8 @@ bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c) continue; //try another message } } - QPID_LOG(debug, "Message retrieved from '" << name << "'"); + QPID_LOG(debug, "Message " << msg->getSequence() << " retrieved from '" + << name << "'"); m = *msg; return true; } else { @@ -767,6 +768,7 @@ void Queue::push(Message& message, bool /*isRecovery*/) Mutex::ScopedLock locker(messageLock); message.setSequence(++sequence); if (settings.sequencing) message.addAnnotation(settings.sequenceKey, (uint32_t)sequence); + interceptors.publish(message); messages->publish(message); listeners.populate(copy); observeEnqueue(message, locker); diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 68d793c970..8b8ef442b2 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -27,6 +27,7 @@ #include "qpid/broker/Consumer.h" #include "qpid/broker/Message.h" #include "qpid/broker/Messages.h" +#include "qpid/broker/MessageInterceptor.h" #include "qpid/broker/PersistableQueue.h" #include "qpid/broker/QueueBindings.h" #include "qpid/broker/QueueListeners.h" @@ -164,6 +165,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge. int eventMode; Observers observers; + MessageInterceptors interceptors; std::string seqNoKey; Broker* broker; bool deleted; @@ -414,6 +416,8 @@ class Queue : public boost::enable_shared_from_this<Queue>, QPID_BROKER_EXTERN void addObserver(boost::shared_ptr<QueueObserver>); QPID_BROKER_EXTERN void removeObserver(boost::shared_ptr<QueueObserver>); QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key); + + QPID_BROKER_EXTERN MessageInterceptors& getMessageInterceptors() { return interceptors; } /** * Notify queue that recovery has completed. */ |
