summaryrefslogtreecommitdiff
path: root/cpp/lib/client/ClientChannel.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-02-22 23:23:52 +0000
committerAlan Conway <aconway@apache.org>2007-02-22 23:23:52 +0000
commit067f367d27bef7500410ea27c000d0ca275c748a (patch)
treeb20d2f526860870c22dbcffa3570ed347f8979ba /cpp/lib/client/ClientChannel.cpp
parent20a442ea00c82b7fd9b6b7a560916f69f3155f56 (diff)
downloadqpid-python-067f367d27bef7500410ea27c000d0ca275c748a.tar.gz
* cpp/lib/client/Basic.*, ClientChannel.*: Extracted Basic functionality
from Channel into separate Basic class. * cpp/lib/client/*, cpp/test/*: Adjusted for new Channel::getBasic() API. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@510705 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client/ClientChannel.cpp')
-rw-r--r--cpp/lib/client/ClientChannel.cpp264
1 files changed, 43 insertions, 221 deletions
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp
index a8fa219c16..ec9ce71dd7 100644
--- a/cpp/lib/client/ClientChannel.cpp
+++ b/cpp/lib/client/ClientChannel.cpp
@@ -37,6 +37,7 @@ using namespace qpid::framing;
using namespace qpid::sys;
Channel::Channel(bool _transactional, u_int16_t _prefetch) :
+ basic(*this),
connection(0),
prefetch(_prefetch),
transactional(_transactional)
@@ -113,19 +114,16 @@ void Channel::protocolInit(
bool Channel::isOpen() const { return connection; }
+void Channel::setQos() {
+ basic.setQos();
+ // FIXME aconway 2007-02-22: message
+}
+
void Channel::setPrefetch(u_int16_t _prefetch){
prefetch = _prefetch;
setQos();
}
-void Channel::setQos(){
- sendAndReceive<BasicQosOkBody>(
- new BasicQosBody(version, 0, prefetch, false));
- if(transactional){
- sendAndReceive<TxSelectOkBody>(new TxSelectBody(version));
- }
-}
-
void Channel::declareExchange(Exchange& exchange, bool synch){
string name = exchange.getName();
string type = exchange.getType();
@@ -177,114 +175,6 @@ void Channel::bind(const Exchange& exchange, const Queue& queue, const std::stri
new QueueBindBody(version, 0, q, e, key,!synch, args));
}
-void Channel::consume(
- Queue& queue, std::string& tag, MessageListener* listener,
- int ackMode, bool noLocal, bool synch, const FieldTable* fields)
-{
- sendAndReceiveSync<BasicConsumeOkBody>(
- synch,
- new BasicConsumeBody(
- version, 0, queue.getName(), tag, noLocal,
- ackMode == NO_ACK, false, !synch,
- fields ? *fields : FieldTable()));
- if (synch) {
- BasicConsumeOkBody::shared_ptr response =
- shared_polymorphic_downcast<BasicConsumeOkBody>(
- responses.getResponse());
- tag = response->getConsumerTag();
- }
- // FIXME aconway 2007-02-20: Race condition!
- // We could receive the first message for the consumer
- // before we create the consumer below.
- // Move consumer creation to handler for BasicConsumeOkBody
- {
- Mutex::ScopedLock l(lock);
- ConsumerMap::iterator i = consumers.find(tag);
- if (i != consumers.end())
- THROW_QPID_ERROR(CLIENT_ERROR,
- "Consumer already exists with tag="+tag);
- Consumer& c = consumers[tag];
- c.listener = listener;
- c.ackMode = ackMode;
- c.lastDeliveryTag = 0;
- }
-}
-
-void Channel::cancel(const std::string& tag, bool synch) {
- Consumer c;
- {
- Mutex::ScopedLock l(lock);
- ConsumerMap::iterator i = consumers.find(tag);
- if (i == consumers.end())
- return;
- c = i->second;
- consumers.erase(i);
- }
- if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
- send(new BasicAckBody(version, c.lastDeliveryTag, true));
- sendAndReceiveSync<BasicCancelOkBody>(
- synch, new BasicCancelBody(version, tag, !synch));
-}
-
-void Channel::cancelAll(){
- ConsumerMap consumersCopy;
- {
- Mutex::ScopedLock l(lock);
- consumersCopy = consumers;
- consumers.clear();
- }
- for (ConsumerMap::iterator i=consumersCopy.begin();
- i != consumersCopy.end(); ++i)
- {
- Consumer& c = i->second;
- if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
- && c.lastDeliveryTag > 0)
- {
- send(new BasicAckBody(version, c.lastDeliveryTag, true));
- }
- }
-}
-
-bool Channel::get(Message& msg, const Queue& queue, int ackMode) {
- // Expect a message starting with a BasicGetOk
- incoming.startGet();
- send(new BasicGetBody(version, 0, queue.getName(), ackMode));
- return incoming.waitGet(msg);
-}
-
-
-void Channel::publish(
- const Message& msg, const Exchange& exchange,
- const std::string& routingKey, bool mandatory, bool immediate)
-{
- // FIXME aconway 2007-01-30: Rework for 0-9 message class.
- const string e = exchange.getName();
- string key = routingKey;
-
- send(new BasicPublishBody(version, 0, e, key, mandatory, immediate));
- //break msg up into header frame and content frame(s) and send these
- send(msg.header);
- string data = msg.getData();
- u_int64_t data_length = data.length();
- if(data_length > 0){
- u_int32_t frag_size = connection->getMaxFrameSize() - 8;//frame itself uses 8 bytes
- if(data_length < frag_size){
- send(new AMQContentBody(data));
- }else{
- u_int32_t offset = 0;
- u_int32_t remaining = data_length - offset;
- while (remaining > 0) {
- u_int32_t length = remaining > frag_size ? frag_size : remaining;
- string frag(data.substr(offset, length));
- send(new AMQContentBody(frag));
-
- offset += length;
- remaining = data_length - offset;
- }
- }
- }
-}
-
void Channel::commit(){
sendAndReceive<TxCommitOkBody>(new TxCommitBody(version));
}
@@ -294,46 +184,53 @@ void Channel::rollback(){
}
void Channel::handleMethodInContext(
- AMQMethodBody::shared_ptr body, const MethodContext&)
+ AMQMethodBody::shared_ptr method, const MethodContext&)
{
- //channel.flow, channel.close, basic.deliver, basic.return or a
- //response to a synchronous request
if(responses.isWaiting()) {
- responses.signalResponse(body);
+ responses.signalResponse(method);
return;
}
-
- if(body->isA<BasicDeliverBody>()
- || body->isA<BasicReturnBody>()
- || body->isA<BasicGetOkBody>()
- || body->isA<BasicGetEmptyBody>())
-
- {
- incoming.add(body);
- return;
+ try {
+ switch (method->amqpClassId()) {
+ case BasicDeliverBody::CLASS_ID: basic.handle(method); break;
+ case ChannelCloseBody::CLASS_ID: handleChannel(method); break;
+ case ConnectionCloseBody::CLASS_ID: handleConnection(method); break;
+ default: throw UnknownMethod();
+ }
}
- else if(body->isA<ChannelCloseBody>()) {
- peerClose(shared_polymorphic_downcast<ChannelCloseBody>(body));
+ catch (const UnknownMethod&) {
+ connection->close(
+ 504, "Unknown method",
+ method->amqpClassId(), method->amqpMethodId());
}
- else if(body->isA<ChannelFlowBody>()){
- // TODO aconway 2007-01-24: not implemented yet.
+}
+
+void Channel::handleChannel(AMQMethodBody::shared_ptr method) {
+ switch (method->amqpMethodId()) {
+ case ChannelCloseBody::METHOD_ID:
+ peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method));
+ return;
+ case ChannelFlowBody::METHOD_ID:
+ // FIXME aconway 2007-02-22: Not yet implemented.
+ return;
}
- else if(body->isA<ConnectionCloseBody>()){
+ throw UnknownMethod();
+}
+
+void Channel::handleConnection(AMQMethodBody::shared_ptr method) {
+ if (method->amqpMethodId() == ConnectionCloseBody::METHOD_ID) {
connection->close();
- }
- else {
- connection->close(
- 504, "Unrecognised method",
- body->amqpClassId(), body->amqpMethodId());
- }
+ return;
+ }
+ throw UnknownMethod();
}
void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
- incoming.add(body);
+ basic.incoming.add(body);
}
void Channel::handleContent(AMQContentBody::shared_ptr body){
- incoming.add(body);
+ basic.incoming.add(body);
}
void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
@@ -341,82 +238,7 @@ void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
}
void Channel::start(){
- dispatcher = Thread(this);
-}
-
-void Channel::deliver(Consumer& consumer, Message& msg){
- //record delivery tag:
- consumer.lastDeliveryTag = msg.getDeliveryTag();
-
- //allow registered listener to handle the message
- consumer.listener->received(msg);
-
- if(isOpen()){
- bool multiple(false);
- switch(consumer.ackMode){
- case LAZY_ACK:
- multiple = true;
- if(++(consumer.count) < prefetch) break;
- //else drop-through
- case AUTO_ACK:
- send(new BasicAckBody(version, msg.getDeliveryTag(), multiple));
- consumer.lastDeliveryTag = 0;
- }
- }
-
- //as it stands, transactionality is entirely orthogonal to ack
- //mode, though the acks will not be processed by the broker under
- //a transaction until it commits.
-}
-
-void Channel::run() {
- while(isOpen()) {
- try {
- Message msg = incoming.waitDispatch();
- if(msg.getMethod()->isA<BasicReturnBody>()) {
- ReturnedMessageHandler* handler=0;
- {
- Mutex::ScopedLock l(lock);
- handler=returnsHandler;
- }
- if(handler == 0) {
- // TODO aconway 2007-02-20: proper logging.
- cout << "Message returned: " << msg.getData() << endl;
- }
- else
- handler->returned(msg);
- }
- else {
- BasicDeliverBody::shared_ptr deliverBody =
- boost::shared_polymorphic_downcast<BasicDeliverBody>(
- msg.getMethod());
- std::string tag = deliverBody->getConsumerTag();
- Consumer consumer;
- {
- Mutex::ScopedLock l(lock);
- ConsumerMap::iterator i = consumers.find(tag);
- if(i == consumers.end())
- THROW_QPID_ERROR(PROTOCOL_ERROR+504,
- "Unknown consumer tag=" + tag);
- consumer = i->second;
- }
- deliver(consumer, msg);
- }
- }
- catch (const ShutdownException&) {
- /* Orderly shutdown */
- }
- catch (const Exception& e) {
- // FIXME aconway 2007-02-20: Report exception to user.
- cout << "client::Channel::run() terminated by: " << e.toString()
- << "(" << typeid(e).name() << ")" << endl;
- }
- }
-}
-
-void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){
- Mutex::ScopedLock l(lock);
- returnsHandler = handler;
+ basicDispatcher = Thread(basic);
}
// Close called by local application.
@@ -452,13 +274,13 @@ void Channel::peerClose(ChannelCloseBody::shared_ptr) {
void Channel::closeInternal() {
if (isOpen());
{
- cancelAll();
- incoming.shutdown();
+ basic.cancelAll();
+ basic.incoming.shutdown();
connection = 0;
// A 0 response means we are closed.
responses.signalResponse(AMQMethodBody::shared_ptr());
}
- dispatcher.join();
+ basicDispatcher.join();
}
void Channel::sendAndReceive(AMQMethodBody* toSend, ClassId c, MethodId m)