diff options
author | Alan Conway <aconway@apache.org> | 2008-10-30 21:07:28 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-10-30 21:07:28 +0000 |
commit | 364b62744a35f7e48332af00217a1848345cd39a (patch) | |
tree | d8cab9a5c824133e4429919a9c129beaec0ce456 | |
parent | 5e79599484a675baabf45e6b2c50635dbd6b1119 (diff) | |
download | qpid-python-364b62744a35f7e48332af00217a1848345cd39a.tar.gz |
Replicate session state for un-acknowledged messages to new cluster members.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@709242 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 16 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/LocalQueue.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/client/LocalQueue.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 49 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 25 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/DumpClient.cpp | 98 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/DumpClient.h | 7 | ||||
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 75 | ||||
-rw-r--r-- | cpp/src/tests/exception_test.cpp | 2 | ||||
-rw-r--r-- | cpp/xml/cluster.xml | 30 |
12 files changed, 257 insertions, 75 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 968720050d..388b2d77dd 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -380,15 +380,12 @@ struct PositionEquals { }; }// namespace -bool Queue::find(QueuedMessage& msg, SequenceNumber pos) const { +QueuedMessage Queue::find(SequenceNumber pos) const { Mutex::ScopedLock locker(messageLock); Messages::const_iterator i = std::find_if(messages.begin(), messages.end(), PositionEquals(pos)); - if (i == messages.end()) - return false; - else { - msg = *i; - return true; - } + if (i != messages.end()) + return *i; + return QueuedMessage(); } void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ @@ -876,9 +873,6 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str } void Queue::setPosition(SequenceNumber n) { - if (n <= sequence) - throw InvalidArgumentException(QPID_MSG("Invalid position " << n << " < " << sequence - << " for queue " << name)); + Mutex::ScopedLock locker(messageLock); sequence = n; - --sequence; // Decrement so ++sequence will return n. } diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index bca01f7ef5..21cb5ad42d 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -230,12 +230,8 @@ RateTracker dequeueTracker; */ QueuedMessage get(); - /** Get the message at position pos - *@param msg out parameter, assigned to the message found. - *@param pos position to search for. - *@return True if there is a message at pos, false otherwise. - */ - bool find(QueuedMessage& msg, framing::SequenceNumber pos) const; + /** Get the message at position pos */ + QueuedMessage find(framing::SequenceNumber pos) const; const QueuePolicy* getPolicy(); diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index d08cc5fa2c..250d520145 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -100,10 +100,12 @@ class SessionState : public qpid::SessionState, void readyToSend(); + // Used by cluster to create replica sessions. template <class F> void eachConsumer(F f) { semanticState.eachConsumer(f); } + template <class F> void eachUnacked(F f) { semanticState.eachUnacked(f); } SemanticState::ConsumerImpl& getConsumer(const string& dest) { return semanticState.find(dest); } - boost::intrusive_ptr<Message> getMessageInProgress() { return msgBuilder.getMessage(); } + void record(const DeliveryRecord& delivery) { semanticState.record(delivery); } private: diff --git a/cpp/src/qpid/client/LocalQueue.cpp b/cpp/src/qpid/client/LocalQueue.cpp index 229d3766ef..e449c9f795 100644 --- a/cpp/src/qpid/client/LocalQueue.cpp +++ b/cpp/src/qpid/client/LocalQueue.cpp @@ -33,12 +33,12 @@ using namespace framing; LocalQueue::LocalQueue() {} LocalQueue::~LocalQueue() {} -Message LocalQueue::pop() { return get(); } +Message LocalQueue::pop(sys::Duration timeout) { return get(timeout); } -Message LocalQueue::get() { +Message LocalQueue::get(sys::Duration timeout) { Message result; - bool ok = get(result, sys::TIME_INFINITE); - assert(ok); (void) ok; + bool ok = get(result, timeout); + if (!ok) throw Exception("Timed out waiting for a message"); return result; } diff --git a/cpp/src/qpid/client/LocalQueue.h b/cpp/src/qpid/client/LocalQueue.h index 9fe72762c3..3be2293810 100644 --- a/cpp/src/qpid/client/LocalQueue.h +++ b/cpp/src/qpid/client/LocalQueue.h @@ -56,14 +56,14 @@ class LocalQueue { */ bool get(Message& result, sys::Duration timeout=0); - /** Get the next message off the local queue, or wait for a - * message from the broker queue. - *@exception ClosedException if subscription has been closed. + /** Get the next message off the local queue, or wait up to the timeout + * for message from the broker queue. + *@exception ClosedException if subscription is closed or timeout exceeded. */ - Message get(); + Message get(sys::Duration timeout=sys::TIME_INFINITE); /** Synonym for get(). */ - Message pop(); + Message pop(sys::Duration timeout=sys::TIME_INFINITE); /** Return true if local queue is empty. */ bool empty() const; diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 28391a5c78..d8d41027cc 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -19,6 +19,7 @@ * */ #include "Connection.h" +#include "DumpClient.h" #include "Cluster.h" #include "qpid/broker/SessionState.h" @@ -73,8 +74,6 @@ void Connection::deliverDoOutput(uint32_t requested) { output.deliverDoOutput(requested); } -// FIXME aconway 2008-10-15: changes here, dubious. - // Received from a directly connected client. void Connection::received(framing::AMQFrame& f) { QPID_LOG(trace, cluster << " RECV " << *this << ": " << f); @@ -214,6 +213,50 @@ bool Connection::isDumped() const { return self.first == cluster.getId() && self.second == 0; } +void Connection::deliveryRecord(const string& qname, + const SequenceNumber& position, + const string& tag, + const SequenceNumber& id, + bool acquired, + bool accepted, + bool cancelled, + bool completed, + bool ended, + bool windowing) +{ + broker::QueuedMessage m; + broker::Queue::shared_ptr queue = cluster.getBroker().getQueues().find(qname); + if (!queue) throw Exception(QPID_MSG(cluster << " bad deliveryRecord queue " << qname)); + broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP); + if (!dumpQueue) throw Exception(QPID_MSG(cluster << " deliveryRecord missing dump queue")); + + if (!ended) { // Has a message + if (acquired) // Message at front of dump queue + m = dumpQueue->get(); + else // Message at original position in original queue + m = queue->find(position); + if (!m.payload) + throw Exception(QPID_MSG("deliveryRecord no dump message")); + } + + broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing); + dr.setId(id); + if (cancelled) dr.cancel(dr.getTag()); + if (completed) dr.complete(); + if (ended) dr.setEnded(); // Exsitance of message + + broker::SessionHandler& h = connection.getChannel(currentChannel); + broker::SessionState* s = h.getSession(); + assert(s); + s->record(dr); +} + +void Connection::queuePosition(const string& qname, const SequenceNumber& position) { + shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname); + if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname)); + q->setPosition(position); +} + std::ostream& operator<<(std::ostream& o, const Connection& c) { const char* type="unknown"; if (c.isLocal()) type = "local"; @@ -222,5 +265,7 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) { return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")"; } + + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 3b5298a8a1..b4f8128632 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -94,17 +94,30 @@ class Connection : // ==== Used in catch-up mode to build initial state. // // State dump methods. - void sessionState(const SequenceNumber& replayStart, - const SequenceNumber& sendCommandPoint, - const SequenceSet& sentIncomplete, - const SequenceNumber& expected, - const SequenceNumber& received, - const SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete); + void sessionState(const framing::SequenceNumber& replayStart, + const framing::SequenceNumber& sendCommandPoint, + const framing::SequenceSet& sentIncomplete, + const framing::SequenceNumber& expected, + const framing::SequenceNumber& received, + const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete); void shadowReady(uint64_t memberId, uint64_t connectionId); void membership(const framing::FieldTable&, const framing::FieldTable&); + void deliveryRecord(const std::string& queue, + const framing::SequenceNumber& position, + const std::string& tag, + const framing::SequenceNumber& id, + bool acquired, + bool accepted, + bool cancelled, + bool completed, + bool ended, + bool windowing); + + void queuePosition(const std::string&, const framing::SequenceNumber&); + private: bool catcUp; diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp index 802019feb1..40852a0411 100644 --- a/cpp/src/qpid/cluster/DumpClient.cpp +++ b/cpp/src/qpid/cluster/DumpClient.cpp @@ -56,14 +56,11 @@ namespace arg=client::arg; using client::SessionBase_0_10Access; struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection { - ClusterConnectionProxy(client::Connection& c) : + ClusterConnectionProxy(client::Connection c) : AMQP_AllProxy::ClusterConnection(*client::ConnectionAccess::getImpl(c)) {} + ClusterConnectionProxy(client::AsyncSession s) : + AMQP_AllProxy::ClusterConnection(SessionBase_0_10Access(s).get()->out) {} }; -struct ClusterProxy : public AMQP_AllProxy::Cluster { - ClusterProxy(client::Connection& c) : - AMQP_AllProxy::Cluster(*client::ConnectionAccess::getImpl(c)) {} -}; - // Create a connection with special version that marks it as a catch-up connection. client::Connection catchUpConnection() { @@ -73,7 +70,7 @@ client::Connection catchUpConnection() { } // Send a control body directly to the session. -void send(client::Session& s, const AMQBody& body) { +void send(client::AsyncSession& s, const AMQBody& body) { client::SessionBase_0_10Access sb(s); sb.get()->send(body); } @@ -94,19 +91,23 @@ DumpClient::DumpClient(const MemberId& from, const MemberId& to, const Url& url, DumpClient::~DumpClient() {} -// Catch-up exchange name: an illegal AMQP exchange name to avoid clashes. -static const char CATCH_UP_CHARS[] = "\000qpid-dump-exchange"; -static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS)); +// Illegal exchange/queue name for catch-up, avoid clashes with user queues/exchanges. +static const char DUMP_CHARS[] = "\000qpid-dump"; +const std::string DumpClient::DUMP(DUMP_CHARS, sizeof(DUMP_CHARS)); void DumpClient::dump() { QPID_LOG(debug, dumperId << " dumping state to " << dumpeeId << " at " << dumpeeUrl); Broker& b = dumperBroker; b.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1)); - // Catch-up exchange is used to route messages to the proper queue without modifying routing key. - session.exchangeDeclare(arg::exchange=CATCH_UP, arg::type="fanout", arg::autoDelete=true); + + // Dump exchange is used to route messages to the proper queue without modifying routing key. + session.exchangeDeclare(arg::exchange=DUMP, arg::type="fanout", arg::autoDelete=true); b.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1)); +// Dump queue is used to transfer acquired messages that are no longer on their original queue. + session.queueDeclare(arg::queue=DUMP, arg::autoDelete=true); session.sync(); session.close(); + std::for_each(connections.begin(), connections.end(), boost::bind(&DumpClient::dumpConnection, this, _1)); AMQFrame frame(map.asMethodBody()); client::ConnectionAccess::getImpl(connection)->handle(frame); @@ -134,6 +135,39 @@ void DumpClient::dumpExchange(const boost::shared_ptr<Exchange>& ex) { arg::arguments=ex->getArgs()); } +/** Bind a queue to the dump exchange and dump messges to it + * setting the message possition as needed. + */ +class MessageDumper { + std::string queue; + bool haveLastPos; + framing::SequenceNumber lastPos; + client::AsyncSession session; + + public: + + MessageDumper(const string& q, const client::AsyncSession s) : queue(q), haveLastPos(false), session(s) { + session.exchangeBind(queue, DumpClient::DUMP); + } + + ~MessageDumper() { + session.exchangeUnbind(queue, DumpClient::DUMP); + } + + void dump(const broker::QueuedMessage& message) { + if (!haveLastPos || message.position - lastPos != 1) { + ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1); + haveLastPos = true; + } + lastPos = message.position; + SessionBase_0_10Access sb(session); + framing::MessageTransferBody transfer( + framing::ProtocolVersion(), DumpClient::DUMP, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED); + sb.get()->send(transfer, message.payload->getFrames()); + } +}; + + void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) { session.queueDeclare( q->getName(), @@ -143,19 +177,11 @@ void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) { arg::exclusive=q->hasExclusiveConsumer(), arg::autoDelete=q->isAutoDelete(), arg::arguments=q->getSettings()); - - session.exchangeBind(q->getName(), CATCH_UP, std::string()); - q->eachMessage(boost::bind(&DumpClient::dumpMessage, this, _1)); - session.exchangeUnbind(q->getName(), CATCH_UP, std::string()); + MessageDumper dumper(q->getName(), session); + q->eachMessage(boost::bind(&MessageDumper::dump, &dumper, _1)); q->eachBinding(boost::bind(&DumpClient::dumpBinding, this, q->getName(), _1)); } -void DumpClient::dumpMessage(const broker::QueuedMessage& message) { - SessionBase_0_10Access sb(session); - framing::MessageTransferBody transfer( - framing::ProtocolVersion(), CATCH_UP, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED); - sb.get()->send(transfer, message.payload->getFrames()); -} void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& binding) { session.exchangeBind(queue, binding.exchange, binding.key, binding.args); @@ -190,11 +216,11 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) { // Re-create session state on remote connection. - // For reasons unknown, boost::bind does not work here with boost 1.33. + // Dump consumers. For reasons unknown, boost::bind does not work here with boost 1.33. ss->eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this)); + ss->eachUnacked(boost::bind(&DumpClient::dumpUnacked, this, _1)); boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress(); - // Adjust for message in progress, will be sent after state update. SequenceNumber received = ss->receiverGetReceived().command; if (inProgress) @@ -221,7 +247,7 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) { QPID_LOG(debug, dumperId << " dumped session " << sh.getSession()->getId()); } -void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) { +void DumpClient::dumpConsumer(const broker::SemanticState::ConsumerImpl* ci) { QPID_LOG(debug, dumperId << " dumping consumer " << ci->getName() << " on " << shadowSession.getId()); using namespace message; shadowSession.messageSubscribe( @@ -246,5 +272,27 @@ void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) { client::SessionBase_0_10Access(shadowSession).get()->send(state); QPID_LOG(debug, dumperId << " dumped consumer " << ci->getName() << " on " << shadowSession.getId()); } + +void DumpClient::dumpUnacked(const broker::DeliveryRecord& dr) { + assert(dr.isEnded() || dr.getMessage().payload); + + if (!dr.isEnded() && dr.isAcquired()) { + // If the message is acquired then it is no longer on the + // dumpees queue, put it on the dump queue for dumpee to pick up. + // + MessageDumper(DUMP, shadowSession).dump(dr.getMessage()); + } + ClusterConnectionProxy(shadowSession).deliveryRecord( + dr.getQueue()->getName(), + dr.getMessage().position, + dr.getTag(), + dr.getId(), + dr.isAcquired(), + dr.isAccepted(), + dr.isCancelled(), + dr.isComplete(), + dr.isEnded(), + dr.isWindowing()); +} }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/DumpClient.h b/cpp/src/qpid/cluster/DumpClient.h index d61779319a..bb349a39ee 100644 --- a/cpp/src/qpid/cluster/DumpClient.h +++ b/cpp/src/qpid/cluster/DumpClient.h @@ -43,6 +43,7 @@ class QueueBindings; class QueueBinding; class QueuedMessage; class SessionHandler; +class DeliveryRecord; } // namespace broker @@ -57,6 +58,8 @@ class ClusterMap; */ class DumpClient : public sys::Runnable { public: + static const std::string DUMP; // Name for special dump queue and exchange. + DumpClient(const MemberId& dumper, const MemberId& dumpee, const Url&, broker::Broker& donor, const ClusterMap& map, const std::vector<boost::intrusive_ptr<Connection> >& , const boost::function<void()>& done, @@ -70,10 +73,12 @@ class DumpClient : public sys::Runnable { void dumpQueue(const boost::shared_ptr<broker::Queue>&); void dumpExchange(const boost::shared_ptr<broker::Exchange>&); void dumpMessage(const broker::QueuedMessage&); + void dumpMessageTo(const broker::QueuedMessage&, const std::string& queue, client::Session s); void dumpBinding(const std::string& queue, const broker::QueueBinding& binding); void dumpConnection(const boost::intrusive_ptr<Connection>& connection); void dumpSession(broker::SessionHandler& s); - void dumpConsumer(broker::SemanticState::ConsumerImpl*); + void dumpConsumer(const broker::SemanticState::ConsumerImpl*); + void dumpUnacked(const broker::DeliveryRecord&); private: MemberId dumperId; diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index a20a3841a9..887a0716e7 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -220,31 +220,90 @@ class Sender { uint16_t channel; }; -// FIXME aconway 2008-10-20: dump Tx state. +QPID_AUTO_TEST_CASE(testUnacked) { + // Verify replication of unacknowledged messages. + ClusterFixture cluster(1); + Client c0(cluster[0], "c0"); + + Message m; + + SubscriptionSettings manualAccept(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 0); + c0.session.queueDeclare("q1"); + c0.session.messageTransfer(arg::content=Message("11","q1")); + LocalQueue q1; + c0.subs.subscribe(q1, "q1", manualAccept); + BOOST_CHECK_EQUAL(q1.get(TIME_SEC).getData(), "11"); // Acquired but not accepted + BOOST_CHECK_EQUAL(c0.session.queueQuery("q1").getMessageCount(), 0u); // Gone from queue + + SubscriptionSettings manualAcquire(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_NOT_ACQUIRED, 0); + c0.session.queueDeclare("q2"); + c0.session.messageTransfer(arg::content=Message("21","q2")); + c0.session.messageTransfer(arg::content=Message("22","q2")); + + LocalQueue q2; + c0.subs.subscribe(q2, "q2", manualAcquire); + m = q2.get(TIME_SEC); // Not acquired or accepted, still on queue + BOOST_CHECK_EQUAL(m.getData(), "21"); + BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 2u); // Not removed + c0.subs.getSubscription("q2").acquire(m); // Acquire manually + BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // Removed + BOOST_CHECK_EQUAL(q2.get(TIME_SEC).getData(), "22"); // Not acquired or accepted, still on queue + BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // 1 not acquired. + + // Add new member while there are unacked messages. + cluster.add(); + cluster.waitFor(2); + Client c1(cluster[1], "c1"); + + // Check queue counts + BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 0u); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 1u); + + // Unacked messages should be requeued when session is closed. + c0.session.close(); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 1u); + BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 2u); + + BOOST_CHECK(c1.subs.get(m, "q1", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "11"); + BOOST_CHECK(c1.subs.get(m, "q2", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "21"); + BOOST_CHECK(c1.subs.get(m, "q2", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "22"); +} + QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testDumpTxState, 1) { // Verify that we dump transaction state correctly to new members. ClusterFixture cluster(1); Client c0(cluster[0], "c0"); + + // Do work in a transaction. c0.session.txSelect(); c0.session.queueDeclare("q"); c0.session.messageTransfer(arg::content=Message("1","q")); - c0.session.txCommit(); - - c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(1)); + c0.session.messageTransfer(arg::content=Message("2","q")); Message m; - BOOST_CHECK(c0.lq.get(m, TIME_SEC)); + BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC)); BOOST_CHECK_EQUAL(m.getData(), "1"); - c0.session.messageTransfer(arg::content=Message("2","q")); + // New member, TX not comitted, c1 should see nothing. cluster.add(); Client c1(cluster[1], "c1"); - // Not yet comitted, c1 should see nothing. BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u); + + // After commit c1 shoudl see results of tx. c0.session.txCommit(); - // c1 shoudl see results of tx. BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u); BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); BOOST_CHECK_EQUAL(m.getData(), "2"); + + // Another transaction with both members active. + c0.session.messageTransfer(arg::content=Message("3","q")); + BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u); + c0.session.txCommit(); + BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u); + BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "3"); } QPID_AUTO_TEST_CASE(testDumpMessageBuilder) { diff --git a/cpp/src/tests/exception_test.cpp b/cpp/src/tests/exception_test.cpp index a73ea9e36b..f3f5435699 100644 --- a/cpp/src/tests/exception_test.cpp +++ b/cpp/src/tests/exception_test.cpp @@ -92,7 +92,7 @@ QPID_AUTO_TEST_CASE(DisconnectedPop) { ProxyConnection c(fix.broker->getPort(Broker::TCP_TRANSPORT)); fix.session.queueDeclare(arg::queue="q"); fix.subs.subscribe(fix.lq, "q"); - Catcher<ConnectionException> pop(bind(&LocalQueue::pop, boost::ref(fix.lq))); + Catcher<ConnectionException> pop(bind(&LocalQueue::pop, &fix.lq, sys::TIME_SEC)); fix.connection.proxy.close(); BOOST_CHECK(pop.join()); } diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml index 1eb33e8333..d213b271a2 100644 --- a/cpp/xml/cluster.xml +++ b/cpp/xml/cluster.xml @@ -70,8 +70,7 @@ A connection is dumped as followed: - open as a normal connection. - attach sessions, create consumers, set flow with normal AMQP cokmmands. - - reset session state by sending session-state for each session. - - frames following session-state are replay frames. + - send /reset additional session state with controls below. - send shadow-ready to mark end of shadow dump. - send dump-complete when entire dump is complete. --> @@ -83,8 +82,22 @@ <field name="notifyEnabled" type="bit"/> </control> + <!-- Delivery-record for outgoing messages sent but not yet accepted. --> + <control name="delivery-record" code ="0x11"> + <field name="queue" type="str8"/> + <field name="position" type="sequence-no"/> + <field name="tag" type="str8"/> + <field name="id" type="sequence-no"/> + <field name="acquired" type="bit"/> <!--If not set, message follows. --> + <field name="accepted" type="bit"/> + <field name="cancelled" type="bit"/> + <field name="completed" type="bit"/> + <field name="ended" type="bit"/> + <field name="windowing" type="bit"/> + </control> + <!-- Complete a session state dump. --> - <control name="session-state" code="0x11" label="Set session state during a brain dump."> + <control name="session-state" code="0x1F" label="Set session state during a brain dump."> <!-- Target session deduced from channel number. --> <field name="replay-start" type="sequence-no"/> <!-- Replay frames will start from this point.--> <field name="command-point" type="sequence-no"/> <!-- Id of next command sent --> @@ -97,15 +110,22 @@ </control> <!-- Complete a shadow connection dump. --> - <control name="shadow-ready" code="0x12" label="End of shadow connection dump."> + <control name="shadow-ready" code="0x20" label="End of shadow connection dump."> <field name="member-id" type="uint64"/> <field name="connection-id" type="uint64"/> </control> <!-- Complete a cluster state dump. --> - <control name="membership" code="0x13" label="Cluster membership details."> + <control name="membership" code="0x21" label="Cluster membership details."> <field name="newbies" type="map"/> <!-- member-id -> URL --> <field name="members" type="map"/> <!-- member-id -> state --> </control> + + <!-- Set the position of a replicated queue. --> + <control name="queue-position" code="0x30"> + <field name="queue" type="str8"/> + <field name="position" type="sequence-no"/> + </control> + </class> </amqp> |