diff options
| author | Alan Conway <aconway@apache.org> | 2008-05-20 13:44:34 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-05-20 13:44:34 +0000 |
| commit | 0333573627c831142aa251bfb1cabdb1e2bf438e (patch) | |
| tree | 953bf8c624374c57953aa3f2888254d175609d9a /cpp/src/qpid/broker | |
| parent | 96024622ccfcc8fdd24b3c9ace44f7c8849fac46 (diff) | |
| download | qpid-python-0333573627c831142aa251bfb1cabdb1e2bf438e.tar.gz | |
Support for AMQP 0-10 sessions in C++ broker.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@658246 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/Bridge.cpp | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 55 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SaslAuthenticator.cpp | 17 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionHandler.cpp | 209 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionHandler.h | 64 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionManager.cpp | 78 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionManager.h | 39 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 124 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 47 |
12 files changed, 203 insertions, 440 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index 337992992f..f3e103dfaf 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -59,6 +59,7 @@ void Bridge::create(ConnectionState& c) peer.reset(new framing::AMQP_ServerProxy(*channelHandler)); session->attach(name, false); + session->commandPoint(0,0); if (args.i_src_is_local) { //TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received() diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 0a45c46d30..4f7686aac4 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -53,6 +53,9 @@ #if HAVE_SASL #include <sasl/sasl.h> +static const bool AUTH_DEFAULT=true; +#else +static const bool AUTH_DEFAULT=false; #endif using qpid::sys::ProtocolFactory; @@ -81,41 +84,25 @@ Broker::Options::Options(const std::string& name) : stagingThreshold(5000000), enableMgmt(1), mgmtPubInterval(10), -#if HAVE_SASL - auth(true), -#else - auth(false), -#endif - realm("QPID"), - ack(0) + auth(AUTH_DEFAULT), + replayFlushLimit(64), + replayHardLimit(0) { int c = sys::SystemInfo::concurrency(); workerThreads=c+1; addOptions() - ("data-dir", optValue(dataDir,"DIR"), - "Directory to contain persistent data generated by the broker") - ("no-data-dir", optValue(noDataDir), - "Don't use a data directory. No persistent configuration will be loaded or stored") - ("port,p", optValue(port,"PORT"), - "Tells the broker to listen on PORT") - ("worker-threads", optValue(workerThreads, "N"), - "Sets the broker thread pool size") - ("max-connections", optValue(maxConnections, "N"), - "Sets the maximum allowed connections") - ("connection-backlog", optValue(connectionBacklog, "N"), - "Sets the connection backlog limit for the server socket") - ("staging-threshold", optValue(stagingThreshold, "N"), - "Stages messages over N bytes to disk") - ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), - "Enable Management") - ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), - "Management Publish Interval") - ("auth", optValue(auth, "yes|no"), - "Enable authentication, if disabled all incoming connections will be trusted") - ("realm", optValue(realm, "REALM"), - "Use the given realm when performing authentication") - ("ack", optValue(ack, "N"), - "Send session.ack/solicit-ack at least every N frames. 0 disables voluntary ack/solitict-ack"); + ("data-dir", optValue(dataDir,"DIR"), "Directory to contain persistent data generated by the broker") + ("no-data-dir", optValue(noDataDir), "Don't use a data directory. No persistent configuration will be loaded or stored") + ("port,p", optValue(port,"PORT"), "Tells the broker to listen on PORT") + ("worker-threads", optValue(workerThreads, "N"), "Sets the broker thread pool size") + ("max-connections", optValue(maxConnections, "N"), "Sets the maximum allowed connections") + ("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket") + ("staging-threshold", optValue(stagingThreshold, "N"), "Stages messages over N bytes to disk") + ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management") + ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval") + ("auth", optValue(auth, "yes|no"), "Enable authentication, if disabled all incoming connections will be trusted") + ("replay-flush-limit", optValue(replayFlushLimit, "KB"), "Send flush request when the replay buffer reaches this limit. 0 means no limit.") + ("replay-hard-limit", optValue(replayHardLimit, "KB"), "Kill a session if its replay buffer exceeds this limit. 0 means no limit."); } const std::string empty; @@ -132,7 +119,11 @@ Broker::Broker(const Broker::Options& conf) : dataDir(conf.noDataDir ? std::string () : conf.dataDir), links(this), factory(*this), - sessionManager(conf.ack) + sessionManager( + qpid::SessionState::Configuration( + conf.replayFlushLimit*1024, // convert kb to bytes. + conf.replayHardLimit*1024), + *this) { if(conf.enableMgmt){ QPID_LOG(info, "Management enabled"); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index a1eaf4f62f..7092a86181 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -83,7 +83,8 @@ class Broker : public sys::Runnable, public Plugin::Target, uint16_t mgmtPubInterval; bool auth; std::string realm; - uint32_t ack; + size_t replayFlushLimit; + size_t replayHardLimit; }; virtual ~Broker(); diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index d156b4a914..463193a346 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -124,8 +124,8 @@ void Connection::idleIn(){} void Connection::closed(){ // Physically closed, suspend open sessions. try { - for (ChannelMap::iterator i = channels.begin(); i != channels.end(); ++i) - ptr_map_ptr(i)->handleDetach(); + while (!channels.empty()) + ptr_map_ptr(channels.begin())->handleDetach(); while (!exclusiveQueues.empty()) { Queue::shared_ptr q(exclusiveQueues.front()); q->releaseExclusiveOwnership(); diff --git a/cpp/src/qpid/broker/SaslAuthenticator.cpp b/cpp/src/qpid/broker/SaslAuthenticator.cpp index d48b258ba2..a542211147 100644 --- a/cpp/src/qpid/broker/SaslAuthenticator.cpp +++ b/cpp/src/qpid/broker/SaslAuthenticator.cpp @@ -89,13 +89,20 @@ void NullAuthenticator::getMechanisms(Array& mechanisms) mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value("ANONYMOUS"))); } -void NullAuthenticator::start(const string& /*mechanism*/, const string& /*response*/) +void NullAuthenticator::start(const string& mechanism, const string& response) { QPID_LOG(warning, "SASL: No Authentication Performed"); - - // TODO: Figure out what should actually be set in this case - connection.setUserId("anonymous"); - + if (mechanism == "PLAIN") { // Old behavior + if (response.size() > 0 && response[0] == (char) 0) { + string temp = response.substr(1); + string::size_type i = temp.find((char)0); + string uid = temp.substr(0, i); + string pwd = temp.substr(i + 1); + connection.setUserId(uid); + } + } else { + connection.setUserId("anonymous"); + } client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, 0); } diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index e284451d14..2d0edde27b 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -410,7 +410,7 @@ void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& comm framing::MessageAcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers) { - //TODO: change this when SequenceNumberSet is deleted along with preview code + // FIXME aconway 2008-05-12: create SequenceSet directly, no need for intermediate results vector. SequenceNumberSet results; RangedOperation f = boost::bind(&SemanticState::acquire, &state, _1, _2, boost::ref(results)); transfers.for_each(f); diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index 2ec0988fc0..2f09c6b5ac 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -21,12 +21,6 @@ #include "SessionHandler.h" #include "SessionState.h" #include "Connection.h" -#include "ConnectionState.h" -#include "qpid/framing/reply_exceptions.h" -#include "qpid/framing/constants.h" -#include "qpid/framing/ClientInvoker.h" -#include "qpid/framing/ServerInvoker.h" -#include "qpid/framing/all_method_bodies.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> @@ -38,11 +32,9 @@ using namespace std; using namespace qpid::sys; SessionHandler::SessionHandler(Connection& c, ChannelId ch) - : InOutHandler(0, &out), - connection(c), channel(ch, &c.getOutput()), - proxy(out), // Via my own handleOut() for L2 data. - peerSession(channel), // Direct to channel for L2 commands. - ignoring(false) + : amqp_0_10::SessionHandler(&c.getOutput(), ch), + connection(c), + proxy(out) {} SessionHandler::~SessionHandler() {} @@ -52,191 +44,52 @@ ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; } MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; } } // namespace -void SessionHandler::handleIn(AMQFrame& f) { - // Note on channel states: a channel is attached if session != 0 - AMQMethodBody* m = f.getBody()->getMethod(); - try { - if (ignoring && !(m && m->isA<SessionDetachedBody>())) { - return; - } - if (m && isValid(m) && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) { - //frame was a valid session control and has been handled - return; - } else if (session.get()) { - //we are attached and frame was not a session control so it is for upper layers - session->handle(f); - } else if (m && m->isA<SessionDetachedBody>()) { - handleDetach(); - connection.closeChannel(channel.get()); - } else { - throw NotAttachedException(QPID_MSG("Channel " << channel.get() << " is not attached")); - } - }catch(const ChannelException& e){ - QPID_LOG(error, "Session detached due to: " << e.what()); - peerSession.detached(name, e.code); +void SessionHandler::channelException(uint16_t, const std::string&) { handleDetach(); - connection.closeChannel(channel.get()); - }catch(const ConnectionException& e){ - connection.close(e.code, e.what(), classId(m), methodId(m)); - }catch(const std::exception& e){ - connection.close(501, e.what(), classId(m), methodId(m)); - } } -bool SessionHandler::isValid(AMQMethodBody* m) { - return session.get() || m->isA<SessionAttachBody>() || m->isA<SessionAttachedBody>(); -} - -void SessionHandler::handleOut(AMQFrame& f) { - channel.handle(f); // Send it. - if (session->sent(f)) - peerSession.flush(false, false, true); -} - -void SessionHandler::assertAttached(const char* method) const { - if (!session.get()) { - std::cout << "SessionHandler::assertAttached() failed for " << method << std::endl; - throw NotAttachedException( - QPID_MSG(method << " failed: No session for channel " - << getChannel())); - } -} - -void SessionHandler::assertClosed(const char* method) const { - if (session.get()) - throw SessionBusyException( - QPID_MSG(method << " failed: channel " << channel.get() - << " is already open.")); +void SessionHandler::connectionException(uint16_t code, const std::string& msg) { + connection.close(code, msg, 0, 0); } ConnectionState& SessionHandler::getConnection() { return connection; } -const ConnectionState& SessionHandler::getConnection() const { return connection; } - -//new methods: -void SessionHandler::attach(const std::string& _name, bool /*force*/) -{ - name = _name;//TODO: this should be used in conjunction with - //userid for connection as sessions identity - //TODO: need to revise session manager to support resume as well - assertClosed("attach"); - session.reset(new SessionState(0, this, 0, 0, name)); - peerSession.attached(name); - peerSession.commandPoint(session->nextOut, 0); -} - -void SessionHandler::attached(const std::string& _name) -{ - name = _name;//TODO: this should be used in conjunction with - //userid for connection as sessions identity - session.reset(new SessionState(0, this, 0, 0, name)); - peerSession.commandPoint(session->nextOut, 0); -} +const ConnectionState& SessionHandler::getConnection() const { return connection; } -void SessionHandler::detach(const std::string& name) -{ - assertAttached("detach"); - peerSession.detached(name, session::NORMAL); - handleDetach(); +void SessionHandler::handleDetach() { + amqp_0_10::SessionHandler::handleDetach(); assert(&connection.getChannel(channel.get()) == this); + if (session.get()) + connection.getBroker().getSessionManager().detach(session); + assert(!session.get()); connection.closeChannel(channel.get()); } -void SessionHandler::detached(const std::string& name, uint8_t code) -{ - ignoring = false; - handleDetach(); - if (code) { - //no error - } else { - //error occured - QPID_LOG(warning, "Received session.closed: "<< name << " " << code); - } - connection.closeChannel(channel.get()); -} - -void SessionHandler::handleDetach() -{ - if (session.get()) { - session->detach(); - session.reset(); - } -} - -void SessionHandler::requestDetach() -{ - //TODO: request timeout when python can handle it - //peerSession.requestTimeout(0); - ignoring = true; - peerSession.detach(name); -} - -void SessionHandler::requestTimeout(uint32_t t) -{ - session->setTimeout(t); - peerSession.timeout(t); -} - -void SessionHandler::timeout(uint32_t t) -{ - session->setTimeout(t); -} - -void SessionHandler::commandPoint(const framing::SequenceNumber& id, uint64_t offset) -{ - if (offset) throw NotImplementedException("Non-zero byte offset not yet supported for command-point"); - - session->nextIn = id; -} - -void SessionHandler::expected(const framing::SequenceSet& commands, const framing::Array& fragments) -{ - if (!commands.empty() || fragments.size()) { - throw NotImplementedException("Session resumption not yet supported"); - } +void SessionHandler::setState(const std::string& name, bool force) { + assert(!session.get()); + SessionId id(connection.getUserId(), name); + session = connection.broker.getSessionManager().attach(*this, id, force); } -void SessionHandler::confirmed(const framing::SequenceSet& /*commands*/, const framing::Array& /*fragments*/) -{ - //don't really care too much about this yet -} +FrameHandler* SessionHandler::getInHandler() { return session.get(); } +qpid::SessionState* SessionHandler::getState() { return session.get(); } -void SessionHandler::completed(const framing::SequenceSet& commands, bool timelyReply) -{ - session->complete(commands); - if (timelyReply) { - peerSession.knownCompleted(session->knownCompleted); - session->knownCompleted.clear(); - } +void SessionHandler::readyToSend() { + if (session.get()) session->readyToSend(); } -void SessionHandler::knownCompleted(const framing::SequenceSet& commands) -{ - session->completed.remove(commands); -} - -void SessionHandler::flush(bool expected, bool confirmed, bool completed) -{ - if (expected) { - peerSession.expected(SequenceSet(session->nextIn), Array()); - } - if (confirmed) { - peerSession.confirmed(session->completed, Array()); - } - if (completed) { - peerSession.completed(session->completed, true); - } -} - - -void SessionHandler::sendCompletion() -{ - peerSession.completed(session->completed, true); +// TODO aconway 2008-05-12: hacky - handle attached for bridge clients. +// We need to integrate the client code so we can run a real client +// in the bridge. +// +void SessionHandler::attached(const std::string& name) { + if (session.get()) + checkName(name); + else { + SessionId id(connection.getUserId(), name); + SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig(); + session.reset(new SessionState(connection.getBroker(), *this, id, config)); } - -void SessionHandler::gap(const framing::SequenceSet& /*commands*/) -{ - throw NotImplementedException("gap not yet supported"); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h index 47c534441a..1aa3137fdf 100644 --- a/cpp/src/qpid/broker/SessionHandler.h +++ b/cpp/src/qpid/broker/SessionHandler.h @@ -22,19 +22,12 @@ * */ -#include "qpid/framing/FrameHandler.h" -#include "qpid/framing/AMQP_ClientOperations.h" -#include "qpid/framing/AMQP_ServerOperations.h" +#include "qpid/amqp_0_10/SessionHandler.h" #include "qpid/framing/AMQP_ClientProxy.h" -#include "qpid/framing/amqp_types.h" -#include "qpid/framing/Array.h" -#include "qpid/framing/ChannelHandler.h" -#include "qpid/framing/SequenceNumber.h" - - -#include <boost/noncopyable.hpp> namespace qpid { +class SessionState; + namespace broker { class Connection; @@ -46,65 +39,38 @@ class SessionState; * receives incoming frames, handles session controls and manages the * association between the channel and a session. */ -class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler, - public framing::FrameHandler::InOutHandler, - private boost::noncopyable -{ +class SessionHandler : public amqp_0_10::SessionHandler { public: SessionHandler(Connection&, framing::ChannelId); ~SessionHandler(); - /** Returns 0 if not attached to a session */ + /** Get broker::SessionState */ SessionState* getSession() { return session.get(); } const SessionState* getSession() const { return session.get(); } - framing::ChannelId getChannel() const { return channel.get(); } - ConnectionState& getConnection(); const ConnectionState& getConnection() const; framing::AMQP_ClientProxy& getProxy() { return proxy; } const framing::AMQP_ClientProxy& getProxy() const { return proxy; } - void requestDetach(); - void handleDetach(); - void sendCompletion(); - - protected: - void handleIn(framing::AMQFrame&); - void handleOut(framing::AMQFrame&); + virtual void handleDetach(); - private: - //new methods: - void attach(const std::string& name, bool force); + // Overrides void attached(const std::string& name); - void detach(const std::string& name); - void detached(const std::string& name, uint8_t code); - - void requestTimeout(uint32_t t); - void timeout(uint32_t t); - - void commandPoint(const framing::SequenceNumber& id, uint64_t offset); - void expected(const framing::SequenceSet& commands, const framing::Array& fragments); - void confirmed(const framing::SequenceSet& commands,const framing::Array& fragments); - void completed(const framing::SequenceSet& commands, bool timelyReply); - void knownCompleted(const framing::SequenceSet& commands); - void flush(bool expected, bool confirmed, bool completed); - void gap(const framing::SequenceSet& commands); - void assertAttached(const char* method) const; - void assertActive(const char* method) const; - void assertClosed(const char* method) const; - - bool isValid(framing::AMQMethodBody*); + protected: + virtual void setState(const std::string& sessionName, bool force); + virtual qpid::SessionState* getState(); + virtual framing::FrameHandler* getInHandler(); + virtual void channelException(uint16_t code, const std::string& msg); + virtual void connectionException(uint16_t code, const std::string& msg); + virtual void readyToSend(); + private: Connection& connection; - framing::ChannelHandler channel; framing::AMQP_ClientProxy proxy; - framing::AMQP_ClientProxy::Session peerSession; - bool ignoring; std::auto_ptr<SessionState> session; - std::string name;//TODO: this should be part of the session state and replace the id }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionManager.cpp b/cpp/src/qpid/broker/SessionManager.cpp index d7bae737fc..789e43b902 100644 --- a/cpp/src/qpid/broker/SessionManager.cpp +++ b/cpp/src/qpid/broker/SessionManager.cpp @@ -40,68 +40,58 @@ using boost::intrusive_ptr; using namespace sys; using namespace framing; -SessionManager::SessionManager(uint32_t a) : ack(a) {} +SessionManager::SessionManager(const SessionState::Configuration& c, Broker& b) + : config(c), broker(b) {} -SessionManager::~SessionManager() {} +SessionManager::~SessionManager() { + detached.clear(); // Must clear before destructor as session dtor will call forget() +} -// FIXME aconway 2008-02-01: pass handler*, allow open unattached. -std::auto_ptr<SessionState> SessionManager::open( - SessionHandler& h, uint32_t timeout_, std::string _name) -{ +std::auto_ptr<SessionState> SessionManager::attach(SessionHandler& h, const SessionId& id, bool/*force*/) { Mutex::ScopedLock l(lock); - std::auto_ptr<SessionState> session( - new SessionState(this, &h, timeout_, ack, _name)); - active.insert(session->getId()); + eraseExpired(); // Clean up expired table + std::pair<Attached::iterator, bool> insert = attached.insert(id); + if (!insert.second) + throw SessionBusyException(QPID_MSG("Session already attached: " << id)); + Detached::iterator i = std::find(detached.begin(), detached.end(), id); + std::auto_ptr<SessionState> state; + if (i == detached.end()) { + state.reset(new SessionState(broker, h, id, config)); for_each(observers.begin(), observers.end(), - boost::bind(&Observer::opened, _1,boost::ref(*session))); - return session; + boost::bind(&Observer::opened, _1,boost::ref(*state))); + } + else { + state.reset(detached.release(i).release()); + state->attach(h); + } + return state; + // FIXME aconway 2008-04-29: implement force } -void SessionManager::suspend(std::auto_ptr<SessionState> session) { +void SessionManager::detach(std::auto_ptr<SessionState> session) { Mutex::ScopedLock l(lock); - active.erase(session->getId()); - session->suspend(); + attached.erase(session->getId()); + session->detach(); + if (session->getTimeout() > 0) { session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC); if (session->mgmtObject.get() != 0) session->mgmtObject->set_expireTime ((uint64_t) Duration (session->expiry)); - suspended.push_back(session.release()); // In expiry order + detached.push_back(session.release()); // In expiry order eraseExpired(); } - -std::auto_ptr<SessionState> SessionManager::resume(const Uuid& id) -{ - Mutex::ScopedLock l(lock); - eraseExpired(); - if (active.find(id) != active.end()) - throw SessionBusyException( - QPID_MSG("Session already active: " << id)); - Suspended::iterator i = std::find_if( - suspended.begin(), suspended.end(), - boost::bind(std::equal_to<Uuid>(), id, boost::bind(&SessionState::getId, _1)) - ); - if (i == suspended.end()) - throw InvalidArgumentException( - QPID_MSG("No suspended session with id=" << id)); - active.insert(id); - std::auto_ptr<SessionState> state(suspended.release(i).release()); - return state; } -void SessionManager::erase(const framing::Uuid& id) -{ - Mutex::ScopedLock l(lock); - active.erase(id); -} +void SessionManager::forget(const SessionId& id) { attached.erase(id); } void SessionManager::eraseExpired() { // Called with lock held. - if (!suspended.empty()) { - Suspended::iterator keep = std::lower_bound( - suspended.begin(), suspended.end(), now(), + if (!detached.empty()) { + Detached::iterator keep = std::lower_bound( + detached.begin(), detached.end(), now(), boost::bind(std::less<AbsTime>(), boost::bind(&SessionState::expiry, _1), _2)); - if (suspended.begin() != keep) { - QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep)); - suspended.erase(suspended.begin(), keep); + if (detached.begin() != keep) { + QPID_LOG(debug, "Expiring sessions: " << log::formatList(detached.begin(), keep)); + detached.erase(detached.begin(), keep); } } } diff --git a/cpp/src/qpid/broker/SessionManager.h b/cpp/src/qpid/broker/SessionManager.h index ad064c69bb..9a4142f613 100644 --- a/cpp/src/qpid/broker/SessionManager.h +++ b/cpp/src/qpid/broker/SessionManager.h @@ -22,7 +22,7 @@ * */ -#include <qpid/framing/Uuid.h> +#include <qpid/SessionState.h> #include <qpid/sys/Time.h> #include <qpid/sys/Mutex.h> #include <qpid/RefCounted.h> @@ -37,7 +37,7 @@ namespace qpid { namespace broker { - +class Broker; class SessionState; class SessionHandler; @@ -50,44 +50,43 @@ class SessionManager : private boost::noncopyable { * Observer notified of SessionManager events. */ struct Observer : public RefCounted { + /** Called when a stateless session is attached. */ virtual void opened(SessionState&) {} }; - SessionManager(uint32_t ack); + SessionManager(const qpid::SessionState::Configuration&, Broker&); ~SessionManager(); /** Open a new active session, caller takes ownership */ - std::auto_ptr<SessionState> open(SessionHandler& c, uint32_t timeout_, std::string name); + std::auto_ptr<SessionState> attach(SessionHandler& h, const SessionId& id, bool/*force*/); - /** Suspend a session, start it's timeout counter. - * The factory takes ownership. - */ - void suspend(std::auto_ptr<SessionState> session); + /** Return a detached session to the manager, start the timeout counter. */ + void detach(std::auto_ptr<SessionState>); - /** Resume a suspended session. - *@throw Exception if timed out or non-existant. - */ - std::auto_ptr<SessionState> resume(const framing::Uuid&); + /** Forget about an attached session. Called by SessionState destructor. */ + void forget(const SessionId&); /** Add an Observer. */ void add(const boost::intrusive_ptr<Observer>&); + Broker& getBroker() const { return broker; } + + const qpid::SessionState::Configuration& getSessionConfig() const { return config; } + private: - typedef boost::ptr_vector<SessionState> Suspended; - typedef std::set<framing::Uuid> Active; + typedef boost::ptr_vector<SessionState> Detached; // Sorted in expiry order. + typedef std::set<SessionId> Attached; typedef std::vector<boost::intrusive_ptr<Observer> > Observers; - void erase(const framing::Uuid&); void eraseExpired(); sys::Mutex lock; - Suspended suspended; - Active active; - uint32_t ack; + Detached detached; + Attached attached; + qpid::SessionState::Configuration config; Observers observers; - - friend class SessionState; // removes deleted sessions from active set. + Broker& broker; }; diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 2ef1ed2de4..c851162046 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -32,6 +32,7 @@ #include "qpid/log/Statement.h" #include <boost/bind.hpp> +#include <boost/lexical_cast.hpp> namespace qpid { namespace broker { @@ -45,59 +46,46 @@ using qpid::management::Manageable; using qpid::management::Args; SessionState::SessionState( - SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack, string& _name) - : framing::SessionState(ack, timeout_ > 0), nextOut(0), - factory(f), handler(h), id(true), timeout(timeout_), - broker(h->getConnection().broker), - version(h->getConnection().getVersion()), - ignoring(false), name(_name), + Broker& b, SessionHandler& h, const SessionId& id, const SessionState::Configuration& config) + : qpid::SessionState(id, config), + broker(b), handler(&h), + ignoring(false), semanticState(*this, *this), adapter(semanticState), msgBuilder(&broker.getStore(), broker.getStagingThreshold()), - ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2)), enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)) { - getConnection().outputTasks.addOutputTask(&semanticState); - Manageable* parent = broker.GetVhostObject (); - - if (parent != 0) - { + if (parent != 0) { ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); - - if (agent.get () != 0) - { + if (agent.get () != 0) { mgmtObject = management::Session::shared_ptr - (new management::Session (this, parent, name)); - mgmtObject->set_attached (1); - mgmtObject->set_clientRef (h->getConnection().GetManagementObject()->getObjectId()); - mgmtObject->set_channelId (h->getChannel()); - mgmtObject->set_detachedLifespan (getTimeout()); + (new management::Session (this, parent, getId().getName())); + mgmtObject->set_attached (0); agent->addObject (mgmtObject); } } + attach(h); } SessionState::~SessionState() { // Remove ID from active session list. - if (factory) - factory->erase(getId()); + // FIXME aconway 2008-05-12: Need to distinguish outgoing sessions established by bridge, + // they don't belong in the manager. For now rely on uniqueness of UUIDs. + // + broker.getSessionManager().forget(getId()); if (mgmtObject.get () != 0) mgmtObject->resourceDestroy (); } -SessionHandler* SessionState::getHandler() { - return handler; -} - AMQP_ClientProxy& SessionState::getProxy() { assert(isAttached()); - return getHandler()->getProxy(); + return handler->getProxy(); } ConnectionState& SessionState::getConnection() { assert(isAttached()); - return getHandler()->getConnection(); + return handler->getConnection(); } bool SessionState::isLocal(const ConnectionToken* t) const @@ -106,18 +94,19 @@ bool SessionState::isLocal(const ConnectionToken* t) const } void SessionState::detach() { - getConnection().outputTasks.removeOutputTask(&semanticState); + // activateOutput can be called in a different thread, lock to protect attached status Mutex::ScopedLock l(lock); + QPID_LOG(debug, getId() << ": detached on broker."); + getConnection().outputTasks.removeOutputTask(&semanticState); handler = 0; if (mgmtObject.get() != 0) - { mgmtObject->set_attached (0); } -} void SessionState::attach(SessionHandler& h) { - { + // activateOutput can be called in a different thread, lock to protect attached status Mutex::ScopedLock l(lock); + QPID_LOG(debug, getId() << ": attached on broker."); handler = &h; if (mgmtObject.get() != 0) { @@ -126,16 +115,13 @@ void SessionState::attach(SessionHandler& h) { mgmtObject->set_channelId (h.getChannel()); } } - h.getConnection().outputTasks.addOutputTask(&semanticState); -} -void SessionState::activateOutput() -{ +void SessionState::activateOutput() { + // activateOutput can be called in a different thread, lock to protect attached status Mutex::ScopedLock l(lock); - if (isAttached()) { + if (isAttached()) getConnection().outputTasks.activateOutput(); } -} //This class could be used as the callback for queue notifications //if not attached, it can simply ignore the callback, else pass it //on to the connection @@ -155,7 +141,7 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, case management::Session::METHOD_DETACH : if (handler != 0) { - handler->requestDetach(); + handler->sendDetach(); } status = Manageable::STATUS_OK; break; @@ -179,35 +165,25 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, return status; } -void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber& id) -{ - id = nextIn++; +void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) { Invoker::Result invocation = invoke(adapter, *method); - completed.add(id); - + receiverCompleted(id); if (!invocation.wasHandled()) { throw NotImplementedException(QPID_MSG("Not implemented: " << *method)); } else if (invocation.hasResult()) { - nextOut++;//execution result is now a command, so the counter must be incremented getProxy().getExecution().result(id, invocation.getResult()); } if (method->isSync()) { incomplete.process(enqueuedOp, true); sendCompletion(); } - //TODO: if window gets too large send unsolicited completion } -void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id) +void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) { - intrusive_ptr<Message> msg(msgBuilder.getMessage()); - if (frame.getBof() && frame.getBos()) {//start of frameset - id = nextIn++; + if (frame.getBof() && frame.getBos()) //start of frameset msgBuilder.start(id); - msg = msgBuilder.getMessage(); - } else { - id = msg->getCommandId(); - } + intrusive_ptr<Message> msg(msgBuilder.getMessage()); msgBuilder.handle(frame); if (frame.getEof() && frame.getEos()) {//end of frameset if (frame.getBof()) { @@ -240,19 +216,14 @@ void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id) void SessionState::enqueued(boost::intrusive_ptr<Message> msg) { - completed.add(msg->getCommandId()); - if (msg->requiresAccept()) { - nextOut++;//accept is a command, so the counter must be incremented + receiverCompleted(msg->getCommandId()); + if (msg->requiresAccept()) getProxy().getMessage().accept(SequenceSet(msg->getCommandId())); } -} void SessionState::handle(AMQFrame& frame) { - if (ignoring) return; - received(frame); - - SequenceNumber commandId; + SequenceNumber commandId = receiverGetCurrent(); try { //TODO: make command handling more uniform, regardless of whether //commands carry content. @@ -277,29 +248,36 @@ void SessionState::handle(AMQFrame& frame) } else { getProxy().getExecution().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable()); } - timeout = 0; ignoring = true; - handler->requestDetach(); + handler->sendDetach(); } } DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) { uint32_t maxFrameSize = getConnection().getFrameMax(); - MessageDelivery::deliver(msg, getProxy().getHandler(), nextOut, token, maxFrameSize); - return nextOut++; + assert(senderGetCommandPoint().offset == 0); + SequenceNumber commandId = senderGetCommandPoint().command; + MessageDelivery::deliver(msg, getProxy().getHandler(), commandId, token, maxFrameSize); + assert(senderGetCommandPoint() == SessionPoint(commandId+1, 0)); // Delivery has moved sendPoint. + return commandId; } -void SessionState::sendCompletion() -{ - handler->sendCompletion(); +void SessionState::sendCompletion() { handler->sendCompletion(); } + +void SessionState::senderCompleted(const SequenceSet& commands) { + qpid::SessionState::senderCompleted(commands); + commands.for_each(boost::bind(&SemanticState::completed, &semanticState, _1, _2)); } -void SessionState::complete(const SequenceSet& commands) -{ - knownCompleted.add(commands); - commands.for_each(ackOp); +void SessionState::readyToSend() { + QPID_LOG(debug, getId() << ": ready to send, activating output."); + assert(handler); + sys::AggregateOutput& tasks = handler->getConnection().outputTasks; + tasks.addOutputTask(&semanticState); + tasks.activateOutput(); } +Broker& SessionState::getBroker() { return broker; } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index ae860e84c9..7b70789161 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -22,11 +22,9 @@ * */ -#include "qpid/framing/Uuid.h" +#include "qpid/SessionState.h" #include "qpid/framing/FrameHandler.h" -#include "qpid/framing/SessionState.h" #include "qpid/framing/SequenceSet.h" -#include "qpid/framing/ProtocolVersion.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Time.h" #include "qpid/management/Manageable.h" @@ -63,21 +61,20 @@ class SessionManager; * Broker-side session state includes sessions handler chains, which may * themselves have state. */ -class SessionState : public framing::SessionState, +class SessionState : public qpid::SessionState, public SessionContext, public DeliveryAdapter, - public management::Manageable + public management::Manageable, + public framing::FrameHandler { public: + SessionState(Broker&, SessionHandler&, const SessionId&, const SessionState::Configuration&); ~SessionState(); bool isAttached() const { return handler; } void detach(); void attach(SessionHandler& handler); - - SessionHandler* getHandler(); - /** @pre isAttached() */ framing::AMQP_ClientProxy& getProxy(); @@ -85,18 +82,15 @@ class SessionState : public framing::SessionState, ConnectionState& getConnection(); bool isLocal(const ConnectionToken* t) const; - uint32_t getTimeout() const { return timeout; } - void setTimeout(uint32_t t) { timeout = t; } - - Broker& getBroker() { return broker; } - framing::ProtocolVersion getVersion() const { return version; } + Broker& getBroker(); /** OutputControl **/ void activateOutput(); void handle(framing::AMQFrame& frame); - void complete(const framing::SequenceSet& ranges); + void senderCompleted(const framing::SequenceSet& ranges); + void sendCompletion(); //delivery adapter methods: @@ -107,29 +101,13 @@ class SessionState : public framing::SessionState, management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args); - // Normally SessionManager creates sessions. - SessionState(SessionManager*, - SessionHandler* out, - uint32_t timeout, - uint32_t ackInterval, - std::string& name); - - - framing::SequenceSet completed; - framing::SequenceSet knownCompleted; - framing::SequenceNumber nextIn; - framing::SequenceNumber nextOut; + void readyToSend(); private: - typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation; - SessionManager* factory; + Broker& broker; SessionHandler* handler; - framing::Uuid id; - uint32_t timeout; sys::AbsTime expiry; // Used by SessionManager. - Broker& broker; - framing::ProtocolVersion version; sys::Mutex lock; bool ignoring; std::string name; @@ -139,12 +117,11 @@ class SessionState : public framing::SessionState, MessageBuilder msgBuilder; IncompleteMessageList incomplete; - RangedOperation ackOp; IncompleteMessageList::CompletionListener enqueuedOp; management::Session::shared_ptr mgmtObject; - void handleCommand(framing::AMQMethodBody* method, framing::SequenceNumber& id); - void handleContent(framing::AMQFrame& frame, framing::SequenceNumber& id); + void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id); + void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id); void enqueued(boost::intrusive_ptr<Message> msg); friend class SessionManager; |
