From 80406d0fb680239a0141b81fb0b9f20d20c9b1e1 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Fri, 27 Jul 2007 15:44:52 +0000 Subject: Use execution layer to acknowledge messages. Turn off 0-9 framing of requests and responses. Some refactoring around message delivery. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560285 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/SemanticHandler.cpp | 52 +++++++++++++++++++++++++++++---- 1 file changed, 46 insertions(+), 6 deletions(-) (limited to 'cpp/src/qpid/broker/SemanticHandler.cpp') diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index 2b1de1bbc0..e9ec698400 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -25,10 +25,11 @@ using namespace qpid::broker; using namespace qpid::framing; +using namespace qpid::sys; SemanticHandler::SemanticHandler(ChannelId id, Connection& c) : connection(c), - channel(c, id, &c.broker.getStore()) + channel(c, *this, id, &c.broker.getStore()) { init(id, connection.getOutput(), connection.getVersion()); adapter = std::auto_ptr(new BrokerAdapter(channel, connection, connection.broker, *this)); @@ -75,10 +76,24 @@ void SemanticHandler::handleMethodInContext(boost::shared_ptrgetValue(), (uint64_t) (++i)->getValue()); + } + } } void SemanticHandler::flush() @@ -86,8 +101,8 @@ void SemanticHandler::flush() //flush doubles as a sync to begin with - send an execution.complete incoming.lwm = incoming.hwm; if (isOpen()) { - /*use dummy value for range which is not yet encoded correctly*/ - send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), 0))); + Mutex::ScopedLock l(outLock); + ChannelAdapter::send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet()))); } } @@ -140,3 +155,28 @@ void SemanticHandler::handleHeartbeat(boost::shared_ptrdeliver(*this, copy.getValue(), token, connection.getFrameMax()); + //std::cout << "[" << this << "] delivered: " << outgoing.hwm.getValue() << std::endl; + return outgoing.hwm.getValue(); +} + +void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag) +{ + msg->deliver(*this, tag, token, connection.getFrameMax()); +} + +RequestId SemanticHandler::send(shared_ptr body, Correlator::Action action) +{ + Mutex::ScopedLock l(outLock); + uint8_t type(body->type()); + if (type == REQUEST_BODY || type == RESPONSE_BODY || type == METHOD_BODY) { + ++outgoing.hwm; + //std::cout << "[" << this << "] allocated: " << outgoing.hwm.getValue() << " to " << *body << std::endl; + } + return ChannelAdapter::send(body, action); +} -- cgit v1.2.1