diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.h')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 56 |
1 files changed, 37 insertions, 19 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 8f73c6acca..ccec4948e6 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -56,17 +56,25 @@ namespace qpid { namespace broker { class Message; +class AclModule; } namespace framing { +class AMQFrame; class AMQBody; -class Uuid; +struct Uuid; +} + +namespace sys { +class Timer; +class AbsTime; +class Duration; } namespace cluster { class Connection; -class EventFrame; +struct EventFrame; class ClusterTimer; class UpdateDataExchange; @@ -89,10 +97,10 @@ class Cluster : private Cpg::Handler, public management::Manageable { void initialize(); // Connection map. - void addLocalConnection(const ConnectionPtr&); - void addShadowConnection(const ConnectionPtr&); - void erase(const ConnectionId&); - + void addLocalConnection(const ConnectionPtr&); + void addShadowConnection(const ConnectionPtr&); + void erase(const ConnectionId&); + // URLs of current cluster members. std::vector<std::string> getIds() const; std::vector<Url> getUrls() const; @@ -107,7 +115,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void updateInRetracted(); // True if we are expecting to receive catch-up connections. bool isExpectingUpdate(); - + MemberId getId() const; broker::Broker& getBroker() const; Multicaster& getMulticast() { return mcast; } @@ -135,6 +143,12 @@ class Cluster : private Cpg::Handler, public management::Manageable { bool deferDeliveryImpl(const std::string& queue, const boost::intrusive_ptr<broker::Message>& msg); + sys::AbsTime getClusterTime(); + void sendClockUpdate(); + void clock(const uint64_t time); + + static bool loggable(const framing::AMQFrame&); // True if the frame should be logged. + private: typedef sys::Monitor::ScopedLock Lock; @@ -144,10 +158,10 @@ class Cluster : private Cpg::Handler, public management::Manageable { /** Version number of the cluster protocol, to avoid mixed versions. */ static const uint32_t CLUSTER_VERSION; - + // NB: A dummy Lock& parameter marks functions that must only be // called with Cluster::lock locked. - + void leave(Lock&); std::vector<std::string> getIds(Lock&) const; std::vector<Url> getUrls(Lock&) const; @@ -156,11 +170,11 @@ class Cluster : private Cpg::Handler, public management::Manageable { void brokerShutdown(); // == Called in deliverEventQueue thread - void deliveredEvent(const Event&); + void deliveredEvent(const Event&); // == Called in deliverFrameQueue thread - void deliveredFrame(const EventFrame&); - void processFrame(const EventFrame&, Lock&); + void deliveredFrame(const EventFrame&); + void processFrame(const EventFrame&, Lock&); // Cluster controls implement XML methods from cluster.xml. void updateRequest(const MemberId&, const std::string&, Lock&); @@ -180,12 +194,12 @@ class Cluster : private Cpg::Handler, public management::Manageable { const std::string& left, const std::string& joined, Lock& l); - void messageExpired(const MemberId&, uint64_t, Lock& l); void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&); void timerWakeup(const MemberId&, const std::string& name, Lock&); void timerDrop(const MemberId&, const std::string& name, Lock&); void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&); void deliverToQueue(const std::string& queue, const std::string& message, Lock&); + void clock(const uint64_t time, Lock&); // Helper functions ConnectionPtr getConnection(const EventFrame&, Lock&); @@ -195,7 +209,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void setReady(Lock&); void memberUpdate(Lock&); void setClusterId(const framing::Uuid&, Lock&); - void erase(const ConnectionId&, Lock&); + void erase(const ConnectionId&, Lock&); void requestUpdate(Lock& ); void initMapCompleted(Lock&); void becomeElder(Lock&); @@ -203,7 +217,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void updateMgmtMembership(Lock&); // == Called in CPG dispatch thread - void deliver( // CPG deliver callback. + void deliver( // CPG deliver callback. cpg_handle_t /*handle*/, const struct cpg_name *group, uint32_t /*nodeid*/, @@ -212,7 +226,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { int /*msg_len*/); void deliverEvent(const Event&); - + void configChange( // CPG config change callback. cpg_handle_t /*handle*/, const struct cpg_name */*group*/, @@ -263,7 +277,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { // Used only in deliverEventQueue thread or when stalled for update. Decoder decoder; bool discarding; - + // Remaining members are protected by lock. mutable sys::Monitor lock; @@ -276,7 +290,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { JOINER, ///< Sent update request, waiting for update offer. UPDATEE, ///< Stalled receive queue at update offer, waiting for update to complete. CATCHUP, ///< Update complete, unstalled but has not yet seen own "ready" event. - READY, ///< Fully operational + READY, ///< Fully operational OFFER, ///< Sent an offer, waiting for accept/reject. UPDATER, ///< Offer accepted, sending a state update. LEFT ///< Final state, left the cluster. @@ -296,9 +310,13 @@ class Cluster : private Cpg::Handler, public management::Manageable { ErrorCheck error; UpdateReceiver updateReceiver; ClusterTimer* timer; + sys::Timer clockTimer; + sys::AbsTime clusterTime; + sys::Duration clusterTimeOffset; + broker::AclModule* acl; friend std::ostream& operator<<(std::ostream&, const Cluster&); - friend class ClusterDispatcher; + friend struct ClusterDispatcher; }; }} // namespace qpid::cluster |