summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/SessionState.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-05-09 18:46:17 +0000
committerAlan Conway <aconway@apache.org>2008-05-09 18:46:17 +0000
commitb0f3edfa2a68ccf34aa7a555d0d37c2e17b8421b (patch)
treea94c5a579769809f4d398f4c65ad579a432b93fd /cpp/src/qpid/SessionState.cpp
parent2ebccc4f3ab6e7813ac2179c8318163ffdd22cff (diff)
downloadqpid-python-b0f3edfa2a68ccf34aa7a555d0d37c2e17b8421b.tar.gz
Support for 0-10 sessions, not yet integrated. Misc minor fixes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@654913 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/SessionState.cpp')
-rw-r--r--cpp/src/qpid/SessionState.cpp131
1 files changed, 77 insertions, 54 deletions
diff --git a/cpp/src/qpid/SessionState.cpp b/cpp/src/qpid/SessionState.cpp
index 64fdd17b8f..8905fb5f9d 100644
--- a/cpp/src/qpid/SessionState.cpp
+++ b/cpp/src/qpid/SessionState.cpp
@@ -19,21 +19,24 @@
*
*/
-// FIXME aconway 2008-04-24: Reminders for handler implementation.
-//
-// - execution.sync results must be communicated to SessionState::peerConfirmed.
-//
-//
-
#include "SessionState.h"
#include "qpid/amqp_0_10/exceptions.h"
#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
#include <numeric>
namespace qpid {
using framing::AMQFrame;
using amqp_0_10::NotImplementedException;
+using amqp_0_10::InvalidArgumentException;
+using amqp_0_10::IllegalStateException;
+
+namespace {
+bool isControl(const AMQFrame& f) {
+ return f.getMethod() && f.getMethod()->type() == 0;
+}
+} // namespace
/** A point in the session - command id + offset */
void SessionPoint::advance(const AMQFrame& f) {
@@ -60,103 +63,123 @@ bool SessionPoint::operator==(const SessionPoint& x) const {
return command == x.command && offset == x.offset;
}
-SendState::SendState(size_t syncSize, size_t killSize)
- : replaySyncSize(syncSize), replayKillSize(killSize), unflushedSize() {}
+SessionState::SendState::SendState(SessionState& s) : session(&s), unflushedSize(0) {}
+
+const SessionPoint& SessionState::SendState::getCommandPoint() {
+ return sendPoint;
+}
+
+bool SessionState::SendState::expected(const SessionPoint& point) {
+ if (point < replayPoint || sendPoint < point)
+ throw InvalidArgumentException(QPID_MSG(session->getId() << ": expected command-point out of range."));
+ // FIXME aconway 2008-05-06: this is not strictly correct, we should keep
+ // an intermediate replay pointer into the replay list.
+ confirmed(point); // Drop commands prior to expected from replay.
+ return (!replayList.empty());
+}
-void SendState::send(const AMQFrame& f) {
- if (f.getMethod() && f.getMethod()->type() == 0)
- return; // Don't replay control frames.
+void SessionState::SendState::record(const AMQFrame& f) {
+ if (isControl(f)) return; // Ignore control frames.
+ session->stateful = true;
replayList.push_back(f);
unflushedSize += f.size();
+ incomplete += sendPoint.command;
sendPoint.advance(f);
}
-bool SendState::needFlush() const { return unflushedSize >= replaySyncSize; }
+bool SessionState::SendState::needFlush() const { return unflushedSize >= session->config.replaySyncSize; }
-void SendState::sendFlush() {
+void SessionState::SendState::recordFlush() {
assert(flushPoint <= sendPoint);
flushPoint = sendPoint;
unflushedSize = 0;
}
-void SendState::peerConfirmed(const SessionPoint& confirmed) {
+void SessionState::SendState::confirmed(const SessionPoint& confirmed) {
+ if (confirmed > sendPoint)
+ throw InvalidArgumentException(QPID_MSG(session->getId() << "Confirmed commands not yet sent."));
ReplayList::iterator i = replayList.begin();
- // Ignore peerConfirmed.offset, we don't support partial replay.
while (i != replayList.end() && replayPoint.command < confirmed.command) {
- assert(replayPoint <= flushPoint);
replayPoint.advance(*i);
assert(replayPoint <= sendPoint);
- if (replayPoint > flushPoint) {
- flushPoint.advance(*i);
- assert(replayPoint <= flushPoint);
+ if (replayPoint > flushPoint)
unflushedSize -= i->size();
- }
++i;
}
+ if (replayPoint > flushPoint) flushPoint = replayPoint;
replayList.erase(replayList.begin(), i);
assert(replayPoint.offset == 0);
}
-void SendState::peerCompleted(const SequenceSet& commands) {
+void SessionState::SendState::completed(const SequenceSet& commands) {
if (commands.empty()) return;
- sentCompleted += commands;
+ incomplete -= commands;
// Completion implies confirmation but we don't handle out-of-order
// confirmation, so confirm only the first contiguous range of commands.
- peerConfirmed(SessionPoint(commands.rangesBegin()->end()));
+ confirmed(SessionPoint(commands.rangesBegin()->end()));
}
-bool ReceiveState::hasState() { return stateful; }
+SessionState::ReceiveState::ReceiveState(SessionState& s) : session(&s) {}
-void ReceiveState::setExpecting(const SessionPoint& point) {
- if (!hasState()) // initializing a new session.
- expecting = received = point;
- else { // setting point in an existing session.
- if (point > received)
- throw NotImplementedException("command-point out of bounds.");
- expecting = point;
- }
+void SessionState::ReceiveState::setCommandPoint(const SessionPoint& point) {
+ if (session->hasState() && point > received)
+ throw InvalidArgumentException(QPID_MSG(session->getId() << ": Command-point out of range."));
+ expected = point;
+ if (expected > received)
+ received = expected;
}
-ReceiveState::ReceiveState() : stateful() {}
-
-bool ReceiveState::receive(const AMQFrame& f) {
- stateful = true;
- expecting.advance(f);
- if (expecting > received) {
- received = expecting;
+bool SessionState::ReceiveState::record(const AMQFrame& f) {
+ if (isControl(f)) return true; // Ignore control frames.
+ session->stateful = true;
+ expected.advance(f);
+ if (expected > received) {
+ received = expected;
return true;
}
+ else {
+ QPID_LOG(debug, "Ignoring duplicate: " << f);
return false;
}
-
-void ReceiveState::localCompleted(SequenceNumber command) {
- assert(command < received.command); // Can't complete what we haven't received.
- receivedCompleted += command;
}
-void ReceiveState::peerKnownComplete(const SequenceSet& commands) {
- receivedCompleted -= commands;
+void SessionState::ReceiveState::completed(SequenceNumber command, bool cumulative) {
+ assert(command <= received.command); // Internal error to complete an unreceived command.
+ assert(firstIncomplete <= command);
+ if (cumulative)
+ unknownCompleted.add(firstIncomplete, command);
+ else
+ unknownCompleted += command;
+ firstIncomplete = unknownCompleted.rangeContaining(firstIncomplete).end();
}
-SessionId::SessionId(const std::string& u, const std::string& n) : userId(u), name(n) {}
-
-bool SessionId::operator<(const SessionId& id) const {
- return userId < id.userId || (userId == id.userId && name < id.name);
+void SessionState::ReceiveState::knownCompleted(const SequenceSet& commands) {
+ if (!commands.empty() && commands.back() > received.command)
+ throw InvalidArgumentException(QPID_MSG(session->getId() << ": Known-completed has invalid commands."));
+ unknownCompleted -= commands;
}
-bool SessionId::operator==(const SessionId& id) const {
- return id.name == name && id.userId == userId;
+SequenceNumber SessionState::ReceiveState::getCurrent() const {
+ SequenceNumber current = expected.command; // FIXME aconway 2008-05-08: SequenceNumber arithmetic.
+ return --current;
}
+// FIXME aconway 2008-05-02: implement sync & kill limits.
SessionState::Configuration::Configuration()
: replaySyncSize(std::numeric_limits<size_t>::max()),
replayKillSize(std::numeric_limits<size_t>::max()) {}
SessionState::SessionState(const SessionId& i, const Configuration& c)
- : SendState(c.replaySyncSize, c.replayKillSize),
- id(i), timeout(), config(c) {}
+ : sender(*this), receiver(*this), id(i), timeout(), config(c), stateful()
+{
+ QPID_LOG(debug, "SessionState::SessionState " << id << ": " << this);
+}
+
+bool SessionState::hasState() const {
+ return stateful;
+}
-void SessionState::clear() { *this = SessionState(id, config); }
+SessionState::~SessionState() {}
std::ostream& operator<<(std::ostream& o, const SessionPoint& p) {
return o << "(" << p.command.getValue() << "+" << p.offset << ")";