summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2007-02-13 21:52:30 +0000
committerAndrew Stitcher <astitcher@apache.org>2007-02-13 21:52:30 +0000
commit5d8e8d39e1e5e13d0753c53a8095f075895d01a1 (patch)
tree727d30e03ae1679827779560a32f11c12f32d4a5 /cpp
parent9517deedff9691dbe3429b0b917dfd4208b0b1b8 (diff)
downloadqpid-python-5d8e8d39e1e5e13d0753c53a8095f075895d01a1.tar.gz
r1111@fuschia: andrew | 2007-02-09 15:51:10 +0000
Removed currently unused request tracking logic r1125@fuschia: andrew | 2007-02-13 21:51:30 +0000 Implemented receiveing batched Message.ok in c++ broker Implemented batched response frames in python client code git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@507249 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/lib/broker/AccumulatedAck.cpp12
-rw-r--r--cpp/lib/broker/AccumulatedAck.h2
-rw-r--r--cpp/lib/broker/BrokerAdapter.cpp6
-rw-r--r--cpp/lib/broker/BrokerChannel.cpp29
-rw-r--r--cpp/lib/broker/BrokerChannel.h3
-rw-r--r--cpp/lib/common/framing/ChannelAdapter.cpp3
-rw-r--r--cpp/lib/common/framing/ChannelAdapter.h4
-rw-r--r--cpp/lib/common/framing/Requester.cpp15
-rw-r--r--cpp/lib/common/framing/Requester.h9
9 files changed, 41 insertions, 42 deletions
diff --git a/cpp/lib/broker/AccumulatedAck.cpp b/cpp/lib/broker/AccumulatedAck.cpp
index a9826ba5ea..2f2a7464c1 100644
--- a/cpp/lib/broker/AccumulatedAck.cpp
+++ b/cpp/lib/broker/AccumulatedAck.cpp
@@ -24,12 +24,12 @@ using std::less_equal;
using std::bind2nd;
using namespace qpid::broker;
-void AccumulatedAck::update(u_int64_t tag, bool multiple){
- if(multiple){
- if(tag > range) range = tag;
- //else don't care, it is already counted
- }else if(tag > range){
- individual.push_back(tag);
+void AccumulatedAck::update(u_int64_t firstTag, u_int64_t lastTag){
+ if (firstTag-1 == range) {
+ range = lastTag;
+ } else {
+ for (u_int64_t tag = firstTag; tag<=lastTag; tag++)
+ individual.push_back(tag);
}
}
diff --git a/cpp/lib/broker/AccumulatedAck.h b/cpp/lib/broker/AccumulatedAck.h
index 055c8ea3e0..6ab0cfbe2e 100644
--- a/cpp/lib/broker/AccumulatedAck.h
+++ b/cpp/lib/broker/AccumulatedAck.h
@@ -43,7 +43,7 @@ namespace qpid {
*/
std::list<u_int64_t> individual;
- void update(u_int64_t tag, bool multiple);
+ void update(u_int64_t firstTag, u_int64_t lastTag);
void consolidate();
void clear();
bool covers(u_int64_t tag) const;
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp
index ec80241c66..49f0f24407 100644
--- a/cpp/lib/broker/BrokerAdapter.cpp
+++ b/cpp/lib/broker/BrokerAdapter.cpp
@@ -326,11 +326,7 @@ void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_
}
void BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){
- try{
- channel.ack(deliveryTag, multiple);
- }catch(InvalidAckException& e){
- throw ConnectionException(530, "Received ack for unrecognised delivery tag");
- }
+ channel.ack(deliveryTag, multiple);
}
void BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp
index 74e5504f17..674d0e9505 100644
--- a/cpp/lib/broker/BrokerChannel.cpp
+++ b/cpp/lib/broker/BrokerChannel.cpp
@@ -235,24 +235,37 @@ void Channel::complete(Message::shared_ptr msg) {
// TODO astitcher 2007-02-08 This only deals correctly with non batched responses
void Channel::ack(){
- ack(getRequestInProgress(), false);
+ ack(getFirstAckRequest(), getLastAckRequest());
}
-void Channel::ack(u_int64_t deliveryTag, bool multiple)
-{
+// Used by Basic
+void Channel::ack(u_int64_t deliveryTag, bool multiple){
+ if (multiple)
+ ack(0, deliveryTag);
+ else
+ ack(deliveryTag, deliveryTag);
+}
+
+void Channel::ack(u_int64_t firstTag, u_int64_t lastTag){
if(transactional){
- accumulatedAck.update(deliveryTag, multiple);
+ //FIXME astitcher This only works for Basic style acks
+ accumulatedAck.update(lastTag, lastTag);
+
//TODO: I think the outstanding prefetch size & count should be updated at this point...
//TODO: ...this may then necessitate dispatching to consumers
}else{
Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
- ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), deliveryTag));
+ ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag));
+ ack_iterator j = (firstTag == 0) ?
+ unacked.begin() :
+ find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag));
+
if(i == unacked.end()){
- throw InvalidAckException();
- }else if(multiple){
+ throw ConnectionException(530, "Received ack for unrecognised delivery tag");
+ }else if(i!=j){
ack_iterator end = ++i;
- for_each(unacked.begin(), end, mem_fun_ref(&DeliveryRecord::discard));
+ for_each(j, end, mem_fun_ref(&DeliveryRecord::discard));
unacked.erase(unacked.begin(), end);
//recalculate the prefetch:
diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h
index 538e86b0a8..1a1c4dabba 100644
--- a/cpp/lib/broker/BrokerChannel.h
+++ b/cpp/lib/broker/BrokerChannel.h
@@ -138,6 +138,7 @@ class Channel : public framing::ChannelAdapter,
void rollback();
void ack();
void ack(u_int64_t deliveryTag, bool multiple);
+ void ack(u_int64_t deliveryTag, u_int64_t endTag);
void recover(bool requeue);
void deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag);
void handlePublish(Message* msg);
@@ -153,8 +154,6 @@ class Channel : public framing::ChannelAdapter,
const framing::MethodContext& context);
};
-struct InvalidAckException{};
-
}} // namespace broker
diff --git a/cpp/lib/common/framing/ChannelAdapter.cpp b/cpp/lib/common/framing/ChannelAdapter.cpp
index 40241660f2..8a1ff39ee5 100644
--- a/cpp/lib/common/framing/ChannelAdapter.cpp
+++ b/cpp/lib/common/framing/ChannelAdapter.cpp
@@ -61,7 +61,6 @@ void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) {
assertMethodOk(*request);
AMQRequestBody::Data& requestData = request->getData();
responder.received(requestData);
- requestInProgress = requestData.requestId;
handleMethodInContext(request, MethodContext(this, request));
}
@@ -71,8 +70,6 @@ void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) {
// Review - any cases where this is not the case?
AMQResponseBody::Data& responseData = response->getData();
requester.processed(responseData);
- // For a response this is taken to be the request being responded to (for convenience)
- requestInProgress = responseData.requestId;
handleMethod(response);
}
diff --git a/cpp/lib/common/framing/ChannelAdapter.h b/cpp/lib/common/framing/ChannelAdapter.h
index 9f654d9a5b..36362a417a 100644
--- a/cpp/lib/common/framing/ChannelAdapter.h
+++ b/cpp/lib/common/framing/ChannelAdapter.h
@@ -85,7 +85,8 @@ class ChannelAdapter : public BodyHandler {
boost::shared_ptr<qpid::framing::AMQMethodBody> method,
const MethodContext& context) = 0;
- RequestId getRequestInProgress() { return requestInProgress; }
+ RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); }
+ RequestId getLastAckRequest() { return requester.getLastAckRequest(); }
RequestId getNextSendRequestId() { return requester.getNextId(); }
private:
@@ -94,7 +95,6 @@ class ChannelAdapter : public BodyHandler {
ProtocolVersion version;
Requester requester;
Responder responder;
- RequestId requestInProgress;
};
}}
diff --git a/cpp/lib/common/framing/Requester.cpp b/cpp/lib/common/framing/Requester.cpp
index 37b2d37c86..9ee809e2ee 100644
--- a/cpp/lib/common/framing/Requester.cpp
+++ b/cpp/lib/common/framing/Requester.cpp
@@ -29,23 +29,12 @@ Requester::Requester() : lastId(0), responseMark(0) {}
void Requester::sending(AMQRequestBody::Data& request) {
request.requestId = ++lastId;
request.responseMark = responseMark;
- requests.insert(request.requestId);
}
void Requester::processed(const AMQResponseBody::Data& response) {
responseMark = response.responseId;
- RequestId id = response.requestId;
- RequestId end = id + response.batchOffset + 1;
- for ( ; id < end; ++id) {
- std::set<RequestId>::iterator i = requests.find(id);
- if (i != requests.end())
- requests.erase(i);
- else {
- THROW_QPID_ERROR(
- PROTOCOL_ERROR,
- boost::format("Response with non-existent request id=%d")%id);
- }
- }
+ firstAckRequest = response.requestId;
+ lastAckRequest = firstAckRequest + response.batchOffset;
}
}} // namespace qpid::framing
diff --git a/cpp/lib/common/framing/Requester.h b/cpp/lib/common/framing/Requester.h
index dae5b1eaee..dcc4460041 100644
--- a/cpp/lib/common/framing/Requester.h
+++ b/cpp/lib/common/framing/Requester.h
@@ -46,13 +46,18 @@ class Requester
/** Called after processing a response. */
void processed(const AMQResponseBody::Data&);
- /** Get the next id to be used. */
+ /** Get the next request id to be used. */
RequestId getNextId() { return lastId + 1; }
+ /** Get the first request acked by this response */
+ RequestId getFirstAckRequest() { return firstAckRequest; }
+ /** Get the last request acked by this response */
+ RequestId getLastAckRequest() { return lastAckRequest; }
private:
- std::set<RequestId> requests; /** Sent but not responded to */
RequestId lastId;
ResponseId responseMark;
+ ResponseId firstAckRequest;
+ ResponseId lastAckRequest;
};
}} // namespace qpid::framing