summaryrefslogtreecommitdiff
path: root/cpp/lib/client/ClientChannel.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-12-01 05:11:45 +0000
committerAlan Conway <aconway@apache.org>2006-12-01 05:11:45 +0000
commitfb9ad93a3d422c1e83c998f44c4782f7bf1d1a66 (patch)
treea2ebf932750bf13bf3db271f92df390335b0e844 /cpp/lib/client/ClientChannel.cpp
parent33c04c7e619a65e2d92ac231805e8ad27f4a29c2 (diff)
downloadqpid-python-fb9ad93a3d422c1e83c998f44c4782f7bf1d1a66.tar.gz
2006-12-01 Jim Meyering <meyering@redhat.com>
This delta imposes two major changes on the C++ hierarchy: - adds autoconf, automake, libtool support - makes the hierarchy flatter and renames a few files (e.g., Queue.h, Queue.cpp) that appeared twice, once under client/ and again under broker/. In the process, I've changed many #include directives, mostly to remove a qpid/ or qpid/framing/ prefix from the file name argument. Although most changes were to .cpp and .h files under qpid/cpp/, there were also several to template files under qpid/gentools, and even one to CppGenerator.java. Nearly all files are moved to a new position in the hierarchy. The new hierarchy looks like this: src # this is the new home of qpidd.cpp tests # all tests are here. See Makefile.am. gen # As before, all generated files go here. lib # This is just a container for the 3 lib dirs: lib/client lib/broker lib/common lib/common/framing lib/common/sys lib/common/sys/posix lib/common/sys/apr build-aux m4 git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@481159 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client/ClientChannel.cpp')
-rw-r--r--cpp/lib/client/ClientChannel.cpp428
1 files changed, 428 insertions, 0 deletions
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp
new file mode 100644
index 0000000000..ba21199732
--- /dev/null
+++ b/cpp/lib/client/ClientChannel.cpp
@@ -0,0 +1,428 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <ClientChannel.h>
+#include <sys/Monitor.h>
+#include <ClientMessage.h>
+#include <QpidError.h>
+#include <MethodBodyInstances.h>
+
+using namespace boost; //to use dynamic_pointer_cast
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+Channel::Channel(bool _transactional, u_int16_t _prefetch) :
+ id(0),
+ con(0),
+ out(0),
+ incoming(0),
+ closed(true),
+ prefetch(_prefetch),
+ transactional(_transactional),
+// AMQP version management change - kpvdr 2006-11-20
+// TODO: Make this class version-aware and link these hard-wired numbers to that version
+ version(8, 0)
+{ }
+
+Channel::~Channel(){
+ stop();
+}
+
+void Channel::setPrefetch(u_int16_t _prefetch){
+ prefetch = _prefetch;
+ if(con != 0 && out != 0){
+ setQos();
+ }
+}
+
+void Channel::setQos(){
+// AMQP version management change - kpvdr 2006-11-20
+// TODO: Make this class version-aware and link these hard-wired numbers to that version
+ sendAndReceive(new AMQFrame(id, new BasicQosBody(version, 0, prefetch, false)), method_bodies.basic_qos_ok);
+ if(transactional){
+ sendAndReceive(new AMQFrame(id, new TxSelectBody(version)), method_bodies.tx_select_ok);
+ }
+}
+
+void Channel::declareExchange(Exchange& exchange, bool synch){
+ string name = exchange.getName();
+ string type = exchange.getType();
+ FieldTable args;
+ AMQFrame* frame = new AMQFrame(id, new ExchangeDeclareBody(version, 0, name, type, false, false, false, false, !synch, args));
+ if(synch){
+ sendAndReceive(frame, method_bodies.exchange_declare_ok);
+ }else{
+ out->send(frame);
+ }
+}
+
+void Channel::deleteExchange(Exchange& exchange, bool synch){
+ string name = exchange.getName();
+ AMQFrame* frame = new AMQFrame(id, new ExchangeDeleteBody(version, 0, name, false, !synch));
+ if(synch){
+ sendAndReceive(frame, method_bodies.exchange_delete_ok);
+ }else{
+ out->send(frame);
+ }
+}
+
+void Channel::declareQueue(Queue& queue, bool synch){
+ string name = queue.getName();
+ FieldTable args;
+ AMQFrame* frame = new AMQFrame(id, new QueueDeclareBody(version, 0, name, false, false,
+ queue.isExclusive(),
+ queue.isAutoDelete(), !synch, args));
+ if(synch){
+ sendAndReceive(frame, method_bodies.queue_declare_ok);
+ if(queue.getName().length() == 0){
+ QueueDeclareOkBody::shared_ptr response =
+ dynamic_pointer_cast<QueueDeclareOkBody, AMQMethodBody>(responses.getResponse());
+ queue.setName(response->getQueue());
+ }
+ }else{
+ out->send(frame);
+ }
+}
+
+void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){
+ //ticket, queue, ifunused, ifempty, nowait
+ string name = queue.getName();
+ AMQFrame* frame = new AMQFrame(id, new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch));
+ if(synch){
+ sendAndReceive(frame, method_bodies.queue_delete_ok);
+ }else{
+ out->send(frame);
+ }
+}
+
+void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
+ string e = exchange.getName();
+ string q = queue.getName();
+ AMQFrame* frame = new AMQFrame(id, new QueueBindBody(version, 0, q, e, key,!synch, args));
+ if(synch){
+ sendAndReceive(frame, method_bodies.queue_bind_ok);
+ }else{
+ out->send(frame);
+ }
+}
+
+void Channel::consume(Queue& queue, std::string& tag, MessageListener* listener,
+ int ackMode, bool noLocal, bool synch){
+
+ string q = queue.getName();
+ AMQFrame* frame = new AMQFrame(id, new BasicConsumeBody(version, 0, q, (string&) tag, noLocal, ackMode == NO_ACK, false, !synch));
+ if(synch){
+ sendAndReceive(frame, method_bodies.basic_consume_ok);
+ BasicConsumeOkBody::shared_ptr response = dynamic_pointer_cast<BasicConsumeOkBody, AMQMethodBody>(responses.getResponse());
+ tag = response->getConsumerTag();
+ }else{
+ out->send(frame);
+ }
+ Consumer* c = new Consumer();
+ c->listener = listener;
+ c->ackMode = ackMode;
+ c->lastDeliveryTag = 0;
+ consumers[tag] = c;
+}
+
+void Channel::cancel(std::string& tag, bool synch){
+ Consumer* c = consumers[tag];
+ if(c->ackMode == LAZY_ACK && c->lastDeliveryTag > 0){
+ out->send(new AMQFrame(id, new BasicAckBody(version, c->lastDeliveryTag, true)));
+ }
+
+ AMQFrame* frame = new AMQFrame(id, new BasicCancelBody(version, (string&) tag, !synch));
+ if(synch){
+ sendAndReceive(frame, method_bodies.basic_cancel_ok);
+ }else{
+ out->send(frame);
+ }
+ consumers.erase(tag);
+ if(c != 0){
+ delete c;
+ }
+}
+
+void Channel::cancelAll(){
+ for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin()){
+ Consumer* c = i->second;
+ if((c->ackMode == LAZY_ACK || c->ackMode == AUTO_ACK) && c->lastDeliveryTag > 0){
+ out->send(new AMQFrame(id, new BasicAckBody(c->lastDeliveryTag, true)));
+ }
+ consumers.erase(i);
+ delete c;
+ }
+}
+
+void Channel::retrieve(Message& msg){
+ Monitor::ScopedLock l(retrievalMonitor);
+ while(retrieved == 0){
+ retrievalMonitor.wait();
+ }
+
+ msg.header = retrieved->getHeader();
+ msg.deliveryTag = retrieved->getDeliveryTag();
+ retrieved->getData(msg.data);
+ delete retrieved;
+ retrieved = 0;
+}
+
+bool Channel::get(Message& msg, const Queue& queue, int ackMode){
+ string name = queue.getName();
+ AMQFrame* frame = new AMQFrame(id, new BasicGetBody(version, 0, name, ackMode));
+ responses.expect();
+ out->send(frame);
+ responses.waitForResponse();
+ AMQMethodBody::shared_ptr response = responses.getResponse();
+ if(method_bodies.basic_get_ok.match(response.get())){
+ if(incoming != 0){
+ std::cout << "Existing message not complete" << std::endl;
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
+ }else{
+ incoming = new IncomingMessage(dynamic_pointer_cast<BasicGetOkBody, AMQMethodBody>(response));
+ }
+ retrieve(msg);
+ return true;
+ }if(method_bodies.basic_get_empty.match(response.get())){
+ return false;
+ }else{
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 500, "Unexpected response to basic.get.");
+ }
+}
+
+
+void Channel::publish(Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate){
+ string e = exchange.getName();
+ string key = routingKey;
+
+ out->send(new AMQFrame(id, new BasicPublishBody(version, 0, e, key, mandatory, immediate)));
+ //break msg up into header frame and content frame(s) and send these
+ string data = msg.getData();
+ msg.header->setContentSize(data.length());
+ AMQBody::shared_ptr body(static_pointer_cast<AMQBody, AMQHeaderBody>(msg.header));
+ out->send(new AMQFrame(id, body));
+
+ u_int64_t data_length = data.length();
+ if(data_length > 0){
+ u_int32_t frag_size = con->getMaxFrameSize() - 8;//frame itself uses 8 bytes
+ if(data_length < frag_size){
+ out->send(new AMQFrame(id, new AMQContentBody(data)));
+ }else{
+ u_int32_t offset = 0;
+ u_int32_t remaining = data_length - offset;
+ while (remaining > 0) {
+ u_int32_t length = remaining > frag_size ? frag_size : remaining;
+ string frag(data.substr(offset, length));
+ out->send(new AMQFrame(id, new AMQContentBody(frag)));
+
+ offset += length;
+ remaining = data_length - offset;
+ }
+ }
+ }
+}
+
+void Channel::commit(){
+ AMQFrame* frame = new AMQFrame(id, new TxCommitBody(version));
+ sendAndReceive(frame, method_bodies.tx_commit_ok);
+}
+
+void Channel::rollback(){
+ AMQFrame* frame = new AMQFrame(id, new TxRollbackBody(version));
+ sendAndReceive(frame, method_bodies.tx_rollback_ok);
+}
+
+void Channel::handleMethod(AMQMethodBody::shared_ptr body){
+ //channel.flow, channel.close, basic.deliver, basic.return or a response to a synchronous request
+ if(responses.isWaiting()){
+ responses.signalResponse(body);
+ }else if(method_bodies.basic_deliver.match(body.get())){
+ if(incoming != 0){
+ std::cout << "Existing message not complete [deliveryTag=" << incoming->getDeliveryTag() << "]" << std::endl;
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
+ }else{
+ incoming = new IncomingMessage(dynamic_pointer_cast<BasicDeliverBody, AMQMethodBody>(body));
+ }
+ }else if(method_bodies.basic_return.match(body.get())){
+ if(incoming != 0){
+ std::cout << "Existing message not complete" << std::endl;
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Existing message not complete");
+ }else{
+ incoming = new IncomingMessage(dynamic_pointer_cast<BasicReturnBody, AMQMethodBody>(body));
+ }
+ }else if(method_bodies.channel_close.match(body.get())){
+ con->removeChannel(this);
+ //need to signal application that channel has been closed through exception
+
+ }else if(method_bodies.channel_flow.match(body.get())){
+
+ }else{
+ //signal error
+ std::cout << "Unhandled method: " << *body << std::endl;
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Unhandled method");
+ }
+}
+
+void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
+ if(incoming == 0){
+ //handle invalid frame sequence
+ std::cout << "Invalid message sequence: got header before return or deliver." << std::endl;
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before return or deliver.");
+ }else{
+ incoming->setHeader(body);
+ if(incoming->isComplete()){
+ enqueue();
+ }
+ }
+}
+
+void Channel::handleContent(AMQContentBody::shared_ptr body){
+ if(incoming == 0){
+ //handle invalid frame sequence
+ std::cout << "Invalid message sequence: got content before return or deliver." << std::endl;
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before return or deliver.");
+ }else{
+ incoming->addContent(body);
+ if(incoming->isComplete()){
+ enqueue();
+ }
+ }
+}
+
+void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
+ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat");
+}
+
+void Channel::start(){
+ dispatcher = Thread(this);
+}
+
+void Channel::stop(){
+ {
+ Monitor::ScopedLock l(dispatchMonitor);
+ closed = true;
+ dispatchMonitor.notify();
+ }
+ dispatcher.join();
+}
+
+void Channel::run(){
+ dispatch();
+}
+
+void Channel::enqueue(){
+ if(incoming->isResponse()){
+ Monitor::ScopedLock l(retrievalMonitor);
+ retrieved = incoming;
+ retrievalMonitor.notify();
+ }else{
+ Monitor::ScopedLock l(dispatchMonitor);
+ messages.push(incoming);
+ dispatchMonitor.notify();
+ }
+ incoming = 0;
+}
+
+IncomingMessage* Channel::dequeue(){
+ Monitor::ScopedLock l(dispatchMonitor);
+ while(messages.empty() && !closed){
+ dispatchMonitor.wait();
+ }
+ IncomingMessage* msg = 0;
+ if(!messages.empty()){
+ msg = messages.front();
+ messages.pop();
+ }
+ return msg;
+}
+
+void Channel::deliver(Consumer* consumer, Message& msg){
+ //record delivery tag:
+ consumer->lastDeliveryTag = msg.getDeliveryTag();
+
+ //allow registered listener to handle the message
+ consumer->listener->received(msg);
+
+ //if the handler calls close on the channel or connection while
+ //handling this message, then consumer will now have been deleted.
+ if(!closed){
+ bool multiple(false);
+ switch(consumer->ackMode){
+ case LAZY_ACK:
+ multiple = true;
+ if(++(consumer->count) < prefetch) break;
+ //else drop-through
+ case AUTO_ACK:
+ out->send(new AMQFrame(id, new BasicAckBody(msg.getDeliveryTag(), multiple)));
+ consumer->lastDeliveryTag = 0;
+ }
+ }
+
+ //as it stands, transactionality is entirely orthogonal to ack
+ //mode, though the acks will not be processed by the broker under
+ //a transaction until it commits.
+}
+
+void Channel::dispatch(){
+ while(!closed){
+ IncomingMessage* incomingMsg = dequeue();
+ if(incomingMsg){
+ //Note: msg is currently only valid for duration of this call
+ Message msg(incomingMsg->getHeader());
+ incomingMsg->getData(msg.data);
+ if(incomingMsg->isReturn()){
+ if(returnsHandler == 0){
+ //print warning to log/console
+ std::cout << "Message returned: " << msg.getData() << std::endl;
+ }else{
+ returnsHandler->returned(msg);
+ }
+ }else{
+ msg.deliveryTag = incomingMsg->getDeliveryTag();
+ std::string tag = incomingMsg->getConsumerTag();
+
+ if(consumers[tag] == 0){
+ //signal error
+ std::cout << "Unknown consumer: " << tag << std::endl;
+ }else{
+ deliver(consumers[tag], msg);
+ }
+ }
+ delete incomingMsg;
+ }
+ }
+}
+
+void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler){
+ returnsHandler = handler;
+}
+
+void Channel::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){
+ responses.expect();
+ out->send(frame);
+ responses.receive(body);
+}
+
+void Channel::close(){
+ if(con != 0){
+ con->closeChannel(this);
+ }
+}