diff options
| author | Alan Conway <aconway@apache.org> | 2007-02-22 23:23:52 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-02-22 23:23:52 +0000 |
| commit | 067f367d27bef7500410ea27c000d0ca275c748a (patch) | |
| tree | b20d2f526860870c22dbcffa3570ed347f8979ba /cpp/lib/client/ClientChannel.cpp | |
| parent | 20a442ea00c82b7fd9b6b7a560916f69f3155f56 (diff) | |
| download | qpid-python-067f367d27bef7500410ea27c000d0ca275c748a.tar.gz | |
* cpp/lib/client/Basic.*, ClientChannel.*: Extracted Basic functionality
from Channel into separate Basic class.
* cpp/lib/client/*, cpp/test/*: Adjusted for new Channel::getBasic() API.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@510705 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client/ClientChannel.cpp')
| -rw-r--r-- | cpp/lib/client/ClientChannel.cpp | 264 |
1 files changed, 43 insertions, 221 deletions
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp index a8fa219c16..ec9ce71dd7 100644 --- a/cpp/lib/client/ClientChannel.cpp +++ b/cpp/lib/client/ClientChannel.cpp @@ -37,6 +37,7 @@ using namespace qpid::framing; using namespace qpid::sys; Channel::Channel(bool _transactional, u_int16_t _prefetch) : + basic(*this), connection(0), prefetch(_prefetch), transactional(_transactional) @@ -113,19 +114,16 @@ void Channel::protocolInit( bool Channel::isOpen() const { return connection; } +void Channel::setQos() { + basic.setQos(); + // FIXME aconway 2007-02-22: message +} + void Channel::setPrefetch(u_int16_t _prefetch){ prefetch = _prefetch; setQos(); } -void Channel::setQos(){ - sendAndReceive<BasicQosOkBody>( - new BasicQosBody(version, 0, prefetch, false)); - if(transactional){ - sendAndReceive<TxSelectOkBody>(new TxSelectBody(version)); - } -} - void Channel::declareExchange(Exchange& exchange, bool synch){ string name = exchange.getName(); string type = exchange.getType(); @@ -177,114 +175,6 @@ void Channel::bind(const Exchange& exchange, const Queue& queue, const std::stri new QueueBindBody(version, 0, q, e, key,!synch, args)); } -void Channel::consume( - Queue& queue, std::string& tag, MessageListener* listener, - int ackMode, bool noLocal, bool synch, const FieldTable* fields) -{ - sendAndReceiveSync<BasicConsumeOkBody>( - synch, - new BasicConsumeBody( - version, 0, queue.getName(), tag, noLocal, - ackMode == NO_ACK, false, !synch, - fields ? *fields : FieldTable())); - if (synch) { - BasicConsumeOkBody::shared_ptr response = - shared_polymorphic_downcast<BasicConsumeOkBody>( - responses.getResponse()); - tag = response->getConsumerTag(); - } - // FIXME aconway 2007-02-20: Race condition! - // We could receive the first message for the consumer - // before we create the consumer below. - // Move consumer creation to handler for BasicConsumeOkBody - { - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find(tag); - if (i != consumers.end()) - THROW_QPID_ERROR(CLIENT_ERROR, - "Consumer already exists with tag="+tag); - Consumer& c = consumers[tag]; - c.listener = listener; - c.ackMode = ackMode; - c.lastDeliveryTag = 0; - } -} - -void Channel::cancel(const std::string& tag, bool synch) { - Consumer c; - { - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find(tag); - if (i == consumers.end()) - return; - c = i->second; - consumers.erase(i); - } - if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) - send(new BasicAckBody(version, c.lastDeliveryTag, true)); - sendAndReceiveSync<BasicCancelOkBody>( - synch, new BasicCancelBody(version, tag, !synch)); -} - -void Channel::cancelAll(){ - ConsumerMap consumersCopy; - { - Mutex::ScopedLock l(lock); - consumersCopy = consumers; - consumers.clear(); - } - for (ConsumerMap::iterator i=consumersCopy.begin(); - i != consumersCopy.end(); ++i) - { - Consumer& c = i->second; - if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK) - && c.lastDeliveryTag > 0) - { - send(new BasicAckBody(version, c.lastDeliveryTag, true)); - } - } -} - -bool Channel::get(Message& msg, const Queue& queue, int ackMode) { - // Expect a message starting with a BasicGetOk - incoming.startGet(); - send(new BasicGetBody(version, 0, queue.getName(), ackMode)); - return incoming.waitGet(msg); -} - - -void Channel::publish( - const Message& msg, const Exchange& exchange, - const std::string& routingKey, bool mandatory, bool immediate) -{ - // FIXME aconway 2007-01-30: Rework for 0-9 message class. - const string e = exchange.getName(); - string key = routingKey; - - send(new BasicPublishBody(version, 0, e, key, mandatory, immediate)); - //break msg up into header frame and content frame(s) and send these - send(msg.header); - string data = msg.getData(); - u_int64_t data_length = data.length(); - if(data_length > 0){ - u_int32_t frag_size = connection->getMaxFrameSize() - 8;//frame itself uses 8 bytes - if(data_length < frag_size){ - send(new AMQContentBody(data)); - }else{ - u_int32_t offset = 0; - u_int32_t remaining = data_length - offset; - while (remaining > 0) { - u_int32_t length = remaining > frag_size ? frag_size : remaining; - string frag(data.substr(offset, length)); - send(new AMQContentBody(frag)); - - offset += length; - remaining = data_length - offset; - } - } - } -} - void Channel::commit(){ sendAndReceive<TxCommitOkBody>(new TxCommitBody(version)); } @@ -294,46 +184,53 @@ void Channel::rollback(){ } void Channel::handleMethodInContext( - AMQMethodBody::shared_ptr body, const MethodContext&) + AMQMethodBody::shared_ptr method, const MethodContext&) { - //channel.flow, channel.close, basic.deliver, basic.return or a - //response to a synchronous request if(responses.isWaiting()) { - responses.signalResponse(body); + responses.signalResponse(method); return; } - - if(body->isA<BasicDeliverBody>() - || body->isA<BasicReturnBody>() - || body->isA<BasicGetOkBody>() - || body->isA<BasicGetEmptyBody>()) - - { - incoming.add(body); - return; + try { + switch (method->amqpClassId()) { + case BasicDeliverBody::CLASS_ID: basic.handle(method); break; + case ChannelCloseBody::CLASS_ID: handleChannel(method); break; + case ConnectionCloseBody::CLASS_ID: handleConnection(method); break; + default: throw UnknownMethod(); + } } - else if(body->isA<ChannelCloseBody>()) { - peerClose(shared_polymorphic_downcast<ChannelCloseBody>(body)); + catch (const UnknownMethod&) { + connection->close( + 504, "Unknown method", + method->amqpClassId(), method->amqpMethodId()); } - else if(body->isA<ChannelFlowBody>()){ - // TODO aconway 2007-01-24: not implemented yet. +} + +void Channel::handleChannel(AMQMethodBody::shared_ptr method) { + switch (method->amqpMethodId()) { + case ChannelCloseBody::METHOD_ID: + peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method)); + return; + case ChannelFlowBody::METHOD_ID: + // FIXME aconway 2007-02-22: Not yet implemented. + return; } - else if(body->isA<ConnectionCloseBody>()){ + throw UnknownMethod(); +} + +void Channel::handleConnection(AMQMethodBody::shared_ptr method) { + if (method->amqpMethodId() == ConnectionCloseBody::METHOD_ID) { connection->close(); - } - else { - connection->close( - 504, "Unrecognised method", - body->amqpClassId(), body->amqpMethodId()); - } + return; + } + throw UnknownMethod(); } void Channel::handleHeader(AMQHeaderBody::shared_ptr body){ - incoming.add(body); + basic.incoming.add(body); } void Channel::handleContent(AMQContentBody::shared_ptr body){ - incoming.add(body); + basic.incoming.add(body); } void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ @@ -341,82 +238,7 @@ void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ } void Channel::start(){ - dispatcher = Thread(this); -} - -void Channel::deliver(Consumer& consumer, Message& msg){ - //record delivery tag: - consumer.lastDeliveryTag = msg.getDeliveryTag(); - - //allow registered listener to handle the message - consumer.listener->received(msg); - - if(isOpen()){ - bool multiple(false); - switch(consumer.ackMode){ - case LAZY_ACK: - multiple = true; - if(++(consumer.count) < prefetch) break; - //else drop-through - case AUTO_ACK: - send(new BasicAckBody(version, msg.getDeliveryTag(), multiple)); - consumer.lastDeliveryTag = 0; - } - } - - //as it stands, transactionality is entirely orthogonal to ack - //mode, though the acks will not be processed by the broker under - //a transaction until it commits. -} - -void Channel::run() { - while(isOpen()) { - try { - Message msg = incoming.waitDispatch(); - if(msg.getMethod()->isA<BasicReturnBody>()) { - ReturnedMessageHandler* handler=0; - { - Mutex::ScopedLock l(lock); - handler=returnsHandler; - } - if(handler == 0) { - // TODO aconway 2007-02-20: proper logging. - cout << "Message returned: " << msg.getData() << endl; - } - else - handler->returned(msg); - } - else { - BasicDeliverBody::shared_ptr deliverBody = - boost::shared_polymorphic_downcast<BasicDeliverBody>( - msg.getMethod()); - std::string tag = deliverBody->getConsumerTag(); - Consumer consumer; - { - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find(tag); - if(i == consumers.end()) - THROW_QPID_ERROR(PROTOCOL_ERROR+504, - "Unknown consumer tag=" + tag); - consumer = i->second; - } - deliver(consumer, msg); - } - } - catch (const ShutdownException&) { - /* Orderly shutdown */ - } - catch (const Exception& e) { - // FIXME aconway 2007-02-20: Report exception to user. - cout << "client::Channel::run() terminated by: " << e.toString() - << "(" << typeid(e).name() << ")" << endl; - } - } -} - -void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){ - Mutex::ScopedLock l(lock); - returnsHandler = handler; + basicDispatcher = Thread(basic); } // Close called by local application. @@ -452,13 +274,13 @@ void Channel::peerClose(ChannelCloseBody::shared_ptr) { void Channel::closeInternal() { if (isOpen()); { - cancelAll(); - incoming.shutdown(); + basic.cancelAll(); + basic.incoming.shutdown(); connection = 0; // A 0 response means we are closed. responses.signalResponse(AMQMethodBody::shared_ptr()); } - dispatcher.join(); + basicDispatcher.join(); } void Channel::sendAndReceive(AMQMethodBody* toSend, ClassId c, MethodId m) |
