diff options
Diffstat (limited to 'qpid/cpp-0-9/lib/client')
26 files changed, 0 insertions, 3088 deletions
diff --git a/qpid/cpp-0-9/lib/client/Basic.cpp b/qpid/cpp-0-9/lib/client/Basic.cpp deleted file mode 100644 index 4a1cf249a8..0000000000 --- a/qpid/cpp-0-9/lib/client/Basic.cpp +++ /dev/null @@ -1,255 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <iostream> -#include "Basic.h" -#include "AMQMethodBody.h" -#include "ClientChannel.h" -#include "ReturnedMessageHandler.h" -#include "MessageListener.h" -#include "framing/FieldTable.h" -#include "Connection.h" - -using namespace std; - -namespace qpid { -namespace client { - -using namespace sys; -using namespace framing; - -Basic::Basic(Channel& ch) : channel(ch), returnsHandler(0) {} - -void Basic::consume( - Queue& queue, std::string& tag, MessageListener* listener, - AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) -{ - channel.sendAndReceiveSync<BasicConsumeOkBody>( - synch, - new BasicConsumeBody( - channel.version, 0, queue.getName(), tag, noLocal, - ackMode == NO_ACK, false, !synch, - fields ? *fields : FieldTable())); - if (synch) { - BasicConsumeOkBody::shared_ptr response = - boost::shared_polymorphic_downcast<BasicConsumeOkBody>( - channel.responses.getResponse()); - tag = response->getConsumerTag(); - } - // FIXME aconway 2007-02-20: Race condition! - // We could receive the first message for the consumer - // before we create the consumer below. - // Move consumer creation to handler for BasicConsumeOkBody - { - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find(tag); - if (i != consumers.end()) - THROW_QPID_ERROR(CLIENT_ERROR, - "Consumer already exists with tag="+tag); - Consumer& c = consumers[tag]; - c.listener = listener; - c.ackMode = ackMode; - c.lastDeliveryTag = 0; - } -} - - -void Basic::cancel(const std::string& tag, bool synch) { - Consumer c; - { - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find(tag); - if (i == consumers.end()) - return; - c = i->second; - consumers.erase(i); - } - if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) - channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); - channel.sendAndReceiveSync<BasicCancelOkBody>( - synch, new BasicCancelBody(channel.version, tag, !synch)); -} - -void Basic::cancelAll(){ - ConsumerMap consumersCopy; - { - Mutex::ScopedLock l(lock); - consumersCopy = consumers; - consumers.clear(); - } - for (ConsumerMap::iterator i=consumersCopy.begin(); - i != consumersCopy.end(); ++i) - { - Consumer& c = i->second; - if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK) - && c.lastDeliveryTag > 0) - { - channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); - } - } -} - - - -bool Basic::get(Message& msg, const Queue& queue, AckMode ackMode) { - // Expect a message starting with a BasicGetOk - incoming.startGet(); - channel.send(new BasicGetBody(channel.version, 0, queue.getName(), ackMode)); - return incoming.waitGet(msg); -} - - -void Basic::publish( - const Message& msg, const Exchange& exchange, - const std::string& routingKey, bool mandatory, bool immediate) -{ - const string e = exchange.getName(); - string key = routingKey; - channel.send(new BasicPublishBody(channel.version, 0, e, key, mandatory, immediate)); - //break msg up into header frame and content frame(s) and send these - channel.send(msg.getHeader()); - string data = msg.getData(); - uint64_t data_length = data.length(); - if(data_length > 0){ - //frame itself uses 8 bytes - uint32_t frag_size = channel.connection->getMaxFrameSize() - 8; - if(data_length < frag_size){ - channel.send(new AMQContentBody(data)); - }else{ - uint32_t offset = 0; - uint32_t remaining = data_length - offset; - while (remaining > 0) { - uint32_t length = remaining > frag_size ? frag_size : remaining; - string frag(data.substr(offset, length)); - channel.send(new AMQContentBody(frag)); - - offset += length; - remaining = data_length - offset; - } - } - } -} - -void Basic::handle(boost::shared_ptr<AMQMethodBody> method) { - assert(method->amqpClassId() ==BasicGetBody::CLASS_ID); - switch(method->amqpMethodId()) { - case BasicDeliverBody::METHOD_ID: - case BasicReturnBody::METHOD_ID: - case BasicGetOkBody::METHOD_ID: - case BasicGetEmptyBody::METHOD_ID: - incoming.add(method); - return; - } - throw Channel::UnknownMethod(); -} - -void Basic::deliver(Consumer& consumer, Message& msg){ - //record delivery tag: - consumer.lastDeliveryTag = msg.getDeliveryTag(); - - //allow registered listener to handle the message - consumer.listener->received(msg); - - if(channel.isOpen()){ - bool multiple(false); - switch(consumer.ackMode){ - case LAZY_ACK: - multiple = true; - if(++(consumer.count) < channel.getPrefetch()) - break; - //else drop-through - case AUTO_ACK: - consumer.lastDeliveryTag = 0; - channel.send( - new BasicAckBody( - channel.version, msg.getDeliveryTag(), multiple)); - case NO_ACK: // Nothing to do - case CLIENT_ACK: // User code must ack. - break; - // TODO aconway 2007-02-22: Provide a way for user - // to ack! - } - } - - //as it stands, transactionality is entirely orthogonal to ack - //mode, though the acks will not be processed by the broker under - //a transaction until it commits. -} - - -void Basic::run() { - while(channel.isOpen()) { - try { - Message msg = incoming.waitDispatch(); - if(msg.getMethod()->isA<BasicReturnBody>()) { - ReturnedMessageHandler* handler=0; - { - Mutex::ScopedLock l(lock); - handler=returnsHandler; - } - if(handler == 0) { - // TODO aconway 2007-02-20: proper logging. - cout << "Message returned: " << msg.getData() << endl; - } - else - handler->returned(msg); - } - else { - BasicDeliverBody::shared_ptr deliverBody = - boost::shared_polymorphic_downcast<BasicDeliverBody>( - msg.getMethod()); - std::string tag = deliverBody->getConsumerTag(); - Consumer consumer; - { - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find(tag); - if(i == consumers.end()) - THROW_QPID_ERROR(PROTOCOL_ERROR+504, - "Unknown consumer tag=" + tag); - consumer = i->second; - } - deliver(consumer, msg); - } - } - catch (const ShutdownException&) { - /* Orderly shutdown */ - } - catch (const Exception& e) { - // FIXME aconway 2007-02-20: Report exception to user. - cout << "client::Basic::run() terminated by: " << e.toString() - << "(" << typeid(e).name() << ")" << endl; - } - } -} - -void Basic::setReturnedMessageHandler(ReturnedMessageHandler* handler){ - Mutex::ScopedLock l(lock); - returnsHandler = handler; -} - -void Basic::setQos(){ - channel.sendAndReceive<BasicQosOkBody>( - new BasicQosBody(channel.version, 0, channel.getPrefetch(), false)); - if(channel.isTransactional()) - channel.sendAndReceive<TxSelectOkBody>(new TxSelectBody(channel.version)); -} - - -// TODO aconway 2007-02-22: NOTES: -// Move incoming to BasicChannel - check for uses. - -}} // namespace qpid::client diff --git a/qpid/cpp-0-9/lib/client/Basic.h b/qpid/cpp-0-9/lib/client/Basic.h deleted file mode 100644 index f6ae633ab8..0000000000 --- a/qpid/cpp-0-9/lib/client/Basic.h +++ /dev/null @@ -1,195 +0,0 @@ -#ifndef _client_Basic_h -#define _client_Basic_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "IncomingMessage.h" -#include "sys/Runnable.h" - -namespace qpid { - -namespace framing { -class AMQMethodBody; -class FieldTable; -} - -namespace client { - -class Channel; -class Message; -class Queue; -class Exchange; -class MessageListener; -class ReturnedMessageHandler; - -/** - * The available acknowledgements modes. - * - * \ingroup clientapi - */ -enum AckMode { - /** No acknowledgement will be sent, broker can - discard messages as soon as they are delivered - to a consumer using this mode. **/ - NO_ACK = 0, - /** Each message will be automatically - acknowledged as soon as it is delivered to the - application **/ - AUTO_ACK = 1, - /** Acknowledgements will be sent automatically, - but not for each message. **/ - LAZY_ACK = 2, - /** The application is responsible for explicitly - acknowledging messages. **/ - CLIENT_ACK = 3 -}; - - -/** - * Represents the AMQP Basic class for sending and receiving messages. - */ -class Basic : public sys::Runnable -{ - public: - Basic(Channel& parent); - - /** - * Creates a 'consumer' for a queue. Messages in (or arriving - * at) that queue will be delivered to consumers - * asynchronously. - * - * @param queue a Queue instance representing the queue to - * consume from - * - * @param tag an identifier to associate with the consumer - * that can be used to cancel its subscription (if empty, this - * will be assigned by the broker) - * - * @param listener a pointer to an instance of an - * implementation of the MessageListener interface. Messages - * received from this queue for this consumer will result in - * invocation of the received() method on the listener, with - * the message itself passed in. - * - * @param ackMode the mode of acknowledgement that the broker - * should assume for this consumer. @see AckMode - * - * @param noLocal if true, this consumer will not be sent any - * message published by this connection - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void consume( - Queue& queue, std::string& tag, MessageListener* listener, - AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true, - const framing::FieldTable* fields = 0); - - /** - * Cancels a subscription previously set up through a call to consume(). - * - * @param tag the identifier used (or assigned) in the consume - * request that set up the subscription to be cancelled. - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void cancel(const std::string& tag, bool synch = true); - /** - * Synchronous pull of a message from a queue. - * - * @param msg a message object that will contain the message - * headers and content if the call completes. - * - * @param queue the queue to consume from - * - * @param ackMode the acknowledgement mode to use (@see - * AckMode) - * - * @return true if a message was succcessfully dequeued from - * the queue, false if the queue was empty. - */ - bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK); - - /** - * Publishes (i.e. sends a message to the broker). - * - * @param msg the message to publish - * - * @param exchange the exchange to publish the message to - * - * @param routingKey the routing key to publish with - * - * @param mandatory if true and the exchange to which this - * publish is directed has no matching bindings, the message - * will be returned (see setReturnedMessageHandler()). - * - * @param immediate if true and there is no consumer to - * receive this message on publication, the message will be - * returned (see setReturnedMessageHandler()). - */ - void publish(const Message& msg, const Exchange& exchange, - const std::string& routingKey, - bool mandatory = false, bool immediate = false); - - /** - * Set a handler for this channel that will process any - * returned messages - * - * @see publish() - */ - void setReturnedMessageHandler(ReturnedMessageHandler* handler); - - /** - * Deliver messages from the broker to the appropriate MessageListener. - */ - void run(); - - - private: - - struct Consumer{ - MessageListener* listener; - AckMode ackMode; - int count; - uint64_t lastDeliveryTag; - }; - - typedef std::map<std::string, Consumer> ConsumerMap; - - void handle(boost::shared_ptr<framing::AMQMethodBody>); - void setQos(); - void cancelAll(); - void deliver(Consumer& consumer, Message& msg); - - sys::Mutex lock; - Channel& channel; - IncomingMessage incoming; - ConsumerMap consumers; - ReturnedMessageHandler* returnsHandler; - - // FIXME aconway 2007-02-22: Remove friendship. - friend class Channel; -}; - -}} // namespace qpid::client - - - -#endif /*!_client_Basic_h*/ diff --git a/qpid/cpp-0-9/lib/client/ClientAdapter.cpp b/qpid/cpp-0-9/lib/client/ClientAdapter.cpp deleted file mode 100644 index c77f049c96..0000000000 --- a/qpid/cpp-0-9/lib/client/ClientAdapter.cpp +++ /dev/null @@ -1,70 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "AMQP_ClientOperations.h" -#include "ClientAdapter.h" -#include "Connection.h" -#include "Exception.h" -#include "AMQMethodBody.h" - -namespace qpid { -namespace client { - -using namespace qpid; -using namespace qpid::framing; - -typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; - -void ClientAdapter::handleMethodInContext( - boost::shared_ptr<qpid::framing::AMQMethodBody> method, - const MethodContext& context -) -{ - try{ - method->invoke(*clientOps, context); - }catch(ChannelException& e){ - connection.client->getChannel().close( - context, e.code, e.toString(), - method->amqpClassId(), method->amqpMethodId()); - connection.closeChannel(getId()); - }catch(ConnectionException& e){ - connection.client->getConnection().close( - context, e.code, e.toString(), - method->amqpClassId(), method->amqpMethodId()); - }catch(std::exception& e){ - connection.client->getConnection().close( - context, 541/*internal error*/, e.what(), - method->amqpClassId(), method->amqpMethodId()); - } -} - -void ClientAdapter::handleHeader(AMQHeaderBody::shared_ptr body) { - channel->handleHeader(body); -} - -void ClientAdapter::handleContent(AMQContentBody::shared_ptr body) { - channel->handleContent(body); -} - -void ClientAdapter::handleHeartbeat(AMQHeartbeatBody::shared_ptr) { - // TODO aconway 2007-01-17: Implement heartbeats. -} - - - -}} // namespace qpid::client - diff --git a/qpid/cpp-0-9/lib/client/ClientAdapter.h b/qpid/cpp-0-9/lib/client/ClientAdapter.h deleted file mode 100644 index d5e16fc6ad..0000000000 --- a/qpid/cpp-0-9/lib/client/ClientAdapter.h +++ /dev/null @@ -1,66 +0,0 @@ -#ifndef _client_ClientAdapter_h -#define _client_ClientAdapter_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "ChannelAdapter.h" -#include "ClientChannel.h" - -namespace qpid { -namespace client { - -class AMQMethodBody; -class Connection; - -/** - * Per-channel protocol adapter. - * - * Translates protocol bodies into calls on the core Channel, - * Connection and Client objects. - * - * Owns a channel, has references to Connection and Client. - */ -class ClientAdapter : public framing::ChannelAdapter -{ - public: - ClientAdapter(std::auto_ptr<Channel> ch, Connection&, Client&); - Channel& getChannel() { return *channel; } - - void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>); - void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>); - void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>); - - private: - void handleMethodInContext( - boost::shared_ptr<qpid::framing::AMQMethodBody> method, - const framing::MethodContext& context); - - class ClientOps; - - std::auto_ptr<Channel> channel; - Connection& connection; - Client& client; - boost::shared_ptr<ClientOps> clientOps; -}; - -}} // namespace qpid::client - - - -#endif /*!_client_ClientAdapter_h*/ diff --git a/qpid/cpp-0-9/lib/client/ClientChannel.cpp b/qpid/cpp-0-9/lib/client/ClientChannel.cpp deleted file mode 100644 index 84aa73e6bc..0000000000 --- a/qpid/cpp-0-9/lib/client/ClientChannel.cpp +++ /dev/null @@ -1,302 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <iostream> -#include <ClientChannel.h> -#include <sys/Monitor.h> -#include <ClientMessage.h> -#include <QpidError.h> -#include <MethodBodyInstances.h> -#include "Connection.h" - -// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent -// handling of errors that should close the connection or the channel. -// Make sure the user thread receives a connection in each case. -// -using namespace std; -using namespace boost; -using namespace qpid::client; -using namespace qpid::framing; -using namespace qpid::sys; - -Channel::Channel(bool _transactional, uint16_t _prefetch) : - basic(*this), - connection(0), - prefetch(_prefetch), - transactional(_transactional) -{ } - -Channel::~Channel(){ - close(); -} - -void Channel::open(ChannelId id, Connection& con) -{ - if (isOpen()) - THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel "+id); - connection = &con; - init(id, con, con.getVersion()); // ChannelAdapter initialization. - string oob; - if (id != 0) - sendAndReceive<ChannelOpenOkBody>(new ChannelOpenBody(version, oob)); -} - -void Channel::protocolInit( - const std::string& uid, const std::string& pwd, const std::string& vhost) { - assert(connection); - responses.expect(); - connection->connector->init(); // Send ProtocolInit block. - responses.receive<ConnectionStartBody>(); - - FieldTable props; - string mechanism("PLAIN"); - string response = ((char)0) + uid + ((char)0) + pwd; - string locale("en_US"); - ConnectionTuneBody::shared_ptr proposal = - sendAndReceive<ConnectionTuneBody>( - new ConnectionStartOkBody( - version, responses.getRequestId(), props, mechanism, - response, locale)); - - /** - * Assume for now that further challenges will not be required - //receive connection.secure - responses.receive(connection_secure)); - //send connection.secure-ok - connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response))); - **/ - - send(new ConnectionTuneOkBody( - version, responses.getRequestId(), proposal->getChannelMax(), connection->getMaxFrameSize(), - proposal->getHeartbeat())); - - uint16_t heartbeat = proposal->getHeartbeat(); - connection->connector->setReadTimeout(heartbeat * 2); - connection->connector->setWriteTimeout(heartbeat); - - // Send connection open. - std::string capabilities; - responses.expect(); - send(new ConnectionOpenBody(version, vhost, capabilities, true)); - //receive connection.open-ok (or redirect, but ignore that for now - //esp. as using force=true). - responses.waitForResponse(); - if(responses.validate<ConnectionOpenOkBody>()) { - //ok - }else if(responses.validate<ConnectionRedirectBody>()){ - //ignore for now - ConnectionRedirectBody::shared_ptr redirect( - shared_polymorphic_downcast<ConnectionRedirectBody>( - responses.getResponse())); - cout << "Received redirection to " << redirect->getHost() - << endl; - } else { - THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response"); - } -} - -bool Channel::isOpen() const { return connection; } - -void Channel::setQos() { - basic.setQos(); - // FIXME aconway 2007-02-22: message -} - -void Channel::setPrefetch(uint16_t _prefetch){ - prefetch = _prefetch; - setQos(); -} - -void Channel::declareExchange(Exchange& exchange, bool synch){ - string name = exchange.getName(); - string type = exchange.getType(); - FieldTable args; - sendAndReceiveSync<ExchangeDeclareOkBody>( - synch, - new ExchangeDeclareBody( - version, 0, name, type, false, false, false, false, !synch, args)); -} - -void Channel::deleteExchange(Exchange& exchange, bool synch){ - string name = exchange.getName(); - sendAndReceiveSync<ExchangeDeleteOkBody>( - synch, - new ExchangeDeleteBody(version, 0, name, false, !synch)); -} - -void Channel::declareQueue(Queue& queue, bool synch){ - string name = queue.getName(); - FieldTable args; - sendAndReceiveSync<QueueDeclareOkBody>( - synch, - new QueueDeclareBody( - version, 0, name, false/*passive*/, queue.isDurable(), - queue.isExclusive(), queue.isAutoDelete(), !synch, args)); - if(synch){ - if(queue.getName().length() == 0){ - QueueDeclareOkBody::shared_ptr response = - shared_polymorphic_downcast<QueueDeclareOkBody>( - responses.getResponse()); - queue.setName(response->getQueue()); - } - } -} - -void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){ - //ticket, queue, ifunused, ifempty, nowait - string name = queue.getName(); - sendAndReceiveSync<QueueDeleteOkBody>( - synch, - new QueueDeleteBody(version, 0, name, ifunused, ifempty, !synch)); -} - -void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ - string e = exchange.getName(); - string q = queue.getName(); - sendAndReceiveSync<QueueBindOkBody>( - synch, - new QueueBindBody(version, 0, q, e, key,!synch, args)); -} - -void Channel::commit(){ - sendAndReceive<TxCommitOkBody>(new TxCommitBody(version)); -} - -void Channel::rollback(){ - sendAndReceive<TxRollbackOkBody>(new TxRollbackBody(version)); -} - -void Channel::handleMethodInContext( - AMQMethodBody::shared_ptr method, const MethodContext&) -{ - if(responses.isWaiting()) { - responses.signalResponse(method); - return; - } - try { - switch (method->amqpClassId()) { - case BasicDeliverBody::CLASS_ID: basic.handle(method); break; - case ChannelCloseBody::CLASS_ID: handleChannel(method); break; - case ConnectionCloseBody::CLASS_ID: handleConnection(method); break; - default: throw UnknownMethod(); - } - } - catch (const UnknownMethod&) { - connection->close( - 504, "Unknown method", - method->amqpClassId(), method->amqpMethodId()); - } -} - -void Channel::handleChannel(AMQMethodBody::shared_ptr method) { - switch (method->amqpMethodId()) { - case ChannelCloseBody::METHOD_ID: - peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method)); - return; - case ChannelFlowBody::METHOD_ID: - // FIXME aconway 2007-02-22: Not yet implemented. - return; - } - throw UnknownMethod(); -} - -void Channel::handleConnection(AMQMethodBody::shared_ptr method) { - if (method->amqpMethodId() == ConnectionCloseBody::METHOD_ID) { - connection->close(); - return; - } - throw UnknownMethod(); -} - -void Channel::handleHeader(AMQHeaderBody::shared_ptr body){ - basic.incoming.add(body); -} - -void Channel::handleContent(AMQContentBody::shared_ptr body){ - basic.incoming.add(body); -} - -void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat"); -} - -void Channel::start(){ - basicDispatcher = Thread(basic); -} - -// Close called by local application. -void Channel::close( - uint16_t code, const std::string& text, - ClassId classId, MethodId methodId) -{ - if (isOpen()) { - try { - if (getId() != 0) { - sendAndReceive<ChannelCloseOkBody>( - new ChannelCloseBody( - version, code, text, classId, methodId)); - } - static_cast<ConnectionForChannel*>(connection)->erase(getId()); - closeInternal(); - } catch (...) { - static_cast<ConnectionForChannel*>(connection)->erase(getId()); - closeInternal(); - throw; - } - } -} - -// Channel closed by peer. -void Channel::peerClose(ChannelCloseBody::shared_ptr) { - assert(isOpen()); - closeInternal(); - // FIXME aconway 2007-01-26: How to throw the proper exception - // to the application thread? -} - -void Channel::closeInternal() { - if (isOpen()); - { - basic.cancelAll(); - basic.incoming.shutdown(); - connection = 0; - // A 0 response means we are closed. - responses.signalResponse(AMQMethodBody::shared_ptr()); - } - basicDispatcher.join(); -} - -void Channel::sendAndReceive(AMQMethodBody* toSend, ClassId c, MethodId m) -{ - responses.expect(); - send(toSend); - responses.receive(c, m); -} - -void Channel::sendAndReceiveSync( - bool sync, AMQMethodBody* body, ClassId c, MethodId m) -{ - if(sync) - sendAndReceive(body, c, m); - else - send(body); -} - - diff --git a/qpid/cpp-0-9/lib/client/ClientChannel.h b/qpid/cpp-0-9/lib/client/ClientChannel.h deleted file mode 100644 index 3ecab05d0b..0000000000 --- a/qpid/cpp-0-9/lib/client/ClientChannel.h +++ /dev/null @@ -1,250 +0,0 @@ -#ifndef _client_ClientChannel_h -#define _client_ClientChannel_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "sys/types.h" -#include <framing/amqp_framing.h> -#include <ClientExchange.h> -#include <ClientMessage.h> -#include <ClientQueue.h> -#include <ResponseHandler.h> -#include "ChannelAdapter.h" -#include "Thread.h" -#include "Basic.h" - -namespace qpid { - -namespace framing { -class ChannelCloseBody; -class AMQMethodBody; -} - -namespace client { - -class Connection; - - -/** - * Represents an AMQP channel, i.e. loosely a session of work. It - * is through a channel that most of the AMQP 'methods' are - * exposed. - * - * \ingroup clientapi - */ -class Channel : public framing::ChannelAdapter -{ - private: - // TODO aconway 2007-02-22: Remove friendship. - friend class Basic; - // FIXME aconway 2007-02-22: friend class Message; - - struct UnknownMethod {}; - - sys::Mutex lock; - Basic basic; - Connection* connection; - sys::Thread basicDispatcher; - ResponseHandler responses; - - uint16_t prefetch; - const bool transactional; - framing::ProtocolVersion version; - - void handleHeader(framing::AMQHeaderBody::shared_ptr body); - void handleContent(framing::AMQContentBody::shared_ptr body); - void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body); - void handleMethodInContext( - framing::AMQMethodBody::shared_ptr, const framing::MethodContext&); - void handleChannel(framing::AMQMethodBody::shared_ptr method); - void handleConnection(framing::AMQMethodBody::shared_ptr method); - - void setQos(); - - void protocolInit( - const std::string& uid, const std::string& pwd, - const std::string& vhost); - - void sendAndReceive( - framing::AMQMethodBody*, framing::ClassId, framing::MethodId); - - void sendAndReceiveSync( - bool sync, - framing::AMQMethodBody*, framing::ClassId, framing::MethodId); - - template <class BodyType> - boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody* body) { - sendAndReceive(body, BodyType::CLASS_ID, BodyType::METHOD_ID); - return boost::shared_polymorphic_downcast<BodyType>( - responses.getResponse()); - } - - template <class BodyType> void sendAndReceiveSync( - bool sync, framing::AMQMethodBody* body) { - sendAndReceiveSync( - sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID); - } - - void open(framing::ChannelId, Connection&); - void closeInternal(); - void peerClose(boost::shared_ptr<framing::ChannelCloseBody>); - - friend class Connection; - - public: - - /** - * Creates a channel object. - * - * @param transactional if true, the publishing and acknowledgement - * of messages will be transactional and can be committed or - * aborted in atomic units (@see commit(), @see rollback()) - * - * @param prefetch specifies the number of unacknowledged - * messages the channel is willing to have sent to it - * asynchronously - */ - Channel(bool transactional = false, uint16_t prefetch = 500); - ~Channel(); - - /** - * Declares an exchange. - * - * In AMQP Exchanges are the destinations to which messages - * are published. They have Queues bound to them and route - * messages they receive to those queues. The routing rules - * depend on the type of the exchange. - * - * @param exchange an Exchange object representing the - * exchange to declare - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void declareExchange(Exchange& exchange, bool synch = true); - /** - * Deletes an exchange - * - * @param exchange an Exchange object representing the exchange to delete - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void deleteExchange(Exchange& exchange, bool synch = true); - /** - * Declares a Queue - * - * @param queue a Queue object representing the queue to declare - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void declareQueue(Queue& queue, bool synch = true); - /** - * Deletes a Queue - * - * @param queue a Queue object representing the queue to delete - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true); - /** - * Binds a queue to an exchange. The exact semantics of this - * (in particular how 'routing keys' and 'binding arguments' - * are used) depends on the type of the exchange. - * - * @param exchange an Exchange object representing the - * exchange to bind to - * - * @param queue a Queue object representing the queue to be - * bound - * - * @param key the 'routing key' for the binding - * - * @param args the 'binding arguments' for the binding - * - * @param synch if true this call will block until a response - * is received from the broker - */ - void bind(const Exchange& exchange, const Queue& queue, - const std::string& key, const framing::FieldTable& args, - bool synch = true); - - /** - * Get a Basic object which provides functions to send and - * receive messages using the AMQP 0-8 Basic class methods. - *@see Basic - */ - Basic& getBasic() { return basic; } - - /** - * For a transactional channel this will commit all - * publications and acknowledgements since the last commit (or - * the channel was opened if there has been no previous - * commit). This will cause published messages to become - * available to consumers and acknowledged messages to be - * consumed and removed from the queues they were dispatched - * from. - * - * Transactionailty of a channel is specified when the channel - * object is created (@see Channel()). - */ - void commit(); - - /** - * For a transactional channel, this will rollback any - * publications or acknowledgements. It will be as if the - * ppblished messages were never sent and the acknowledged - * messages were never consumed. - */ - void rollback(); - - /** - * Change the prefetch in use. - */ - void setPrefetch(uint16_t prefetch); - - uint16_t getPrefetch() { return prefetch; } - - /** - * Start message dispatching on a new thread - */ - void start(); - - /** - * Close the channel with optional error information. - * Closing a channel that is not open has no effect. - */ - void close( - framing::ReplyCode = 200, const std::string& ="OK", - framing::ClassId = 0, framing::MethodId = 0); - - /** True if the channel is transactional */ - bool isTransactional() { return transactional; } - - /** True if the channel is open */ - bool isOpen() const; -}; - -}} - -#endif /*!_client_ClientChannel_h*/ diff --git a/qpid/cpp-0-9/lib/client/ClientExchange.cpp b/qpid/cpp-0-9/lib/client/ClientExchange.cpp deleted file mode 100644 index 5e5f3f14c6..0000000000 --- a/qpid/cpp-0-9/lib/client/ClientExchange.cpp +++ /dev/null @@ -1,34 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <ClientExchange.h> - -qpid::client::Exchange::Exchange(std::string _name, std::string _type) : name(_name), type(_type){} -const std::string& qpid::client::Exchange::getName() const { return name; } -const std::string& qpid::client::Exchange::getType() const { return type; } - -const std::string qpid::client::Exchange::DIRECT_EXCHANGE = "direct"; -const std::string qpid::client::Exchange::TOPIC_EXCHANGE = "topic"; -const std::string qpid::client::Exchange::HEADERS_EXCHANGE = "headers"; - -const qpid::client::Exchange qpid::client::Exchange::DEFAULT_EXCHANGE("", DIRECT_EXCHANGE); -const qpid::client::Exchange qpid::client::Exchange::STANDARD_DIRECT_EXCHANGE("amq.direct", DIRECT_EXCHANGE); -const qpid::client::Exchange qpid::client::Exchange::STANDARD_TOPIC_EXCHANGE("amq.topic", TOPIC_EXCHANGE); -const qpid::client::Exchange qpid::client::Exchange::STANDARD_HEADERS_EXCHANGE("amq.headers", HEADERS_EXCHANGE); diff --git a/qpid/cpp-0-9/lib/client/ClientExchange.h b/qpid/cpp-0-9/lib/client/ClientExchange.h deleted file mode 100644 index a8ac21fa9b..0000000000 --- a/qpid/cpp-0-9/lib/client/ClientExchange.h +++ /dev/null @@ -1,106 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> - -#ifndef _Exchange_ -#define _Exchange_ - -namespace qpid { -namespace client { - - /** - * A 'handle' used to represent an AMQP exchange in the Channel - * methods. Exchanges are the destinations to which messages are - * published. - * - * There are different types of exchange (the standard types are - * available as static constants, see DIRECT_EXCHANGE, - * TOPIC_EXCHANGE and HEADERS_EXCHANGE). A Queue can be bound to - * an exchange using Channel::bind() and messages published to - * that exchange are then routed to the queue based on the details - * of the binding and the type of exchange. - * - * There are some standard exchange instances that are predeclared - * on all AMQP brokers. These are defined as static members - * STANDARD_DIRECT_EXCHANGE, STANDARD_TOPIC_EXCHANGE and - * STANDARD_HEADERS_EXCHANGE. There is also the 'default' exchange - * (member DEFAULT_EXCHANGE) which is nameless and of type - * 'direct' and has every declared queue bound to it by queue - * name. - * - * \ingroup clientapi - */ - class Exchange{ - const std::string name; - const std::string type; - - public: - /** - * A direct exchange routes messages published with routing - * key X to any queue bound with key X (i.e. an exact match is - * used). - */ - static const std::string DIRECT_EXCHANGE; - /** - * A topic exchange treat the key with which a queue is bound - * as a pattern and routes all messages whose routing keys - * match that pattern to the bound queue. The routing key for - * a message must consist of zero or more alpha-numeric words - * delimited by dots. The pattern is of a similar form but * - * can be used to match excatly one word and # can be used to - * match zero or more words. - */ - static const std::string TOPIC_EXCHANGE; - /** - * The headers exchange routes messages based on whether their - * headers match the binding arguments specified when - * binding. (see the AMQP spec for more details). - */ - static const std::string HEADERS_EXCHANGE; - - /** - * The 'default' exchange, nameless and of type 'direct'. Has - * every declared queue bound to it by name. - */ - static const Exchange DEFAULT_EXCHANGE; - /** - * The standard direct exchange, named amq.direct. - */ - static const Exchange STANDARD_DIRECT_EXCHANGE; - /** - * The standard topic exchange, named amq.topic. - */ - static const Exchange STANDARD_TOPIC_EXCHANGE; - /** - * The standard headers exchange, named amq.header. - */ - static const Exchange STANDARD_HEADERS_EXCHANGE; - - Exchange(std::string name, std::string type = DIRECT_EXCHANGE); - const std::string& getName() const; - const std::string& getType() const; - }; - -} -} - - -#endif diff --git a/qpid/cpp-0-9/lib/client/ClientMessage.cpp b/qpid/cpp-0-9/lib/client/ClientMessage.cpp deleted file mode 100644 index f55c4abfe6..0000000000 --- a/qpid/cpp-0-9/lib/client/ClientMessage.cpp +++ /dev/null @@ -1,162 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <ClientMessage.h> -using namespace qpid::client; -using namespace qpid::framing; - -Message::Message(const std::string& d) - : header(new AMQHeaderBody(BASIC)) -{ - setData(d); -} - -void Message::setData(const std::string& d) { - data = d; - header->setContentSize(d.size()); -} - -Message::Message(AMQHeaderBody::shared_ptr& _header) : header(_header){ -} - -Message::~Message(){ -} - -BasicHeaderProperties* Message::getHeaderProperties() const { - return dynamic_cast<BasicHeaderProperties*>(header->getProperties()); -} - -const std::string& Message::getContentType() const { - return getHeaderProperties()->getContentType(); -} - -const std::string& Message::getContentEncoding() const { - return getHeaderProperties()->getContentEncoding(); -} - -FieldTable& Message::getHeaders() const { - return getHeaderProperties()->getHeaders(); -} - -uint8_t Message::getDeliveryMode() const { - return getHeaderProperties()->getDeliveryMode(); -} - -uint8_t Message::getPriority() const { - return getHeaderProperties()->getPriority(); -} - -const std::string& Message::getCorrelationId() const { - return getHeaderProperties()->getCorrelationId(); -} - -const std::string& Message::getReplyTo() const { - return getHeaderProperties()->getReplyTo(); -} - -const std::string& Message::getExpiration() const { - return getHeaderProperties()->getExpiration(); -} - -const std::string& Message::getMessageId() const { - return getHeaderProperties()->getMessageId(); -} - -uint64_t Message::getTimestamp() const { - return getHeaderProperties()->getTimestamp(); -} - -const std::string& Message::getType() const { - return getHeaderProperties()->getType(); -} - -const std::string& Message::getUserId() const { - return getHeaderProperties()->getUserId(); -} - -const std::string& Message::getAppId() const { - return getHeaderProperties()->getAppId(); -} - -const std::string& Message::getClusterId() const { - return getHeaderProperties()->getClusterId(); -} - -void Message::setContentType(const std::string& type){ - getHeaderProperties()->setContentType(type); -} - -void Message::setContentEncoding(const std::string& encoding){ - getHeaderProperties()->setContentEncoding(encoding); -} - -void Message::setHeaders(const FieldTable& headers){ - getHeaderProperties()->setHeaders(headers); -} - -void Message::setDeliveryMode(uint8_t mode){ - getHeaderProperties()->setDeliveryMode(mode); -} - -void Message::setPriority(uint8_t priority){ - getHeaderProperties()->setPriority(priority); -} - -void Message::setCorrelationId(const std::string& correlationId){ - getHeaderProperties()->setCorrelationId(correlationId); -} - -void Message::setReplyTo(const std::string& replyTo){ - getHeaderProperties()->setReplyTo(replyTo); -} - -void Message::setExpiration(const std::string& expiration){ - getHeaderProperties()->setExpiration(expiration); -} - -void Message::setMessageId(const std::string& messageId){ - getHeaderProperties()->setMessageId(messageId); -} - -void Message::setTimestamp(uint64_t timestamp){ - getHeaderProperties()->setTimestamp(timestamp); -} - -void Message::setType(const std::string& type){ - getHeaderProperties()->setType(type); -} - -void Message::setUserId(const std::string& userId){ - getHeaderProperties()->setUserId(userId); -} - -void Message::setAppId(const std::string& appId){ - getHeaderProperties()->setAppId(appId); -} - -void Message::setClusterId(const std::string& clusterId){ - getHeaderProperties()->setClusterId(clusterId); -} - - -uint64_t Message::getDeliveryTag() const { - BasicDeliverBody* deliver=dynamic_cast<BasicDeliverBody*>(method.get()); - return deliver ? deliver->getDeliveryTag() : 0; -} diff --git a/qpid/cpp-0-9/lib/client/ClientMessage.h b/qpid/cpp-0-9/lib/client/ClientMessage.h deleted file mode 100644 index cb239ed4d6..0000000000 --- a/qpid/cpp-0-9/lib/client/ClientMessage.h +++ /dev/null @@ -1,125 +0,0 @@ -#ifndef _client_ClientMessage_h -#define _client_ClientMessage_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> -#include <framing/amqp_framing.h> - -namespace qpid { - -namespace client { -class IncomingMessage; - -/** - * A representation of messages for sent or recived through the - * client api. - * - * \ingroup clientapi - */ -class Message { - framing::AMQMethodBody::shared_ptr method; - framing::AMQHeaderBody::shared_ptr header; - std::string data; - bool redelivered; - - // FIXME aconway 2007-02-20: const incorrect, needs const return type. - framing::BasicHeaderProperties* getHeaderProperties() const; - Message(qpid::framing::AMQHeaderBody::shared_ptr& header); - - public: - Message(const std::string& data=std::string()); - ~Message(); - - /** - * Allows the application to access the content of messages - * received. - * - * @return a string representing the data of the message - */ - std::string getData() const { return data; } - - /** - * Allows the application to set the content of messages to be - * sent. - * - * @param data a string representing the data of the message - */ - void setData(const std::string& _data); - - /** - * @return true if this message was delivered previously (to - * any consumer) but was not acknowledged. - */ - bool isRedelivered(){ return redelivered; } - void setRedelivered(bool _redelivered){ redelivered = _redelivered; } - - uint64_t getDeliveryTag() const; - - const std::string& getContentType() const; - const std::string& getContentEncoding() const; - qpid::framing::FieldTable& getHeaders() const; - uint8_t getDeliveryMode() const; - uint8_t getPriority() const; - const std::string& getCorrelationId() const; - const std::string& getReplyTo() const; - const std::string& getExpiration() const; - const std::string& getMessageId() const; - uint64_t getTimestamp() const; - const std::string& getType() const; - const std::string& getUserId() const; - const std::string& getAppId() const; - const std::string& getClusterId() const; - - void setContentType(const std::string& type); - void setContentEncoding(const std::string& encoding); - void setHeaders(const qpid::framing::FieldTable& headers); - /** - * Sets the delivery mode. 1 = non-durable, 2 = durable. - */ - void setDeliveryMode(uint8_t mode); - void setPriority(uint8_t priority); - void setCorrelationId(const std::string& correlationId); - void setReplyTo(const std::string& replyTo); - void setExpiration(const std::string& expiration); - void setMessageId(const std::string& messageId); - void setTimestamp(uint64_t timestamp); - void setType(const std::string& type); - void setUserId(const std::string& userId); - void setAppId(const std::string& appId); - void setClusterId(const std::string& clusterId); - - /** Get the method used to deliver this message */ - boost::shared_ptr<framing::AMQMethodBody> getMethod() const - { return method; } - - void setMethod(framing::AMQMethodBody::shared_ptr m) { method=m; } - boost::shared_ptr<framing::AMQHeaderBody> getHeader() const - { return header; } - - // TODO aconway 2007-02-15: remove friendships. - friend class IncomingMessage; - friend class Channel; -}; - -}} - -#endif /*!_client_ClientMessage_h*/ diff --git a/qpid/cpp-0-9/lib/client/ClientQueue.cpp b/qpid/cpp-0-9/lib/client/ClientQueue.cpp deleted file mode 100644 index 773be504d8..0000000000 --- a/qpid/cpp-0-9/lib/client/ClientQueue.cpp +++ /dev/null @@ -1,58 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <ClientQueue.h> - -qpid::client::Queue::Queue() : name(""), autodelete(true), exclusive(true), durable(false){} - -qpid::client::Queue::Queue(std::string _name) : name(_name), autodelete(false), exclusive(false), durable(false){} - -qpid::client::Queue::Queue(std::string _name, bool temp) : name(_name), autodelete(temp), exclusive(temp), durable(false){} - -qpid::client::Queue::Queue(std::string _name, bool _autodelete, bool _exclusive, bool _durable) - : name(_name), autodelete(_autodelete), exclusive(_exclusive), durable(_durable){} - -const std::string& qpid::client::Queue::getName() const{ - return name; -} - -void qpid::client::Queue::setName(const std::string& _name){ - name = _name; -} - -bool qpid::client::Queue::isAutoDelete() const{ - return autodelete; -} - -bool qpid::client::Queue::isExclusive() const{ - return exclusive; -} - -bool qpid::client::Queue::isDurable() const{ - return durable; -} - -void qpid::client::Queue::setDurable(bool _durable){ - durable = _durable; -} - - - - diff --git a/qpid/cpp-0-9/lib/client/ClientQueue.h b/qpid/cpp-0-9/lib/client/ClientQueue.h deleted file mode 100644 index b37a44b004..0000000000 --- a/qpid/cpp-0-9/lib/client/ClientQueue.h +++ /dev/null @@ -1,103 +0,0 @@ -#ifndef _client_ClientQueue_h -#define _client_ClientQueue_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> - -namespace qpid { -namespace client { - - /** - * A 'handle' used to represent an AMQP queue in the Channel - * methods. Creating an instance of this class does not cause the - * queue to be created on the broker. Rather, an instance of this - * class should be passed to Channel::declareQueue() to ensure - * that the queue exists or is created. - * - * Queues hold messages and allow clients to consume - * (see Channel::consume()) or get (see Channel::get()) those messags. A - * queue receives messages by being bound to one or more Exchange; - * messages published to that exchange may then be routed to the - * queue based on the details of the binding and the type of the - * exchange (see Channel::bind()). - * - * Queues are identified by a name. They can be exclusive (in which - * case they can only be used in the context of the connection - * over which they were declared, and are deleted when then - * connection closes), or they can be shared. Shared queues can be - * auto deleted when they have no consumers. - * - * We use the term 'temporary queue' to refer to an exclusive - * queue. - * - * \ingroup clientapi - */ - class Queue{ - std::string name; - const bool autodelete; - const bool exclusive; - bool durable; - - public: - - /** - * Creates an unnamed, non-durable, temporary queue. A name - * will be assigned to this queue instance by a call to - * Channel::declareQueue(). - */ - Queue(); - /** - * Creates a shared, non-durable, queue with a given name, - * that will not be autodeleted. - * - * @param name the name of the queue - */ - Queue(std::string name); - /** - * Creates a non-durable queue with a given name. - * - * @param name the name of the queue - * - * @param temp if true the queue will be a temporary queue, if - * false it will be shared and not autodeleted. - */ - Queue(std::string name, bool temp); - /** - * This constructor allows the autodelete, exclusive and - * durable propeties to be explictly set. Note however that if - * exclusive is true, autodelete has no meaning as exclusive - * queues are always destroyed when the connection that - * created them is closed. - */ - Queue(std::string name, bool autodelete, bool exclusive, bool durable); - const std::string& getName() const; - void setName(const std::string&); - bool isAutoDelete() const; - bool isExclusive() const; - bool isDurable() const; - void setDurable(bool durable); - }; - -} -} - -#endif /*!_client_ClientQueue_h*/ diff --git a/qpid/cpp-0-9/lib/client/Connection.cpp b/qpid/cpp-0-9/lib/client/Connection.cpp deleted file mode 100644 index 618f3cbc92..0000000000 --- a/qpid/cpp-0-9/lib/client/Connection.cpp +++ /dev/null @@ -1,156 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <algorithm> -#include <boost/format.hpp> -#include <boost/bind.hpp> - -#include <Connection.h> -#include <ClientChannel.h> -#include <ClientMessage.h> -#include <QpidError.h> -#include <iostream> -#include <sstream> -#include <MethodBodyInstances.h> -#include <functional> - -using namespace qpid::framing; -using namespace qpid::sys; - - -namespace qpid { -namespace client { - -const std::string Connection::OK("OK"); - -Connection::Connection( - bool _debug, uint32_t _max_frame_size, - framing::ProtocolVersion _version -) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size), - defaultConnector(version, _debug, _max_frame_size), - isOpen(false), debug(_debug) -{ - setConnector(defaultConnector); -} - -Connection::~Connection(){} - -void Connection::setConnector(Connector& con) -{ - connector = &con; - connector->setInputHandler(this); - connector->setTimeoutHandler(this); - connector->setShutdownHandler(this); - out = connector->getOutputHandler(); -} - -void Connection::open( - const std::string& host, int port, - const std::string& uid, const std::string& pwd, const std::string& vhost) -{ - if (isOpen) - THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open"); - connector->connect(host, port); - channels[0] = &channel0; - channel0.open(0, *this); - channel0.protocolInit(uid, pwd, vhost); - isOpen = true; -} - -void Connection::shutdown() { - close(); -} - -void Connection::close( - ReplyCode code, const string& msg, ClassId classId, MethodId methodId -) -{ - if(isOpen) { - // TODO aconway 2007-01-29: Exception handling - could end up - // partly closed with threads left unjoined. - isOpen = false; - channel0.sendAndReceive<ConnectionCloseOkBody>( - new ConnectionCloseBody( - getVersion(), code, msg, classId, methodId)); - - using boost::bind; - for_each(channels.begin(), channels.end(), - bind(&Channel::closeInternal, - bind(&ChannelMap::value_type::second, _1))); - channels.clear(); - connector->close(); - } -} - -void Connection::openChannel(Channel& channel) { - ChannelId id = ++channelIdCounter; - assert (channels.find(id) == channels.end()); - assert(out); - channels[id] = &channel; - channel.open(id, *this); -} - -void Connection::erase(ChannelId id) { - channels.erase(id); -} - -void Connection::received(AMQFrame* frame){ - // FIXME aconway 2007-01-25: Mutex - ChannelId id = frame->getChannel(); - Channel* channel = channels[id]; - // FIXME aconway 2007-01-26: Exception thrown here is hanging the - // client. Need to review use of exceptions. - if (channel == 0) - THROW_QPID_ERROR( - PROTOCOL_ERROR+504, - (boost::format("Invalid channel number %g") % id).str()); - try{ - channel->handleBody(frame->getBody()); - }catch(const qpid::QpidError& e){ - channelException( - *channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e); - } -} - -void Connection::send(AMQFrame* frame) { - out->send(frame); -} - -void Connection::channelException( - Channel& channel, AMQMethodBody* method, const QpidError& e) -{ - int code = (e.code >= PROTOCOL_ERROR) ? e.code - PROTOCOL_ERROR : 500; - string msg = e.msg; - if(method == 0) - channel.close(code, msg); - else - channel.close( - code, msg, method->amqpClassId(), method->amqpMethodId()); -} - -void Connection::idleIn(){ - connector->close(); -} - -void Connection::idleOut(){ - out->send(new AMQFrame(version, 0, new AMQHeartbeatBody())); -} - -}} // namespace qpid::client diff --git a/qpid/cpp-0-9/lib/client/Connection.h b/qpid/cpp-0-9/lib/client/Connection.h deleted file mode 100644 index b4bd311511..0000000000 --- a/qpid/cpp-0-9/lib/client/Connection.h +++ /dev/null @@ -1,194 +0,0 @@ -#ifndef _client_Connection_ -#define _client_Connection_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <map> -#include <string> -#include <boost/shared_ptr.hpp> - -#include "amqp_types.h" -#include <QpidError.h> -#include <Connector.h> -#include <sys/ShutdownHandler.h> -#include <sys/TimeoutHandler.h> - - -#include "framing/amqp_types.h" -#include <framing/amqp_framing.h> -#include <ClientExchange.h> -#include <IncomingMessage.h> -#include <ClientMessage.h> -#include <MessageListener.h> -#include <ClientQueue.h> -#include <ResponseHandler.h> -#include <AMQP_HighestVersion.h> -#include "ClientChannel.h" - -namespace qpid { - -/** - * The client namespace contains all classes that make up a client - * implementation of the AMQP protocol. The key classes that form - * the basis of the client API to be used by applications are - * Connection and Channel. - */ -namespace client { - -class Channel; - -/** - * \internal provide access to selected private channel functions - * for the Connection without making it a friend of the entire channel. - */ -class ConnectionForChannel : - public framing::InputHandler, - public framing::OutputHandler, - public sys::TimeoutHandler, - public sys::ShutdownHandler - -{ - private: - friend class Channel; - virtual void erase(framing::ChannelId) = 0; -}; - - -/** - * \defgroup clientapi Application API for an AMQP client - */ - -/** - * Represents a connection to an AMQP broker. All communication is - * initiated by establishing a connection, then opening one or - * more Channels over that connection. - * - * \ingroup clientapi - */ -class Connection : public ConnectionForChannel -{ - typedef std::map<framing::ChannelId, Channel*> ChannelMap; - - framing::ChannelId channelIdCounter; - static const std::string OK; - - framing::ProtocolVersion version; - const uint32_t max_frame_size; - ChannelMap channels; - Connector defaultConnector; - Connector* connector; - framing::OutputHandler* out; - volatile bool isOpen; - Channel channel0; - bool debug; - - void erase(framing::ChannelId); - void channelException( - Channel&, framing::AMQMethodBody*, const QpidError&); - - // TODO aconway 2007-01-26: too many friendships, untagle these classes. - friend class Channel; - - public: - /** - * Creates a connection object, but does not open the - * connection. - * - * @param _version the version of the protocol to connect with - * - * @param debug turns on tracing for the connection - * (i.e. prints details of the frames sent and received to std - * out). Optional and defaults to false. - * - * @param max_frame_size the maximum frame size that the - * client will accept. Optional and defaults to 65536. - */ - Connection(bool debug = false, uint32_t max_frame_size = 65536, - framing::ProtocolVersion=framing::highestProtocolVersion); - ~Connection(); - - /** - * Opens a connection to a broker. - * - * @param host the host on which the broker is running - * - * @param port the port on the which the broker is listening - * - * @param uid the userid to connect with - * - * @param pwd the password to connect with (currently SASL - * PLAIN is the only authentication method supported so this - * is sent in clear text) - * - * @param virtualhost the AMQP virtual host to use (virtual - * hosts, where implemented(!), provide namespace partitioning - * within a single broker). - */ - void open(const std::string& host, int port = 5672, - const std::string& uid = "guest", - const std::string& pwd = "guest", - const std::string& virtualhost = "/"); - - /** - * Close the connection with optional error information for the peer. - * - * Any further use of this connection (without reopening it) will - * not succeed. - */ - void close(framing::ReplyCode=200, const std::string& msg=OK, - framing::ClassId = 0, framing::MethodId = 0); - - /** - * Associate a Channel with this connection and open it for use. - * - * In AMQP channels are like multi-plexed 'sessions' of work over - * a connection. Almost all the interaction with AMQP is done over - * a channel. - * - * @param connection the connection object to be associated with - * the channel. Call Channel::close() to close the channel. - */ - void openChannel(Channel&); - - - // TODO aconway 2007-01-26: can these be private? - void send(framing::AMQFrame*); - void received(framing::AMQFrame*); - void idleOut(); - void idleIn(); - void shutdown(); - - /**\internal used for testing */ - void setConnector(Connector& connector); - - /** - * @return the maximum frame size in use on this connection - */ - inline uint32_t getMaxFrameSize(){ return max_frame_size; } - - /** @return protocol version in use on this connection. */ - framing::ProtocolVersion getVersion() const { return version; } -}; - -}} // namespace qpid::client - - -#endif diff --git a/qpid/cpp-0-9/lib/client/Connector.cpp b/qpid/cpp-0-9/lib/client/Connector.cpp deleted file mode 100644 index 7b73cc016a..0000000000 --- a/qpid/cpp-0-9/lib/client/Connector.cpp +++ /dev/null @@ -1,188 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <iostream> -#include <QpidError.h> -#include <sys/Time.h> -#include "Connector.h" - -namespace qpid { -namespace client { - -using namespace qpid::sys; -using namespace qpid::framing; -using qpid::QpidError; - -Connector::Connector( - ProtocolVersion ver, bool _debug, uint32_t buffer_size -) : debug(_debug), - receive_buffer_size(buffer_size), - send_buffer_size(buffer_size), - version(ver), - closed(true), - lastIn(0), lastOut(0), - timeout(0), - idleIn(0), idleOut(0), - timeoutHandler(0), - shutdownHandler(0), - inbuf(receive_buffer_size), - outbuf(send_buffer_size) -{ } - -Connector::~Connector(){ } - -void Connector::connect(const std::string& host, int port){ - socket = Socket::createTcp(); - socket.connect(host, port); - closed = false; - receiver = Thread(this); -} - -void Connector::init(){ - ProtocolInitiation init(version); - writeBlock(&init); -} - -void Connector::close(){ - closed = true; - socket.close(); - receiver.join(); -} - -void Connector::setInputHandler(InputHandler* handler){ - input = handler; -} - -void Connector::setShutdownHandler(ShutdownHandler* handler){ - shutdownHandler = handler; -} - -OutputHandler* Connector::getOutputHandler(){ - return this; -} - -void Connector::send(AMQFrame* f){ - std::auto_ptr<AMQFrame> frame(f); - AMQBody::shared_ptr body = frame->getBody(); - writeBlock(frame.get()); - if(debug) std::cout << "SENT: " << *frame << std::endl; -} - -void Connector::writeBlock(AMQDataBlock* data){ - Mutex::ScopedLock l(writeLock); - data->encode(outbuf); - //transfer data to wire - outbuf.flip(); - writeToSocket(outbuf.start(), outbuf.available()); - outbuf.clear(); -} - -void Connector::writeToSocket(char* data, size_t available){ - size_t written = 0; - while(written < available && !closed){ - ssize_t sent = socket.send(data + written, available-written); - if(sent > 0) { - lastOut = now() * TIME_MSEC; - written += sent; - } - } -} - -void Connector::handleClosed(){ - closed = true; - socket.close(); - if(shutdownHandler) shutdownHandler->shutdown(); -} - -void Connector::checkIdle(ssize_t status){ - if(timeoutHandler){ - Time t = now() * TIME_MSEC; - if(status == Socket::SOCKET_TIMEOUT) { - if(idleIn && (t - lastIn > idleIn)){ - timeoutHandler->idleIn(); - } - } - else if(status == 0 || status == Socket::SOCKET_EOF) { - handleClosed(); - } - else { - lastIn = t; - } - if(idleOut && (t - lastOut > idleOut)){ - timeoutHandler->idleOut(); - } - } -} - -void Connector::setReadTimeout(uint16_t t){ - idleIn = t * 1000;//t is in secs - if(idleIn && (!timeout || idleIn < timeout)){ - timeout = idleIn; - setSocketTimeout(); - } - -} - -void Connector::setWriteTimeout(uint16_t t){ - idleOut = t * 1000;//t is in secs - if(idleOut && (!timeout || idleOut < timeout)){ - timeout = idleOut; - setSocketTimeout(); - } -} - -void Connector::setSocketTimeout(){ - socket.setTimeout(timeout*TIME_MSEC); -} - -void Connector::setTimeoutHandler(TimeoutHandler* handler){ - timeoutHandler = handler; -} - -void Connector::run(){ - try{ - while(!closed){ - ssize_t available = inbuf.available(); - if(available < 1){ - THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); - } - ssize_t received = socket.recv(inbuf.start(), available); - checkIdle(received); - - if(!closed && received > 0){ - inbuf.move(received); - inbuf.flip();//position = 0, limit = total data read - - AMQFrame frame(version); - while(frame.decode(inbuf)){ - if(debug) std::cout << "RECV: " << frame << std::endl; - input->received(&frame); - } - //need to compact buffer to preserve any 'extra' data - inbuf.compact(); - } - } - } catch (const std::exception& e) { - std::cout << e.what() << std::endl; - handleClosed(); - } -} - -}} // namespace qpid::client diff --git a/qpid/cpp-0-9/lib/client/Connector.h b/qpid/cpp-0-9/lib/client/Connector.h deleted file mode 100644 index 91234d2321..0000000000 --- a/qpid/cpp-0-9/lib/client/Connector.h +++ /dev/null @@ -1,98 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _Connector_ -#define _Connector_ - - -#include <framing/InputHandler.h> -#include <framing/OutputHandler.h> -#include <framing/InitiationHandler.h> -#include <framing/ProtocolInitiation.h> -#include <ProtocolVersion.h> -#include <sys/ShutdownHandler.h> -#include <sys/TimeoutHandler.h> -#include <sys/Thread.h> -#include <sys/Monitor.h> -#include <sys/Socket.h> - -namespace qpid { - -namespace client { - -class Connector : public framing::OutputHandler, - private sys::Runnable -{ - const bool debug; - const int receive_buffer_size; - const int send_buffer_size; - framing::ProtocolVersion version; - - bool closed; - - int64_t lastIn; - int64_t lastOut; - int64_t timeout; - uint32_t idleIn; - uint32_t idleOut; - - sys::TimeoutHandler* timeoutHandler; - sys::ShutdownHandler* shutdownHandler; - framing::InputHandler* input; - framing::InitiationHandler* initialiser; - framing::OutputHandler* output; - - framing::Buffer inbuf; - framing::Buffer outbuf; - - sys::Mutex writeLock; - sys::Thread receiver; - - sys::Socket socket; - - void checkIdle(ssize_t status); - void writeBlock(framing::AMQDataBlock* data); - void writeToSocket(char* data, size_t available); - void setSocketTimeout(); - - void run(); - void handleClosed(); - - friend class Channel; - public: - Connector(framing::ProtocolVersion pVersion, - bool debug = false, uint32_t buffer_size = 1024); - virtual ~Connector(); - virtual void connect(const std::string& host, int port); - virtual void init(); - virtual void close(); - virtual void setInputHandler(framing::InputHandler* handler); - virtual void setTimeoutHandler(sys::TimeoutHandler* handler); - virtual void setShutdownHandler(sys::ShutdownHandler* handler); - virtual framing::OutputHandler* getOutputHandler(); - virtual void send(framing::AMQFrame* frame); - virtual void setReadTimeout(uint16_t timeout); - virtual void setWriteTimeout(uint16_t timeout); -}; - -}} - - -#endif diff --git a/qpid/cpp-0-9/lib/client/IncomingMessage.cpp b/qpid/cpp-0-9/lib/client/IncomingMessage.cpp deleted file mode 100644 index 07f94ceb64..0000000000 --- a/qpid/cpp-0-9/lib/client/IncomingMessage.cpp +++ /dev/null @@ -1,172 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <IncomingMessage.h> -#include "framing/AMQHeaderBody.h" -#include "framing/AMQContentBody.h" -#include "BasicGetOkBody.h" -#include "BasicReturnBody.h" -#include "BasicDeliverBody.h" -#include <QpidError.h> -#include <iostream> - -namespace qpid { -namespace client { - -using namespace sys; -using namespace framing; - -struct IncomingMessage::Guard: public Mutex::ScopedLock { - Guard(IncomingMessage* im) : Mutex::ScopedLock(im->lock) { - im->shutdownError.throwIf(); - } -}; - -IncomingMessage::IncomingMessage() { reset(); } - -void IncomingMessage::reset() { - state = &IncomingMessage::expectRequest; - endFn= &IncomingMessage::endRequest; - buildMessage = Message(); -} - -void IncomingMessage::startGet() { - Guard g(this); - if (state != &IncomingMessage::expectRequest) { - endGet(new QPID_ERROR(CLIENT_ERROR, "Message already in progress.")); - } - else { - state = &IncomingMessage::expectGetOk; - endFn = &IncomingMessage::endGet; - getError.reset(); - getState = GETTING; - } -} - -bool IncomingMessage::waitGet(Message& msg) { - Guard g(this); - while (getState == GETTING && !shutdownError && !getError) - getReady.wait(lock); - shutdownError.throwIf(); - getError.throwIf(); - msg = getMessage; - return getState==GOT; -} - -Message IncomingMessage::waitDispatch() { - Guard g(this); - while(dispatchQueue.empty() && !shutdownError) - dispatchReady.wait(lock); - shutdownError.throwIf(); - - Message msg(dispatchQueue.front()); - dispatchQueue.pop(); - return msg; -} - -void IncomingMessage::add(BodyPtr body) { - Guard g(this); - shutdownError.throwIf(); - // Call the current state function. - (this->*state)(body); -} - -void IncomingMessage::shutdown() { - Mutex::ScopedLock l(lock); - shutdownError.reset(new ShutdownException()); - getReady.notify(); - dispatchReady.notify(); -} - -bool IncomingMessage::isShutdown() const { - Mutex::ScopedLock l(lock); - return shutdownError; -} - -// Common check for all the expect functions. Called in network thread. -template<class T> -boost::shared_ptr<T> IncomingMessage::expectCheck(BodyPtr body) { - boost::shared_ptr<T> ptr = boost::dynamic_pointer_cast<T>(body); - if (!ptr) - throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type"); - return ptr; -} - -void IncomingMessage::expectGetOk(BodyPtr body) { - if (dynamic_cast<BasicGetOkBody*>(body.get())) - state = &IncomingMessage::expectHeader; - else if (dynamic_cast<BasicGetEmptyBody*>(body.get())) { - getState = EMPTY; - endGet(); - } - else - throw QPID_ERROR(PROTOCOL_ERROR+504, "Unexpected frame type"); -} - -void IncomingMessage::expectHeader(BodyPtr body) { - AMQHeaderBody::shared_ptr header = expectCheck<AMQHeaderBody>(body); - buildMessage.header = header; - state = &IncomingMessage::expectContent; - checkComplete(); -} - -void IncomingMessage::expectContent(BodyPtr body) { - AMQContentBody::shared_ptr content = expectCheck<AMQContentBody>(body); - buildMessage.setData(buildMessage.getData() + content->getData()); - checkComplete(); -} - -void IncomingMessage::checkComplete() { - size_t declaredSize = buildMessage.header->getContentSize(); - size_t currentSize = buildMessage.getData().size(); - if (declaredSize == currentSize) - (this->*endFn)(0); - else if (declaredSize < currentSize) - (this->*endFn)(new QPID_ERROR( - PROTOCOL_ERROR, "Message content exceeds declared size.")); -} - -void IncomingMessage::expectRequest(BodyPtr body) { - AMQMethodBody::shared_ptr method = expectCheck<AMQMethodBody>(body); - buildMessage.setMethod(method); - state = &IncomingMessage::expectHeader; -} - -void IncomingMessage::endGet(Exception* ex) { - getError.reset(ex); - if (getState == GETTING) { - getMessage = buildMessage; - getState = GOT; - } - reset(); - getReady.notify(); -} - -void IncomingMessage::endRequest(Exception* ex) { - ExceptionHolder eh(ex); - if (!eh) { - dispatchQueue.push(buildMessage); - reset(); - dispatchReady.notify(); - } - eh.throwIf(); -} - -}} // namespace qpid::client diff --git a/qpid/cpp-0-9/lib/client/IncomingMessage.h b/qpid/cpp-0-9/lib/client/IncomingMessage.h deleted file mode 100644 index 2d7c8723c5..0000000000 --- a/qpid/cpp-0-9/lib/client/IncomingMessage.h +++ /dev/null @@ -1,118 +0,0 @@ -#ifndef _IncomingMessage_ -#define _IncomingMessage_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> -#include <queue> -#include <framing/amqp_framing.h> -#include "ExceptionHolder.h" -#include "ClientMessage.h" -#include "sys/Mutex.h" -#include "sys/Condition.h" - -namespace qpid { - -namespace framing { -class AMQBody; -} - -namespace client { -/** - * Accumulates incoming message frames into messages. - * Client-initiated messages (basic.get) are initiated and made - * available to the user thread one at a time. - * - * Broker initiated messages (basic.return, basic.deliver) are - * queued for handling by the user dispatch thread. - */ -class IncomingMessage { - public: - typedef boost::shared_ptr<framing::AMQBody> BodyPtr; - IncomingMessage(); - - /** Expect a new message starting with getOk. Called in user thread.*/ - void startGet(); - - /** Wait for the message to complete, return the message. - * Called in user thread. - *@raises QpidError if there was an error. - */ - bool waitGet(Message&); - - /** Wait for the next broker-initiated message. */ - Message waitDispatch(); - - /** Add a frame body to the message. Called in network thread. */ - void add(BodyPtr); - - /** Shut down: all further calls to any function throw ex. */ - void shutdown(); - - /** Check if shutdown */ - bool isShutdown() const; - - private: - - typedef void (IncomingMessage::* ExpectFn)(BodyPtr); - typedef void (IncomingMessage::* EndFn)(Exception*); - typedef std::queue<Message> MessageQueue; - struct Guard; - friend struct Guard; - - void reset(); - template <class T> boost::shared_ptr<T> expectCheck(BodyPtr); - - // State functions - a state machine where each state is - // a member function that processes a frame body. - void expectGetOk(BodyPtr); - void expectHeader(BodyPtr); - void expectContent(BodyPtr); - void expectRequest(BodyPtr); - - // End functions. - void endGet(Exception* ex = 0); - void endRequest(Exception* ex); - - // Check for complete message. - void checkComplete(); - - mutable sys::Mutex lock; - ExpectFn state; - EndFn endFn; - Message buildMessage; - ExceptionHolder shutdownError; - - // For basic.get messages. - sys::Condition getReady; - ExceptionHolder getError; - Message getMessage; - enum { GETTING, GOT, EMPTY } getState; - - // For broker-initiated messages - sys::Condition dispatchReady; - MessageQueue dispatchQueue; -}; - -}} - - -#endif diff --git a/qpid/cpp-0-9/lib/client/Makefile.am b/qpid/cpp-0-9/lib/client/Makefile.am deleted file mode 100644 index 1e10a2a244..0000000000 --- a/qpid/cpp-0-9/lib/client/Makefile.am +++ /dev/null @@ -1,36 +0,0 @@ -AM_CXXFLAGS = $(WARNING_CFLAGS) -INCLUDES = \ - -I$(top_srcdir)/gen \ - -I$(top_srcdir)/lib/common \ - -I$(top_srcdir)/lib/common/sys \ - -I$(top_srcdir)/lib/common/framing \ - $(APR_CXXFLAGS) - -lib_LTLIBRARIES = libqpidclient.la -libqpidclient_la_LIBADD = ../common/libqpidcommon.la -libqpidclient_la_LDFLAGS = -version-info $(LIBTOOL_VERSION_INFO_ARG) -libqpidclient_la_SOURCES = \ - ClientChannel.cpp \ - ClientExchange.cpp \ - ClientMessage.cpp \ - ClientQueue.cpp \ - Basic.cpp \ - Connection.cpp \ - Connector.cpp \ - IncomingMessage.cpp \ - MessageListener.cpp \ - ResponseHandler.cpp \ - ReturnedMessageHandler.cpp -pkginclude_HEADERS = \ - ClientChannel.h \ - ClientExchange.h \ - ClientMessage.h \ - ClientQueue.h \ - Basic.h \ - Connection.h \ - Connector.h \ - IncomingMessage.h \ - MessageListener.h \ - MethodBodyInstances.h \ - ResponseHandler.h \ - ReturnedMessageHandler.h diff --git a/qpid/cpp-0-9/lib/client/MessageListener.cpp b/qpid/cpp-0-9/lib/client/MessageListener.cpp deleted file mode 100644 index 70d44e7040..0000000000 --- a/qpid/cpp-0-9/lib/client/MessageListener.cpp +++ /dev/null @@ -1,24 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <MessageListener.h> - -qpid::client::MessageListener::~MessageListener() {} diff --git a/qpid/cpp-0-9/lib/client/MessageListener.h b/qpid/cpp-0-9/lib/client/MessageListener.h deleted file mode 100644 index cfb917b4f8..0000000000 --- a/qpid/cpp-0-9/lib/client/MessageListener.h +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> - -#ifndef _MessageListener_ -#define _MessageListener_ - -#include <ClientMessage.h> - -namespace qpid { -namespace client { - - /** - * An interface through which asynchronously delivered messages - * can be received by an application. - * - * @see Channel::consume() - * - * \ingroup clientapi - */ - class MessageListener{ - public: - virtual ~MessageListener(); - virtual void received(Message& msg) = 0; - }; - -} -} - - -#endif diff --git a/qpid/cpp-0-9/lib/client/MethodBodyInstances.h b/qpid/cpp-0-9/lib/client/MethodBodyInstances.h deleted file mode 100644 index acbeeb1158..0000000000 --- a/qpid/cpp-0-9/lib/client/MethodBodyInstances.h +++ /dev/null @@ -1,100 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <framing/amqp_framing.h> - -#ifndef _MethodBodyInstances_h_ -#define _MethodBodyInstances_h_ - -namespace qpid { -namespace client { - -/** - * A list of method body instances that can be used to compare against - * incoming bodies. - */ -class MethodBodyInstances -{ -private: - qpid::framing::ProtocolVersion version; -public: - const qpid::framing::BasicCancelOkBody basic_cancel_ok; - const qpid::framing::BasicConsumeOkBody basic_consume_ok; - const qpid::framing::BasicDeliverBody basic_deliver; - const qpid::framing::BasicGetEmptyBody basic_get_empty; - const qpid::framing::BasicGetOkBody basic_get_ok; - const qpid::framing::BasicQosOkBody basic_qos_ok; - const qpid::framing::BasicReturnBody basic_return; - const qpid::framing::ChannelCloseBody channel_close; - const qpid::framing::ChannelCloseOkBody channel_close_ok; - const qpid::framing::ChannelFlowBody channel_flow; - const qpid::framing::ChannelOpenOkBody channel_open_ok; - const qpid::framing::ConnectionCloseBody connection_close; - const qpid::framing::ConnectionCloseOkBody connection_close_ok; - const qpid::framing::ConnectionOpenOkBody connection_open_ok; - const qpid::framing::ConnectionRedirectBody connection_redirect; - const qpid::framing::ConnectionStartBody connection_start; - const qpid::framing::ConnectionTuneBody connection_tune; - const qpid::framing::ExchangeDeclareOkBody exchange_declare_ok; - const qpid::framing::ExchangeDeleteOkBody exchange_delete_ok; - const qpid::framing::QueueDeclareOkBody queue_declare_ok; - const qpid::framing::QueueDeleteOkBody queue_delete_ok; - const qpid::framing::QueueBindOkBody queue_bind_ok; - const qpid::framing::TxCommitOkBody tx_commit_ok; - const qpid::framing::TxRollbackOkBody tx_rollback_ok; - const qpid::framing::TxSelectOkBody tx_select_ok; - - MethodBodyInstances(uint8_t major, uint8_t minor) : - version(major, minor), - basic_cancel_ok(version), - basic_consume_ok(version), - basic_deliver(version), - basic_get_empty(version), - basic_get_ok(version), - basic_qos_ok(version), - basic_return(version), - channel_close(version), - channel_close_ok(version), - channel_flow(version), - channel_open_ok(version), - connection_close(version), - connection_close_ok(version), - connection_open_ok(version), - connection_redirect(version), - connection_start(version), - connection_tune(version), - exchange_declare_ok(version), - exchange_delete_ok(version), - queue_declare_ok(version), - queue_delete_ok(version), - queue_bind_ok(version), - tx_commit_ok(version), - tx_rollback_ok(version), - tx_select_ok(version) - {} - -}; - -static MethodBodyInstances method_bodies(8, 0); - -} // namespace client -} // namespace qpid - -#endif diff --git a/qpid/cpp-0-9/lib/client/ResponseHandler.cpp b/qpid/cpp-0-9/lib/client/ResponseHandler.cpp deleted file mode 100644 index 4498de41ae..0000000000 --- a/qpid/cpp-0-9/lib/client/ResponseHandler.cpp +++ /dev/null @@ -1,86 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <boost/format.hpp> - -#include <ResponseHandler.h> -#include <sys/Monitor.h> -#include <QpidError.h> -#include "amqp_types.h" - -using namespace qpid::sys; -using namespace qpid::framing; - -namespace qpid { -namespace client { - -ResponseHandler::ResponseHandler() : waiting(false){} - -ResponseHandler::~ResponseHandler(){} - -bool ResponseHandler::validate(ClassId c, MethodId m) { - return response != 0 && - response->amqpClassId() ==c && response->amqpMethodId() == m; -} - -void ResponseHandler::waitForResponse(){ - Monitor::ScopedLock l(monitor); - while (waiting) - monitor.wait(); -} - -void ResponseHandler::signalResponse( - qpid::framing::AMQMethodBody::shared_ptr _response) -{ - Monitor::ScopedLock l(monitor); - response = _response; - waiting = false; - monitor.notify(); -} - -void ResponseHandler::receive(ClassId c, MethodId m) { - Monitor::ScopedLock l(monitor); - while (waiting) - monitor.wait(); - getResponse(); // Check for closed. - if(!validate(response->amqpClassId(), response->amqpMethodId())) { - THROW_QPID_ERROR( - PROTOCOL_ERROR, - boost::format("Expected class:method %d:%d, got %d:%d") - % c % m % response->amqpClassId() % response->amqpMethodId()); - } -} - -framing::AMQMethodBody::shared_ptr ResponseHandler::getResponse() { - if (!response) - THROW_QPID_ERROR( - PROTOCOL_ERROR, "Channel closed unexpectedly."); - return response; -} - -RequestId ResponseHandler::getRequestId() { - assert(response->getRequestId()); - return response->getRequestId(); -} -void ResponseHandler::expect(){ - waiting = true; -} - -}} // namespace qpid::client diff --git a/qpid/cpp-0-9/lib/client/ResponseHandler.h b/qpid/cpp-0-9/lib/client/ResponseHandler.h deleted file mode 100644 index d28048c3d3..0000000000 --- a/qpid/cpp-0-9/lib/client/ResponseHandler.h +++ /dev/null @@ -1,68 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> - -#include <framing/amqp_framing.h> // FIXME aconway 2007-02-01: #include cleanup. -#include <sys/Monitor.h> - -#ifndef _ResponseHandler_ -#define _ResponseHandler_ - -namespace qpid { -namespace client { - -/** - * Holds a response from the broker peer for the client. - */ -class ResponseHandler{ - bool waiting; - qpid::framing::AMQMethodBody::shared_ptr response; - qpid::sys::Monitor monitor; - - public: - ResponseHandler(); - ~ResponseHandler(); - - bool isWaiting(){ return waiting; } - framing::AMQMethodBody::shared_ptr getResponse(); - void waitForResponse(); - - void signalResponse(framing::AMQMethodBody::shared_ptr response); - - void expect();//must be called before calling receive - bool validate(framing::ClassId, framing::MethodId); - void receive(framing::ClassId, framing::MethodId); - - framing::RequestId getRequestId(); - - template <class BodyType> bool validate() { - return validate(BodyType::CLASS_ID, BodyType::METHOD_ID); - } - template <class BodyType> void receive() { - receive(BodyType::CLASS_ID, BodyType::METHOD_ID); - } -}; - -} -} - - -#endif diff --git a/qpid/cpp-0-9/lib/client/ReturnedMessageHandler.cpp b/qpid/cpp-0-9/lib/client/ReturnedMessageHandler.cpp deleted file mode 100644 index ee9f7462ef..0000000000 --- a/qpid/cpp-0-9/lib/client/ReturnedMessageHandler.cpp +++ /dev/null @@ -1,24 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <ReturnedMessageHandler.h> - -qpid::client::ReturnedMessageHandler::~ReturnedMessageHandler() {} diff --git a/qpid/cpp-0-9/lib/client/ReturnedMessageHandler.h b/qpid/cpp-0-9/lib/client/ReturnedMessageHandler.h deleted file mode 100644 index 137f0b2e17..0000000000 --- a/qpid/cpp-0-9/lib/client/ReturnedMessageHandler.h +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> - -#ifndef _ReturnedMessageHandler_ -#define _ReturnedMessageHandler_ - -#include <ClientMessage.h> - -namespace qpid { -namespace client { - - /** - * An interface through which returned messages can be received by - * an application. - * - * @see Channel::setReturnedMessageHandler() - * - * \ingroup clientapi - */ - class ReturnedMessageHandler{ - public: - virtual ~ReturnedMessageHandler(); - virtual void returned(Message& msg) = 0; - }; - -} -} - - -#endif |
