summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-05-20 13:44:34 +0000
committerAlan Conway <aconway@apache.org>2008-05-20 13:44:34 +0000
commit0333573627c831142aa251bfb1cabdb1e2bf438e (patch)
tree953bf8c624374c57953aa3f2888254d175609d9a /cpp/src/qpid/broker
parent96024622ccfcc8fdd24b3c9ace44f7c8849fac46 (diff)
downloadqpid-python-0333573627c831142aa251bfb1cabdb1e2bf438e.tar.gz
Support for AMQP 0-10 sessions in C++ broker.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@658246 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp1
-rw-r--r--cpp/src/qpid/broker/Broker.cpp55
-rw-r--r--cpp/src/qpid/broker/Broker.h3
-rw-r--r--cpp/src/qpid/broker/Connection.cpp4
-rw-r--r--cpp/src/qpid/broker/SaslAuthenticator.cpp17
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp2
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp209
-rw-r--r--cpp/src/qpid/broker/SessionHandler.h64
-rw-r--r--cpp/src/qpid/broker/SessionManager.cpp78
-rw-r--r--cpp/src/qpid/broker/SessionManager.h39
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp124
-rw-r--r--cpp/src/qpid/broker/SessionState.h47
12 files changed, 203 insertions, 440 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index 337992992f..f3e103dfaf 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -59,6 +59,7 @@ void Bridge::create(ConnectionState& c)
peer.reset(new framing::AMQP_ServerProxy(*channelHandler));
session->attach(name, false);
+ session->commandPoint(0,0);
if (args.i_src_is_local) {
//TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received()
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 0a45c46d30..4f7686aac4 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -53,6 +53,9 @@
#if HAVE_SASL
#include <sasl/sasl.h>
+static const bool AUTH_DEFAULT=true;
+#else
+static const bool AUTH_DEFAULT=false;
#endif
using qpid::sys::ProtocolFactory;
@@ -81,41 +84,25 @@ Broker::Options::Options(const std::string& name) :
stagingThreshold(5000000),
enableMgmt(1),
mgmtPubInterval(10),
-#if HAVE_SASL
- auth(true),
-#else
- auth(false),
-#endif
- realm("QPID"),
- ack(0)
+ auth(AUTH_DEFAULT),
+ replayFlushLimit(64),
+ replayHardLimit(0)
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
addOptions()
- ("data-dir", optValue(dataDir,"DIR"),
- "Directory to contain persistent data generated by the broker")
- ("no-data-dir", optValue(noDataDir),
- "Don't use a data directory. No persistent configuration will be loaded or stored")
- ("port,p", optValue(port,"PORT"),
- "Tells the broker to listen on PORT")
- ("worker-threads", optValue(workerThreads, "N"),
- "Sets the broker thread pool size")
- ("max-connections", optValue(maxConnections, "N"),
- "Sets the maximum allowed connections")
- ("connection-backlog", optValue(connectionBacklog, "N"),
- "Sets the connection backlog limit for the server socket")
- ("staging-threshold", optValue(stagingThreshold, "N"),
- "Stages messages over N bytes to disk")
- ("mgmt-enable,m", optValue(enableMgmt,"yes|no"),
- "Enable Management")
- ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"),
- "Management Publish Interval")
- ("auth", optValue(auth, "yes|no"),
- "Enable authentication, if disabled all incoming connections will be trusted")
- ("realm", optValue(realm, "REALM"),
- "Use the given realm when performing authentication")
- ("ack", optValue(ack, "N"),
- "Send session.ack/solicit-ack at least every N frames. 0 disables voluntary ack/solitict-ack");
+ ("data-dir", optValue(dataDir,"DIR"), "Directory to contain persistent data generated by the broker")
+ ("no-data-dir", optValue(noDataDir), "Don't use a data directory. No persistent configuration will be loaded or stored")
+ ("port,p", optValue(port,"PORT"), "Tells the broker to listen on PORT")
+ ("worker-threads", optValue(workerThreads, "N"), "Sets the broker thread pool size")
+ ("max-connections", optValue(maxConnections, "N"), "Sets the maximum allowed connections")
+ ("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket")
+ ("staging-threshold", optValue(stagingThreshold, "N"), "Stages messages over N bytes to disk")
+ ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
+ ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval")
+ ("auth", optValue(auth, "yes|no"), "Enable authentication, if disabled all incoming connections will be trusted")
+ ("replay-flush-limit", optValue(replayFlushLimit, "KB"), "Send flush request when the replay buffer reaches this limit. 0 means no limit.")
+ ("replay-hard-limit", optValue(replayHardLimit, "KB"), "Kill a session if its replay buffer exceeds this limit. 0 means no limit.");
}
const std::string empty;
@@ -132,7 +119,11 @@ Broker::Broker(const Broker::Options& conf) :
dataDir(conf.noDataDir ? std::string () : conf.dataDir),
links(this),
factory(*this),
- sessionManager(conf.ack)
+ sessionManager(
+ qpid::SessionState::Configuration(
+ conf.replayFlushLimit*1024, // convert kb to bytes.
+ conf.replayHardLimit*1024),
+ *this)
{
if(conf.enableMgmt){
QPID_LOG(info, "Management enabled");
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index a1eaf4f62f..7092a86181 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -83,7 +83,8 @@ class Broker : public sys::Runnable, public Plugin::Target,
uint16_t mgmtPubInterval;
bool auth;
std::string realm;
- uint32_t ack;
+ size_t replayFlushLimit;
+ size_t replayHardLimit;
};
virtual ~Broker();
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index d156b4a914..463193a346 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -124,8 +124,8 @@ void Connection::idleIn(){}
void Connection::closed(){ // Physically closed, suspend open sessions.
try {
- for (ChannelMap::iterator i = channels.begin(); i != channels.end(); ++i)
- ptr_map_ptr(i)->handleDetach();
+ while (!channels.empty())
+ ptr_map_ptr(channels.begin())->handleDetach();
while (!exclusiveQueues.empty()) {
Queue::shared_ptr q(exclusiveQueues.front());
q->releaseExclusiveOwnership();
diff --git a/cpp/src/qpid/broker/SaslAuthenticator.cpp b/cpp/src/qpid/broker/SaslAuthenticator.cpp
index d48b258ba2..a542211147 100644
--- a/cpp/src/qpid/broker/SaslAuthenticator.cpp
+++ b/cpp/src/qpid/broker/SaslAuthenticator.cpp
@@ -89,13 +89,20 @@ void NullAuthenticator::getMechanisms(Array& mechanisms)
mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value("ANONYMOUS")));
}
-void NullAuthenticator::start(const string& /*mechanism*/, const string& /*response*/)
+void NullAuthenticator::start(const string& mechanism, const string& response)
{
QPID_LOG(warning, "SASL: No Authentication Performed");
-
- // TODO: Figure out what should actually be set in this case
- connection.setUserId("anonymous");
-
+ if (mechanism == "PLAIN") { // Old behavior
+ if (response.size() > 0 && response[0] == (char) 0) {
+ string temp = response.substr(1);
+ string::size_type i = temp.find((char)0);
+ string uid = temp.substr(0, i);
+ string pwd = temp.substr(i + 1);
+ connection.setUserId(uid);
+ }
+ } else {
+ connection.setUserId("anonymous");
+ }
client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), 0, 0);
}
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index e284451d14..2d0edde27b 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -410,7 +410,7 @@ void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& comm
framing::MessageAcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers)
{
- //TODO: change this when SequenceNumberSet is deleted along with preview code
+ // FIXME aconway 2008-05-12: create SequenceSet directly, no need for intermediate results vector.
SequenceNumberSet results;
RangedOperation f = boost::bind(&SemanticState::acquire, &state, _1, _2, boost::ref(results));
transfers.for_each(f);
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index 2ec0988fc0..2f09c6b5ac 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -21,12 +21,6 @@
#include "SessionHandler.h"
#include "SessionState.h"
#include "Connection.h"
-#include "ConnectionState.h"
-#include "qpid/framing/reply_exceptions.h"
-#include "qpid/framing/constants.h"
-#include "qpid/framing/ClientInvoker.h"
-#include "qpid/framing/ServerInvoker.h"
-#include "qpid/framing/all_method_bodies.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
@@ -38,11 +32,9 @@ using namespace std;
using namespace qpid::sys;
SessionHandler::SessionHandler(Connection& c, ChannelId ch)
- : InOutHandler(0, &out),
- connection(c), channel(ch, &c.getOutput()),
- proxy(out), // Via my own handleOut() for L2 data.
- peerSession(channel), // Direct to channel for L2 commands.
- ignoring(false)
+ : amqp_0_10::SessionHandler(&c.getOutput(), ch),
+ connection(c),
+ proxy(out)
{}
SessionHandler::~SessionHandler() {}
@@ -52,191 +44,52 @@ ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; }
MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; }
} // namespace
-void SessionHandler::handleIn(AMQFrame& f) {
- // Note on channel states: a channel is attached if session != 0
- AMQMethodBody* m = f.getBody()->getMethod();
- try {
- if (ignoring && !(m && m->isA<SessionDetachedBody>())) {
- return;
- }
- if (m && isValid(m) && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) {
- //frame was a valid session control and has been handled
- return;
- } else if (session.get()) {
- //we are attached and frame was not a session control so it is for upper layers
- session->handle(f);
- } else if (m && m->isA<SessionDetachedBody>()) {
- handleDetach();
- connection.closeChannel(channel.get());
- } else {
- throw NotAttachedException(QPID_MSG("Channel " << channel.get() << " is not attached"));
- }
- }catch(const ChannelException& e){
- QPID_LOG(error, "Session detached due to: " << e.what());
- peerSession.detached(name, e.code);
+void SessionHandler::channelException(uint16_t, const std::string&) {
handleDetach();
- connection.closeChannel(channel.get());
- }catch(const ConnectionException& e){
- connection.close(e.code, e.what(), classId(m), methodId(m));
- }catch(const std::exception& e){
- connection.close(501, e.what(), classId(m), methodId(m));
- }
}
-bool SessionHandler::isValid(AMQMethodBody* m) {
- return session.get() || m->isA<SessionAttachBody>() || m->isA<SessionAttachedBody>();
-}
-
-void SessionHandler::handleOut(AMQFrame& f) {
- channel.handle(f); // Send it.
- if (session->sent(f))
- peerSession.flush(false, false, true);
-}
-
-void SessionHandler::assertAttached(const char* method) const {
- if (!session.get()) {
- std::cout << "SessionHandler::assertAttached() failed for " << method << std::endl;
- throw NotAttachedException(
- QPID_MSG(method << " failed: No session for channel "
- << getChannel()));
- }
-}
-
-void SessionHandler::assertClosed(const char* method) const {
- if (session.get())
- throw SessionBusyException(
- QPID_MSG(method << " failed: channel " << channel.get()
- << " is already open."));
+void SessionHandler::connectionException(uint16_t code, const std::string& msg) {
+ connection.close(code, msg, 0, 0);
}
ConnectionState& SessionHandler::getConnection() { return connection; }
-const ConnectionState& SessionHandler::getConnection() const { return connection; }
-
-//new methods:
-void SessionHandler::attach(const std::string& _name, bool /*force*/)
-{
- name = _name;//TODO: this should be used in conjunction with
- //userid for connection as sessions identity
- //TODO: need to revise session manager to support resume as well
- assertClosed("attach");
- session.reset(new SessionState(0, this, 0, 0, name));
- peerSession.attached(name);
- peerSession.commandPoint(session->nextOut, 0);
-}
-
-void SessionHandler::attached(const std::string& _name)
-{
- name = _name;//TODO: this should be used in conjunction with
- //userid for connection as sessions identity
- session.reset(new SessionState(0, this, 0, 0, name));
- peerSession.commandPoint(session->nextOut, 0);
-}
+const ConnectionState& SessionHandler::getConnection() const { return connection; }
-void SessionHandler::detach(const std::string& name)
-{
- assertAttached("detach");
- peerSession.detached(name, session::NORMAL);
- handleDetach();
+void SessionHandler::handleDetach() {
+ amqp_0_10::SessionHandler::handleDetach();
assert(&connection.getChannel(channel.get()) == this);
+ if (session.get())
+ connection.getBroker().getSessionManager().detach(session);
+ assert(!session.get());
connection.closeChannel(channel.get());
}
-void SessionHandler::detached(const std::string& name, uint8_t code)
-{
- ignoring = false;
- handleDetach();
- if (code) {
- //no error
- } else {
- //error occured
- QPID_LOG(warning, "Received session.closed: "<< name << " " << code);
- }
- connection.closeChannel(channel.get());
-}
-
-void SessionHandler::handleDetach()
-{
- if (session.get()) {
- session->detach();
- session.reset();
- }
-}
-
-void SessionHandler::requestDetach()
-{
- //TODO: request timeout when python can handle it
- //peerSession.requestTimeout(0);
- ignoring = true;
- peerSession.detach(name);
-}
-
-void SessionHandler::requestTimeout(uint32_t t)
-{
- session->setTimeout(t);
- peerSession.timeout(t);
-}
-
-void SessionHandler::timeout(uint32_t t)
-{
- session->setTimeout(t);
-}
-
-void SessionHandler::commandPoint(const framing::SequenceNumber& id, uint64_t offset)
-{
- if (offset) throw NotImplementedException("Non-zero byte offset not yet supported for command-point");
-
- session->nextIn = id;
-}
-
-void SessionHandler::expected(const framing::SequenceSet& commands, const framing::Array& fragments)
-{
- if (!commands.empty() || fragments.size()) {
- throw NotImplementedException("Session resumption not yet supported");
- }
+void SessionHandler::setState(const std::string& name, bool force) {
+ assert(!session.get());
+ SessionId id(connection.getUserId(), name);
+ session = connection.broker.getSessionManager().attach(*this, id, force);
}
-void SessionHandler::confirmed(const framing::SequenceSet& /*commands*/, const framing::Array& /*fragments*/)
-{
- //don't really care too much about this yet
-}
+FrameHandler* SessionHandler::getInHandler() { return session.get(); }
+qpid::SessionState* SessionHandler::getState() { return session.get(); }
-void SessionHandler::completed(const framing::SequenceSet& commands, bool timelyReply)
-{
- session->complete(commands);
- if (timelyReply) {
- peerSession.knownCompleted(session->knownCompleted);
- session->knownCompleted.clear();
- }
+void SessionHandler::readyToSend() {
+ if (session.get()) session->readyToSend();
}
-void SessionHandler::knownCompleted(const framing::SequenceSet& commands)
-{
- session->completed.remove(commands);
-}
-
-void SessionHandler::flush(bool expected, bool confirmed, bool completed)
-{
- if (expected) {
- peerSession.expected(SequenceSet(session->nextIn), Array());
- }
- if (confirmed) {
- peerSession.confirmed(session->completed, Array());
- }
- if (completed) {
- peerSession.completed(session->completed, true);
- }
-}
-
-
-void SessionHandler::sendCompletion()
-{
- peerSession.completed(session->completed, true);
+// TODO aconway 2008-05-12: hacky - handle attached for bridge clients.
+// We need to integrate the client code so we can run a real client
+// in the bridge.
+//
+void SessionHandler::attached(const std::string& name) {
+ if (session.get())
+ checkName(name);
+ else {
+ SessionId id(connection.getUserId(), name);
+ SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig();
+ session.reset(new SessionState(connection.getBroker(), *this, id, config));
}
-
-void SessionHandler::gap(const framing::SequenceSet& /*commands*/)
-{
- throw NotImplementedException("gap not yet supported");
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h
index 47c534441a..1aa3137fdf 100644
--- a/cpp/src/qpid/broker/SessionHandler.h
+++ b/cpp/src/qpid/broker/SessionHandler.h
@@ -22,19 +22,12 @@
*
*/
-#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/AMQP_ClientOperations.h"
-#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/amqp_0_10/SessionHandler.h"
#include "qpid/framing/AMQP_ClientProxy.h"
-#include "qpid/framing/amqp_types.h"
-#include "qpid/framing/Array.h"
-#include "qpid/framing/ChannelHandler.h"
-#include "qpid/framing/SequenceNumber.h"
-
-
-#include <boost/noncopyable.hpp>
namespace qpid {
+class SessionState;
+
namespace broker {
class Connection;
@@ -46,65 +39,38 @@ class SessionState;
* receives incoming frames, handles session controls and manages the
* association between the channel and a session.
*/
-class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
- public framing::FrameHandler::InOutHandler,
- private boost::noncopyable
-{
+class SessionHandler : public amqp_0_10::SessionHandler {
public:
SessionHandler(Connection&, framing::ChannelId);
~SessionHandler();
- /** Returns 0 if not attached to a session */
+ /** Get broker::SessionState */
SessionState* getSession() { return session.get(); }
const SessionState* getSession() const { return session.get(); }
- framing::ChannelId getChannel() const { return channel.get(); }
-
ConnectionState& getConnection();
const ConnectionState& getConnection() const;
framing::AMQP_ClientProxy& getProxy() { return proxy; }
const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
- void requestDetach();
- void handleDetach();
- void sendCompletion();
-
- protected:
- void handleIn(framing::AMQFrame&);
- void handleOut(framing::AMQFrame&);
+ virtual void handleDetach();
- private:
- //new methods:
- void attach(const std::string& name, bool force);
+ // Overrides
void attached(const std::string& name);
- void detach(const std::string& name);
- void detached(const std::string& name, uint8_t code);
-
- void requestTimeout(uint32_t t);
- void timeout(uint32_t t);
-
- void commandPoint(const framing::SequenceNumber& id, uint64_t offset);
- void expected(const framing::SequenceSet& commands, const framing::Array& fragments);
- void confirmed(const framing::SequenceSet& commands,const framing::Array& fragments);
- void completed(const framing::SequenceSet& commands, bool timelyReply);
- void knownCompleted(const framing::SequenceSet& commands);
- void flush(bool expected, bool confirmed, bool completed);
- void gap(const framing::SequenceSet& commands);
- void assertAttached(const char* method) const;
- void assertActive(const char* method) const;
- void assertClosed(const char* method) const;
-
- bool isValid(framing::AMQMethodBody*);
+ protected:
+ virtual void setState(const std::string& sessionName, bool force);
+ virtual qpid::SessionState* getState();
+ virtual framing::FrameHandler* getInHandler();
+ virtual void channelException(uint16_t code, const std::string& msg);
+ virtual void connectionException(uint16_t code, const std::string& msg);
+ virtual void readyToSend();
+ private:
Connection& connection;
- framing::ChannelHandler channel;
framing::AMQP_ClientProxy proxy;
- framing::AMQP_ClientProxy::Session peerSession;
- bool ignoring;
std::auto_ptr<SessionState> session;
- std::string name;//TODO: this should be part of the session state and replace the id
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionManager.cpp b/cpp/src/qpid/broker/SessionManager.cpp
index d7bae737fc..789e43b902 100644
--- a/cpp/src/qpid/broker/SessionManager.cpp
+++ b/cpp/src/qpid/broker/SessionManager.cpp
@@ -40,68 +40,58 @@ using boost::intrusive_ptr;
using namespace sys;
using namespace framing;
-SessionManager::SessionManager(uint32_t a) : ack(a) {}
+SessionManager::SessionManager(const SessionState::Configuration& c, Broker& b)
+ : config(c), broker(b) {}
-SessionManager::~SessionManager() {}
+SessionManager::~SessionManager() {
+ detached.clear(); // Must clear before destructor as session dtor will call forget()
+}
-// FIXME aconway 2008-02-01: pass handler*, allow open unattached.
-std::auto_ptr<SessionState> SessionManager::open(
- SessionHandler& h, uint32_t timeout_, std::string _name)
-{
+std::auto_ptr<SessionState> SessionManager::attach(SessionHandler& h, const SessionId& id, bool/*force*/) {
Mutex::ScopedLock l(lock);
- std::auto_ptr<SessionState> session(
- new SessionState(this, &h, timeout_, ack, _name));
- active.insert(session->getId());
+ eraseExpired(); // Clean up expired table
+ std::pair<Attached::iterator, bool> insert = attached.insert(id);
+ if (!insert.second)
+ throw SessionBusyException(QPID_MSG("Session already attached: " << id));
+ Detached::iterator i = std::find(detached.begin(), detached.end(), id);
+ std::auto_ptr<SessionState> state;
+ if (i == detached.end()) {
+ state.reset(new SessionState(broker, h, id, config));
for_each(observers.begin(), observers.end(),
- boost::bind(&Observer::opened, _1,boost::ref(*session)));
- return session;
+ boost::bind(&Observer::opened, _1,boost::ref(*state)));
+ }
+ else {
+ state.reset(detached.release(i).release());
+ state->attach(h);
+ }
+ return state;
+ // FIXME aconway 2008-04-29: implement force
}
-void SessionManager::suspend(std::auto_ptr<SessionState> session) {
+void SessionManager::detach(std::auto_ptr<SessionState> session) {
Mutex::ScopedLock l(lock);
- active.erase(session->getId());
- session->suspend();
+ attached.erase(session->getId());
+ session->detach();
+ if (session->getTimeout() > 0) {
session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC);
if (session->mgmtObject.get() != 0)
session->mgmtObject->set_expireTime ((uint64_t) Duration (session->expiry));
- suspended.push_back(session.release()); // In expiry order
+ detached.push_back(session.release()); // In expiry order
eraseExpired();
}
-
-std::auto_ptr<SessionState> SessionManager::resume(const Uuid& id)
-{
- Mutex::ScopedLock l(lock);
- eraseExpired();
- if (active.find(id) != active.end())
- throw SessionBusyException(
- QPID_MSG("Session already active: " << id));
- Suspended::iterator i = std::find_if(
- suspended.begin(), suspended.end(),
- boost::bind(std::equal_to<Uuid>(), id, boost::bind(&SessionState::getId, _1))
- );
- if (i == suspended.end())
- throw InvalidArgumentException(
- QPID_MSG("No suspended session with id=" << id));
- active.insert(id);
- std::auto_ptr<SessionState> state(suspended.release(i).release());
- return state;
}
-void SessionManager::erase(const framing::Uuid& id)
-{
- Mutex::ScopedLock l(lock);
- active.erase(id);
-}
+void SessionManager::forget(const SessionId& id) { attached.erase(id); }
void SessionManager::eraseExpired() {
// Called with lock held.
- if (!suspended.empty()) {
- Suspended::iterator keep = std::lower_bound(
- suspended.begin(), suspended.end(), now(),
+ if (!detached.empty()) {
+ Detached::iterator keep = std::lower_bound(
+ detached.begin(), detached.end(), now(),
boost::bind(std::less<AbsTime>(), boost::bind(&SessionState::expiry, _1), _2));
- if (suspended.begin() != keep) {
- QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep));
- suspended.erase(suspended.begin(), keep);
+ if (detached.begin() != keep) {
+ QPID_LOG(debug, "Expiring sessions: " << log::formatList(detached.begin(), keep));
+ detached.erase(detached.begin(), keep);
}
}
}
diff --git a/cpp/src/qpid/broker/SessionManager.h b/cpp/src/qpid/broker/SessionManager.h
index ad064c69bb..9a4142f613 100644
--- a/cpp/src/qpid/broker/SessionManager.h
+++ b/cpp/src/qpid/broker/SessionManager.h
@@ -22,7 +22,7 @@
*
*/
-#include <qpid/framing/Uuid.h>
+#include <qpid/SessionState.h>
#include <qpid/sys/Time.h>
#include <qpid/sys/Mutex.h>
#include <qpid/RefCounted.h>
@@ -37,7 +37,7 @@
namespace qpid {
namespace broker {
-
+class Broker;
class SessionState;
class SessionHandler;
@@ -50,44 +50,43 @@ class SessionManager : private boost::noncopyable {
* Observer notified of SessionManager events.
*/
struct Observer : public RefCounted {
+ /** Called when a stateless session is attached. */
virtual void opened(SessionState&) {}
};
- SessionManager(uint32_t ack);
+ SessionManager(const qpid::SessionState::Configuration&, Broker&);
~SessionManager();
/** Open a new active session, caller takes ownership */
- std::auto_ptr<SessionState> open(SessionHandler& c, uint32_t timeout_, std::string name);
+ std::auto_ptr<SessionState> attach(SessionHandler& h, const SessionId& id, bool/*force*/);
- /** Suspend a session, start it's timeout counter.
- * The factory takes ownership.
- */
- void suspend(std::auto_ptr<SessionState> session);
+ /** Return a detached session to the manager, start the timeout counter. */
+ void detach(std::auto_ptr<SessionState>);
- /** Resume a suspended session.
- *@throw Exception if timed out or non-existant.
- */
- std::auto_ptr<SessionState> resume(const framing::Uuid&);
+ /** Forget about an attached session. Called by SessionState destructor. */
+ void forget(const SessionId&);
/** Add an Observer. */
void add(const boost::intrusive_ptr<Observer>&);
+ Broker& getBroker() const { return broker; }
+
+ const qpid::SessionState::Configuration& getSessionConfig() const { return config; }
+
private:
- typedef boost::ptr_vector<SessionState> Suspended;
- typedef std::set<framing::Uuid> Active;
+ typedef boost::ptr_vector<SessionState> Detached; // Sorted in expiry order.
+ typedef std::set<SessionId> Attached;
typedef std::vector<boost::intrusive_ptr<Observer> > Observers;
- void erase(const framing::Uuid&);
void eraseExpired();
sys::Mutex lock;
- Suspended suspended;
- Active active;
- uint32_t ack;
+ Detached detached;
+ Attached attached;
+ qpid::SessionState::Configuration config;
Observers observers;
-
- friend class SessionState; // removes deleted sessions from active set.
+ Broker& broker;
};
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 2ef1ed2de4..c851162046 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -32,6 +32,7 @@
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
+#include <boost/lexical_cast.hpp>
namespace qpid {
namespace broker {
@@ -45,59 +46,46 @@ using qpid::management::Manageable;
using qpid::management::Args;
SessionState::SessionState(
- SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack, string& _name)
- : framing::SessionState(ack, timeout_ > 0), nextOut(0),
- factory(f), handler(h), id(true), timeout(timeout_),
- broker(h->getConnection().broker),
- version(h->getConnection().getVersion()),
- ignoring(false), name(_name),
+ Broker& b, SessionHandler& h, const SessionId& id, const SessionState::Configuration& config)
+ : qpid::SessionState(id, config),
+ broker(b), handler(&h),
+ ignoring(false),
semanticState(*this, *this),
adapter(semanticState),
msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
- ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2)),
enqueuedOp(boost::bind(&SessionState::enqueued, this, _1))
{
- getConnection().outputTasks.addOutputTask(&semanticState);
-
Manageable* parent = broker.GetVhostObject ();
-
- if (parent != 0)
- {
+ if (parent != 0) {
ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
-
- if (agent.get () != 0)
- {
+ if (agent.get () != 0) {
mgmtObject = management::Session::shared_ptr
- (new management::Session (this, parent, name));
- mgmtObject->set_attached (1);
- mgmtObject->set_clientRef (h->getConnection().GetManagementObject()->getObjectId());
- mgmtObject->set_channelId (h->getChannel());
- mgmtObject->set_detachedLifespan (getTimeout());
+ (new management::Session (this, parent, getId().getName()));
+ mgmtObject->set_attached (0);
agent->addObject (mgmtObject);
}
}
+ attach(h);
}
SessionState::~SessionState() {
// Remove ID from active session list.
- if (factory)
- factory->erase(getId());
+ // FIXME aconway 2008-05-12: Need to distinguish outgoing sessions established by bridge,
+ // they don't belong in the manager. For now rely on uniqueness of UUIDs.
+ //
+ broker.getSessionManager().forget(getId());
if (mgmtObject.get () != 0)
mgmtObject->resourceDestroy ();
}
-SessionHandler* SessionState::getHandler() {
- return handler;
-}
-
AMQP_ClientProxy& SessionState::getProxy() {
assert(isAttached());
- return getHandler()->getProxy();
+ return handler->getProxy();
}
ConnectionState& SessionState::getConnection() {
assert(isAttached());
- return getHandler()->getConnection();
+ return handler->getConnection();
}
bool SessionState::isLocal(const ConnectionToken* t) const
@@ -106,18 +94,19 @@ bool SessionState::isLocal(const ConnectionToken* t) const
}
void SessionState::detach() {
- getConnection().outputTasks.removeOutputTask(&semanticState);
+ // activateOutput can be called in a different thread, lock to protect attached status
Mutex::ScopedLock l(lock);
+ QPID_LOG(debug, getId() << ": detached on broker.");
+ getConnection().outputTasks.removeOutputTask(&semanticState);
handler = 0;
if (mgmtObject.get() != 0)
- {
mgmtObject->set_attached (0);
}
-}
void SessionState::attach(SessionHandler& h) {
- {
+ // activateOutput can be called in a different thread, lock to protect attached status
Mutex::ScopedLock l(lock);
+ QPID_LOG(debug, getId() << ": attached on broker.");
handler = &h;
if (mgmtObject.get() != 0)
{
@@ -126,16 +115,13 @@ void SessionState::attach(SessionHandler& h) {
mgmtObject->set_channelId (h.getChannel());
}
}
- h.getConnection().outputTasks.addOutputTask(&semanticState);
-}
-void SessionState::activateOutput()
-{
+void SessionState::activateOutput() {
+ // activateOutput can be called in a different thread, lock to protect attached status
Mutex::ScopedLock l(lock);
- if (isAttached()) {
+ if (isAttached())
getConnection().outputTasks.activateOutput();
}
-}
//This class could be used as the callback for queue notifications
//if not attached, it can simply ignore the callback, else pass it
//on to the connection
@@ -155,7 +141,7 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
case management::Session::METHOD_DETACH :
if (handler != 0)
{
- handler->requestDetach();
+ handler->sendDetach();
}
status = Manageable::STATUS_OK;
break;
@@ -179,35 +165,25 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
return status;
}
-void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber& id)
-{
- id = nextIn++;
+void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) {
Invoker::Result invocation = invoke(adapter, *method);
- completed.add(id);
-
+ receiverCompleted(id);
if (!invocation.wasHandled()) {
throw NotImplementedException(QPID_MSG("Not implemented: " << *method));
} else if (invocation.hasResult()) {
- nextOut++;//execution result is now a command, so the counter must be incremented
getProxy().getExecution().result(id, invocation.getResult());
}
if (method->isSync()) {
incomplete.process(enqueuedOp, true);
sendCompletion();
}
- //TODO: if window gets too large send unsolicited completion
}
-void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id)
+void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
{
- intrusive_ptr<Message> msg(msgBuilder.getMessage());
- if (frame.getBof() && frame.getBos()) {//start of frameset
- id = nextIn++;
+ if (frame.getBof() && frame.getBos()) //start of frameset
msgBuilder.start(id);
- msg = msgBuilder.getMessage();
- } else {
- id = msg->getCommandId();
- }
+ intrusive_ptr<Message> msg(msgBuilder.getMessage());
msgBuilder.handle(frame);
if (frame.getEof() && frame.getEos()) {//end of frameset
if (frame.getBof()) {
@@ -240,19 +216,14 @@ void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id)
void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
{
- completed.add(msg->getCommandId());
- if (msg->requiresAccept()) {
- nextOut++;//accept is a command, so the counter must be incremented
+ receiverCompleted(msg->getCommandId());
+ if (msg->requiresAccept())
getProxy().getMessage().accept(SequenceSet(msg->getCommandId()));
}
-}
void SessionState::handle(AMQFrame& frame)
{
- if (ignoring) return;
- received(frame);
-
- SequenceNumber commandId;
+ SequenceNumber commandId = receiverGetCurrent();
try {
//TODO: make command handling more uniform, regardless of whether
//commands carry content.
@@ -277,29 +248,36 @@ void SessionState::handle(AMQFrame& frame)
} else {
getProxy().getExecution().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable());
}
- timeout = 0;
ignoring = true;
- handler->requestDetach();
+ handler->sendDetach();
}
}
DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
{
uint32_t maxFrameSize = getConnection().getFrameMax();
- MessageDelivery::deliver(msg, getProxy().getHandler(), nextOut, token, maxFrameSize);
- return nextOut++;
+ assert(senderGetCommandPoint().offset == 0);
+ SequenceNumber commandId = senderGetCommandPoint().command;
+ MessageDelivery::deliver(msg, getProxy().getHandler(), commandId, token, maxFrameSize);
+ assert(senderGetCommandPoint() == SessionPoint(commandId+1, 0)); // Delivery has moved sendPoint.
+ return commandId;
}
-void SessionState::sendCompletion()
-{
- handler->sendCompletion();
+void SessionState::sendCompletion() { handler->sendCompletion(); }
+
+void SessionState::senderCompleted(const SequenceSet& commands) {
+ qpid::SessionState::senderCompleted(commands);
+ commands.for_each(boost::bind(&SemanticState::completed, &semanticState, _1, _2));
}
-void SessionState::complete(const SequenceSet& commands)
-{
- knownCompleted.add(commands);
- commands.for_each(ackOp);
+void SessionState::readyToSend() {
+ QPID_LOG(debug, getId() << ": ready to send, activating output.");
+ assert(handler);
+ sys::AggregateOutput& tasks = handler->getConnection().outputTasks;
+ tasks.addOutputTask(&semanticState);
+ tasks.activateOutput();
}
+Broker& SessionState::getBroker() { return broker; }
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index ae860e84c9..7b70789161 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -22,11 +22,9 @@
*
*/
-#include "qpid/framing/Uuid.h"
+#include "qpid/SessionState.h"
#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/SessionState.h"
#include "qpid/framing/SequenceSet.h"
-#include "qpid/framing/ProtocolVersion.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Time.h"
#include "qpid/management/Manageable.h"
@@ -63,21 +61,20 @@ class SessionManager;
* Broker-side session state includes sessions handler chains, which may
* themselves have state.
*/
-class SessionState : public framing::SessionState,
+class SessionState : public qpid::SessionState,
public SessionContext,
public DeliveryAdapter,
- public management::Manageable
+ public management::Manageable,
+ public framing::FrameHandler
{
public:
+ SessionState(Broker&, SessionHandler&, const SessionId&, const SessionState::Configuration&);
~SessionState();
bool isAttached() const { return handler; }
void detach();
void attach(SessionHandler& handler);
-
- SessionHandler* getHandler();
-
/** @pre isAttached() */
framing::AMQP_ClientProxy& getProxy();
@@ -85,18 +82,15 @@ class SessionState : public framing::SessionState,
ConnectionState& getConnection();
bool isLocal(const ConnectionToken* t) const;
- uint32_t getTimeout() const { return timeout; }
- void setTimeout(uint32_t t) { timeout = t; }
-
- Broker& getBroker() { return broker; }
- framing::ProtocolVersion getVersion() const { return version; }
+ Broker& getBroker();
/** OutputControl **/
void activateOutput();
void handle(framing::AMQFrame& frame);
- void complete(const framing::SequenceSet& ranges);
+ void senderCompleted(const framing::SequenceSet& ranges);
+
void sendCompletion();
//delivery adapter methods:
@@ -107,29 +101,13 @@ class SessionState : public framing::SessionState,
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args);
- // Normally SessionManager creates sessions.
- SessionState(SessionManager*,
- SessionHandler* out,
- uint32_t timeout,
- uint32_t ackInterval,
- std::string& name);
-
-
- framing::SequenceSet completed;
- framing::SequenceSet knownCompleted;
- framing::SequenceNumber nextIn;
- framing::SequenceNumber nextOut;
+ void readyToSend();
private:
- typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;
- SessionManager* factory;
+ Broker& broker;
SessionHandler* handler;
- framing::Uuid id;
- uint32_t timeout;
sys::AbsTime expiry; // Used by SessionManager.
- Broker& broker;
- framing::ProtocolVersion version;
sys::Mutex lock;
bool ignoring;
std::string name;
@@ -139,12 +117,11 @@ class SessionState : public framing::SessionState,
MessageBuilder msgBuilder;
IncompleteMessageList incomplete;
- RangedOperation ackOp;
IncompleteMessageList::CompletionListener enqueuedOp;
management::Session::shared_ptr mgmtObject;
- void handleCommand(framing::AMQMethodBody* method, framing::SequenceNumber& id);
- void handleContent(framing::AMQFrame& frame, framing::SequenceNumber& id);
+ void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
+ void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id);
void enqueued(boost::intrusive_ptr<Message> msg);
friend class SessionManager;