summaryrefslogtreecommitdiff
path: root/cpp/lib/client/Basic.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-02-22 23:23:52 +0000
committerAlan Conway <aconway@apache.org>2007-02-22 23:23:52 +0000
commit067f367d27bef7500410ea27c000d0ca275c748a (patch)
treeb20d2f526860870c22dbcffa3570ed347f8979ba /cpp/lib/client/Basic.cpp
parent20a442ea00c82b7fd9b6b7a560916f69f3155f56 (diff)
downloadqpid-python-067f367d27bef7500410ea27c000d0ca275c748a.tar.gz
* cpp/lib/client/Basic.*, ClientChannel.*: Extracted Basic functionality
from Channel into separate Basic class. * cpp/lib/client/*, cpp/test/*: Adjusted for new Channel::getBasic() API. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@510705 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client/Basic.cpp')
-rw-r--r--cpp/lib/client/Basic.cpp255
1 files changed, 255 insertions, 0 deletions
diff --git a/cpp/lib/client/Basic.cpp b/cpp/lib/client/Basic.cpp
new file mode 100644
index 0000000000..88d46541f4
--- /dev/null
+++ b/cpp/lib/client/Basic.cpp
@@ -0,0 +1,255 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+#include "Basic.h"
+#include "AMQMethodBody.h"
+#include "ClientChannel.h"
+#include "ReturnedMessageHandler.h"
+#include "MessageListener.h"
+#include "framing/FieldTable.h"
+#include "Connection.h"
+
+using namespace std;
+
+namespace qpid {
+namespace client {
+
+using namespace sys;
+using namespace framing;
+
+Basic::Basic(Channel& ch) : channel(ch), returnsHandler(0) {}
+
+void Basic::consume(
+ Queue& queue, std::string& tag, MessageListener* listener,
+ AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields)
+{
+ channel.sendAndReceiveSync<BasicConsumeOkBody>(
+ synch,
+ new BasicConsumeBody(
+ channel.version, 0, queue.getName(), tag, noLocal,
+ ackMode == NO_ACK, false, !synch,
+ fields ? *fields : FieldTable()));
+ if (synch) {
+ BasicConsumeOkBody::shared_ptr response =
+ boost::shared_polymorphic_downcast<BasicConsumeOkBody>(
+ channel.responses.getResponse());
+ tag = response->getConsumerTag();
+ }
+ // FIXME aconway 2007-02-20: Race condition!
+ // We could receive the first message for the consumer
+ // before we create the consumer below.
+ // Move consumer creation to handler for BasicConsumeOkBody
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(tag);
+ if (i != consumers.end())
+ THROW_QPID_ERROR(CLIENT_ERROR,
+ "Consumer already exists with tag="+tag);
+ Consumer& c = consumers[tag];
+ c.listener = listener;
+ c.ackMode = ackMode;
+ c.lastDeliveryTag = 0;
+ }
+}
+
+
+void Basic::cancel(const std::string& tag, bool synch) {
+ Consumer c;
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(tag);
+ if (i == consumers.end())
+ return;
+ c = i->second;
+ consumers.erase(i);
+ }
+ if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
+ channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
+ channel.sendAndReceiveSync<BasicCancelOkBody>(
+ synch, new BasicCancelBody(channel.version, tag, !synch));
+}
+
+void Basic::cancelAll(){
+ ConsumerMap consumersCopy;
+ {
+ Mutex::ScopedLock l(lock);
+ consumersCopy = consumers;
+ consumers.clear();
+ }
+ for (ConsumerMap::iterator i=consumersCopy.begin();
+ i != consumersCopy.end(); ++i)
+ {
+ Consumer& c = i->second;
+ if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
+ && c.lastDeliveryTag > 0)
+ {
+ channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
+ }
+ }
+}
+
+
+
+bool Basic::get(Message& msg, const Queue& queue, AckMode ackMode) {
+ // Expect a message starting with a BasicGetOk
+ incoming.startGet();
+ channel.send(new BasicGetBody(channel.version, 0, queue.getName(), ackMode));
+ return incoming.waitGet(msg);
+}
+
+
+void Basic::publish(
+ const Message& msg, const Exchange& exchange,
+ const std::string& routingKey, bool mandatory, bool immediate)
+{
+ const string e = exchange.getName();
+ string key = routingKey;
+ channel.send(new BasicPublishBody(channel.version, 0, e, key, mandatory, immediate));
+ //break msg up into header frame and content frame(s) and send these
+ channel.send(msg.getHeader());
+ string data = msg.getData();
+ u_int64_t data_length = data.length();
+ if(data_length > 0){
+ //frame itself uses 8 bytes
+ u_int32_t frag_size = channel.connection->getMaxFrameSize() - 8;
+ if(data_length < frag_size){
+ channel.send(new AMQContentBody(data));
+ }else{
+ u_int32_t offset = 0;
+ u_int32_t remaining = data_length - offset;
+ while (remaining > 0) {
+ u_int32_t length = remaining > frag_size ? frag_size : remaining;
+ string frag(data.substr(offset, length));
+ channel.send(new AMQContentBody(frag));
+
+ offset += length;
+ remaining = data_length - offset;
+ }
+ }
+ }
+}
+
+void Basic::handle(boost::shared_ptr<AMQMethodBody> method) {
+ assert(method->amqpClassId() ==BasicGetBody::CLASS_ID);
+ switch(method->amqpMethodId()) {
+ case BasicDeliverBody::METHOD_ID:
+ case BasicReturnBody::METHOD_ID:
+ case BasicGetOkBody::METHOD_ID:
+ case BasicGetEmptyBody::METHOD_ID:
+ incoming.add(method);
+ return;
+ }
+ throw Channel::UnknownMethod();
+}
+
+void Basic::deliver(Consumer& consumer, Message& msg){
+ //record delivery tag:
+ consumer.lastDeliveryTag = msg.getDeliveryTag();
+
+ //allow registered listener to handle the message
+ consumer.listener->received(msg);
+
+ if(channel.isOpen()){
+ bool multiple(false);
+ switch(consumer.ackMode){
+ case LAZY_ACK:
+ multiple = true;
+ if(++(consumer.count) < channel.getPrefetch())
+ break;
+ //else drop-through
+ case AUTO_ACK:
+ consumer.lastDeliveryTag = 0;
+ channel.send(
+ new BasicAckBody(
+ channel.version, msg.getDeliveryTag(), multiple));
+ case NO_ACK: // Nothing to do
+ case CLIENT_ACK: // User code must ack.
+ break;
+ // TODO aconway 2007-02-22: Provide a way for user
+ // to ack!
+ }
+ }
+
+ //as it stands, transactionality is entirely orthogonal to ack
+ //mode, though the acks will not be processed by the broker under
+ //a transaction until it commits.
+}
+
+
+void Basic::run() {
+ while(channel.isOpen()) {
+ try {
+ Message msg = incoming.waitDispatch();
+ if(msg.getMethod()->isA<BasicReturnBody>()) {
+ ReturnedMessageHandler* handler=0;
+ {
+ Mutex::ScopedLock l(lock);
+ handler=returnsHandler;
+ }
+ if(handler == 0) {
+ // TODO aconway 2007-02-20: proper logging.
+ cout << "Message returned: " << msg.getData() << endl;
+ }
+ else
+ handler->returned(msg);
+ }
+ else {
+ BasicDeliverBody::shared_ptr deliverBody =
+ boost::shared_polymorphic_downcast<BasicDeliverBody>(
+ msg.getMethod());
+ std::string tag = deliverBody->getConsumerTag();
+ Consumer consumer;
+ {
+ Mutex::ScopedLock l(lock);
+ ConsumerMap::iterator i = consumers.find(tag);
+ if(i == consumers.end())
+ THROW_QPID_ERROR(PROTOCOL_ERROR+504,
+ "Unknown consumer tag=" + tag);
+ consumer = i->second;
+ }
+ deliver(consumer, msg);
+ }
+ }
+ catch (const ShutdownException&) {
+ /* Orderly shutdown */
+ }
+ catch (const Exception& e) {
+ // FIXME aconway 2007-02-20: Report exception to user.
+ cout << "client::Basic::run() terminated by: " << e.toString()
+ << "(" << typeid(e).name() << ")" << endl;
+ }
+ }
+}
+
+void Basic::setReturnedMessageHandler(ReturnedMessageHandler* handler){
+ Mutex::ScopedLock l(lock);
+ returnsHandler = handler;
+}
+
+void Basic::setQos(){
+ channel.sendAndReceive<BasicQosOkBody>(
+ new BasicQosBody(channel.version, 0, channel.getPrefetch(), false));
+ if(channel.isTransactional())
+ channel.sendAndReceive<TxSelectOkBody>(new TxSelectBody(channel.version));
+}
+
+
+// TODO aconway 2007-02-22: NOTES:
+// Move incoming to BasicChannel - check for uses.
+
+}} // namespace qpid::client