diff options
Diffstat (limited to 'cpp/src/qpid/cluster')
28 files changed, 696 insertions, 417 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index dd4882774b..e6e3de64f2 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -36,45 +36,45 @@ * * IMPORTANT NOTE: any time code is added to the broker that uses timers, * the cluster may need to be updated to take account of this. - * + * * * USE OF TIMESTAMPS IN THE BROKER - * + * * The following are the current areas where broker uses timers or timestamps: - * + * * - Producer flow control: broker::SemanticState uses * connection::getClusterOrderOutput. a FrameHandler that sends * frames to the client via the cluster. Used by broker::SessionState - * + * * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is * implemented by cluster::ExpiryPolicy. - * + * * - Connection heartbeat: sends connection controls, not part of * session command counting so OK to ignore. - * + * * - LinkRegistry: only cluster elder is ever active for links. - * + * * - management::ManagementBroker: uses MessageHandler supplied by cluster * to send messages to the broker via the cluster. - * - * - Dtx: not yet supported with cluster. * - * cluster::ExpiryPolicy implements the strategy for message expiry. + * cluster::ExpiryPolicy uses cluster time. * * ClusterTimer implements periodic timed events in the cluster context. - * Used for periodic management events. + * Used for: + * - periodic management events. + * - DTX transaction timeouts. * * <h1>CLUSTER PROTOCOL OVERVIEW</h1> - * + * * Messages sent to/from CPG are called Events. * * An Event carries a ConnectionId, which includes a MemberId and a * connection number. - * + * * Events are either * - Connection events: non-0 connection number and are associated with a connection. * - Cluster Events: 0 connection number, are not associated with a connection. - * + * * Events are further categorized as: * - Control: carries method frame(s) that affect cluster behavior. * - Data: carries raw data received from a client connection. @@ -146,6 +146,7 @@ #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/framing/AllInvoker.h" #include "qpid/framing/ClusterConfigChangeBody.h" +#include "qpid/framing/ClusterClockBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionAbortBody.h" #include "qpid/framing/ClusterRetractOfferBody.h" @@ -198,7 +199,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 1058747; +const uint32_t Cluster::CLUSTER_VERSION = 1159329; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -214,7 +215,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { { cluster.initialStatus( member, version, active, clusterId, - framing::cluster::StoreState(storeState), shutdownId, + framing::cluster::StoreState(storeState), shutdownId, firstConfig, l); } void ready(const std::string& url) { @@ -230,21 +231,21 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { cluster.updateOffer(member, updatee, l); } void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); } - void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) { cluster.errorCheck(member, type, frameSeq, l); } void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l); } - void timerDrop(const std::string& name) { cluster.timerWakeup(member, name, l); } + void timerDrop(const std::string& name) { cluster.timerDrop(member, name, l); } void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); } void deliverToQueue(const std::string& queue, const std::string& message) { cluster.deliverToQueue(queue, message, l); } + void clock(uint64_t time) { cluster.clock(time, l); } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } }; Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : - settings(set), + settings(set), broker(b), mgmtObject(0), poller(b.getPoller()), @@ -253,7 +254,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : self(cpg.self()), clusterId(true), mAgent(0), - expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())), + expiryPolicy(new ExpiryPolicy(*this)), mcast(cpg, poller, boost::bind(&Cluster::leave, this)), dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)), deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1), @@ -277,8 +278,11 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : lastBroker(false), updateRetracted(false), updateClosed(false), - error(*this) + error(*this), + acl(0) { + broker.setInCluster(true); + // We give ownership of the timer to the broker and keep a plain pointer. // This is OK as it means the timer has the same lifetime as the broker. timer = new ClusterTimer(*this); @@ -299,7 +303,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : // Load my store status before we go into initialization if (! broker::NullMessageStore::isNullStore(&broker.getStore())) { store.load(); - clusterId = store.getClusterId(); + clusterId = store.getClusterId(); QPID_LOG(notice, "Cluster store state: " << store) } cpg.join(name); @@ -360,14 +364,15 @@ void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) { // Safe to use connections here because we're pre-catchup, stalled // and discarding, so deliveredFrame is not processing any // connection events. - assert(discarding); + assert(discarding); pair<ConnectionMap::iterator, bool> ib = connections.insert(ConnectionMap::value_type(c->getId(), c)); - assert(ib.second); + // Like this to avoid tripping up unused variable warning when NDEBUG set + if (!ib.second) assert(ib.second); } void Cluster::erase(const ConnectionId& id) { - Lock l(lock); + Lock l(lock); erase(id,l); } @@ -393,9 +398,9 @@ std::vector<Url> Cluster::getUrls() const { std::vector<Url> Cluster::getUrls(Lock&) const { return map.memberUrls(); -} +} -void Cluster::leave() { +void Cluster::leave() { Lock l(lock); leave(l); } @@ -405,7 +410,7 @@ void Cluster::leave() { QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \ } do {} while(0) -void Cluster::leave(Lock&) { +void Cluster::leave(Lock&) { if (state != LEFT) { state = LEFT; QPID_LOG(notice, *this << " leaving cluster " << name); @@ -424,7 +429,7 @@ void Cluster::deliver( uint32_t nodeid, uint32_t pid, void* msg, - int msg_len) + int msg_len) { MemberId from(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); @@ -455,7 +460,7 @@ void Cluster::deliveredEvent(const Event& e) { EventFrame ef(e, e.getFrame()); // Stop the deliverEventQueue on update offers. // This preserves the connection decoder fragments for an update. - // Only do this for the two brokers that are directly involved in this + // Only do this for the two brokers that are directly involved in this // offer: the one making the offer, or the one receiving it. const ClusterUpdateOfferBody* offer = castUpdateOffer(ef.frame.getBody()); if (offer && ( e.getMemberId() == self || MemberId(offer->getUpdatee()) == self) ) { @@ -465,7 +470,7 @@ void Cluster::deliveredEvent(const Event& e) { } deliverFrame(ef); } - else if(!discarding) { + else if(!discarding) { if (e.isControl()) deliverFrame(EventFrame(e, e.getFrame())); else { @@ -507,7 +512,7 @@ void Cluster::deliveredFrame(const EventFrame& efConst) { // the event queue. e.frame = AMQFrame( ClusterRetractOfferBody(ProtocolVersion(), offer->getUpdatee())); - deliverEventQueue.start(); + deliverEventQueue.start(); } // Process each frame through the error checker. if (error.isUnresolved()) { @@ -515,14 +520,14 @@ void Cluster::deliveredFrame(const EventFrame& efConst) { while (error.canProcess()) // There is a frame ready to process. processFrame(error.getNext(), l); } - else + else processFrame(e, l); } void Cluster::processFrame(const EventFrame& e, Lock& l) { if (e.isCluster()) { - QPID_LOG(trace, *this << " DLVR: " << e); + QPID_LOG_IF(trace, loggable(e.frame), *this << " DLVR: " << e); ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l); if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled()) throw Exception(QPID_MSG("Invalid cluster control")); @@ -531,14 +536,15 @@ void Cluster::processFrame(const EventFrame& e, Lock& l) { map.incrementFrameSeq(); ConnectionPtr connection = getConnection(e, l); if (connection) { - QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e); + QPID_LOG_IF(trace, loggable(e.frame), + *this << " DLVR " << map.getFrameSeq() << ": " << e); connection->deliveredFrame(e); } else - QPID_LOG(trace, *this << " DROP (no connection): " << e); + throw Exception(QPID_MSG("Unknown connection: " << e)); } else // Drop connection frames while state < CATCHUP - QPID_LOG(trace, *this << " DROP (joining): " << e); + QPID_LOG_IF(trace, loggable(e.frame), *this << " DROP (joining): " << e); } // Called in deliverFrameQueue thread @@ -577,7 +583,7 @@ Cluster::ConnectionVector Cluster::getConnections(Lock&) { } // CPG config-change callback. -void Cluster::configChange ( +void Cluster::configChange ( cpg_handle_t /*handle*/, const cpg_name */*group*/, const cpg_address *members, int nMembers, @@ -607,7 +613,7 @@ void Cluster::setReady(Lock&) { } // Set the management status from the Cluster::state. -// +// // NOTE: Management updates are sent based on property changes. In // order to keep consistency across the cluster, we touch the local // management status property even if it is locally unchanged for any @@ -618,7 +624,7 @@ void Cluster::setMgmtStatus(Lock&) { } void Cluster::initMapCompleted(Lock& l) { - // Called on completion of the initial status map. + // Called on completion of the initial status map. QPID_LOG(debug, *this << " initial status map complete. "); setMgmtStatus(l); if (state == PRE_INIT) { @@ -665,6 +671,8 @@ void Cluster::initMapCompleted(Lock& l) { else { // I can go ready. discarding = false; setReady(l); + // Must be called *before* memberUpdate so first update will be generated. + failoverExchange->setReady(); memberUpdate(l); updateMgmtMembership(l); mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); @@ -701,8 +709,8 @@ void Cluster::configChange(const MemberId&, if (initMap.isResendNeeded()) { mcast.mcastControl( ClusterInitialStatusBody( - ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, - store.getState(), store.getShutdownId(), + ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, + store.getState(), store.getShutdownId(), initMap.getFirstConfigStr() ), self); @@ -717,6 +725,20 @@ void Cluster::configChange(const MemberId&, updateMgmtMembership(l); // Update on every config change for consistency } +struct ClusterClockTask : public sys::TimerTask { + Cluster& cluster; + sys::Timer& timer; + + ClusterClockTask(Cluster& cluster, sys::Timer& timer, uint16_t clockInterval) + : TimerTask(Duration(clockInterval * TIME_MSEC),"ClusterClock"), cluster(cluster), timer(timer) {} + + void fire() { + cluster.sendClockUpdate(); + setupNextFire(); + timer.add(this); + } +}; + void Cluster::becomeElder(Lock&) { if (elder) return; // We were already the elder. // We are the oldest, reactive links if necessary @@ -724,6 +746,8 @@ void Cluster::becomeElder(Lock&) { elder = true; broker.getLinks().setPassive(false); timer->becomeElder(); + + clockTimer.add(new ClusterClockTask(*this, clockTimer, settings.clockInterval)); } void Cluster::makeOffer(const MemberId& id, Lock& ) { @@ -759,7 +783,7 @@ std::string Cluster::debugSnapshot() { // point we know the poller has stopped so no poller callbacks will be // invoked. We must ensure that CPG has also shut down so no CPG // callbacks will be invoked. -// +// void Cluster::brokerShutdown() { sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts. try { cpg.shutdown(); } @@ -775,7 +799,7 @@ void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) } void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active, - const framing::Uuid& id, + const framing::Uuid& id, framing::cluster::StoreState store, const framing::Uuid& shutdownId, const std::string& firstConfig, @@ -833,6 +857,8 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) else if (updatee == self && url) { assert(state == JOINER); state = UPDATEE; + acl = broker.getAcl(); + broker.setAcl(0); // Disable ACL during update QPID_LOG(notice, *this << " receiving update from " << updater); checkUpdateIn(l); } @@ -844,7 +870,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) if (updatee != self && url) { QPID_LOG(debug, debugSnapshot()); if (mAgent) mAgent->clusterUpdate(); - // Updatee will call clusterUpdate when update completes + // Updatee will call clusterUpdate() via checkUpdateIn() when update completes } } @@ -925,13 +951,15 @@ void Cluster::checkUpdateIn(Lock& l) { if (!updateClosed) return; // Wait till update connection closes. if (updatedMap) { // We're up to date map = *updatedMap; - failoverExchange->setUrls(getUrls(l)); mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; memberUpdate(l); + // Must be called *after* memberUpdate() to avoid sending an extra update. + failoverExchange->setReady(); // NB: don't updateMgmtMembership() here as we are not in the deliver // thread. It will be updated on delivery of the "ready" we just mcast. broker.setClusterUpdatee(false); + broker.setAcl(acl); // Restore ACL discarding = false; // OK to set, we're stalled for update. QPID_LOG(notice, *this << " update complete, starting catch-up."); QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled. @@ -941,6 +969,10 @@ void Cluster::checkUpdateIn(Lock& l) { mAgent->suppress(false); // Enable management output. mAgent->clusterUpdate(); } + // Restore alternate exchange settings on exchanges. + broker.getExchanges().eachExchange( + boost::bind(&broker::Exchange::recoveryComplete, _1, + boost::ref(broker.getExchanges()))); enableClusterSafe(); // Enable cluster-safe assertions deliverEventQueue.start(); } @@ -969,7 +1001,7 @@ void Cluster::updateOutDone(Lock& l) { void Cluster::updateOutError(const std::exception& e) { Monitor::ScopedLock l(lock); - QPID_LOG(error, *this << " error sending update: " << e.what()); + QPID_LOG(error, *this << " error sending update: " << e.what()); updateOutDone(l); } @@ -1067,7 +1099,7 @@ void Cluster::memberUpdate(Lock& l) { void Cluster::updateMgmtMembership(Lock& l) { if (!mgmtObject) return; std::vector<Url> urls = getUrls(l); - mgmtObject->set_clusterSize(urls.size()); + mgmtObject->set_clusterSize(urls.size()); string urlstr; for(std::vector<Url>::iterator i = urls.begin(); i != urls.end(); i++ ) { if (i != urls.begin()) urlstr += ";"; @@ -1114,10 +1146,6 @@ void Cluster::setClusterId(const Uuid& uuid, Lock&) { QPID_LOG(notice, *this << " cluster-uuid = " << clusterId); } -void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { - expiryPolicy->deliverExpire(id); -} - void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq, Lock&) { // If we see an errorCheck here (rather than in the ErrorCheck // class) then we have processed succesfully past the point of the @@ -1155,6 +1183,35 @@ void Cluster::deliverToQueue(const std::string& queue, const std::string& messag q->deliver(msg); } +sys::AbsTime Cluster::getClusterTime() { + Mutex::ScopedLock l(lock); + return clusterTime; +} + +// This method is called during update on the updatee to set the initial cluster time. +void Cluster::clock(const uint64_t time) { + Mutex::ScopedLock l(lock); + clock(time, l); +} + +// called when broadcast message received +void Cluster::clock(const uint64_t time, Lock&) { + clusterTime = AbsTime(EPOCH, time); + AbsTime now = AbsTime::now(); + + if (!elder) { + clusterTimeOffset = Duration(now, clusterTime); + } +} + +// called by elder timer to send clock broadcast +void Cluster::sendClockUpdate() { + Mutex::ScopedLock l(lock); + int64_t nanosecondsSinceEpoch = Duration(EPOCH, now()); + nanosecondsSinceEpoch += clusterTimeOffset; + mcast.mcastControl(ClusterClockBody(ProtocolVersion(), nanosecondsSinceEpoch), self); +} + bool Cluster::deferDeliveryImpl(const std::string& queue, const boost::intrusive_ptr<broker::Message>& msg) { @@ -1167,4 +1224,12 @@ bool Cluster::deferDeliveryImpl(const std::string& queue, return true; } +bool Cluster::loggable(const AMQFrame& f) { + const AMQMethodBody* method = (f.getMethod()); + if (!method) return true; // Not a method + bool isClock = method->amqpClassId() == ClusterClockBody::CLASS_ID + && method->amqpMethodId() == ClusterClockBody::METHOD_ID; + return !isClock; +} + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 8f73c6acca..ccec4948e6 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -56,17 +56,25 @@ namespace qpid { namespace broker { class Message; +class AclModule; } namespace framing { +class AMQFrame; class AMQBody; -class Uuid; +struct Uuid; +} + +namespace sys { +class Timer; +class AbsTime; +class Duration; } namespace cluster { class Connection; -class EventFrame; +struct EventFrame; class ClusterTimer; class UpdateDataExchange; @@ -89,10 +97,10 @@ class Cluster : private Cpg::Handler, public management::Manageable { void initialize(); // Connection map. - void addLocalConnection(const ConnectionPtr&); - void addShadowConnection(const ConnectionPtr&); - void erase(const ConnectionId&); - + void addLocalConnection(const ConnectionPtr&); + void addShadowConnection(const ConnectionPtr&); + void erase(const ConnectionId&); + // URLs of current cluster members. std::vector<std::string> getIds() const; std::vector<Url> getUrls() const; @@ -107,7 +115,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void updateInRetracted(); // True if we are expecting to receive catch-up connections. bool isExpectingUpdate(); - + MemberId getId() const; broker::Broker& getBroker() const; Multicaster& getMulticast() { return mcast; } @@ -135,6 +143,12 @@ class Cluster : private Cpg::Handler, public management::Manageable { bool deferDeliveryImpl(const std::string& queue, const boost::intrusive_ptr<broker::Message>& msg); + sys::AbsTime getClusterTime(); + void sendClockUpdate(); + void clock(const uint64_t time); + + static bool loggable(const framing::AMQFrame&); // True if the frame should be logged. + private: typedef sys::Monitor::ScopedLock Lock; @@ -144,10 +158,10 @@ class Cluster : private Cpg::Handler, public management::Manageable { /** Version number of the cluster protocol, to avoid mixed versions. */ static const uint32_t CLUSTER_VERSION; - + // NB: A dummy Lock& parameter marks functions that must only be // called with Cluster::lock locked. - + void leave(Lock&); std::vector<std::string> getIds(Lock&) const; std::vector<Url> getUrls(Lock&) const; @@ -156,11 +170,11 @@ class Cluster : private Cpg::Handler, public management::Manageable { void brokerShutdown(); // == Called in deliverEventQueue thread - void deliveredEvent(const Event&); + void deliveredEvent(const Event&); // == Called in deliverFrameQueue thread - void deliveredFrame(const EventFrame&); - void processFrame(const EventFrame&, Lock&); + void deliveredFrame(const EventFrame&); + void processFrame(const EventFrame&, Lock&); // Cluster controls implement XML methods from cluster.xml. void updateRequest(const MemberId&, const std::string&, Lock&); @@ -180,12 +194,12 @@ class Cluster : private Cpg::Handler, public management::Manageable { const std::string& left, const std::string& joined, Lock& l); - void messageExpired(const MemberId&, uint64_t, Lock& l); void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&); void timerWakeup(const MemberId&, const std::string& name, Lock&); void timerDrop(const MemberId&, const std::string& name, Lock&); void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&); void deliverToQueue(const std::string& queue, const std::string& message, Lock&); + void clock(const uint64_t time, Lock&); // Helper functions ConnectionPtr getConnection(const EventFrame&, Lock&); @@ -195,7 +209,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void setReady(Lock&); void memberUpdate(Lock&); void setClusterId(const framing::Uuid&, Lock&); - void erase(const ConnectionId&, Lock&); + void erase(const ConnectionId&, Lock&); void requestUpdate(Lock& ); void initMapCompleted(Lock&); void becomeElder(Lock&); @@ -203,7 +217,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void updateMgmtMembership(Lock&); // == Called in CPG dispatch thread - void deliver( // CPG deliver callback. + void deliver( // CPG deliver callback. cpg_handle_t /*handle*/, const struct cpg_name *group, uint32_t /*nodeid*/, @@ -212,7 +226,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { int /*msg_len*/); void deliverEvent(const Event&); - + void configChange( // CPG config change callback. cpg_handle_t /*handle*/, const struct cpg_name */*group*/, @@ -263,7 +277,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { // Used only in deliverEventQueue thread or when stalled for update. Decoder decoder; bool discarding; - + // Remaining members are protected by lock. mutable sys::Monitor lock; @@ -276,7 +290,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { JOINER, ///< Sent update request, waiting for update offer. UPDATEE, ///< Stalled receive queue at update offer, waiting for update to complete. CATCHUP, ///< Update complete, unstalled but has not yet seen own "ready" event. - READY, ///< Fully operational + READY, ///< Fully operational OFFER, ///< Sent an offer, waiting for accept/reject. UPDATER, ///< Offer accepted, sending a state update. LEFT ///< Final state, left the cluster. @@ -296,9 +310,13 @@ class Cluster : private Cpg::Handler, public management::Manageable { ErrorCheck error; UpdateReceiver updateReceiver; ClusterTimer* timer; + sys::Timer clockTimer; + sys::AbsTime clusterTime; + sys::Duration clusterTimeOffset; + broker::AclModule* acl; friend std::ostream& operator<<(std::ostream&, const Cluster&); - friend class ClusterDispatcher; + friend struct ClusterDispatcher; }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp index 040e129970..a8389095c9 100644 --- a/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/cpp/src/qpid/cluster/ClusterMap.cpp @@ -50,11 +50,6 @@ void insertFieldTableFromMapValue(FieldTable& ft, const ClusterMap::Map::value_t ft.setString(vt.first.str(), vt.second.str()); } -void assignFieldTable(FieldTable& ft, const ClusterMap::Map& map) { - ft.clear(); - for_each(map.begin(), map.end(), bind(&insertFieldTableFromMapValue, ref(ft), _1)); -} - } ClusterMap::ClusterMap() : frameSeq(0) {} diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 2962daaa07..69ba095f16 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -72,6 +72,7 @@ struct ClusterOptions : public Options { ("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.") #endif ("cluster-size", optValue(settings.size, "N"), "Wait for N cluster members before allowing clients to connect.") + ("cluster-clock-interval", optValue(settings.clockInterval,"N"), "How often to broadcast the current time to the cluster nodes, in milliseconds. A value between 5 and 1000 is recommended.") ("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: flow-control limit reads per connection. 0=no limit.") ; } diff --git a/cpp/src/qpid/cluster/ClusterSettings.h b/cpp/src/qpid/cluster/ClusterSettings.h index 8e708aa139..2f7b5be20a 100644 --- a/cpp/src/qpid/cluster/ClusterSettings.h +++ b/cpp/src/qpid/cluster/ClusterSettings.h @@ -35,8 +35,9 @@ struct ClusterSettings { size_t readMax; std::string username, password, mechanism; size_t size; + uint16_t clockInterval; - ClusterSettings() : quorum(false), readMax(10), size(1) + ClusterSettings() : quorum(false), readMax(10), size(1), clockInterval(10) {} Url getUrl(uint16_t port) const { diff --git a/cpp/src/qpid/cluster/ClusterTimer.cpp b/cpp/src/qpid/cluster/ClusterTimer.cpp index f6e1c7a849..b4f7d00f38 100644 --- a/cpp/src/qpid/cluster/ClusterTimer.cpp +++ b/cpp/src/qpid/cluster/ClusterTimer.cpp @@ -70,6 +70,7 @@ void ClusterTimer::add(intrusive_ptr<TimerTask> task) if (i != map.end()) throw Exception(QPID_MSG("Task already exists with name " << task->getName())); map[task->getName()] = task; + // Only the elder actually activates the task with the Timer base class. if (cluster.isElder()) { QPID_LOG(trace, "Elder activating cluster timer task " << task->getName()); @@ -112,6 +113,9 @@ void ClusterTimer::deliverWakeup(const std::string& name) { else { intrusive_ptr<TimerTask> t = i->second; map.erase(i); + // Move the nextFireTime so readyToFire() is true. This is to ensure we + // don't get an error if the fired task calls setupNextFire() + t->setFired(); Timer::fire(t); } } diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index e9b718e6de..394749aad2 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,6 +24,8 @@ #include "Cluster.h" #include "UpdateReceiver.h" #include "qpid/assert.h" +#include "qpid/broker/DtxAck.h" +#include "qpid/broker/DtxBuffer.h" #include "qpid/broker/SessionState.h" #include "qpid/broker/SemanticState.h" #include "qpid/broker/TxBuffer.h" @@ -35,6 +37,7 @@ #include "qpid/broker/Fairshare.h" #include "qpid/broker/Link.h" #include "qpid/broker/Bridge.h" +#include "qpid/broker/StatefulQueueObserver.h" #include "qpid/broker/Queue.h" #include "qpid/framing/enum.h" #include "qpid/framing/AMQFrame.h" @@ -78,7 +81,7 @@ const std::string shadowPrefix("[shadow]"); Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& mgmtId, const ConnectionId& id, const qpid::sys::SecuritySettings& external) - : cluster(c), self(id), catchUp(false), output(*this, out), + : cluster(c), self(id), catchUp(false), announced(false), output(*this, out), connectionCtor(&output, cluster.getBroker(), mgmtId, external, false, 0, true), expectProtocolHeader(false), mcastFrameHandler(cluster.getMulticast(), self), @@ -90,13 +93,15 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& mgmtId, MemberId member, bool isCatchUp, bool isLink, const qpid::sys::SecuritySettings& external -) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out), +) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), announced(false), output(*this, out), connectionCtor(&output, cluster.getBroker(), mgmtId, external, isLink, isCatchUp ? ++catchUpId : 0, - isCatchUp), // isCatchUp => shadow + // The first catch-up connection is not considered a shadow + // as it needs to be authenticated. + isCatchUp && self.second > 1), expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self), updateIn(c.getUpdateReceiver()), @@ -113,7 +118,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, if (!updateIn.nextShadowMgmtId.empty()) connectionCtor.mgmtId = updateIn.nextShadowMgmtId; updateIn.nextShadowMgmtId.clear(); - } + } init(); QPID_LOG(debug, cluster << " local connection " << *this); } @@ -143,7 +148,7 @@ void Connection::init() { // Called when we have consumed a read buffer to give credit to the // connection layer to continue reading. void Connection::giveReadCredit(int credit) { - if (cluster.getSettings().readMax && credit) + if (cluster.getSettings().readMax && credit) output.giveReadCredit(credit); } @@ -166,7 +171,7 @@ void Connection::announce( AMQFrame frame; while (frame.decode(buf)) connection->received(frame); - connection->setUserId(username); + connection->setUserId(username); } // Do managment actions now that the connection is replicated. connection->raiseConnectEvent(); @@ -193,7 +198,7 @@ void Connection::received(framing::AMQFrame& f) { << *this << ": " << f); return; } - QPID_LOG(trace, cluster << " RECV " << *this << ": " << f); + QPID_LOG_IF(trace, Cluster::loggable(f), cluster << " RECV " << *this << ": " << f); if (isLocal()) { // Local catch-up connection. currentChannel = f.getChannel(); if (!framing::invoke(*this, *f.getBody()).wasHandled()) @@ -201,7 +206,7 @@ void Connection::received(framing::AMQFrame& f) { } else { // Shadow or updated catch-up connection. if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) { - if (isShadow()) + if (isShadow()) cluster.addShadowConnection(this); AMQFrame ok((ConnectionCloseOkBody())); connection->getOutput().send(ok); @@ -213,16 +218,9 @@ void Connection::received(framing::AMQFrame& f) { } } -bool Connection::checkUnsupported(const AMQBody& body) { - std::string message; - if (body.getMethod()) { - switch (body.getMethod()->amqpClassId()) { - case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break; - } - } - if (!message.empty()) - connection->close(connection::CLOSE_CODE_FRAMING_ERROR, message); - return !message.empty(); +bool Connection::checkUnsupported(const AMQBody&) { + // Throw an exception for unsupported commands. Currently all are supported. + return false; } struct GiveReadCreditOnExit { @@ -241,7 +239,7 @@ void Connection::deliverDoOutput(uint32_t limit) { void Connection::deliveredFrame(const EventFrame& f) { GiveReadCreditOnExit gc(*this, f.readCredit); assert(!catchUp); - currentChannel = f.frame.getChannel(); + currentChannel = f.frame.getChannel(); if (f.frame.getBody() // frame can be emtpy with just readCredit && !framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol. && !checkUnsupported(*f.frame.getBody())) // Unsupported operation. @@ -255,7 +253,7 @@ void Connection::deliveredFrame(const EventFrame& f) { } } -// A local connection is closed by the network layer. +// A local connection is closed by the network layer. Called in the connection thread. void Connection::closed() { try { if (isUpdated()) { @@ -272,8 +270,9 @@ void Connection::closed() { // closed and process any outstanding frames from the cluster // until self-delivery of deliver-close. output.closeOutput(); - cluster.getMulticast().mcastControl( - ClusterConnectionDeliverCloseBody(), self); + if (announced) + cluster.getMulticast().mcastControl( + ClusterConnectionDeliverCloseBody(), self); } } catch (const std::exception& e) { @@ -287,7 +286,7 @@ void Connection::deliverClose () { cluster.erase(self); } -// Close the connection +// Close the connection void Connection::close() { if (connection.get()) { QPID_LOG(debug, cluster << " closed connection " << *this); @@ -320,10 +319,10 @@ size_t Connection::decode(const char* data, size_t size) { while (localDecoder.decode(buf)) received(localDecoder.getFrame()); if (!wasOpen && connection->isOpen()) { - // Connections marked as federation links are allowed to proxy + // Connections marked with setUserProxyAuth are allowed to proxy // messages with user-ID that doesn't match the connection's // authenticated ID. This is important for updates. - connection->setFederationLink(isCatchUp()); + connection->setUserProxyAuth(isCatchUp()); } } else { // Multicast local connections. @@ -332,9 +331,9 @@ size_t Connection::decode(const char* data, size_t size) { if (!checkProtocolHeader(ptr, size)) // Updates ptr return 0; // Incomplete header - if (!connection->isOpen()) + if (!connection->isOpen()) processInitialFrames(ptr, end-ptr); // Updates ptr - + if (connection->isOpen() && end - ptr > 0) { // We're multi-casting, we will give read credit on delivery. grc.credit = 0; @@ -384,6 +383,7 @@ void Connection::processInitialFrames(const char*& ptr, size_t size) { connection->getUserId(), initialFrames), getId()); + announced = true; initialFrames.clear(); } } @@ -406,11 +406,11 @@ void Connection::shadowSetUser(const std::string& userId) { void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position) { - broker::SemanticState::ConsumerImpl& c = semanticState().find(name); - c.position = position; - c.setBlocked(blocked); - if (notifyEnabled) c.enableNotify(); else c.disableNotify(); - updateIn.consumerNumbering.add(c.shared_from_this()); + broker::SemanticState::ConsumerImpl::shared_ptr c = semanticState().find(name); + c->position = position; + c->setBlocked(blocked); + if (notifyEnabled) c->enableNotify(); else c->disableNotify(); + updateIn.consumerNumbering.add(c); } @@ -421,7 +421,8 @@ void Connection::sessionState( const SequenceNumber& expected, const SequenceNumber& received, const SequenceSet& unknownCompleted, - const SequenceSet& receivedIncomplete) + const SequenceSet& receivedIncomplete, + bool dtxSelected) { sessionState().setState( replayStart, @@ -431,8 +432,10 @@ void Connection::sessionState( received, unknownCompleted, receivedIncomplete); - QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId()); - // The output tasks will be added later in the update process. + if (dtxSelected) semanticState().selectDtx(); + QPID_LOG(debug, cluster << " received session state update for " + << sessionState().getId()); + // The output tasks will be added later in the update process. connection->getOutputTasks().removeAll(); } @@ -441,7 +444,7 @@ void Connection::outputTask(uint16_t channel, const std::string& name) { if (!session) throw Exception(QPID_MSG(cluster << " channel not attached " << *this << "[" << channel << "] ")); - OutputTask* task = &session->getSemanticState().find(name); + OutputTask* task = session->getSemanticState().find(name).get(); connection->getOutputTasks().addOutputTask(task); } @@ -461,11 +464,24 @@ void Connection::shadowReady( output.setSendMax(sendMax); } +void Connection::setDtxBuffer(const UpdateReceiver::DtxBufferRef& bufRef) { + broker::DtxManager& mgr = cluster.getBroker().getDtxManager(); + broker::DtxWorkRecord* record = mgr.getWork(bufRef.xid); + broker::DtxBuffer::shared_ptr buffer = (*record)[bufRef.index]; + if (bufRef.suspended) + bufRef.semanticState->getSuspendedXids()[bufRef.xid] = buffer; + else + bufRef.semanticState->setDtxBuffer(buffer); +} + +// Marks the end of the update. void Connection::membership(const FieldTable& joiners, const FieldTable& members, const framing::SequenceNumber& frameSeq) { QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); updateIn.consumerNumbering.clear(); + for_each(updateIn.dtxBuffers.begin(), updateIn.dtxBuffers.end(), + boost::bind(&Connection::setDtxBuffer, this, _1)); closeUpdated(); cluster.updateInDone(ClusterMap(joiners, members, frameSeq)); } @@ -478,7 +494,7 @@ void Connection::retractOffer() { void Connection::closeUpdated() { self.second = 0; // Mark this as completed update connection. - if (connection.get()) + if (connection.get()) connection->close(connection::CLOSE_CODE_NORMAL, "OK"); } @@ -529,12 +545,20 @@ void Connection::deliveryRecord(const string& qname, m = getUpdateMessage(); m.queue = queue.get(); m.position = position; - if (enqueued) queue->updateEnqueued(m); //inform queue of the message + if (enqueued) queue->updateEnqueued(m); //inform queue of the message } else { // Message at original position in original queue - m = queue->find(position); + queue->find(position, m); } - if (!m.payload) - throw Exception(QPID_MSG("deliveryRecord no update message")); + // FIXME aconway 2011-08-19: removed: + // if (!m.payload) + // throw Exception(QPID_MSG("deliveryRecord no update message")); + // + // It seems this could happen legitimately in the case one + // session browses message M, then another session acquires + // it. In that case the browsers delivery record is !acquired + // but the message is not on its original Queue. In that case + // we'll get a deliveryRecord with no payload for the browser. + // } broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing, credit); @@ -542,7 +566,11 @@ void Connection::deliveryRecord(const string& qname, if (cancelled) dr.cancel(dr.getTag()); if (completed) dr.complete(); if (ended) dr.setEnded(); // Exsitance of message - semanticState().record(dr); // Part of the session's unacked list. + + if (dtxBuffer) // Record for next dtx-ack + dtxAckRecords.push_back(dr); + else + semanticState().record(dr); // Record on session's unacked list. } void Connection::queuePosition(const string& qname, const SequenceNumber& position) { @@ -556,8 +584,46 @@ void Connection::queueFairshareState(const std::string& qname, const uint8_t pri } } -void Connection::expiryId(uint64_t id) { - cluster.getExpiryPolicy().setId(id); + +namespace { +// find a StatefulQueueObserver that matches a given identifier +class ObserverFinder { + const std::string id; + boost::shared_ptr<broker::QueueObserver> target; + ObserverFinder(const ObserverFinder&) {} + public: + ObserverFinder(const std::string& _id) : id(_id) {} + broker::StatefulQueueObserver *getObserver() + { + if (target) + return dynamic_cast<broker::StatefulQueueObserver *>(target.get()); + return 0; + } + void operator() (boost::shared_ptr<broker::QueueObserver> o) + { + if (!target) { + broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get()); + if (p && p->getId() == id) { + target = o; + } + } + } +}; +} + + +void Connection::queueObserverState(const std::string& qname, const std::string& observerId, const FieldTable& state) +{ + boost::shared_ptr<broker::Queue> queue(findQueue(qname)); + ObserverFinder finder(observerId); // find this observer + queue->eachObserver<ObserverFinder &>(finder); + broker::StatefulQueueObserver *so = finder.getObserver(); + if (so) { + so->setState( state ); + QPID_LOG(debug, "updated queue observer " << observerId << "'s state on queue " << qname << "; ..."); + return; + } + QPID_LOG(error, "Failed to find observer " << observerId << " state on queue " << qname << "; this will result in inconsistencies."); } std::ostream& operator<<(std::ostream& o, const Connection& c) { @@ -574,6 +640,7 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) { void Connection::txStart() { txBuffer.reset(new broker::TxBuffer()); } + void Connection::txAccept(const framing::SequenceSet& acked) { txBuffer->enlist(boost::shared_ptr<broker::TxAccept>( new broker::TxAccept(acked, semanticState().getUnacked()))); @@ -589,9 +656,11 @@ void Connection::txEnqueue(const std::string& queue) { new broker::RecoveredEnqueue(findQueue(queue), getUpdateMessage().payload))); } -void Connection::txPublish(const framing::Array& queues, bool delivered) { - boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getUpdateMessage().payload)); - for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i) +void Connection::txPublish(const framing::Array& queues, bool delivered) +{ + boost::shared_ptr<broker::TxPublish> txPub( + new broker::TxPublish(getUpdateMessage().payload)); + for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i) txPub->deliverTo(findQueue((*i)->get<std::string>())); txPub->delivered = delivered; txBuffer->enlist(txPub); @@ -605,6 +674,51 @@ void Connection::accumulatedAck(const qpid::framing::SequenceSet& s) { semanticState().setAccumulatedAck(s); } +void Connection::dtxStart(const std::string& xid, + bool ended, + bool suspended, + bool failed, + bool expired) +{ + dtxBuffer.reset(new broker::DtxBuffer(xid, ended, suspended, failed, expired)); + txBuffer = dtxBuffer; +} + +void Connection::dtxEnd() { + broker::DtxManager& mgr = cluster.getBroker().getDtxManager(); + std::string xid = dtxBuffer->getXid(); + if (mgr.exists(xid)) + mgr.join(xid, dtxBuffer); + else + mgr.start(xid, dtxBuffer); + dtxBuffer.reset(); + txBuffer.reset(); +} + +// Sent after all DeliveryRecords for a dtx-ack have been collected in dtxAckRecords +void Connection::dtxAck() { + dtxBuffer->enlist( + boost::shared_ptr<broker::DtxAck>(new broker::DtxAck(dtxAckRecords))); + dtxAckRecords.clear(); +} + +void Connection::dtxBufferRef(const std::string& xid, uint32_t index, bool suspended) { + // Save the association between DtxBuffers and the session so we + // can set the DtxBuffers at the end of the update when the + // DtxManager has been replicated. + updateIn.dtxBuffers.push_back( + UpdateReceiver::DtxBufferRef(xid, index, suspended, &semanticState())); +} + +// Sent at end of work record. +void Connection::dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout) +{ + broker::DtxManager& mgr = cluster.getBroker().getDtxManager(); + if (timeout) mgr.setTimeout(xid, timeout); + if (prepared) mgr.prepare(xid); +} + + void Connection::exchange(const std::string& encoded) { Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); broker::Exchange::shared_ptr ex = broker::Exchange::decode(cluster.getBroker().getExchanges(), buf); @@ -614,12 +728,6 @@ void Connection::exchange(const std::string& encoded) { QPID_LOG(debug, cluster << " updated exchange " << ex->getName()); } -void Connection::queue(const std::string& encoded) { - Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); - broker::Queue::shared_ptr q = broker::Queue::decode(cluster.getBroker().getQueues(), buf); - QPID_LOG(debug, cluster << " updated queue " << q->getName()); -} - void Connection::sessionError(uint16_t , const std::string& msg) { // Ignore errors before isOpen(), we're not multicasting yet. if (connection->isOpen()) @@ -678,6 +786,23 @@ void Connection::config(const std::string& encoded) { else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind)); } +void Connection::doCatchupIoCallbacks() { + // We need to process IO callbacks during the catch-up phase in + // order to service asynchronous completions for messages + // transferred during catch-up. + + if (catchUp) getBrokerConnection()->doIoCallbacks(); +} + +void Connection::clock(uint64_t time) { + QPID_LOG(debug, "Cluster connection received time update"); + cluster.clock(time); +} + +void Connection::queueDequeueSincePurgeState(const std::string& qname, uint32_t dequeueSincePurge) { + boost::shared_ptr<broker::Queue> queue(findQueue(qname)); + queue->setDequeueSincePurge(dequeueSincePurge); +} }} // Namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 7ee85bf1aa..fe66b77238 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,11 +24,12 @@ #include "types.h" #include "OutputInterceptor.h" -#include "EventFrame.h" #include "McastFrameHandler.h" #include "UpdateReceiver.h" +#include "qpid/RefCounted.h" #include "qpid/broker/Connection.h" +#include "qpid/broker/DeliveryRecord.h" #include "qpid/broker/SecureConnection.h" #include "qpid/broker/SemanticState.h" #include "qpid/amqp_0_10/Connection.h" @@ -47,7 +48,7 @@ namespace framing { class AMQFrame; } namespace broker { class SemanticState; -class QueuedMessage; +struct QueuedMessage; class TxBuffer; class TxAccept; } @@ -55,6 +56,7 @@ class TxAccept; namespace cluster { class Cluster; class Event; +struct EventFrame; /** Intercept broker::Connection calls for shadow and local cluster connections. */ class Connection : @@ -62,7 +64,7 @@ class Connection : public sys::ConnectionInputHandler, public framing::AMQP_AllOperations::ClusterConnectionHandler, private broker::Connection::ErrorListener - + { public: @@ -73,7 +75,7 @@ class Connection : Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, const ConnectionId& id, const qpid::sys::SecuritySettings& external); ~Connection(); - + ConnectionId getId() const { return self; } broker::Connection* getBrokerConnection() { return connection.get(); } const broker::Connection* getBrokerConnection() const { return connection.get(); } @@ -108,9 +110,9 @@ class Connection : void deliveredFrame(const EventFrame&); void consumerState(const std::string& name, bool blocked, bool notifyEnabled, const qpid::framing::SequenceNumber& position); - + // ==== Used in catch-up mode to build initial state. - // + // // State update methods. void shadowPrepare(const std::string&); @@ -122,10 +124,11 @@ class Connection : const framing::SequenceNumber& expected, const framing::SequenceNumber& received, const framing::SequenceSet& unknownCompleted, - const SequenceSet& receivedIncomplete); - + const SequenceSet& receivedIncomplete, + bool dtxSelected); + void outputTask(uint16_t channel, const std::string& name); - + void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& managementId, @@ -153,7 +156,7 @@ class Connection : void queuePosition(const std::string&, const framing::SequenceNumber&); void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count); - void expiryId(uint64_t); + void queueObserverState(const std::string&, const std::string&, const framing::FieldTable&); void txStart(); void txAccept(const framing::SequenceSet&); @@ -163,8 +166,18 @@ class Connection : void txEnd(); void accumulatedAck(const framing::SequenceSet&); - // Encoded queue/exchange replication. - void queue(const std::string& encoded); + // Dtx state + void dtxStart(const std::string& xid, + bool ended, + bool suspended, + bool failed, + bool expired); + void dtxEnd(); + void dtxAck(); + void dtxBufferRef(const std::string& xid, uint32_t index, bool suspended); + void dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout); + + // Encoded exchange replication. void exchange(const std::string& encoded); void giveReadCredit(int credit); @@ -189,6 +202,12 @@ class Connection : void setSecureConnection ( broker::SecureConnection * sc ); + void doCatchupIoCallbacks(); + + void clock(uint64_t time); + + void queueDequeueSincePurgeState(const std::string&, uint32_t); + private: struct NullFrameHandler : public framing::FrameHandler { void handle(framing::AMQFrame&) {} @@ -233,7 +252,7 @@ class Connection : // Error listener functions void connectionError(const std::string&); void sessionError(uint16_t channel, const std::string&); - + void init(); bool checkUnsupported(const framing::AMQBody& body); void deliverDoOutput(uint32_t limit); @@ -245,10 +264,11 @@ class Connection : broker::SemanticState& semanticState(); broker::QueuedMessage getUpdateMessage(); void closeUpdated(); - + void setDtxBuffer(const UpdateReceiver::DtxBuffers::value_type &); Cluster& cluster; ConnectionId self; bool catchUp; + bool announced; OutputInterceptor output; framing::FrameDecoder localDecoder; ConnectionCtor connectionCtor; @@ -256,6 +276,9 @@ class Connection : framing::SequenceNumber deliverSeq; framing::ChannelId currentChannel; boost::shared_ptr<broker::TxBuffer> txBuffer; + boost::shared_ptr<broker::DtxBuffer> dtxBuffer; + broker::DeliveryRecords dtxAckRecords; + broker::DtxWorkRecord* dtxCurrent; bool expectProtocolHeader; McastFrameHandler mcastFrameHandler; UpdateReceiver& updateIn; diff --git a/cpp/src/qpid/cluster/Decoder.h b/cpp/src/qpid/cluster/Decoder.h index 2e2af2868f..3b5ada4a81 100644 --- a/cpp/src/qpid/cluster/Decoder.h +++ b/cpp/src/qpid/cluster/Decoder.h @@ -31,7 +31,7 @@ namespace qpid { namespace cluster { -class EventFrame; +struct EventFrame; class EventHeader; /** diff --git a/cpp/src/qpid/cluster/ErrorCheck.h b/cpp/src/qpid/cluster/ErrorCheck.h index de8cedafb3..a417b2ec25 100644 --- a/cpp/src/qpid/cluster/ErrorCheck.h +++ b/cpp/src/qpid/cluster/ErrorCheck.h @@ -33,7 +33,7 @@ namespace qpid { namespace cluster { -class EventFrame; +struct EventFrame; class Cluster; class Multicaster; class Connection; diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp index cd775ce2f1..da2bc89d8c 100644 --- a/cpp/src/qpid/cluster/Event.cpp +++ b/cpp/src/qpid/cluster/Event.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,6 +23,7 @@ #include "qpid/cluster/Cpg.h" #include "qpid/framing/Buffer.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/RefCountedBuffer.h" #include "qpid/assert.h" #include <ostream> #include <iterator> diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h index 07f74d3ba5..13283edff7 100644 --- a/cpp/src/qpid/cluster/Event.h +++ b/cpp/src/qpid/cluster/Event.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,7 +23,7 @@ */ #include "qpid/cluster/types.h" -#include "qpid/RefCountedBuffer.h" +#include "qpid/BufferRef.h" #include "qpid/framing/AMQFrame.h" #include <sys/uio.h> // For iovec #include <iosfwd> @@ -53,7 +53,7 @@ class EventHeader { /** Size of payload data, excluding header. */ size_t getSize() const { return size; } - /** Size of header + payload. */ + /** Size of header + payload. */ size_t getStoreSize() const { return size + HEADER_SIZE; } bool isCluster() const { return connectionId.getNumber() == 0; } @@ -62,7 +62,7 @@ class EventHeader { protected: static const size_t HEADER_SIZE; - + EventType type; ConnectionId connectionId; size_t size; @@ -86,25 +86,25 @@ class Event : public EventHeader { /** Create a control event. */ static Event control(const framing::AMQFrame&, const ConnectionId&); - + // Data excluding header. - char* getData() { return store + HEADER_SIZE; } - const char* getData() const { return store + HEADER_SIZE; } + char* getData() { return store.begin() + HEADER_SIZE; } + const char* getData() const { return store.begin() + HEADER_SIZE; } // Store including header - char* getStore() { return store; } - const char* getStore() const { return store; } + char* getStore() { return store.begin(); } + const char* getStore() const { return store.begin(); } + + const framing::AMQFrame& getFrame() const; - const framing::AMQFrame& getFrame() const; - operator framing::Buffer() const; iovec toIovec() const; - + private: void encodeHeader() const; - RefCountedBuffer::pointer store; + BufferRef store; mutable framing::AMQFrame frame; }; diff --git a/cpp/src/qpid/cluster/EventFrame.h b/cpp/src/qpid/cluster/EventFrame.h index 61447c5525..6b702a9bf8 100644 --- a/cpp/src/qpid/cluster/EventFrame.h +++ b/cpp/src/qpid/cluster/EventFrame.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -48,7 +48,7 @@ struct EventFrame ConnectionId connectionId; - framing::AMQFrame frame; + framing::AMQFrame frame; int readCredit; ///< last frame in an event, give credit when processed. EventType type; }; diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/cpp/src/qpid/cluster/ExpiryPolicy.cpp index d9a7b0122a..0ef5c2a35d 100644 --- a/cpp/src/qpid/cluster/ExpiryPolicy.cpp +++ b/cpp/src/qpid/cluster/ExpiryPolicy.cpp @@ -21,106 +21,21 @@ #include "qpid/broker/Message.h" #include "qpid/cluster/ExpiryPolicy.h" -#include "qpid/cluster/Multicaster.h" -#include "qpid/framing/ClusterMessageExpiredBody.h" +#include "qpid/cluster/Cluster.h" #include "qpid/sys/Time.h" -#include "qpid/sys/Timer.h" #include "qpid/log/Statement.h" namespace qpid { namespace cluster { -ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, sys::Timer& t) - : expiryId(1), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {} +ExpiryPolicy::ExpiryPolicy(Cluster& cluster) : cluster(cluster) {} -struct ExpiryTask : public sys::TimerTask { - ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when) - : TimerTask(when,"ExpiryPolicy"), expiryPolicy(policy), expiryId(id) {} - void fire() { expiryPolicy->sendExpire(expiryId); } - boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; - const uint64_t expiryId; -}; - -// Called while receiving an update -void ExpiryPolicy::setId(uint64_t id) { - sys::Mutex::ScopedLock l(lock); - expiryId = id; -} - -// Called while giving an update -uint64_t ExpiryPolicy::getId() const { - sys::Mutex::ScopedLock l(lock); - return expiryId; -} - -// Called in enqueuing connection thread -void ExpiryPolicy::willExpire(broker::Message& m) { - uint64_t id; - { - // When messages are fanned out to multiple queues, update sends - // them as independenty messages so we can have multiple messages - // with the same expiry ID. - // - sys::Mutex::ScopedLock l(lock); - id = expiryId++; - if (!id) { // This is an update of an already-expired message. - m.setExpiryPolicy(expiredPolicy); - } - else { - assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end()); - // If this is an update, the id may already exist - unexpiredById.insert(IdMessageMap::value_type(id, &m)); - unexpiredByMessage[&m] = id; - } - } - timer.add(new ExpiryTask(this, id, m.getExpiration())); -} - -// Called in dequeueing connection thread -void ExpiryPolicy::forget(broker::Message& m) { - sys::Mutex::ScopedLock l(lock); - MessageIdMap::iterator i = unexpiredByMessage.find(&m); - assert(i != unexpiredByMessage.end()); - unexpiredById.erase(i->second); - unexpiredByMessage.erase(i); -} - -// Called in dequeueing connection or cleanup thread. bool ExpiryPolicy::hasExpired(broker::Message& m) { - sys::Mutex::ScopedLock l(lock); - return unexpiredByMessage.find(&m) == unexpiredByMessage.end(); -} - -// Called in timer thread -void ExpiryPolicy::sendExpire(uint64_t id) { - { - sys::Mutex::ScopedLock l(lock); - // Don't multicast an expiry notice if message is already forgotten. - if (unexpiredById.find(id) == unexpiredById.end()) return; - } - mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId); + return m.getExpiration() < cluster.getClusterTime(); } -// Called in CPG deliver thread. -void ExpiryPolicy::deliverExpire(uint64_t id) { - sys::Mutex::ScopedLock l(lock); - std::pair<IdMessageMap::iterator, IdMessageMap::iterator> expired = unexpiredById.equal_range(id); - IdMessageMap::iterator i = expired.first; - while (i != expired.second) { - i->second->setExpiryPolicy(expiredPolicy); // hasExpired() == true; - unexpiredByMessage.erase(i->second); - unexpiredById.erase(i++); - } +sys::AbsTime ExpiryPolicy::getCurrentTime() { + return cluster.getClusterTime(); } -// Called in update thread on the updater. -boost::optional<uint64_t> ExpiryPolicy::getId(broker::Message& m) { - sys::Mutex::ScopedLock l(lock); - MessageIdMap::iterator i = unexpiredByMessage.find(&m); - return i == unexpiredByMessage.end() ? boost::optional<uint64_t>() : i->second; -} - -bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; } -void ExpiryPolicy::Expired::willExpire(broker::Message&) { } - }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.h b/cpp/src/qpid/cluster/ExpiryPolicy.h index 77a656aa68..d8ddbca8b3 100644 --- a/cpp/src/qpid/cluster/ExpiryPolicy.h +++ b/cpp/src/qpid/cluster/ExpiryPolicy.h @@ -36,12 +36,8 @@ namespace broker { class Message; } -namespace sys { -class Timer; -} - namespace cluster { -class Multicaster; +class Cluster; /** * Cluster expiry policy @@ -49,43 +45,13 @@ class Multicaster; class ExpiryPolicy : public broker::ExpiryPolicy { public: - ExpiryPolicy(Multicaster&, const MemberId&, sys::Timer&); + ExpiryPolicy(Cluster& cluster); - void willExpire(broker::Message&); bool hasExpired(broker::Message&); - void forget(broker::Message&); - - // Send expiration notice to cluster. - void sendExpire(uint64_t); + qpid::sys::AbsTime getCurrentTime(); - // Cluster delivers expiry notice. - void deliverExpire(uint64_t); - - void setId(uint64_t id); - uint64_t getId() const; - - boost::optional<uint64_t> getId(broker::Message&); - private: - typedef std::map<broker::Message*, uint64_t> MessageIdMap; - // When messages are fanned out to multiple queues, update sends - // them as independenty messages so we can have multiple messages - // with the same expiry ID. - typedef std::multimap<uint64_t, broker::Message*> IdMessageMap; - - struct Expired : public broker::ExpiryPolicy { - bool hasExpired(broker::Message&); - void willExpire(broker::Message&); - }; - - mutable sys::Mutex lock; - MessageIdMap unexpiredByMessage; - IdMessageMap unexpiredById; - uint64_t expiryId; - boost::intrusive_ptr<Expired> expiredPolicy; - Multicaster& mcast; - MemberId memberId; - sys::Timer& timer; + Cluster& cluster; }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/FailoverExchange.cpp b/cpp/src/qpid/cluster/FailoverExchange.cpp index 84232dac1b..cfbe34a460 100644 --- a/cpp/src/qpid/cluster/FailoverExchange.cpp +++ b/cpp/src/qpid/cluster/FailoverExchange.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -39,8 +39,10 @@ using namespace broker; using namespace framing; const string FailoverExchange::typeName("amq.failover"); - -FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b) : Exchange(typeName, parent, b ) { + +FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b) + : Exchange(typeName, parent, b ), ready(false) +{ if (mgmtExchange != 0) mgmtExchange->set_type(typeName); } @@ -53,16 +55,17 @@ void FailoverExchange::setUrls(const vector<Url>& u) { void FailoverExchange::updateUrls(const vector<Url>& u) { Lock l(lock); urls=u; - if (urls.empty()) return; - std::for_each(queues.begin(), queues.end(), - boost::bind(&FailoverExchange::sendUpdate, this, _1)); + if (ready && !urls.empty()) { + std::for_each(queues.begin(), queues.end(), + boost::bind(&FailoverExchange::sendUpdate, this, _1)); + } } string FailoverExchange::getType() const { return typeName; } bool FailoverExchange::bind(Queue::shared_ptr queue, const string&, const framing::FieldTable*) { Lock l(lock); - sendUpdate(queue); + if (ready) sendUpdate(queue); return queues.insert(queue).second; } @@ -84,7 +87,7 @@ void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) { // Called with lock held. if (urls.empty()) return; framing::Array array(0x95); - for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i) + for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i) array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str()))); const ProtocolVersion v; boost::intrusive_ptr<Message> msg(new Message); @@ -96,9 +99,12 @@ void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) { header.get<MessageProperties>(true)->getApplicationHeaders().setArray(typeName, array); AMQFrame headerFrame(header); headerFrame.setFirstSegment(false); - msg->getFrames().append(headerFrame); + msg->getFrames().append(headerFrame); DeliverableMessage(msg).deliverTo(queue); } +void FailoverExchange::setReady() { + ready = true; +} }} // namespace cluster diff --git a/cpp/src/qpid/cluster/FailoverExchange.h b/cpp/src/qpid/cluster/FailoverExchange.h index 2e1edfc0ae..c3e50c6929 100644 --- a/cpp/src/qpid/cluster/FailoverExchange.h +++ b/cpp/src/qpid/cluster/FailoverExchange.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -46,6 +46,8 @@ class FailoverExchange : public broker::Exchange void setUrls(const std::vector<Url>&); /** Set the URLs and send an update.*/ void updateUrls(const std::vector<Url>&); + /** Flag the failover exchange as ready to generate updates (caught up) */ + void setReady(); // Exchange overrides std::string getType() const; @@ -56,7 +58,7 @@ class FailoverExchange : public broker::Exchange private: void sendUpdate(const boost::shared_ptr<broker::Queue>&); - + typedef sys::Mutex::ScopedLock Lock; typedef std::vector<Url> Urls; typedef std::set<boost::shared_ptr<broker::Queue> > Queues; @@ -64,7 +66,7 @@ class FailoverExchange : public broker::Exchange sys::Mutex lock; Urls urls; Queues queues; - + bool ready; }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp index 8916de9628..217641841c 100644 --- a/cpp/src/qpid/cluster/Multicaster.cpp +++ b/cpp/src/qpid/cluster/Multicaster.cpp @@ -21,6 +21,7 @@ #include "qpid/cluster/Multicaster.h" #include "qpid/cluster/Cpg.h" +#include "qpid/cluster/Cluster.h" #include "qpid/log/Statement.h" #include "qpid/framing/AMQBody.h" #include "qpid/framing/AMQFrame.h" @@ -58,7 +59,7 @@ void Multicaster::mcast(const Event& e) { return; } } - QPID_LOG(trace, "MCAST " << e); + QPID_LOG_IF(trace, e.isControl() && Cluster::loggable(e.getFrame()), "MCAST " << e); if (bypass) { // direct, don't queue iovec iov = e.toIovec(); while (!cpg.mcast(&iov, 1)) diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp index 1354dab17b..4bf03eefa2 100644 --- a/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -45,12 +45,11 @@ void OutputInterceptor::send(framing::AMQFrame& f) { } void OutputInterceptor::activateOutput() { - if (parent.isCatchUp()) { - sys::Mutex::ScopedLock l(lock); + sys::Mutex::ScopedLock l(lock); + if (parent.isCatchUp()) next->activateOutput(); - } else - sendDoOutput(sendMax); + sendDoOutput(sendMax, l); } void OutputInterceptor::abort() { @@ -66,29 +65,38 @@ void OutputInterceptor::giveReadCredit(int32_t credit) { } // Called in write thread when the IO layer has no more data to write. -// We do nothing in the write thread, we run doOutput only on delivery -// of doOutput requests. -bool OutputInterceptor::doOutput() { return false; } +// We only process IO callbacks in the write thread during catch-up. +// Normally we run doOutput only on delivery of doOutput requests. +bool OutputInterceptor::doOutput() { + parent.doCatchupIoCallbacks(); + return false; +} -// Send output up to limit, calculate new limit. +// Send output up to limit, calculate new limit. void OutputInterceptor::deliverDoOutput(uint32_t limit) { + sys::Mutex::ScopedLock l(lock); sentDoOutput = false; sendMax = limit; size_t newLimit = limit; if (parent.isLocal()) { - size_t buffered = getBuffered(); + size_t buffered = next->getBuffered(); if (buffered == 0 && sent == sendMax) // Could have sent more, increase the limit. - newLimit = sendMax*2; + newLimit = sendMax*2; else if (buffered > 0 && sent > 1) // Data left unsent, reduce the limit. newLimit = (sendMax + sent) / 2; } sent = 0; - while (sent < limit && parent.getBrokerConnection()->doOutput()) + while (sent < limit) { + { + sys::Mutex::ScopedUnlock u(lock); + if (!parent.getBrokerConnection()->doOutput()) break; + } ++sent; - if (sent == limit) sendDoOutput(newLimit); + } + if (sent == limit) sendDoOutput(newLimit, l); } -void OutputInterceptor::sendDoOutput(size_t newLimit) { +void OutputInterceptor::sendDoOutput(size_t newLimit, const sys::Mutex::ScopedLock&) { if (parent.isLocal() && !sentDoOutput && !closing) { sentDoOutput = true; parent.getCluster().getMulticast().mcastControl( @@ -97,6 +105,7 @@ void OutputInterceptor::sendDoOutput(size_t newLimit) { } } +// Called in connection thread when local connection closes. void OutputInterceptor::closeOutput() { sys::Mutex::ScopedLock l(lock); closing = true; diff --git a/cpp/src/qpid/cluster/OutputInterceptor.h b/cpp/src/qpid/cluster/OutputInterceptor.h index 65bd82a4fc..3abf5273a0 100644 --- a/cpp/src/qpid/cluster/OutputInterceptor.h +++ b/cpp/src/qpid/cluster/OutputInterceptor.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -58,13 +58,13 @@ class OutputInterceptor : public sys::ConnectionOutputHandler { uint32_t getSendMax() const { return sendMax; } void setSendMax(uint32_t sendMax_) { sendMax=sendMax_; } - + cluster::Connection& parent; - + private: typedef sys::Mutex::ScopedLock Locker; - void sendDoOutput(size_t newLimit); + void sendDoOutput(size_t newLimit, const sys::Mutex::ScopedLock&); mutable sys::Mutex lock; bool closing; diff --git a/cpp/src/qpid/cluster/SecureConnectionFactory.cpp b/cpp/src/qpid/cluster/SecureConnectionFactory.cpp index 6ddef66226..2672d8360c 100644 --- a/cpp/src/qpid/cluster/SecureConnectionFactory.cpp +++ b/cpp/src/qpid/cluster/SecureConnectionFactory.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -48,7 +48,7 @@ SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, cons if (clusterCodec) { SecureConnectionPtr sc(new SecureConnection()); clusterCodec->setSecureConnection(sc.get()); - sc->setCodec(codec); + sc->setCodec(codec); return sc.release(); } return 0; @@ -63,7 +63,7 @@ SecureConnectionFactory::create(sys::OutputControl& out, const std::string& id, if (clusterCodec) { SecureConnectionPtr sc(new SecureConnection()); clusterCodec->setSecureConnection(sc.get()); - sc->setCodec(codec); + sc->setCodec(codec); return sc.release(); } return 0; diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 8f751add9b..2446c12f2b 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,9 +26,9 @@ #include "qpid/cluster/Decoder.h" #include "qpid/cluster/ExpiryPolicy.h" #include "qpid/cluster/UpdateDataExchange.h" -#include "qpid/client/SessionBase_0_10Access.h" -#include "qpid/client/ConnectionAccess.h" -#include "qpid/client/SessionImpl.h" +#include "qpid/client/SessionBase_0_10Access.h" +#include "qpid/client/ConnectionAccess.h" +#include "qpid/client/SessionImpl.h" #include "qpid/client/ConnectionImpl.h" #include "qpid/client/Future.h" #include "qpid/broker/Broker.h" @@ -45,10 +45,13 @@ #include "qpid/broker/SessionState.h" #include "qpid/broker/TxOpVisitor.h" #include "qpid/broker/DtxAck.h" +#include "qpid/broker/DtxBuffer.h" +#include "qpid/broker/DtxWorkRecord.h" #include "qpid/broker/TxAccept.h" #include "qpid/broker/TxPublish.h" #include "qpid/broker/RecoveredDequeue.h" #include "qpid/broker/RecoveredEnqueue.h" +#include "qpid/broker/StatefulQueueObserver.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/ClusterConnectionMembershipBody.h" #include "qpid/framing/ClusterConnectionShadowReadyBody.h" @@ -64,6 +67,7 @@ #include <boost/bind.hpp> #include <boost/cast.hpp> #include <algorithm> +#include <iterator> #include <sstream> namespace qpid { @@ -82,11 +86,20 @@ using namespace framing; namespace arg=client::arg; using client::SessionBase_0_10Access; +// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges. +const std::string UpdateClient::UPDATE("x-qpid.cluster-update"); +// Name for header used to carry expiration information. +const std::string UpdateClient::X_QPID_EXPIRATION = "x-qpid.expiration"; +// Headers used to flag headers/properties added by the UpdateClient so they can be +// removed on the other side. +const std::string UpdateClient::X_QPID_NO_MESSAGE_PROPS = "x-qpid.no-message-props"; +const std::string UpdateClient::X_QPID_NO_HEADERS = "x-qpid.no-headers"; + std::ostream& operator<<(std::ostream& o, const UpdateClient& c) { return o << "cluster(" << c.updaterId << " UPDATER)"; } -struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler +struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler { boost::shared_ptr<qpid::client::ConnectionImpl> connection; @@ -120,7 +133,7 @@ void send(client::AsyncSession& s, const AMQBody& body) { // TODO aconway 2008-09-24: optimization: update connections/sessions in parallel. UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url, - broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_, + broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_, const Cluster::ConnectionVector& cons, Decoder& decoder_, const boost::function<void()>& ok, const boost::function<void(const std::exception&)>& fail, @@ -134,13 +147,11 @@ UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, con UpdateClient::~UpdateClient() {} -// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges. -const std::string UpdateClient::UPDATE("qpid.cluster-update"); - void UpdateClient::run() { try { connection.open(updateeUrl, connectionSettings); session = connection.newSession(UPDATE); + session.sync(); update(); done(); } catch (const std::exception& e) { @@ -154,6 +165,13 @@ void UpdateClient::update() { << " at " << updateeUrl); Broker& b = updaterBroker; + if(b.getExpiryPolicy()) { + QPID_LOG(debug, *this << "Updating updatee with cluster time"); + qpid::sys::AbsTime clusterTime = b.getExpiryPolicy()->getCurrentTime(); + int64_t time = qpid::sys::Duration(qpid::sys::EPOCH, clusterTime); + ClusterConnectionProxy(session).clock(time); + } + updateManagementSetupState(); b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1)); @@ -163,16 +181,20 @@ void UpdateClient::update() { // longer on their original queue. session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true); session.sync(); + std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1)); - session.queueDelete(arg::queue=UPDATE); + + // some Queue Observers need session state & msgs synced first, so sync observers now + b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueObservers, this, _1)); // Update queue listeners: must come after sessions so consumerNumbering is populated b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1)); - ClusterConnectionProxy(session).expiryId(expiry.getId()); updateLinks(); updateManagementAgent(); + updateDtxManager(); + session.queueDelete(arg::queue=UPDATE); session.close(); @@ -184,7 +206,7 @@ void UpdateClient::update() { // NOTE: connection will be closed from the other end, don't close // it here as that causes a race. - + // TODO aconway 2010-03-15: This sleep avoids the race condition // described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831. // It allows the connection to fully close before destroying the @@ -276,7 +298,7 @@ class MessageUpdater { framing::SequenceNumber lastPos; client::AsyncSession session; ExpiryPolicy& expiry; - + public: MessageUpdater(const string& q, const client::AsyncSession s, ExpiryPolicy& expiry_) : queue(q), haveLastPos(false), session(s), expiry(expiry_) { @@ -293,7 +315,6 @@ class MessageUpdater { } } - void updateQueuedMessage(const broker::QueuedMessage& message) { // Send the queue position if necessary. if (!haveLastPos || message.position - lastPos != 1) { @@ -302,10 +323,23 @@ class MessageUpdater { } lastPos = message.position; - // Send the expiry ID if necessary. - if (message.payload->getProperties<DeliveryProperties>()->getTtl()) { - boost::optional<uint64_t> expiryId = expiry.getId(*message.payload); - ClusterConnectionProxy(session).expiryId(expiryId?*expiryId:0); + // if the ttl > 0, we need to send the calculated expiration time to the updatee + const DeliveryProperties* dprops = + message.payload->getProperties<DeliveryProperties>(); + if (dprops && dprops->getTtl() > 0) { + bool hadMessageProps = + message.payload->hasProperties<framing::MessageProperties>(); + const framing::MessageProperties* mprops = + message.payload->getProperties<framing::MessageProperties>(); + bool hadApplicationHeaders = mprops->hasApplicationHeaders(); + message.payload->insertCustomProperty(UpdateClient::X_QPID_EXPIRATION, + sys::Duration(sys::EPOCH, message.payload->getExpiration())); + // If message properties or application headers didn't exist + // prior to us adding data, we want to remove them on the other side. + if (!hadMessageProps) + message.payload->insertCustomProperty(UpdateClient::X_QPID_NO_MESSAGE_PROPS, 0); + else if (!hadApplicationHeaders) + message.payload->insertCustomProperty(UpdateClient::X_QPID_NO_HEADERS, 0); } // We can't send a broker::Message via the normal client API, @@ -318,7 +352,7 @@ class MessageUpdater { framing::MessageTransferBody transfer( *message.payload->getFrames().as<framing::MessageTransferBody>()); transfer.setDestination(UpdateClient::UPDATE); - + sb.get()->send(transfer, message.payload->getFrames(), !message.payload->isContentReleased()); if (message.payload->isContentReleased()){ @@ -326,9 +360,10 @@ class MessageUpdater { uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); bool morecontent = true; for (uint64_t offset = 0; morecontent; offset += maxContentSize) - { + { AMQFrame frame((AMQContentBody())); - morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset); + morecontent = message.payload->getContentFrame( + *(message.queue), frame, maxContentSize, offset); sb.get()->sendRawFrame(frame); } } @@ -357,6 +392,8 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr< if (qpid::broker::Fairshare::getState(q->getMessages(), priority, count)) { ClusterConnectionProxy(s).queueFairshareState(q->getName(), priority, count); } + + ClusterConnectionProxy(s).queueDequeueSincePurgeState(q->getName(), q->getDequeueSincePurge()); } void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) { @@ -372,7 +409,11 @@ void UpdateClient::updateNonExclusiveQueue(const boost::shared_ptr<broker::Queue } void UpdateClient::updateBinding(client::AsyncSession& s, const std::string& queue, const QueueBinding& binding) { - s.exchangeBind(queue, binding.exchange, binding.key, binding.args); + if (binding.exchange.size()) + s.exchangeBind(queue, binding.exchange, binding.key, binding.args); + //else its the default exchange and there is no need to replicate + //the binding, the creation of the queue will have done so + //automatically } void UpdateClient::updateOutputTask(const sys::OutputTask* task) { @@ -380,8 +421,8 @@ void UpdateClient::updateOutputTask(const sys::OutputTask* task) { boost::polymorphic_downcast<const SemanticState::ConsumerImpl*> (task); SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci); uint16_t channel = ci->getParent().getSession().getChannel(); - ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName()); - QPID_LOG(debug, *this << " updating output task " << ci->getName() + ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getTag()); + QPID_LOG(debug, *this << " updating output task " << ci->getTag() << " channel=" << channel); } @@ -389,7 +430,7 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda QPID_LOG(debug, *this << " updating connection " << *updateConnection); assert(updateConnection->getBrokerConnection()); broker::Connection& bc = *updateConnection->getBrokerConnection(); - + // Send the management ID first on the main connection. std::string mgmtId = updateConnection->getBrokerConnection()->getMgmtId(); ClusterConnectionProxy(session).shadowPrepare(mgmtId); @@ -426,7 +467,7 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { QPID_LOG(debug, *this << " updating session " << ss->getId()); - // Create a client session to update session state. + // Create a client session to update session state. boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection); boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel()); simpl->disableAutoDetach(); @@ -445,19 +486,19 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { QPID_LOG(debug, *this << " updating unacknowledged messages."); broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked(); std::for_each(drs.begin(), drs.end(), - boost::bind(&UpdateClient::updateUnacked, this, _1)); + boost::bind(&UpdateClient::updateUnacked, this, _1, shadowSession)); - updateTxState(ss->getSemanticState()); // Tx transaction state. + updateTransactionState(ss->getSemanticState()); // Adjust command counter for message in progress, will be sent after state update. boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress(); SequenceNumber received = ss->receiverGetReceived().command; - if (inProgress) + if (inProgress) --received; // Sync the session to ensure all responses from broker have been processed. shadowSession.sync(); - + // Reset command-sequence state. proxy.sessionState( ss->senderGetReplayPoint().command, @@ -466,7 +507,8 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { std::max(received, ss->receiverGetExpected().command), received, ss->receiverGetUnknownComplete(), - ss->receiverGetIncomplete() + ss->receiverGetIncomplete(), + ss->getSemanticState().getDtxSelected() ); // Send frames for partial message in progress. @@ -479,13 +521,13 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { void UpdateClient::updateConsumer( const broker::SemanticState::ConsumerImpl::shared_ptr& ci) { - QPID_LOG(debug, *this << " updating consumer " << ci->getName() << " on " + QPID_LOG(debug, *this << " updating consumer " << ci->getTag() << " on " << shadowSession.getId()); using namespace message; shadowSession.messageSubscribe( arg::queue = ci->getQueue()->getName(), - arg::destination = ci->getName(), + arg::destination = ci->getTag(), arg::acceptMode = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE, arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED, arg::exclusive = ci->isExclusive(), @@ -493,29 +535,32 @@ void UpdateClient::updateConsumer( arg::resumeTtl = ci->getResumeTtl(), arg::arguments = ci->getArguments() ); - shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT); - shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit()); - shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit()); + shadowSession.messageSetFlowMode(ci->getTag(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT); + shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit()); + shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_BYTE, ci->getByteCredit()); ClusterConnectionProxy(shadowSession).consumerState( - ci->getName(), + ci->getTag(), ci->isBlocked(), ci->isNotifyEnabled(), ci->position ); consumerNumbering.add(ci.get()); - QPID_LOG(debug, *this << " updated consumer " << ci->getName() + QPID_LOG(debug, *this << " updated consumer " << ci->getTag() << " on " << shadowSession.getId()); } - -void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) { - if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) { + +void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr, + client::AsyncSession& updateSession) +{ + if (!dr.isEnded() && dr.isAcquired()) { + assert(dr.getMessage().payload); // If the message is acquired then it is no longer on the // updatees queue, put it on the update queue for updatee to pick up. // - MessageUpdater(UPDATE, shadowSession, expiry).updateQueuedMessage(dr.getMessage()); + MessageUpdater(UPDATE, updateSession, expiry).updateQueuedMessage(dr.getMessage()); } - ClusterConnectionProxy(shadowSession).deliveryRecord( + ClusterConnectionProxy(updateSession).deliveryRecord( dr.getQueue()->getName(), dr.getMessage().position, dr.getTag(), @@ -536,10 +581,12 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { TxOpUpdater(UpdateClient& dc, client::AsyncSession s, ExpiryPolicy& expiry) : MessageUpdater(UpdateClient::UPDATE, s, expiry), parent(dc), session(s), proxy(s) {} - void operator()(const broker::DtxAck& ) { - throw InternalErrorException("DTX transactions not currently supported by cluster."); + void operator()(const broker::DtxAck& ack) { + std::for_each(ack.getPending().begin(), ack.getPending().end(), + boost::bind(&UpdateClient::updateUnacked, &parent, _1, session)); + proxy.dtxAck(); } - + void operator()(const broker::RecoveredDequeue& rdeq) { updateMessage(rdeq.getMessage()); proxy.txEnqueue(rdeq.getQueue()->getName()); @@ -554,13 +601,18 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { proxy.txAccept(txAccept.getAcked()); } + typedef std::list<Queue::shared_ptr> QueueList; + + void copy(const QueueList& l, Array& a) { + for (QueueList::const_iterator i = l.begin(); i!=l.end(); ++i) + a.push_back(Array::ValuePtr(new Str8Value((*i)->getName()))); + } + void operator()(const broker::TxPublish& txPub) { updateMessage(txPub.getMessage()); - typedef std::list<Queue::shared_ptr> QueueList; - const QueueList& qlist = txPub.getQueues(); + assert(txPub.getQueues().empty() || txPub.getPrepared().empty()); Array qarray(TYPE_CODE_STR8); - for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i) - qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName()))); + copy(txPub.getQueues().empty() ? txPub.getPrepared() : txPub.getQueues(), qarray); proxy.txPublish(qarray, txPub.delivered); } @@ -569,18 +621,44 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { client::AsyncSession session; ClusterConnectionProxy proxy; }; - -void UpdateClient::updateTxState(broker::SemanticState& s) { - QPID_LOG(debug, *this << " updating TX transaction state."); + +void UpdateClient::updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx,bool suspended) +{ + ClusterConnectionProxy proxy(shadowSession); + broker::DtxWorkRecord* record = + updaterBroker.getDtxManager().getWork(dtx->getXid()); + proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx), suspended); + +} + +void UpdateClient::updateTransactionState(broker::SemanticState& s) { ClusterConnectionProxy proxy(shadowSession); proxy.accumulatedAck(s.getAccumulatedAck()); - broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer(); - if (txBuffer) { + broker::TxBuffer::shared_ptr tx = s.getTxBuffer(); + broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer(); + if (dtx) { + updateBufferRef(dtx, false); // Current transaction. + } else if (tx) { proxy.txStart(); TxOpUpdater updater(*this, shadowSession, expiry); - txBuffer->accept(updater); + tx->accept(updater); proxy.txEnd(); } + for (SemanticState::DtxBufferMap::iterator i = s.getSuspendedXids().begin(); + i != s.getSuspendedXids().end(); + ++i) + { + updateBufferRef(i->second, true); + } +} + +void UpdateClient::updateDtxBuffer(const broker::DtxBuffer::shared_ptr& dtx) { + ClusterConnectionProxy proxy(session); + proxy.dtxStart( + dtx->getXid(), dtx->isEnded(), dtx->isSuspended(), dtx->isFailed(), dtx->isExpired()); + TxOpUpdater updater(*this, session, expiry); + dtx->accept(updater); + proxy.dtxEnd(); } void UpdateClient::updateQueueListeners(const boost::shared_ptr<broker::Queue>& queue) { @@ -615,4 +693,35 @@ void UpdateClient::updateBridge(const boost::shared_ptr<broker::Bridge>& bridge) ClusterConnectionProxy(session).config(encode(*bridge)); } +void UpdateClient::updateQueueObservers(const boost::shared_ptr<broker::Queue>& q) +{ + q->eachObserver(boost::bind(&UpdateClient::updateObserver, this, q, _1)); +} + +void UpdateClient::updateObserver(const boost::shared_ptr<broker::Queue>& q, + boost::shared_ptr<broker::QueueObserver> o) +{ + qpid::framing::FieldTable state; + broker::StatefulQueueObserver *so = dynamic_cast<broker::StatefulQueueObserver *>(o.get()); + if (so) { + so->getState( state ); + std::string id(so->getId()); + QPID_LOG(debug, *this << " updating queue " << q->getName() << "'s observer " << id); + ClusterConnectionProxy(session).queueObserverState( q->getName(), id, state ); + } +} + +void UpdateClient::updateDtxManager() { + broker::DtxManager& dtm = updaterBroker.getDtxManager(); + dtm.each(boost::bind(&UpdateClient::updateDtxWorkRecord, this, _1)); +} + +void UpdateClient::updateDtxWorkRecord(const broker::DtxWorkRecord& r) { + QPID_LOG(debug, *this << " updating DTX transaction: " << r.getXid()); + for (size_t i = 0; i < r.size(); ++i) + updateDtxBuffer(r[i]); + ClusterConnectionProxy(session).dtxWorkRecord( + r.getXid(), r.isPrepared(), r.getTimeout()); +} + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h index 7520bb82cb..481ee357c7 100644 --- a/cpp/src/qpid/cluster/UpdateClient.h +++ b/cpp/src/qpid/cluster/UpdateClient.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -34,7 +34,7 @@ namespace qpid { -class Url; +struct Url; namespace broker { @@ -42,8 +42,8 @@ class Broker; class Queue; class Exchange; class QueueBindings; -class QueueBinding; -class QueuedMessage; +struct QueueBinding; +struct QueuedMessage; class SessionHandler; class DeliveryRecord; class SessionState; @@ -51,7 +51,8 @@ class SemanticState; class Decoder; class Link; class Bridge; - +class QueueObserver; +class DtxBuffer; } // namespace broker namespace cluster { @@ -68,21 +69,26 @@ class ExpiryPolicy; class UpdateClient : public sys::Runnable { public: static const std::string UPDATE; // Name for special update queue and exchange. + static const std::string X_QPID_EXPIRATION; // Update message expiration + // Flag to remove props/headers that were added by the UpdateClient + static const std::string X_QPID_NO_MESSAGE_PROPS; + static const std::string X_QPID_NO_HEADERS; + static client::Connection catchUpConnection(); - + UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&, broker::Broker& donor, const ClusterMap& map, ExpiryPolicy& expiry, const std::vector<boost::intrusive_ptr<Connection> >&, Decoder&, const boost::function<void()>& done, const boost::function<void(const std::exception&)>& fail, - const client::ConnectionSettings& + const client::ConnectionSettings& ); ~UpdateClient(); void update(); void run(); // Will delete this when finished. - void updateUnacked(const broker::DeliveryRecord&); + void updateUnacked(const broker::DeliveryRecord&, client::AsyncSession&); private: void updateQueue(client::AsyncSession&, const boost::shared_ptr<broker::Queue>&); @@ -94,7 +100,8 @@ class UpdateClient : public sys::Runnable { void updateBinding(client::AsyncSession&, const std::string& queue, const broker::QueueBinding& binding); void updateConnection(const boost::intrusive_ptr<Connection>& connection); void updateSession(broker::SessionHandler& s); - void updateTxState(broker::SemanticState& s); + void updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx, bool suspended); + void updateTransactionState(broker::SemanticState& s); void updateOutputTask(const sys::OutputTask* task); void updateConsumer(const broker::SemanticState::ConsumerImpl::shared_ptr&); void updateQueueListeners(const boost::shared_ptr<broker::Queue>&); @@ -104,6 +111,11 @@ class UpdateClient : public sys::Runnable { void updateLinks(); void updateLink(const boost::shared_ptr<broker::Link>&); void updateBridge(const boost::shared_ptr<broker::Bridge>&); + void updateQueueObservers(const boost::shared_ptr<broker::Queue>&); + void updateObserver(const boost::shared_ptr<broker::Queue>&, boost::shared_ptr<broker::QueueObserver>); + void updateDtxManager(); + void updateDtxBuffer(const boost::shared_ptr<broker::DtxBuffer>& ); + void updateDtxWorkRecord(const broker::DtxWorkRecord&); Numbering<broker::SemanticState::ConsumerImpl*> consumerNumbering; diff --git a/cpp/src/qpid/cluster/UpdateDataExchange.cpp b/cpp/src/qpid/cluster/UpdateDataExchange.cpp index 2a079b8881..e5cd82e3d3 100644 --- a/cpp/src/qpid/cluster/UpdateDataExchange.cpp +++ b/cpp/src/qpid/cluster/UpdateDataExchange.cpp @@ -36,13 +36,8 @@ const std::string UpdateDataExchange::MANAGEMENT_AGENTS_KEY("management-agents") const std::string UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY("management-schemas"); const std::string UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY("management-deleted-objects"); -std::ostream& operator<<(std::ostream& o, const UpdateDataExchange& c) { - return o << "cluster(" << c.clusterId << " UPDATER)"; -} - UpdateDataExchange::UpdateDataExchange(Cluster& cluster) : - Exchange(EXCHANGE_NAME, &cluster), - clusterId(cluster.getId()) + Exchange(EXCHANGE_NAME, &cluster) {} void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& routingKey, @@ -62,11 +57,9 @@ void UpdateDataExchange::updateManagementAgent(management::ManagementAgent* agen framing::Buffer buf1(const_cast<char*>(managementAgents.data()), managementAgents.size()); agent->importAgents(buf1); - QPID_LOG(debug, *this << " updated management agents."); framing::Buffer buf2(const_cast<char*>(managementSchemas.data()), managementSchemas.size()); agent->importSchemas(buf2); - QPID_LOG(debug, *this << " updated management schemas."); using amqp_0_10::ListCodec; using types::Variant; @@ -78,7 +71,6 @@ void UpdateDataExchange::updateManagementAgent(management::ManagementAgent* agen new management::ManagementAgent::DeletedObject(*i))); } agent->importDeletedObjects(objects); - QPID_LOG(debug, *this << " updated management deleted objects."); } diff --git a/cpp/src/qpid/cluster/UpdateDataExchange.h b/cpp/src/qpid/cluster/UpdateDataExchange.h index 8c493e400a..d2f6c35ad0 100644 --- a/cpp/src/qpid/cluster/UpdateDataExchange.h +++ b/cpp/src/qpid/cluster/UpdateDataExchange.h @@ -74,11 +74,9 @@ class UpdateDataExchange : public broker::Exchange void updateManagementAgent(management::ManagementAgent* agent); private: - MemberId clusterId; std::string managementAgents; std::string managementSchemas; std::string managementDeletedObjects; - friend std::ostream& operator<<(std::ostream&, const UpdateDataExchange&); }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/UpdateExchange.cpp b/cpp/src/qpid/cluster/UpdateExchange.cpp index 11937f296f..cb1376004e 100644 --- a/cpp/src/qpid/cluster/UpdateExchange.cpp +++ b/cpp/src/qpid/cluster/UpdateExchange.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,6 +19,7 @@ * */ #include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/FieldTable.h" #include "qpid/broker/Message.h" #include "UpdateExchange.h" @@ -27,6 +28,8 @@ namespace cluster { using framing::MessageTransferBody; using framing::DeliveryProperties; +using framing::MessageProperties; +using framing::FieldTable; UpdateExchange::UpdateExchange(management::Manageable* parent) : broker::Exchange(UpdateClient::UPDATE, parent), @@ -34,6 +37,7 @@ UpdateExchange::UpdateExchange(management::Manageable* parent) void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>& msg) { + // Copy exchange name to destination property. MessageTransferBody* transfer = msg->getMethod<MessageTransferBody>(); assert(transfer); const DeliveryProperties* props = msg->getProperties<DeliveryProperties>(); @@ -42,6 +46,23 @@ void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>& transfer->setDestination(props->getExchange()); else transfer->clearDestinationFlag(); -} + // Copy expiration from x-property if present. + if (msg->hasProperties<MessageProperties>()) { + const MessageProperties* mprops = msg->getProperties<MessageProperties>(); + if (mprops->hasApplicationHeaders()) { + const FieldTable& headers = mprops->getApplicationHeaders(); + if (headers.isSet(UpdateClient::X_QPID_EXPIRATION)) { + msg->setExpiration( + sys::AbsTime(sys::EPOCH, headers.getAsInt64(UpdateClient::X_QPID_EXPIRATION))); + msg->removeCustomProperty(UpdateClient::X_QPID_EXPIRATION); + // Erase props/headers that were added by the UpdateClient + if (headers.isSet(UpdateClient::X_QPID_NO_MESSAGE_PROPS)) + msg->eraseProperties<MessageProperties>(); + else if (headers.isSet(UpdateClient::X_QPID_NO_HEADERS)) + msg->clearApplicationHeadersFlag(); + } + } + } +} }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/UpdateReceiver.h b/cpp/src/qpid/cluster/UpdateReceiver.h index 7e8ce47662..81ee3a5ffe 100644 --- a/cpp/src/qpid/cluster/UpdateReceiver.h +++ b/cpp/src/qpid/cluster/UpdateReceiver.h @@ -39,6 +39,20 @@ class UpdateReceiver { /** Management-id for the next shadow connection */ std::string nextShadowMgmtId; + + /** Record the position of a DtxBuffer in the DtxManager (xid + index) + * and the association with a session, either suspended or current. + */ + struct DtxBufferRef { + std::string xid; + uint32_t index; // Index in WorkRecord in DtxManager + bool suspended; // Is this a suspended or current transaction? + broker::SemanticState* semanticState; // Associated session + DtxBufferRef(const std::string& x, uint32_t i, bool s, broker::SemanticState* ss) + : xid(x), index(i), suspended(s), semanticState(ss) {} + }; + typedef std::vector<DtxBufferRef> DtxBuffers; + DtxBuffers dtxBuffers; }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/types.h b/cpp/src/qpid/cluster/types.h index 0795e5e77a..bfb4fd5b9e 100644 --- a/cpp/src/qpid/cluster/types.h +++ b/cpp/src/qpid/cluster/types.h @@ -24,6 +24,7 @@ #include "config.h" #include "qpid/Url.h" +#include "qpid/RefCounted.h" #include "qpid/sys/IntegerTypes.h" #include <boost/intrusive_ptr.hpp> #include <utility> |