diff options
author | Alan Conway <aconway@apache.org> | 2008-05-09 18:46:17 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-05-09 18:46:17 +0000 |
commit | b0f3edfa2a68ccf34aa7a555d0d37c2e17b8421b (patch) | |
tree | a94c5a579769809f4d398f4c65ad579a432b93fd /cpp/src/qpid/SessionState.cpp | |
parent | 2ebccc4f3ab6e7813ac2179c8318163ffdd22cff (diff) | |
download | qpid-python-b0f3edfa2a68ccf34aa7a555d0d37c2e17b8421b.tar.gz |
Support for 0-10 sessions, not yet integrated. Misc minor fixes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@654913 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/SessionState.cpp')
-rw-r--r-- | cpp/src/qpid/SessionState.cpp | 131 |
1 files changed, 77 insertions, 54 deletions
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp index 64fdd17b8f..8905fb5f9d 100644 --- a/cpp/src/qpid/SessionState.cpp +++ b/cpp/src/qpid/SessionState.cpp @@ -19,21 +19,24 @@ * */ -// FIXME aconway 2008-04-24: Reminders for handler implementation. -// -// - execution.sync results must be communicated to SessionState::peerConfirmed. -// -// - #include "SessionState.h" #include "qpid/amqp_0_10/exceptions.h" #include "qpid/framing/AMQMethodBody.h" +#include "qpid/log/Statement.h" #include <boost/bind.hpp> #include <numeric> namespace qpid { using framing::AMQFrame; using amqp_0_10::NotImplementedException; +using amqp_0_10::InvalidArgumentException; +using amqp_0_10::IllegalStateException; + +namespace { +bool isControl(const AMQFrame& f) { + return f.getMethod() && f.getMethod()->type() == 0; +} +} // namespace /** A point in the session - command id + offset */ void SessionPoint::advance(const AMQFrame& f) { @@ -60,103 +63,123 @@ bool SessionPoint::operator==(const SessionPoint& x) const { return command == x.command && offset == x.offset; } -SendState::SendState(size_t syncSize, size_t killSize) - : replaySyncSize(syncSize), replayKillSize(killSize), unflushedSize() {} +SessionState::SendState::SendState(SessionState& s) : session(&s), unflushedSize(0) {} + +const SessionPoint& SessionState::SendState::getCommandPoint() { + return sendPoint; +} + +bool SessionState::SendState::expected(const SessionPoint& point) { + if (point < replayPoint || sendPoint < point) + throw InvalidArgumentException(QPID_MSG(session->getId() << ": expected command-point out of range.")); + // FIXME aconway 2008-05-06: this is not strictly correct, we should keep + // an intermediate replay pointer into the replay list. + confirmed(point); // Drop commands prior to expected from replay. + return (!replayList.empty()); +} -void SendState::send(const AMQFrame& f) { - if (f.getMethod() && f.getMethod()->type() == 0) - return; // Don't replay control frames. +void SessionState::SendState::record(const AMQFrame& f) { + if (isControl(f)) return; // Ignore control frames. + session->stateful = true; replayList.push_back(f); unflushedSize += f.size(); + incomplete += sendPoint.command; sendPoint.advance(f); } -bool SendState::needFlush() const { return unflushedSize >= replaySyncSize; } +bool SessionState::SendState::needFlush() const { return unflushedSize >= session->config.replaySyncSize; } -void SendState::sendFlush() { +void SessionState::SendState::recordFlush() { assert(flushPoint <= sendPoint); flushPoint = sendPoint; unflushedSize = 0; } -void SendState::peerConfirmed(const SessionPoint& confirmed) { +void SessionState::SendState::confirmed(const SessionPoint& confirmed) { + if (confirmed > sendPoint) + throw InvalidArgumentException(QPID_MSG(session->getId() << "Confirmed commands not yet sent.")); ReplayList::iterator i = replayList.begin(); - // Ignore peerConfirmed.offset, we don't support partial replay. while (i != replayList.end() && replayPoint.command < confirmed.command) { - assert(replayPoint <= flushPoint); replayPoint.advance(*i); assert(replayPoint <= sendPoint); - if (replayPoint > flushPoint) { - flushPoint.advance(*i); - assert(replayPoint <= flushPoint); + if (replayPoint > flushPoint) unflushedSize -= i->size(); - } ++i; } + if (replayPoint > flushPoint) flushPoint = replayPoint; replayList.erase(replayList.begin(), i); assert(replayPoint.offset == 0); } -void SendState::peerCompleted(const SequenceSet& commands) { +void SessionState::SendState::completed(const SequenceSet& commands) { if (commands.empty()) return; - sentCompleted += commands; + incomplete -= commands; // Completion implies confirmation but we don't handle out-of-order // confirmation, so confirm only the first contiguous range of commands. - peerConfirmed(SessionPoint(commands.rangesBegin()->end())); + confirmed(SessionPoint(commands.rangesBegin()->end())); } -bool ReceiveState::hasState() { return stateful; } +SessionState::ReceiveState::ReceiveState(SessionState& s) : session(&s) {} -void ReceiveState::setExpecting(const SessionPoint& point) { - if (!hasState()) // initializing a new session. - expecting = received = point; - else { // setting point in an existing session. - if (point > received) - throw NotImplementedException("command-point out of bounds."); - expecting = point; - } +void SessionState::ReceiveState::setCommandPoint(const SessionPoint& point) { + if (session->hasState() && point > received) + throw InvalidArgumentException(QPID_MSG(session->getId() << ": Command-point out of range.")); + expected = point; + if (expected > received) + received = expected; } -ReceiveState::ReceiveState() : stateful() {} - -bool ReceiveState::receive(const AMQFrame& f) { - stateful = true; - expecting.advance(f); - if (expecting > received) { - received = expecting; +bool SessionState::ReceiveState::record(const AMQFrame& f) { + if (isControl(f)) return true; // Ignore control frames. + session->stateful = true; + expected.advance(f); + if (expected > received) { + received = expected; return true; } + else { + QPID_LOG(debug, "Ignoring duplicate: " << f); return false; } - -void ReceiveState::localCompleted(SequenceNumber command) { - assert(command < received.command); // Can't complete what we haven't received. - receivedCompleted += command; } -void ReceiveState::peerKnownComplete(const SequenceSet& commands) { - receivedCompleted -= commands; +void SessionState::ReceiveState::completed(SequenceNumber command, bool cumulative) { + assert(command <= received.command); // Internal error to complete an unreceived command. + assert(firstIncomplete <= command); + if (cumulative) + unknownCompleted.add(firstIncomplete, command); + else + unknownCompleted += command; + firstIncomplete = unknownCompleted.rangeContaining(firstIncomplete).end(); } -SessionId::SessionId(const std::string& u, const std::string& n) : userId(u), name(n) {} - -bool SessionId::operator<(const SessionId& id) const { - return userId < id.userId || (userId == id.userId && name < id.name); +void SessionState::ReceiveState::knownCompleted(const SequenceSet& commands) { + if (!commands.empty() && commands.back() > received.command) + throw InvalidArgumentException(QPID_MSG(session->getId() << ": Known-completed has invalid commands.")); + unknownCompleted -= commands; } -bool SessionId::operator==(const SessionId& id) const { - return id.name == name && id.userId == userId; +SequenceNumber SessionState::ReceiveState::getCurrent() const { + SequenceNumber current = expected.command; // FIXME aconway 2008-05-08: SequenceNumber arithmetic. + return --current; } +// FIXME aconway 2008-05-02: implement sync & kill limits. SessionState::Configuration::Configuration() : replaySyncSize(std::numeric_limits<size_t>::max()), replayKillSize(std::numeric_limits<size_t>::max()) {} SessionState::SessionState(const SessionId& i, const Configuration& c) - : SendState(c.replaySyncSize, c.replayKillSize), - id(i), timeout(), config(c) {} + : sender(*this), receiver(*this), id(i), timeout(), config(c), stateful() +{ + QPID_LOG(debug, "SessionState::SessionState " << id << ": " << this); +} + +bool SessionState::hasState() const { + return stateful; +} -void SessionState::clear() { *this = SessionState(id, config); } +SessionState::~SessionState() {} std::ostream& operator<<(std::ostream& o, const SessionPoint& p) { return o << "(" << p.command.getValue() << "+" << p.offset << ")"; |