summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ClientChannel.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-09-25 18:16:02 +0000
committerAlan Conway <aconway@apache.org>2007-09-25 18:16:02 +0000
commit00b761b3b6d80ee2bb3e538face881748efb2b09 (patch)
tree59c1b38093bb0cd993863f8c72cd8d22a3aa7bb9 /cpp/src/qpid/client/ClientChannel.cpp
parentbbdaa6ec54ad9d04baa5ae1cb4d99c0387aa7d9d (diff)
downloadqpid-python-00b761b3b6d80ee2bb3e538face881748efb2b09.tar.gz
Renamed the following files for consistency:
broker/BrokerExchange.cpp -> Exchange.cpp broker/BrokerExchange.h -> Exchange.h broker/BrokerQueue.cpp -> Queue.cpp broker/BrokerQueue.h -> Queue.h client/ClientChannel.cpp -> Channel.cpp client/ClientChannel.h -> Channel.h client/ClientConnection.cpp -> Connection.cpp client/ClientExchange.cpp -> Exchange.cpp client/ClientExchange.h -> Exchange.h client/ClientMessage.h -> Message.h client/ClientQueue.cpp -> Queue.cpp client/ClientQueue.h -> Queue.h git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@579340 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/ClientChannel.cpp')
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp271
1 files changed, 0 insertions, 271 deletions
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
deleted file mode 100644
index f5362bf688..0000000000
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/log/Statement.h"
-#include <iostream>
-#include <sstream>
-#include "ClientChannel.h"
-#include "qpid/sys/Monitor.h"
-#include "ClientMessage.h"
-#include "qpid/QpidError.h"
-#include "Connection.h"
-#include "Demux.h"
-#include "FutureResponse.h"
-#include "MessageListener.h"
-#include "MessageQueue.h"
-#include <boost/format.hpp>
-#include <boost/bind.hpp>
-#include "qpid/framing/all_method_bodies.h"
-
-// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent
-// handling of errors that should close the connection or the channel.
-// Make sure the user thread receives a connection in each case.
-//
-using namespace std;
-using namespace boost;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-namespace qpid{
-namespace client{
-
-const std::string empty;
-
-class ScopedSync
-{
- Session& session;
- public:
- ScopedSync(Session& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); }
- ~ScopedSync() { session.setSynchronous(false); }
-};
-
-Channel::Channel(bool _transactional, u_int16_t _prefetch) :
- prefetch(_prefetch), transactional(_transactional), running(false),
- uniqueId(true)/*could eventually be the session id*/, nameCounter(0), active(false)
-{
-}
-
-Channel::~Channel()
-{
- join();
-}
-
-void Channel::open(const Session& s)
-{
- Mutex::ScopedLock l(stopLock);
- if (isOpen())
- THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel");
- active = true;
- session = s;
- if(isTransactional()) {
- session.txSelect();
- }
-}
-
-bool Channel::isOpen() const {
- Mutex::ScopedLock l(stopLock);
- return active;
-}
-
-void Channel::setPrefetch(uint32_t _prefetch){
- prefetch = _prefetch;
-}
-
-void Channel::declareExchange(Exchange& _exchange, bool synch){
- ScopedSync s(session, synch);
- session.exchangeDeclare_(exchange=_exchange.getName(), type=_exchange.getType());
-}
-
-void Channel::deleteExchange(Exchange& _exchange, bool synch){
- ScopedSync s(session, synch);
- session.exchangeDelete_(exchange=_exchange.getName(), ifUnused=false);
-}
-
-void Channel::declareQueue(Queue& _queue, bool synch){
- if (_queue.getName().empty()) {
- stringstream uniqueName;
- uniqueName << uniqueId << "-queue-" << ++nameCounter;
- _queue.setName(uniqueName.str());
- }
-
- ScopedSync s(session, synch);
- session.queueDeclare_(queue=_queue.getName(), passive=false/*passive*/, durable=_queue.isDurable(),
- exclusive=_queue.isExclusive(), autoDelete=_queue.isAutoDelete());
-
-}
-
-void Channel::deleteQueue(Queue& _queue, bool ifunused, bool ifempty, bool synch){
- ScopedSync s(session, synch);
- session.queueDelete_(queue=_queue.getName(), ifUnused=ifunused, ifEmpty=ifempty);
-}
-
-void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
- string e = exchange.getName();
- string q = queue.getName();
- ScopedSync s(session, synch);
- session.queueBind(0, q, e, key, args);
-}
-
-void Channel::commit(){
- session.txCommit();
-}
-
-void Channel::rollback(){
- session.txRollback();
-}
-
-void Channel::consume(
- Queue& _queue, const std::string& tag, MessageListener* listener,
- AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) {
-
- if (tag.empty()) {
- throw Exception("A tag must be specified for a consumer.");
- }
- {
- Mutex::ScopedLock l(lock);
- ConsumerMap::iterator i = consumers.find(tag);
- if (i != consumers.end())
- throw Exception(boost::format("Consumer already exists with tag: '%1%'") % tag);
- Consumer& c = consumers[tag];
- c.listener = listener;
- c.ackMode = ackMode;
- c.count = 0;
- }
- uint8_t confirmMode = ackMode == NO_ACK ? 0 : 1;
- ScopedSync s(session, synch);
- session.messageSubscribe(0, _queue.getName(), tag, noLocal,
- confirmMode, 0/*pre-acquire*/,
- false, fields ? *fields : FieldTable());
- if (!prefetch) {
- session.messageFlowMode(tag, 0/*credit based*/);
- }
-
- //allocate some credit:
- session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF);
- session.messageFlow(tag, 0/*MESSAGES*/, prefetch ? prefetch : 0xFFFFFFFF);
-}
-
-void Channel::cancel(const std::string& tag, bool synch) {
- Consumer c;
- {
- Mutex::ScopedLock l(lock);
- ConsumerMap::iterator i = consumers.find(tag);
- if (i == consumers.end())
- return;
- c = i->second;
- consumers.erase(i);
- }
- ScopedSync s(session, synch);
- session.messageCancel(tag);
-}
-
-bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) {
- string tag = "get-handler";
- ScopedDivert handler(tag, session.execution().getDemux());
- Demux::Queue& incoming = handler.getQueue();
-
- session.messageSubscribe_(destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1));
- session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF);
- session.messageFlow(tag, 0/*MESSAGES*/, 1);
- Completion status = session.messageFlush(tag);
- status.sync();
- session.messageCancel(tag);
-
- if (incoming.empty()) {
- return false;
- } else {
- msg.populate(*(incoming.pop()));
- if (ackMode == AUTO_ACK) msg.acknowledge(session, false, true);
- return true;
- }
-}
-
-void Channel::publish(Message& msg, const Exchange& exchange,
- const std::string& routingKey,
- bool mandatory, bool /*?TODO-restore immediate?*/) {
-
- msg.getDeliveryProperties().setRoutingKey(routingKey);
- msg.getDeliveryProperties().setDiscardUnroutable(!mandatory);
- session.messageTransfer_(destination=exchange.getName(), content=msg);
-}
-
-void Channel::close()
-{
- session.close();
- {
- Mutex::ScopedLock l(stopLock);
- active = false;
- }
- stop();
-}
-
-void Channel::start(){
- running = true;
- dispatcher = Thread(*this);
-}
-
-void Channel::stop() {
- gets.close();
- join();
-}
-
-void Channel::join() {
- Mutex::ScopedLock l(stopLock);
- if(running && dispatcher.id()) {
- dispatcher.join();
- running = false;
- }
-}
-
-void Channel::dispatch(FrameSet& content, const std::string& destination)
-{
- ConsumerMap::iterator i = consumers.find(destination);
- if (i != consumers.end()) {
- Message msg;
- msg.populate(content);
- MessageListener* listener = i->second.listener;
- listener->received(msg);
- if (isOpen() && i->second.ackMode != CLIENT_ACK) {
- bool send = i->second.ackMode == AUTO_ACK
- || (prefetch && ++(i->second.count) > (prefetch / 2));
- if (send) i->second.count = 0;
- session.execution().completed(content.getId(), true, send);
- }
- } else {
- QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination);
- }
-}
-
-void Channel::run() {
- try {
- while (true) {
- FrameSet::shared_ptr content = session.get();
- //need to dispatch this to the relevant listener:
- if (content->isA<MessageTransferBody>()) {
- dispatch(*content, content->as<MessageTransferBody>()->getDestination());
- } else {
- QPID_LOG(warning, "Dropping unsupported message type: " << content->getMethod());
- }
- }
- } catch (const QueueClosed&) {}
-}
-
-}}
-