summaryrefslogtreecommitdiff
path: root/qpid/cpp/lib/broker
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2007-04-02 11:40:48 +0000
committerAndrew Stitcher <astitcher@apache.org>2007-04-02 11:40:48 +0000
commit9ecd69ebc88fb5d82a693e51eef0475c1a6b282e (patch)
tree841ab9ff2ebf92ad57bc9189eefc7448260577d1 /qpid/cpp/lib/broker
parent4ee7e8cbd677bd2ddf3f49d535a547e99c0aa150 (diff)
downloadqpid-python-9ecd69ebc88fb5d82a693e51eef0475c1a6b282e.tar.gz
Fix for the most disruptive items in QPID-243.
* All #include lines now use '""' rather than '<>' where appropriate. * #include lines within the qpid project use relative includes so that the same path will work in /usr/include when installed as part of the client libraries. * All the source code has now been rearranged to be under src in a directory analogous to the namespace of the classes in it. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@524769 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/lib/broker')
-rw-r--r--qpid/cpp/lib/broker/AccumulatedAck.cpp57
-rw-r--r--qpid/cpp/lib/broker/AccumulatedAck.h57
-rw-r--r--qpid/cpp/lib/broker/AutoDelete.cpp86
-rw-r--r--qpid/cpp/lib/broker/AutoDelete.h55
-rw-r--r--qpid/cpp/lib/broker/Broker.cpp121
-rw-r--r--qpid/cpp/lib/broker/Broker.h108
-rw-r--r--qpid/cpp/lib/broker/BrokerAdapter.cpp388
-rw-r--r--qpid/cpp/lib/broker/BrokerAdapter.h222
-rw-r--r--qpid/cpp/lib/broker/BrokerChannel.cpp346
-rw-r--r--qpid/cpp/lib/broker/BrokerChannel.h159
-rw-r--r--qpid/cpp/lib/broker/BrokerExchange.h51
-rw-r--r--qpid/cpp/lib/broker/BrokerMessage.cpp244
-rw-r--r--qpid/cpp/lib/broker/BrokerMessage.h137
-rw-r--r--qpid/cpp/lib/broker/BrokerMessageBase.h182
-rw-r--r--qpid/cpp/lib/broker/BrokerMessageMessage.cpp305
-rw-r--r--qpid/cpp/lib/broker/BrokerMessageMessage.h99
-rw-r--r--qpid/cpp/lib/broker/BrokerQueue.cpp282
-rw-r--r--qpid/cpp/lib/broker/BrokerQueue.h151
-rw-r--r--qpid/cpp/lib/broker/BrokerSingleton.cpp36
-rw-r--r--qpid/cpp/lib/broker/BrokerSingleton.h52
-rw-r--r--qpid/cpp/lib/broker/CompletionHandler.h39
-rw-r--r--qpid/cpp/lib/broker/Configuration.cpp252
-rw-r--r--qpid/cpp/lib/broker/Configuration.h171
-rw-r--r--qpid/cpp/lib/broker/Connection.cpp128
-rw-r--r--qpid/cpp/lib/broker/Connection.h108
-rw-r--r--qpid/cpp/lib/broker/ConnectionFactory.cpp43
-rw-r--r--qpid/cpp/lib/broker/ConnectionFactory.h47
-rw-r--r--qpid/cpp/lib/broker/ConnectionToken.h38
-rw-r--r--qpid/cpp/lib/broker/Consumer.h37
-rw-r--r--qpid/cpp/lib/broker/Content.h64
-rw-r--r--qpid/cpp/lib/broker/DeletingTxOp.cpp45
-rw-r--r--qpid/cpp/lib/broker/DeletingTxOp.h45
-rw-r--r--qpid/cpp/lib/broker/Deliverable.h37
-rw-r--r--qpid/cpp/lib/broker/DeliverableMessage.cpp33
-rw-r--r--qpid/cpp/lib/broker/DeliverableMessage.h41
-rw-r--r--qpid/cpp/lib/broker/DeliveryRecord.cpp87
-rw-r--r--qpid/cpp/lib/broker/DeliveryRecord.h63
-rw-r--r--qpid/cpp/lib/broker/DirectExchange.cpp71
-rw-r--r--qpid/cpp/lib/broker/DirectExchange.h57
-rw-r--r--qpid/cpp/lib/broker/ExchangeRegistry.cpp76
-rw-r--r--qpid/cpp/lib/broker/ExchangeRegistry.h47
-rw-r--r--qpid/cpp/lib/broker/FanOutExchange.cpp56
-rw-r--r--qpid/cpp/lib/broker/FanOutExchange.h60
-rw-r--r--qpid/cpp/lib/broker/HandlerImpl.h71
-rw-r--r--qpid/cpp/lib/broker/HeadersExchange.cpp119
-rw-r--r--qpid/cpp/lib/broker/HeadersExchange.h65
-rw-r--r--qpid/cpp/lib/broker/InMemoryContent.cpp72
-rw-r--r--qpid/cpp/lib/broker/InMemoryContent.h45
-rw-r--r--qpid/cpp/lib/broker/LazyLoadedContent.cpp68
-rw-r--r--qpid/cpp/lib/broker/LazyLoadedContent.h50
-rw-r--r--qpid/cpp/lib/broker/Makefile.am101
-rw-r--r--qpid/cpp/lib/broker/MessageBuilder.cpp74
-rw-r--r--qpid/cpp/lib/broker/MessageBuilder.h57
-rw-r--r--qpid/cpp/lib/broker/MessageHandlerImpl.cpp243
-rw-r--r--qpid/cpp/lib/broker/MessageHandlerImpl.h130
-rw-r--r--qpid/cpp/lib/broker/MessageStore.h129
-rw-r--r--qpid/cpp/lib/broker/MessageStoreModule.cpp109
-rw-r--r--qpid/cpp/lib/broker/MessageStoreModule.h67
-rw-r--r--qpid/cpp/lib/broker/NameGenerator.cpp32
-rw-r--r--qpid/cpp/lib/broker/NameGenerator.h39
-rw-r--r--qpid/cpp/lib/broker/NullMessageStore.cpp105
-rw-r--r--qpid/cpp/lib/broker/NullMessageStore.h64
-rw-r--r--qpid/cpp/lib/broker/Persistable.h62
-rw-r--r--qpid/cpp/lib/broker/PersistableExchange.h44
-rw-r--r--qpid/cpp/lib/broker/PersistableMessage.h53
-rw-r--r--qpid/cpp/lib/broker/PersistableQueue.h45
-rw-r--r--qpid/cpp/lib/broker/Prefetch.h42
-rw-r--r--qpid/cpp/lib/broker/QueuePolicy.cpp69
-rw-r--r--qpid/cpp/lib/broker/QueuePolicy.h54
-rw-r--r--qpid/cpp/lib/broker/QueueRegistry.cpp78
-rw-r--r--qpid/cpp/lib/broker/QueueRegistry.h96
-rw-r--r--qpid/cpp/lib/broker/RecoverableMessage.h57
-rw-r--r--qpid/cpp/lib/broker/RecoverableQueue.h49
-rw-r--r--qpid/cpp/lib/broker/RecoveryManager.h45
-rw-r--r--qpid/cpp/lib/broker/RecoveryManagerImpl.cpp131
-rw-r--r--qpid/cpp/lib/broker/RecoveryManagerImpl.h55
-rw-r--r--qpid/cpp/lib/broker/Reference.cpp52
-rw-r--r--qpid/cpp/lib/broker/Reference.h114
-rw-r--r--qpid/cpp/lib/broker/TopicExchange.cpp154
-rw-r--r--qpid/cpp/lib/broker/TopicExchange.h100
-rw-r--r--qpid/cpp/lib/broker/TransactionalStore.h57
-rw-r--r--qpid/cpp/lib/broker/TxAck.cpp54
-rw-r--r--qpid/cpp/lib/broker/TxAck.h57
-rw-r--r--qpid/cpp/lib/broker/TxBuffer.cpp55
-rw-r--r--qpid/cpp/lib/broker/TxBuffer.h107
-rw-r--r--qpid/cpp/lib/broker/TxOp.h39
-rw-r--r--qpid/cpp/lib/broker/TxPublish.cpp60
-rw-r--r--qpid/cpp/lib/broker/TxPublish.h78
88 files changed, 0 insertions, 8230 deletions
diff --git a/qpid/cpp/lib/broker/AccumulatedAck.cpp b/qpid/cpp/lib/broker/AccumulatedAck.cpp
deleted file mode 100644
index ff471b0287..0000000000
--- a/qpid/cpp/lib/broker/AccumulatedAck.cpp
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "AccumulatedAck.h"
-
-#include <assert.h>
-
-using std::less_equal;
-using std::bind2nd;
-using namespace qpid::broker;
-
-void AccumulatedAck::update(uint64_t firstTag, uint64_t lastTag){
- assert(firstTag<=lastTag);
- if (firstTag <= range + 1) {
- if (lastTag > range) range = lastTag;
- } else {
- for (uint64_t tag = firstTag; tag<=lastTag; tag++)
- individual.push_back(tag);
- }
-}
-
-void AccumulatedAck::consolidate(){
- individual.sort();
- //remove any individual tags that are covered by range
- individual.remove_if(bind2nd(less_equal<uint64_t>(), range));
- //update range if possible (using <= allows for duplicates from overlapping ranges)
- while (individual.front() <= range + 1) {
- range = individual.front();
- individual.pop_front();
- }
-}
-
-void AccumulatedAck::clear(){
- range = 0;
- individual.clear();
-}
-
-bool AccumulatedAck::covers(uint64_t tag) const{
- return tag <= range || find(individual.begin(), individual.end(), tag) != individual.end();
-}
diff --git a/qpid/cpp/lib/broker/AccumulatedAck.h b/qpid/cpp/lib/broker/AccumulatedAck.h
deleted file mode 100644
index c4a6e3b79b..0000000000
--- a/qpid/cpp/lib/broker/AccumulatedAck.h
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _AccumulatedAck_
-#define _AccumulatedAck_
-
-#include <algorithm>
-#include <functional>
-#include <list>
-
-namespace qpid {
- namespace broker {
- /**
- * Keeps an accumulated record of acked messages (by delivery
- * tag).
- */
- class AccumulatedAck {
- public:
- /**
- * If not zero, then everything up to this value has been
- * acked.
- */
- uint64_t range;
- /**
- * List of individually acked messages that are not
- * included in the range marked by 'range'.
- */
- std::list<uint64_t> individual;
-
- AccumulatedAck(uint64_t r) : range(r) {}
- void update(uint64_t firstTag, uint64_t lastTag);
- void consolidate();
- void clear();
- bool covers(uint64_t tag) const;
- };
- }
-}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/AutoDelete.cpp b/qpid/cpp/lib/broker/AutoDelete.cpp
deleted file mode 100644
index 2037a9c71c..0000000000
--- a/qpid/cpp/lib/broker/AutoDelete.cpp
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <AutoDelete.h>
-#include <sys/Time.h>
-
-using namespace qpid::broker;
-using namespace qpid::sys;
-
-AutoDelete::AutoDelete(QueueRegistry* const _registry, uint32_t _period)
- : registry(_registry), period(_period), stopped(true) { }
-
-void AutoDelete::add(Queue::shared_ptr const queue){
- Mutex::ScopedLock l(lock);
- queues.push(queue);
-}
-
-Queue::shared_ptr const AutoDelete::pop(){
- Queue::shared_ptr next;
- Mutex::ScopedLock l(lock);
- if(!queues.empty()){
- next = queues.front();
- queues.pop();
- }
- return next;
-}
-
-void AutoDelete::process(){
- Queue::shared_ptr seen;
- for(Queue::shared_ptr q = pop(); q; q = pop()){
- if(seen == q){
- add(q);
- break;
- }else if(q->canAutoDelete()){
- std::string name(q->getName());
- registry->destroy(name);
- std::cout << "INFO: Auto-deleted queue named " << name << std::endl;
- }else{
- add(q);
- if(!seen) seen = q;
- }
- }
-}
-
-void AutoDelete::run(){
- Monitor::ScopedLock l(monitor);
- while(!stopped){
- process();
- monitor.wait(period*TIME_MSEC);
- }
-}
-
-void AutoDelete::start(){
- Monitor::ScopedLock l(monitor);
- if(stopped){
- stopped = false;
- runner = Thread(this);
- }
-}
-
-void AutoDelete::stop(){
- {
- Monitor::ScopedLock l(monitor);
- if(stopped) return;
- stopped = true;
- }
- monitor.notify();
- runner.join();
-}
diff --git a/qpid/cpp/lib/broker/AutoDelete.h b/qpid/cpp/lib/broker/AutoDelete.h
deleted file mode 100644
index 9034de1730..0000000000
--- a/qpid/cpp/lib/broker/AutoDelete.h
+++ /dev/null
@@ -1,55 +0,0 @@
-#ifndef _AutoDelete_
-#define _AutoDelete_
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <iostream>
-#include <queue>
-#include <sys/Monitor.h>
-#include <BrokerQueue.h>
-#include <QueueRegistry.h>
-#include <sys/Thread.h>
-
-namespace qpid {
- namespace broker{
- class AutoDelete : private qpid::sys::Runnable {
- qpid::sys::Mutex lock;
- qpid::sys::Monitor monitor;
- std::queue<Queue::shared_ptr> queues;
- QueueRegistry* const registry;
- uint32_t period;
- volatile bool stopped;
- qpid::sys::Thread runner;
-
- Queue::shared_ptr const pop();
- void process();
- virtual void run();
-
- public:
- AutoDelete(QueueRegistry* const registry, uint32_t period);
- void add(Queue::shared_ptr const);
- void start();
- void stop();
- };
- }
-}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/Broker.cpp b/qpid/cpp/lib/broker/Broker.cpp
deleted file mode 100644
index fa80867b69..0000000000
--- a/qpid/cpp/lib/broker/Broker.cpp
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <iostream>
-#include <memory>
-
-#include "AMQFrame.h"
-#include "DirectExchange.h"
-#include "TopicExchange.h"
-#include "FanOutExchange.h"
-#include "HeadersExchange.h"
-#include "MessageStoreModule.h"
-#include "NullMessageStore.h"
-#include "ProtocolInitiation.h"
-#include "RecoveryManagerImpl.h"
-#include "Connection.h"
-#include "sys/ConnectionInputHandler.h"
-#include "sys/ConnectionInputHandlerFactory.h"
-#include "sys/TimeoutHandler.h"
-
-#include "Broker.h"
-
-namespace qpid {
-namespace broker {
-
-const std::string empty;
-const std::string amq_direct("amq.direct");
-const std::string amq_topic("amq.topic");
-const std::string amq_fanout("amq.fanout");
-const std::string amq_match("amq.match");
-
-Broker::Broker(const Configuration& conf) :
- config(conf),
- store(createStore(conf)),
- queues(store.get()),
- timeout(30000),
- stagingThreshold(0),
- cleaner(&queues, timeout/10),
- factory(*this)
-{
- exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
- exchanges.declare(amq_direct, DirectExchange::typeName);
- exchanges.declare(amq_topic, TopicExchange::typeName);
- exchanges.declare(amq_fanout, FanOutExchange::typeName);
- exchanges.declare(amq_match, HeadersExchange::typeName);
-
- if(store.get()) {
- RecoveryManagerImpl recoverer(queues, exchanges, conf.getStagingThreshold());
- store->recover(recoverer);
- }
-
- cleaner.start();
-}
-
-
-Broker::shared_ptr Broker::create(int16_t port)
-{
- Configuration config;
- config.setPort(port);
- return create(config);
-}
-
-Broker::shared_ptr Broker::create(const Configuration& config) {
- return Broker::shared_ptr(new Broker(config));
-}
-
-MessageStore* Broker::createStore(const Configuration& config) {
- if (config.getStore().empty())
- return new NullMessageStore(config.isTrace());
- else
- return new MessageStoreModule(config.getStore());
-}
-
-void Broker::run() {
- getAcceptor().run(&factory);
-}
-
-void Broker::shutdown() {
- if (acceptor)
- acceptor->shutdown();
-}
-
-Broker::~Broker() {
- shutdown();
-}
-
-int16_t Broker::getPort() const { return getAcceptor().getPort(); }
-
-Acceptor& Broker::getAcceptor() const {
- if (!acceptor)
- const_cast<Acceptor::shared_ptr&>(acceptor) =
- Acceptor::create(config.getPort(),
- config.getConnectionBacklog(),
- config.getWorkerThreads(),
- config.isTrace());
- return *acceptor;
-}
-
-
-const int16_t Broker::DEFAULT_PORT(5672);
-
-
-}} // namespace qpid::broker
-
diff --git a/qpid/cpp/lib/broker/Broker.h b/qpid/cpp/lib/broker/Broker.h
deleted file mode 100644
index 68c04336d8..0000000000
--- a/qpid/cpp/lib/broker/Broker.h
+++ /dev/null
@@ -1,108 +0,0 @@
-#ifndef _Broker_
-#define _Broker_
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <Configuration.h>
-#include <ConnectionFactory.h>
-#include <sys/Runnable.h>
-#include <sys/Acceptor.h>
-#include <SharedObject.h>
-#include <MessageStore.h>
-#include <AutoDelete.h>
-#include <ExchangeRegistry.h>
-#include <ConnectionToken.h>
-#include <DirectExchange.h>
-#include <OutputHandler.h>
-#include <ProtocolInitiation.h>
-#include <QueueRegistry.h>
-
-namespace qpid {
-namespace broker {
-/**
- * A broker instance.
- */
-class Broker : public sys::Runnable,
- public SharedObject<Broker>
-{
- public:
- static const int16_t DEFAULT_PORT;
-
- virtual ~Broker();
-
- /**
- * Create a broker.
- * @param port Port to listen on or 0 to pick a port dynamically.
- */
- static shared_ptr create(int16_t port = DEFAULT_PORT);
-
- /**
- * Create a broker using a Configuration.
- */
- static shared_ptr create(const Configuration& config);
-
- /**
- * Return listening port. If called before bind this is
- * the configured port. If called after it is the actual
- * port, which will be different if the configured port is
- * 0.
- */
- virtual int16_t getPort() const;
-
- /**
- * Run the broker. Implements Runnable::run() so the broker
- * can be run in a separate thread.
- */
- virtual void run();
-
- /** Shut down the broker */
- virtual void shutdown();
-
- MessageStore& getStore() { return *store; }
- QueueRegistry& getQueues() { return queues; }
- ExchangeRegistry& getExchanges() { return exchanges; }
- uint32_t getTimeout() { return timeout; }
- uint64_t getStagingThreshold() { return stagingThreshold; }
- AutoDelete& getCleaner() { return cleaner; }
-
- private:
- Broker(const Configuration& config);
- sys::Acceptor& getAcceptor() const;
-
- Configuration config;
- sys::Acceptor::shared_ptr acceptor;
- const std::auto_ptr<MessageStore> store;
- QueueRegistry queues;
- ExchangeRegistry exchanges;
- uint32_t timeout;
- uint64_t stagingThreshold;
- AutoDelete cleaner;
- ConnectionFactory factory;
-
- static MessageStore* createStore(const Configuration& config);
-};
-
-}}
-
-
-
-#endif /*!_Broker_*/
diff --git a/qpid/cpp/lib/broker/BrokerAdapter.cpp b/qpid/cpp/lib/broker/BrokerAdapter.cpp
deleted file mode 100644
index 981801c40e..0000000000
--- a/qpid/cpp/lib/broker/BrokerAdapter.cpp
+++ /dev/null
@@ -1,388 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <boost/format.hpp>
-
-#include "BrokerAdapter.h"
-#include "BrokerChannel.h"
-#include "Connection.h"
-#include "AMQMethodBody.h"
-#include "Exception.h"
-
-namespace qpid {
-namespace broker {
-
-using boost::format;
-using namespace qpid;
-using namespace qpid::framing;
-
-typedef std::vector<Queue::shared_ptr> QueueVector;
-
-
-BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) :
- CoreRefs(ch, c, b),
- connection(c),
- basicHandler(*this),
- channelHandler(*this),
- connectionHandler(*this),
- exchangeHandler(*this),
- messageHandler(*this),
- queueHandler(*this),
- txHandler(*this)
-{}
-
-
-ProtocolVersion BrokerAdapter::getVersion() const {
- return connection.getVersion();
-}
-
-void BrokerAdapter::ConnectionHandlerImpl::startOk(
- const MethodContext&, const FieldTable& /*clientProperties*/,
- const string& /*mechanism*/,
- const string& /*response*/, const string& /*locale*/)
-{
- client.tune(
- 100, connection.getFrameMax(), connection.getHeartbeat());
-}
-
-void BrokerAdapter::ConnectionHandlerImpl::secureOk(
- const MethodContext&, const string& /*response*/){}
-
-void BrokerAdapter::ConnectionHandlerImpl::tuneOk(
- const MethodContext&, uint16_t /*channelmax*/,
- uint32_t framemax, uint16_t heartbeat)
-{
- connection.setFrameMax(framemax);
- connection.setHeartbeat(heartbeat);
-}
-
-void BrokerAdapter::ConnectionHandlerImpl::open(
- const MethodContext& context, const string& /*virtualHost*/,
- const string& /*capabilities*/, bool /*insist*/)
-{
- string knownhosts;
- client.openOk(
- knownhosts, context.getRequestId());
-}
-
-void BrokerAdapter::ConnectionHandlerImpl::close(
- const MethodContext& context, uint16_t /*replyCode*/, const string& /*replyText*/,
- uint16_t /*classId*/, uint16_t /*methodId*/)
-{
- client.closeOk(context.getRequestId());
- connection.getOutput().close();
-}
-
-void BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){
- connection.getOutput().close();
-}
-
-void BrokerAdapter::ChannelHandlerImpl::open(
- const MethodContext& context, const string& /*outOfBand*/){
- channel.open();
- // FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9
- client.openOk(
- std::string()/* ID */, context.getRequestId());
-}
-
-void BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){}
-void BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){}
-
-void BrokerAdapter::ChannelHandlerImpl::close(
- const MethodContext& context, uint16_t /*replyCode*/,
- const string& /*replyText*/,
- uint16_t /*classId*/, uint16_t /*methodId*/)
-{
- client.closeOk(context.getRequestId());
- // FIXME aconway 2007-01-18: Following line will "delete this". Ugly.
- connection.closeChannel(channel.getId());
-}
-
-void BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){}
-
-
-
-void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& exchange, const string& type,
- bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
- const FieldTable& /*arguments*/){
-
- if(passive){
- if(!broker.getExchanges().get(exchange)) {
- throw ChannelException(404, "Exchange not found: " + exchange);
- }
- }else{
- try{
- std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type);
- if(!response.second && response.first->getType() != type){
- throw ConnectionException(
- 530,
- "Exchange already declared to be of type "
- + response.first->getType() + ", requested " + type);
- }
- }catch(UnknownExchangeTypeException& e){
- throw ConnectionException(
- 503, "Exchange type not implemented: " + type);
- }
- }
- if(!nowait){
- client.declareOk(context.getRequestId());
- }
-}
-
-void BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, uint16_t /*ticket*/,
- const string& exchange, bool /*ifUnused*/, bool nowait){
-
- //TODO: implement unused
- broker.getExchanges().destroy(exchange);
- if(!nowait) client.deleteOk(context.getRequestId());
-}
-
-void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& name,
- bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){
- Queue::shared_ptr queue;
- if (passive && !name.empty()) {
- queue = connection.getQueue(name, channel.getId());
- } else {
- std::pair<Queue::shared_ptr, bool> queue_created =
- broker.getQueues().declare(
- name, durable,
- autoDelete ? connection.getTimeout() : 0,
- exclusive ? &connection : 0);
- queue = queue_created.first;
- assert(queue);
- if (queue_created.second) { // This is a new queue
- channel.setDefaultQueue(queue);
-
- //apply settings & create persistent record if required
- queue_created.first->create(arguments);
-
- //add default binding:
- broker.getExchanges().getDefault()->bind(queue, name, 0);
- if (exclusive) {
- connection.exclusiveQueues.push_back(queue);
- } else if(autoDelete){
- broker.getCleaner().add(queue);
- }
- }
- }
- if (exclusive && !queue->isExclusiveOwner(&connection))
- throw ChannelException(
- 405,
- format("Cannot grant exclusive access to queue '%s'")
- % queue->getName());
- if (!nowait) {
- string queueName = queue->getName();
- client.declareOk(
- queueName, queue->getMessageCount(), queue->getConsumerCount(),
- context.getRequestId());
- }
-}
-
-void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, uint16_t /*ticket*/, const string& queueName,
- const string& exchangeName, const string& routingKey, bool nowait,
- const FieldTable& arguments){
-
- Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
- Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
- if(exchange){
- string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
- exchange->bind(queue, exchangeRoutingKey, &arguments);
- if(!nowait) client.bindOk(context.getRequestId());
- }else{
- throw ChannelException(
- 404, "Bind failed. No such exchange: " + exchangeName);
- }
-}
-
-void
-BrokerAdapter::QueueHandlerImpl::unbind(
- const MethodContext& context,
- uint16_t /*ticket*/,
- const string& queueName,
- const string& exchangeName,
- const string& routingKey,
- const qpid::framing::FieldTable& arguments )
-{
- Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
- if (!queue.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName);
-
- Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
- if (!exchange.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName);
-
- exchange->unbind(queue, routingKey, &arguments);
-
- client.unbindOk(context.getRequestId());
-}
-
-void BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool nowait){
-
- Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
- int count = queue->purge();
- if(!nowait) client.purgeOk( count, context.getRequestId());
-}
-
-void BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, uint16_t /*ticket*/, const string& queue,
- bool ifUnused, bool ifEmpty, bool nowait){
- ChannelException error(0, "");
- int count(0);
- Queue::shared_ptr q = connection.getQueue(queue, channel.getId());
- if(ifEmpty && q->getMessageCount() > 0){
- throw ChannelException(406, "Queue not empty.");
- }else if(ifUnused && q->getConsumerCount() > 0){
- throw ChannelException(406, "Queue in use.");
- }else{
- //remove the queue from the list of exclusive queues if necessary
- if(q->isExclusiveOwner(&connection)){
- QueueVector::iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q);
- if(i < connection.exclusiveQueues.end()) connection.exclusiveQueues.erase(i);
- }
- count = q->getMessageCount();
- q->destroy();
- broker.getQueues().destroy(queue);
- }
-
- if(!nowait)
- client.deleteOk(count, context.getRequestId());
-}
-
-
-
-
-void BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){
- //TODO: handle global
- channel.setPrefetchSize(prefetchSize);
- channel.setPrefetchCount(prefetchCount);
- client.qosOk(context.getRequestId());
-}
-
-void BrokerAdapter::BasicHandlerImpl::consume(
- const MethodContext& context, uint16_t /*ticket*/,
- const string& queueName, const string& consumerTag,
- bool noLocal, bool noAck, bool exclusive,
- bool nowait, const FieldTable& fields)
-{
-
- Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
- if(!consumerTag.empty() && channel.exists(consumerTag)){
- throw ConnectionException(530, "Consumer tags must be unique");
- }
-
- string newTag = consumerTag;
- channel.consume(
- newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
-
- if(!nowait) client.consumeOk(newTag, context.getRequestId());
-
- //allow messages to be dispatched if required as there is now a consumer:
- queue->dispatch();
-}
-
-void BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){
- channel.cancel(consumerTag);
-
- if(!nowait) client.cancelOk(consumerTag, context.getRequestId());
-}
-
-void BrokerAdapter::BasicHandlerImpl::publish(
- const MethodContext& context, uint16_t /*ticket*/,
- const string& exchangeName, const string& routingKey,
- bool mandatory, bool immediate)
-{
-
- Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
- if(exchange){
- BasicMessage* msg = new BasicMessage(
- &connection, exchangeName, routingKey, mandatory, immediate,
- context.methodBody);
- channel.handlePublish(msg);
- }else{
- throw ChannelException(
- 404, "Exchange not found '" + exchangeName + "'");
- }
-}
-
-void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool noAck){
- Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
- if(!connection.getChannel(channel.getId()).get(queue, "", !noAck)){
- string clusterId;//not used, part of an imatix hack
-
- client.getEmpty(clusterId, context.getRequestId());
- }
-}
-
-void BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, uint64_t deliveryTag, bool multiple){
- channel.ack(deliveryTag, multiple);
-}
-
-void BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, uint64_t /*deliveryTag*/, bool /*requeue*/){}
-
-void BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){
- channel.recover(requeue);
-}
-
-void BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){
- channel.begin();
- client.selectOk(context.getRequestId());
-}
-
-void BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){
- channel.commit();
- client.commitOk(context.getRequestId());
-}
-
-void BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){
-
- channel.rollback();
- client.rollbackOk(context.getRequestId());
- channel.recover(false);
-}
-
-void
-BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& )
-{
- //no specific action required, generic response handling should be sufficient
-}
-
-
-//
-// Message class method handlers
-//
-void
-BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context)
-{
- client.ok(context.getRequestId());
- client.pong();
-}
-
-
-void
-BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context)
-{
- client.ok(context.getRequestId());
-}
-
-void
-BrokerAdapter::ChannelHandlerImpl::resume(
- const MethodContext&,
- const string& /*channel*/ )
-{
- assert(0); // FIXME aconway 2007-01-04: 0-9 feature
-}
-
-}} // namespace qpid::broker
-
diff --git a/qpid/cpp/lib/broker/BrokerAdapter.h b/qpid/cpp/lib/broker/BrokerAdapter.h
deleted file mode 100644
index 2fafbcc180..0000000000
--- a/qpid/cpp/lib/broker/BrokerAdapter.h
+++ /dev/null
@@ -1,222 +0,0 @@
-#ifndef _broker_BrokerAdapter_h
-#define _broker_BrokerAdapter_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "AMQP_ServerOperations.h"
-#include "HandlerImpl.h"
-#include "MessageHandlerImpl.h"
-#include "Exception.h"
-
-namespace qpid {
-namespace broker {
-
-class Channel;
-class Connection;
-class Broker;
-class ChannelHandler;
-class ConnectionHandler;
-class BasicHandler;
-class ExchangeHandler;
-class QueueHandler;
-class TxHandler;
-class MessageHandler;
-class AccessHandler;
-class FileHandler;
-class StreamHandler;
-class DtxHandler;
-class TunnelHandler;
-class MessageHandlerImpl;
-
-/**
- * Per-channel protocol adapter.
- *
- * A container for a collection of AMQP-class adapters that translate
- * AMQP method bodies into calls on the core Channel, Connection and
- * Broker objects. Each adapter class also provides a client proxy
- * to send methods to the peer.
- *
- */
-class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
-{
- public:
- BrokerAdapter(Channel& ch, Connection& c, Broker& b);
-
- framing::ProtocolVersion getVersion() const;
- ChannelHandler* getChannelHandler() { return &channelHandler; }
- ConnectionHandler* getConnectionHandler() { return &connectionHandler; }
- BasicHandler* getBasicHandler() { return &basicHandler; }
- ExchangeHandler* getExchangeHandler() { return &exchangeHandler; }
- QueueHandler* getQueueHandler() { return &queueHandler; }
- TxHandler* getTxHandler() { return &txHandler; }
- MessageHandler* getMessageHandler() { return &messageHandler; }
- AccessHandler* getAccessHandler() {
- throw ConnectionException(540, "Access class not implemented"); }
- FileHandler* getFileHandler() {
- throw ConnectionException(540, "File class not implemented"); }
- StreamHandler* getStreamHandler() {
- throw ConnectionException(540, "Stream class not implemented"); }
- DtxHandler* getDtxHandler() {
- throw ConnectionException(540, "Dtx class not implemented"); }
- TunnelHandler* getTunnelHandler() {
- throw ConnectionException(540, "Tunnel class not implemented"); }
-
- framing::AMQP_ClientProxy& getProxy() { return proxy; }
-
- private:
-
- class ConnectionHandlerImpl :
- public ConnectionHandler,
- public HandlerImpl<framing::AMQP_ClientProxy::Connection>
- {
- public:
- ConnectionHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
-
- void startOk(const framing::MethodContext& context,
- const qpid::framing::FieldTable& clientProperties,
- const std::string& mechanism, const std::string& response,
- const std::string& locale);
- void secureOk(const framing::MethodContext& context,
- const std::string& response);
- void tuneOk(const framing::MethodContext& context,
- uint16_t channelMax,
- uint32_t frameMax, uint16_t heartbeat);
- void open(const framing::MethodContext& context,
- const std::string& virtualHost,
- const std::string& capabilities, bool insist);
- void close(const framing::MethodContext& context, uint16_t replyCode,
- const std::string& replyText,
- uint16_t classId, uint16_t methodId);
- void closeOk(const framing::MethodContext& context);
- };
-
- class ChannelHandlerImpl :
- public ChannelHandler,
- public HandlerImpl<framing::AMQP_ClientProxy::Channel>
- {
- public:
- ChannelHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
-
- void open(const framing::MethodContext& context, const std::string& outOfBand);
- void flow(const framing::MethodContext& context, bool active);
- void flowOk(const framing::MethodContext& context, bool active);
- void ok( const framing::MethodContext& context );
- void ping( const framing::MethodContext& context );
- void pong( const framing::MethodContext& context );
- void resume( const framing::MethodContext& context, const std::string& channelId );
- void close(const framing::MethodContext& context, uint16_t replyCode, const
- std::string& replyText, uint16_t classId, uint16_t methodId);
- void closeOk(const framing::MethodContext& context);
- };
-
- class ExchangeHandlerImpl :
- public ExchangeHandler,
- public HandlerImpl<framing::AMQP_ClientProxy::Exchange>
- {
- public:
- ExchangeHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
-
- void declare(const framing::MethodContext& context, uint16_t ticket,
- const std::string& exchange, const std::string& type,
- bool passive, bool durable, bool autoDelete,
- bool internal, bool nowait,
- const qpid::framing::FieldTable& arguments);
- void delete_(const framing::MethodContext& context, uint16_t ticket,
- const std::string& exchange, bool ifUnused, bool nowait);
- };
-
- class QueueHandlerImpl :
- public QueueHandler,
- public HandlerImpl<framing::AMQP_ClientProxy::Queue>
- {
- public:
- QueueHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
-
- void declare(const framing::MethodContext& context, uint16_t ticket, const std::string& queue,
- bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait,
- const qpid::framing::FieldTable& arguments);
- void bind(const framing::MethodContext& context, uint16_t ticket, const std::string& queue,
- const std::string& exchange, const std::string& routingKey,
- bool nowait, const qpid::framing::FieldTable& arguments);
- void unbind(const framing::MethodContext& context,
- uint16_t ticket,
- const std::string& queue,
- const std::string& exchange,
- const std::string& routingKey,
- const qpid::framing::FieldTable& arguments );
- void purge(const framing::MethodContext& context, uint16_t ticket, const std::string& queue,
- bool nowait);
- void delete_(const framing::MethodContext& context, uint16_t ticket, const std::string& queue,
- bool ifUnused, bool ifEmpty,
- bool nowait);
- };
-
- class BasicHandlerImpl :
- public BasicHandler,
- public HandlerImpl<framing::AMQP_ClientProxy::Basic>
- {
- public:
- BasicHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
-
- void qos(const framing::MethodContext& context, uint32_t prefetchSize,
- uint16_t prefetchCount, bool global);
- void consume(
- const framing::MethodContext& context, uint16_t ticket, const std::string& queue,
- const std::string& consumerTag, bool noLocal, bool noAck,
- bool exclusive, bool nowait,
- const qpid::framing::FieldTable& fields);
- void cancel(const framing::MethodContext& context, const std::string& consumerTag,
- bool nowait);
- void publish(const framing::MethodContext& context, uint16_t ticket,
- const std::string& exchange, const std::string& routingKey,
- bool mandatory, bool immediate);
- void get(const framing::MethodContext& context, uint16_t ticket, const std::string& queue,
- bool noAck);
- void ack(const framing::MethodContext& context, uint64_t deliveryTag, bool multiple);
- void reject(const framing::MethodContext& context, uint64_t deliveryTag, bool requeue);
- void recover(const framing::MethodContext& context, bool requeue);
- };
-
- class TxHandlerImpl :
- public TxHandler,
- public HandlerImpl<framing::AMQP_ClientProxy::Tx>
- {
- public:
- TxHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
-
- void select(const framing::MethodContext& context);
- void commit(const framing::MethodContext& context);
- void rollback(const framing::MethodContext& context);
- };
-
- Connection& connection;
- BasicHandlerImpl basicHandler;
- ChannelHandlerImpl channelHandler;
- ConnectionHandlerImpl connectionHandler;
- ExchangeHandlerImpl exchangeHandler;
- MessageHandlerImpl messageHandler;
- QueueHandlerImpl queueHandler;
- TxHandlerImpl txHandler;
-
-};
-}} // namespace qpid::broker
-
-
-
-#endif /*!_broker_BrokerAdapter_h*/
diff --git a/qpid/cpp/lib/broker/BrokerChannel.cpp b/qpid/cpp/lib/broker/BrokerChannel.cpp
deleted file mode 100644
index 5897914f26..0000000000
--- a/qpid/cpp/lib/broker/BrokerChannel.cpp
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <assert.h>
-
-#include <iostream>
-#include <sstream>
-#include <algorithm>
-#include <functional>
-
-#include <boost/bind.hpp>
-
-#include "BrokerChannel.h"
-#include "DeletingTxOp.h"
-#include "framing/ChannelAdapter.h"
-#include <QpidError.h>
-#include <DeliverableMessage.h>
-#include <BrokerQueue.h>
-#include <BrokerMessage.h>
-#include <MessageStore.h>
-#include <TxAck.h>
-#include <TxPublish.h>
-#include "BrokerAdapter.h"
-#include "Connection.h"
-
-using std::mem_fun_ref;
-using std::bind2nd;
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-
-Channel::Channel(
- Connection& con, ChannelId id,
- uint32_t _framesize, MessageStore* const _store,
- uint64_t _stagingThreshold
-) :
- ChannelAdapter(id, &con.getOutput(), con.getVersion()),
- connection(con),
- currentDeliveryTag(1),
- transactional(false),
- prefetchSize(0),
- prefetchCount(0),
- framesize(_framesize),
- tagGenerator("sgen"),
- accumulatedAck(0),
- store(_store),
- messageBuilder(this, _store, _stagingThreshold),
- opened(id == 0),//channel 0 is automatically open, other must be explicitly opened
- adapter(new BrokerAdapter(*this, con, con.broker))
-{
- outstanding.reset();
-}
-
-Channel::~Channel(){
- close();
-}
-
-bool Channel::exists(const string& consumerTag){
- return consumers.find(consumerTag) != consumers.end();
-}
-
-// TODO aconway 2007-02-12: Why is connection token passed in instead
-// of using the channel's parent connection?
-void Channel::consume(string& tagInOut, Queue::shared_ptr queue, bool acks,
- bool exclusive, ConnectionToken* const connection,
- const FieldTable*)
-{
- if(tagInOut.empty())
- tagInOut = tagGenerator.generate();
- std::auto_ptr<ConsumerImpl> c(
- new ConsumerImpl(this, tagInOut, queue, connection, acks));
- queue->consume(c.get(), exclusive);//may throw exception
- consumers.insert(tagInOut, c.release());
-}
-
-void Channel::cancel(const string& tag){
- // consumers is a ptr_map so erase will delete the consumer
- // which will call cancel.
- ConsumerImplMap::iterator i = consumers.find(tag);
- if (i != consumers.end())
- consumers.erase(i);
-}
-
-void Channel::close(){
- opened = false;
- consumers.clear();
- recover(true);
-}
-
-void Channel::begin(){
- transactional = true;
-}
-
-void Channel::commit(){
- TxAck txAck(accumulatedAck, unacked);
- txBuffer.enlist(&txAck);
- if(txBuffer.prepare(store)){
- txBuffer.commit();
- }
- accumulatedAck.clear();
-}
-
-void Channel::rollback(){
- txBuffer.rollback();
- accumulatedAck.clear();
-}
-
-void Channel::deliver(
- Message::shared_ptr& msg, const string& consumerTag,
- Queue::shared_ptr& queue, bool ackExpected)
-{
- Mutex::ScopedLock locker(deliveryLock);
-
- // Key the delivered messages to the id of the request in which they're sent
- uint64_t deliveryTag = getNextSendRequestId();
-
- if(ackExpected){
- unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag));
- outstanding.size += msg->contentSize();
- outstanding.count++;
- }
- //send deliver method, header and content(s)
- msg->deliver(*this, consumerTag, deliveryTag, framesize);
-}
-
-bool Channel::checkPrefetch(Message::shared_ptr& msg){
- Mutex::ScopedLock locker(deliveryLock);
- bool countOk = !prefetchCount || prefetchCount > unacked.size();
- bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty();
- return countOk && sizeOk;
-}
-
-Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag,
- Queue::shared_ptr _queue,
- ConnectionToken* const _connection, bool ack
-) : parent(_parent), tag(_tag), queue(_queue), connection(_connection),
- ackExpected(ack), blocked(false) {}
-
-bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
- if(!connection || connection != msg->getPublisher()){//check for no_local
- if(ackExpected && !parent->checkPrefetch(msg)){
- blocked = true;
- }else{
- blocked = false;
- parent->deliver(msg, tag, queue, ackExpected);
- return true;
- }
- }
- return false;
-}
-
-Channel::ConsumerImpl::~ConsumerImpl() {
- cancel();
-}
-
-void Channel::ConsumerImpl::cancel(){
- if(queue)
- queue->cancel(this);
-}
-
-void Channel::ConsumerImpl::requestDispatch(){
- if(blocked)
- queue->dispatch();
-}
-
-void Channel::handleInlineTransfer(Message::shared_ptr msg)
-{
- Exchange::shared_ptr exchange =
- connection.broker.getExchanges().get(msg->getExchange());
- if(transactional){
- TxPublish* deliverable = new TxPublish(msg);
- exchange->route(
- *deliverable, msg->getRoutingKey(),
- &(msg->getApplicationHeaders()));
- txBuffer.enlist(new DeletingTxOp(deliverable));
- }else{
- DeliverableMessage deliverable(msg);
- exchange->route(
- deliverable, msg->getRoutingKey(),
- &(msg->getApplicationHeaders()));
- }
-}
-
-void Channel::handlePublish(Message* _message){
- Message::shared_ptr message(_message);
- messageBuilder.initialise(message);
-}
-
-void Channel::handleHeader(AMQHeaderBody::shared_ptr header){
- messageBuilder.setHeader(header);
- //at this point, decide based on the size of the message whether we want
- //to stage it by saving content directly to disk as it arrives
-}
-
-void Channel::handleContent(AMQContentBody::shared_ptr content){
- messageBuilder.addContent(content);
-}
-
-void Channel::handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) {
- // TODO aconway 2007-01-17: Implement heartbeating.
-}
-
-void Channel::complete(Message::shared_ptr msg) {
- Exchange::shared_ptr exchange =
- connection.broker.getExchanges().get(msg->getExchange());
- assert(exchange.get());
- if(transactional) {
- std::auto_ptr<TxPublish> deliverable(new TxPublish(msg));
- exchange->route(*deliverable, msg->getRoutingKey(),
- &(msg->getApplicationHeaders()));
- txBuffer.enlist(new DeletingTxOp(deliverable.release()));
- } else {
- DeliverableMessage deliverable(msg);
- exchange->route(deliverable, msg->getRoutingKey(),
- &(msg->getApplicationHeaders()));
- }
-}
-
-void Channel::ack(){
- ack(getFirstAckRequest(), getLastAckRequest());
-}
-
-// Used by Basic
-void Channel::ack(uint64_t deliveryTag, bool multiple){
- if (multiple)
- ack(0, deliveryTag);
- else
- ack(deliveryTag, deliveryTag);
-}
-
-void Channel::ack(uint64_t firstTag, uint64_t lastTag){
- if(transactional){
- accumulatedAck.update(firstTag, lastTag);
-
- //TODO: I think the outstanding prefetch size & count should be updated at this point...
- //TODO: ...this may then necessitate dispatching to consumers
- }else{
- Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
-
- ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag));
- ack_iterator j = (firstTag == 0) ?
- unacked.begin() :
- find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag));
-
- if(i == unacked.end()){
- throw ConnectionException(530, "Received ack for unrecognised delivery tag");
- }else if(i!=j){
- ack_iterator end = ++i;
- for_each(j, end, bind2nd(mem_fun_ref(&DeliveryRecord::discard), 0));
- unacked.erase(unacked.begin(), end);
-
- //recalculate the prefetch:
- outstanding.reset();
- for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding));
- }else{
- i->discard();
- i->subtractFrom(&outstanding);
- unacked.erase(i);
- }
-
- //if the prefetch limit had previously been reached, there may
- //be messages that can be now be delivered
- std::for_each(consumers.begin(), consumers.end(),
- boost::bind(&ConsumerImpl::requestDispatch, _1));
- }
-}
-
-void Channel::recover(bool requeue){
- Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
-
- if(requeue){
- outstanding.reset();
- std::list<DeliveryRecord> copy = unacked;
- unacked.clear();
- for_each(copy.begin(), copy.end(), mem_fun_ref(&DeliveryRecord::requeue));
- }else{
- for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this));
- }
-}
-
-bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackExpected){
- Message::shared_ptr msg = queue->dequeue();
- if(msg){
- Mutex::ScopedLock locker(deliveryLock);
- uint64_t myDeliveryTag = getNextSendRequestId();
- msg->sendGetOk(MethodContext(this, msg->getRespondTo()),
- destination,
- queue->getMessageCount() + 1, myDeliveryTag,
- framesize);
- if(ackExpected){
- unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
- }
- return true;
- }else{
- return false;
- }
-}
-
-void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag,
- uint64_t deliveryTag)
-{
- msg->deliver(*this, consumerTag, deliveryTag, framesize);
-}
-
-void Channel::handleMethodInContext(
- boost::shared_ptr<qpid::framing::AMQMethodBody> method,
- const MethodContext& context
-)
-{
- try{
- if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
- std::stringstream out;
- out << "Attempt to use unopened channel: " << getId();
- throw ConnectionException(504, out.str());
- } else {
- method->invoke(*adapter, context);
- }
- }catch(ChannelException& e){
- adapter->getProxy().getChannel().close(
- e.code, e.toString(),
- method->amqpClassId(), method->amqpMethodId());
- connection.closeChannel(getId());
- }catch(ConnectionException& e){
- connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
- }catch(std::exception& e){
- connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
- }
-}
diff --git a/qpid/cpp/lib/broker/BrokerChannel.h b/qpid/cpp/lib/broker/BrokerChannel.h
deleted file mode 100644
index 5085783685..0000000000
--- a/qpid/cpp/lib/broker/BrokerChannel.h
+++ /dev/null
@@ -1,159 +0,0 @@
-#ifndef _broker_BrokerChannel_h
-#define _broker_BrokerChannel_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <list>
-
-#include <boost/scoped_ptr.hpp>
-#include <boost/shared_ptr.hpp>
-#include <boost/ptr_container/ptr_map.hpp>
-
-#include <AccumulatedAck.h>
-#include <Consumer.h>
-#include <DeliveryRecord.h>
-#include <MessageBuilder.h>
-#include <NameGenerator.h>
-#include <Prefetch.h>
-#include <TxBuffer.h>
-#include "framing/ChannelAdapter.h"
-#include "ChannelOpenBody.h"
-#include "CompletionHandler.h"
-
-namespace qpid {
-namespace broker {
-
-class ConnectionToken;
-class Connection;
-class Queue;
-class BrokerAdapter;
-
-using framing::string;
-
-/**
- * Maintains state for an AMQP channel. Handles incoming and
- * outgoing messages for that channel.
- */
-class Channel : public framing::ChannelAdapter,
- public CompletionHandler
-{
- class ConsumerImpl : public Consumer
- {
- Channel* parent;
- const string tag;
- Queue::shared_ptr queue;
- ConnectionToken* const connection;
- const bool ackExpected;
- bool blocked;
-
- public:
- ConsumerImpl(Channel* parent, const string& tag,
- Queue::shared_ptr queue,
- ConnectionToken* const connection, bool ack);
- ~ConsumerImpl();
- virtual bool deliver(Message::shared_ptr& msg);
- void cancel();
- void requestDispatch();
- };
-
- typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap;
-
- Connection& connection;
- uint64_t currentDeliveryTag;
- Queue::shared_ptr defaultQueue;
- bool transactional;
- ConsumerImplMap consumers;
- uint32_t prefetchSize;
- uint16_t prefetchCount;
- Prefetch outstanding;
- uint32_t framesize;
- NameGenerator tagGenerator;
- std::list<DeliveryRecord> unacked;
- sys::Mutex deliveryLock;
- TxBuffer txBuffer;
- AccumulatedAck accumulatedAck;
- MessageStore* const store;
- MessageBuilder messageBuilder;//builder for in-progress message
- bool opened;
- boost::scoped_ptr<BrokerAdapter> adapter;
-
- // completion handler for MessageBuilder
- void complete(Message::shared_ptr msg);
-
- void deliver(Message::shared_ptr& msg, const string& tag,
- Queue::shared_ptr& queue, bool ackExpected);
- bool checkPrefetch(Message::shared_ptr& msg);
-
- public:
- Channel(Connection& parent,
- framing::ChannelId id,
- uint32_t framesize,
- MessageStore* const _store = 0,
- uint64_t stagingThreshold = 0);
-
- ~Channel();
-
- bool isOpen() const { return opened; }
- BrokerAdapter& getAdatper() { return *adapter; }
-
- void open() { opened = true; }
- void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
- Queue::shared_ptr getDefaultQueue() const { return defaultQueue; }
- uint32_t setPrefetchSize(uint32_t size){ return prefetchSize = size; }
- uint16_t setPrefetchCount(uint16_t n){ return prefetchCount = n; }
-
- bool exists(const string& consumerTag);
-
- /**
- *@param tagInOut - if empty it is updated with the generated token.
- */
- void consume(string& tagInOut, Queue::shared_ptr queue, bool acks,
- bool exclusive, ConnectionToken* const connection = 0,
- const framing::FieldTable* = 0);
- void cancel(const string& tag);
- bool get(Queue::shared_ptr queue, const std::string& destination, bool ackExpected);
- void begin();
- void close();
- void commit();
- void rollback();
- void ack();
- void ack(uint64_t deliveryTag, bool multiple);
- void ack(uint64_t deliveryTag, uint64_t endTag);
- void recover(bool requeue);
- void deliver(Message::shared_ptr& msg, const string& consumerTag, uint64_t deliveryTag);
- void handlePublish(Message* msg);
- void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>);
- void handleContent(boost::shared_ptr<framing::AMQContentBody>);
- void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>);
-
- void handleInlineTransfer(Message::shared_ptr msg);
-
- // For ChannelAdapter
- void handleMethodInContext(
- boost::shared_ptr<framing::AMQMethodBody> method,
- const framing::MethodContext& context);
-};
-
-}} // namespace broker
-
-
-#endif /*!_broker_BrokerChannel_h*/
diff --git a/qpid/cpp/lib/broker/BrokerExchange.h b/qpid/cpp/lib/broker/BrokerExchange.h
deleted file mode 100644
index 6f4e9e6671..0000000000
--- a/qpid/cpp/lib/broker/BrokerExchange.h
+++ /dev/null
@@ -1,51 +0,0 @@
-#ifndef _broker_BrokerExchange_h
-#define _broker_BrokerExchange_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <boost/shared_ptr.hpp>
-#include <Deliverable.h>
-#include <BrokerQueue.h>
-#include <FieldTable.h>
-
-namespace qpid {
- namespace broker {
- using std::string;
-
- class Exchange{
- const string name;
- public:
- typedef boost::shared_ptr<Exchange> shared_ptr;
-
- explicit Exchange(const string& _name) : name(_name){}
- virtual ~Exchange(){}
- string getName() { return name; }
- virtual string getType() = 0;
- virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
- virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
- virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args) = 0;
- };
- }
-}
-
-
-#endif /*!_broker_BrokerExchange_h*/
diff --git a/qpid/cpp/lib/broker/BrokerMessage.cpp b/qpid/cpp/lib/broker/BrokerMessage.cpp
deleted file mode 100644
index b14efb966e..0000000000
--- a/qpid/cpp/lib/broker/BrokerMessage.cpp
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <boost/cast.hpp>
-
-#include <BrokerMessage.h>
-#include <iostream>
-
-#include <InMemoryContent.h>
-#include <LazyLoadedContent.h>
-#include <MessageStore.h>
-#include <BasicDeliverBody.h>
-#include <BasicGetOkBody.h>
-#include <AMQContentBody.h>
-#include <AMQHeaderBody.h>
-#include "AMQMethodBody.h"
-#include "AMQFrame.h"
-#include "framing/ChannelAdapter.h"
-#include "RecoveryManagerImpl.h"
-
-using namespace boost;
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-BasicMessage::BasicMessage(
- const ConnectionToken* const _publisher,
- const string& _exchange, const string& _routingKey,
- bool _mandatory, bool _immediate, framing::AMQMethodBody::shared_ptr respondTo
-) :
- Message(_publisher, _exchange, _routingKey, _mandatory,
- _immediate, respondTo),
- size(0)
-{}
-
-// For tests only.
-BasicMessage::BasicMessage() : size(0)
-{}
-
-BasicMessage::~BasicMessage(){}
-
-void BasicMessage::setHeader(AMQHeaderBody::shared_ptr _header){
- this->header = _header;
-}
-
-void BasicMessage::addContent(AMQContentBody::shared_ptr data){
- if (!content.get()) {
- content = std::auto_ptr<Content>(new InMemoryContent());
- }
- content->add(data);
- size += data->size();
-}
-
-bool BasicMessage::isComplete(){
- return header.get() && (header->getContentSize() == contentSize());
-}
-
-void BasicMessage::deliver(ChannelAdapter& channel,
- const string& consumerTag, uint64_t deliveryTag,
- uint32_t framesize)
-{
- // CCT -- TODO - Update code generator to take pointer/ not
- // instance to avoid extra contruction
- channel.send(
- new BasicDeliverBody(
- channel.getVersion(), consumerTag, deliveryTag,
- getRedelivered(), getExchange(), getRoutingKey()));
- sendContent(channel, framesize);
-}
-
-void BasicMessage::sendGetOk(const MethodContext& context,
- const std::string& /*destination*/,
- uint32_t messageCount,
- uint64_t deliveryTag,
- uint32_t framesize)
-{
- // CCT -- TODO - Update code generator to take pointer/ not
- // instance to avoid extra contruction
- context.channel->send(
- new BasicGetOkBody(
- context.channel->getVersion(),
- context.methodBody->getRequestId(),
- deliveryTag, getRedelivered(), getExchange(),
- getRoutingKey(), messageCount));
- sendContent(*context.channel, framesize);
-}
-
-void BasicMessage::sendContent(
- ChannelAdapter& channel, uint32_t framesize)
-{
- channel.send(header);
- Mutex::ScopedLock locker(contentLock);
- if (content.get())
- content->send(channel, framesize);
-}
-
-BasicHeaderProperties* BasicMessage::getHeaderProperties(){
- return boost::polymorphic_downcast<BasicHeaderProperties*>(
- header->getProperties());
-}
-
-const FieldTable& BasicMessage::getApplicationHeaders(){
- return getHeaderProperties()->getHeaders();
-}
-
-bool BasicMessage::isPersistent()
-{
- if(!header) return false;
- BasicHeaderProperties* props = getHeaderProperties();
- return props && props->getDeliveryMode() == PERSISTENT;
-}
-
-void BasicMessage::decode(Buffer& buffer, bool headersOnly, uint32_t contentChunkSize)
-{
- decodeHeader(buffer);
- if (!headersOnly) decodeContent(buffer, contentChunkSize);
-}
-
-void BasicMessage::decodeHeader(Buffer& buffer)
-{
- //don't care about the type here, but want encode/decode to be symmetric
- RecoveryManagerImpl::decodeMessageType(buffer);
-
- string exchange;
- string routingKey;
-
- buffer.getShortString(exchange);
- buffer.getShortString(routingKey);
- setRouting(exchange, routingKey);
-
- uint32_t headerSize = buffer.getLong();
- AMQHeaderBody::shared_ptr headerBody(new AMQHeaderBody());
- headerBody->decode(buffer, headerSize);
- setHeader(headerBody);
-}
-
-void BasicMessage::decodeContent(Buffer& buffer, uint32_t chunkSize)
-{
- uint64_t expected = expectedContentSize();
- if (expected != buffer.available()) {
- std::cout << "WARN: Expected " << expectedContentSize() << " bytes, got " << buffer.available() << std::endl;
- throw Exception("Cannot decode content, buffer not large enough.");
- }
-
- if (!chunkSize || chunkSize > expected) {
- chunkSize = expected;
- }
-
- uint64_t total = 0;
- while (total < expectedContentSize()) {
- uint64_t remaining = expected - total;
- AMQContentBody::shared_ptr contentBody(new AMQContentBody());
- contentBody->decode(buffer, remaining < chunkSize ? remaining : chunkSize);
- addContent(contentBody);
- total += chunkSize;
- }
-}
-
-void BasicMessage::encode(Buffer& buffer) const
-{
- encodeHeader(buffer);
- encodeContent(buffer);
-}
-
-void BasicMessage::encodeHeader(Buffer& buffer) const
-{
- RecoveryManagerImpl::encodeMessageType(*this, buffer);
- buffer.putShortString(getExchange());
- buffer.putShortString(getRoutingKey());
- buffer.putLong(header->size());
- header->encode(buffer);
-}
-
-void BasicMessage::encodeContent(Buffer& buffer) const
-{
- Mutex::ScopedLock locker(contentLock);
- if (content.get()) content->encode(buffer);
-}
-
-uint32_t BasicMessage::encodedSize() const
-{
- return encodedHeaderSize() + encodedContentSize();
-}
-
-uint32_t BasicMessage::encodedContentSize() const
-{
- Mutex::ScopedLock locker(contentLock);
- return content.get() ? content->size() : 0;
-}
-
-uint32_t BasicMessage::encodedHeaderSize() const
-{
- return RecoveryManagerImpl::encodedMessageTypeSize()
- +getExchange().size() + 1
- + getRoutingKey().size() + 1
- + header->size() + 4;//4 extra bytes for size
-}
-
-uint64_t BasicMessage::expectedContentSize()
-{
- return header.get() ? header->getContentSize() : 0;
-}
-
-void BasicMessage::releaseContent(MessageStore* store)
-{
- Mutex::ScopedLock locker(contentLock);
- if (!isPersistent() && getPersistenceId() == 0) {
- store->stage(*this);
- }
- if (!content.get() || content->size() > 0) {
- //set content to lazy loading mode (but only if there is
- //stored content):
-
- //Note: the LazyLoadedContent instance contains a raw pointer
- //to the message, however it is then set as a member of that
- //message so its lifetime is guaranteed to be no longer than
- //that of the message itself
- content = std::auto_ptr<Content>(
- new LazyLoadedContent(store, this, expectedContentSize()));
- }
-}
-
-void BasicMessage::setContent(std::auto_ptr<Content>& _content)
-{
- Mutex::ScopedLock locker(contentLock);
- content = _content;
-}
diff --git a/qpid/cpp/lib/broker/BrokerMessage.h b/qpid/cpp/lib/broker/BrokerMessage.h
deleted file mode 100644
index 8b408ae669..0000000000
--- a/qpid/cpp/lib/broker/BrokerMessage.h
+++ /dev/null
@@ -1,137 +0,0 @@
-#ifndef _broker_BrokerMessage_h
-#define _broker_BrokerMessage_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <memory>
-#include <boost/shared_ptr.hpp>
-
-#include <BrokerMessageBase.h>
-#include <BasicHeaderProperties.h>
-#include <ConnectionToken.h>
-#include <Content.h>
-#include <Mutex.h>
-#include <TxBuffer.h>
-
-namespace qpid {
-
-namespace framing {
-class MethodContext;
-class ChannelAdapter;
-class AMQHeaderBody;
-}
-
-namespace broker {
-
-class MessageStore;
-using framing::string;
-
-/**
- * Represents an AMQP message, i.e. a header body, a list of
- * content bodies and some details about the publication
- * request.
- */
-class BasicMessage : public Message {
- boost::shared_ptr<framing::AMQHeaderBody> header;
- std::auto_ptr<Content> content;
- mutable sys::Mutex contentLock;
- uint64_t size;
-
- void sendContent(framing::ChannelAdapter&, uint32_t framesize);
-
- public:
- typedef boost::shared_ptr<BasicMessage> shared_ptr;
-
- BasicMessage(const ConnectionToken* const publisher,
- const string& exchange, const string& routingKey,
- bool mandatory, bool immediate,
- boost::shared_ptr<framing::AMQMethodBody> respondTo);
- BasicMessage();
- ~BasicMessage();
- void setHeader(boost::shared_ptr<framing::AMQHeaderBody> header);
- void addContent(framing::AMQContentBody::shared_ptr data);
- bool isComplete();
-
- void deliver(framing::ChannelAdapter&,
- const string& consumerTag,
- uint64_t deliveryTag,
- uint32_t framesize);
-
- void sendGetOk(const framing::MethodContext&,
- const std::string& destination,
- uint32_t messageCount,
- uint64_t deliveryTag,
- uint32_t framesize);
-
- framing::BasicHeaderProperties* getHeaderProperties();
- const framing::FieldTable& getApplicationHeaders();
- bool isPersistent();
- uint64_t contentSize() const { return size; }
-
- void decode(framing::Buffer& buffer, bool headersOnly = false,
- uint32_t contentChunkSize = 0);
- void decodeHeader(framing::Buffer& buffer);
- void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0);
-
- void encode(framing::Buffer& buffer) const;
- void encodeHeader(framing::Buffer& buffer) const;
- void encodeContent(framing::Buffer& buffer) const;
- /**
- * @returns the size of the buffer needed to encode this
- * message in its entirety
- */
- uint32_t encodedSize() const;
- /**
- * @returns the size of the buffer needed to encode the
- * 'header' of this message (not just the header frame,
- * but other meta data e.g.routing key and exchange)
- */
- uint32_t encodedHeaderSize() const;
- /**
- * @returns the size of the buffer needed to encode the
- * (possibly partial) content held by this message
- */
- uint32_t encodedContentSize() const;
- /**
- * Releases the in-memory content data held by this
- * message. Must pass in a store from which the data can
- * be reloaded.
- */
- void releaseContent(MessageStore* store);
- /**
- * If headers have been received, returns the expected
- * content size else returns 0.
- */
- uint64_t expectedContentSize();
- /**
- * Sets the 'content' implementation of this message (the
- * message controls the lifecycle of the content instance
- * it uses).
- */
- void setContent(std::auto_ptr<Content>& content);
-};
-
-}
-}
-
-
-#endif /*!_broker_BrokerMessage_h*/
diff --git a/qpid/cpp/lib/broker/BrokerMessageBase.h b/qpid/cpp/lib/broker/BrokerMessageBase.h
deleted file mode 100644
index da0cc57756..0000000000
--- a/qpid/cpp/lib/broker/BrokerMessageBase.h
+++ /dev/null
@@ -1,182 +0,0 @@
-#ifndef _broker_BrokerMessageBase_h
-#define _broker_BrokerMessageBase_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <string>
-#include <boost/shared_ptr.hpp>
-#include "Content.h"
-#include "PersistableMessage.h"
-#include "framing/amqp_types.h"
-
-namespace qpid {
-
-namespace framing {
-class MethodContext;
-class ChannelAdapter;
-class BasicHeaderProperties;
-class FieldTable;
-class AMQMethodBody;
-class AMQContentBody;
-class AMQHeaderBody;
-}
-
-
-namespace broker {
-class ConnectionToken;
-class MessageStore;
-
-/**
- * Base class for all types of internal broker messages
- * abstracting away the operations
- * TODO; AMS: for the moment this is mostly a placeholder
- */
-class Message : public PersistableMessage{
- public:
- typedef boost::shared_ptr<Message> shared_ptr;
- typedef boost::shared_ptr<framing::AMQMethodBody> AMQMethodBodyPtr;
-
-
- Message(const ConnectionToken* publisher_,
- const std::string& _exchange,
- const std::string& _routingKey,
- bool _mandatory, bool _immediate,
- AMQMethodBodyPtr respondTo_) :
- publisher(publisher_),
- exchange(_exchange),
- routingKey(_routingKey),
- mandatory(_mandatory),
- immediate(_immediate),
- persistenceId(0),
- redelivered(false),
- respondTo(respondTo_)
- {}
-
- Message() :
- mandatory(false),
- immediate(false),
- persistenceId(0),
- redelivered(false)
- {}
-
- virtual ~Message() {};
-
- // Accessors
- const std::string& getRoutingKey() const { return routingKey; }
- const std::string& getExchange() const { return exchange; }
- uint64_t getPersistenceId() const { return persistenceId; }
- bool getRedelivered() const { return redelivered; }
- AMQMethodBodyPtr getRespondTo() const { return respondTo; }
-
- void setRouting(const std::string& _exchange, const std::string& _routingKey)
- { exchange = _exchange; routingKey = _routingKey; }
- void setPersistenceId(uint64_t _persistenceId) { persistenceId = _persistenceId; } // XXXX: Only used in tests?
- void redeliver() { redelivered = true; }
-
- /**
- * Used to deliver the message from the queue
- */
- virtual void deliver(framing::ChannelAdapter& channel,
- const std::string& consumerTag,
- uint64_t deliveryTag,
- uint32_t framesize) = 0;
- /**
- * Used to return a message in response to a get from a queue
- */
- virtual void sendGetOk(const framing::MethodContext& context,
- const std::string& destination,
- uint32_t messageCount,
- uint64_t deliveryTag,
- uint32_t framesize) = 0;
-
- virtual bool isComplete() = 0;
-
- virtual uint64_t contentSize() const = 0;
- virtual framing::BasicHeaderProperties* getHeaderProperties() = 0;
- virtual const framing::FieldTable& getApplicationHeaders() = 0;
- virtual bool isPersistent() = 0;
- virtual const ConnectionToken* getPublisher() const {
- return publisher;
- }
-
- virtual void encode(framing::Buffer& buffer) const = 0;
- virtual void encodeHeader(framing::Buffer& buffer) const = 0;
-
- /**
- * @returns the size of the buffer needed to encode this
- * message in its entirety
- */
- virtual uint32_t encodedSize() const = 0;
- /**
- * @returns the size of the buffer needed to encode the
- * 'header' of this message (not just the header frame,
- * but other meta data e.g.routing key and exchange)
- */
- virtual uint32_t encodedHeaderSize() const = 0;
- /**
- * @returns the size of the buffer needed to encode the
- * (possibly partial) content held by this message
- */
- virtual uint32_t encodedContentSize() const = 0;
- /**
- * If headers have been received, returns the expected
- * content size else returns 0.
- */
- virtual uint64_t expectedContentSize() = 0;
-
- virtual void decodeHeader(framing::Buffer& buffer) = 0;
- virtual void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0) = 0;
-
- static shared_ptr decode(framing::Buffer& buffer);
-
- // TODO: AMS 29/1/2007 Don't think these are really part of base class
-
- /**
- * Sets the 'content' implementation of this message (the
- * message controls the lifecycle of the content instance
- * it uses).
- */
- virtual void setContent(std::auto_ptr<Content>& /*content*/) {};
- virtual void setHeader(boost::shared_ptr<framing::AMQHeaderBody>) {};
- virtual void addContent(boost::shared_ptr<framing::AMQContentBody>) {};
- /**
- * Releases the in-memory content data held by this
- * message. Must pass in a store from which the data can
- * be reloaded.
- */
- virtual void releaseContent(MessageStore* /*store*/) {};
-
- private:
- const ConnectionToken* publisher;
- std::string exchange;
- std::string routingKey;
- const bool mandatory;
- const bool immediate;
- uint64_t persistenceId;
- bool redelivered;
- AMQMethodBodyPtr respondTo;
-};
-
-}}
-
-
-#endif /*!_broker_BrokerMessage_h*/
diff --git a/qpid/cpp/lib/broker/BrokerMessageMessage.cpp b/qpid/cpp/lib/broker/BrokerMessageMessage.cpp
deleted file mode 100644
index 73ad961938..0000000000
--- a/qpid/cpp/lib/broker/BrokerMessageMessage.cpp
+++ /dev/null
@@ -1,305 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "QpidError.h"
-#include "BrokerMessageMessage.h"
-#include "ChannelAdapter.h"
-#include "MessageTransferBody.h"
-#include "MessageOpenBody.h"
-#include "MessageCloseBody.h"
-#include "MessageAppendBody.h"
-#include "Reference.h"
-#include "framing/AMQFrame.h"
-#include "framing/FieldTable.h"
-#include "framing/BasicHeaderProperties.h"
-#include "RecoveryManagerImpl.h"
-
-#include <algorithm>
-
-using namespace std;
-using namespace qpid::framing;
-
-namespace qpid {
-namespace broker {
-
-MessageMessage::MessageMessage(
- ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_
-) : Message(publisher, transfer_->getDestination(),
- transfer_->getRoutingKey(),
- transfer_->getMandatory(),
- transfer_->getImmediate(),
- transfer_),
- requestId(requestId_),
- transfer(transfer_)
-{}
-
-MessageMessage::MessageMessage(
- ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_,
- ReferencePtr reference_
-) : Message(publisher, transfer_->getDestination(),
- transfer_->getRoutingKey(),
- transfer_->getMandatory(),
- transfer_->getImmediate(),
- transfer_),
- requestId(requestId_),
- transfer(transfer_),
- reference(reference_)
-{}
-
-/**
- * Currently used by message store impls to recover messages
- */
-MessageMessage::MessageMessage() : transfer(new MessageTransferBody(qpid::framing::highestProtocolVersion)) {}
-
-// TODO: astitcher 1-Mar-2007: This code desperately needs better factoring
-void MessageMessage::transferMessage(
- framing::ChannelAdapter& channel,
- const std::string& consumerTag,
- uint32_t framesize)
-{
- const framing::Content& body = transfer->getBody();
-
- // Send any reference data
- if (!body.isInline()){
- // Open
- channel.send(new MessageOpenBody(channel.getVersion(), reference->getId()));
- // Appends
- for(Reference::Appends::const_iterator a = reference->getAppends().begin();
- a != reference->getAppends().end();
- ++a) {
- uint32_t sizeleft = (*a)->size();
- const string& content = (*a)->getBytes();
- // Calculate overhead bytes
- // Assume that the overhead is constant as the reference name doesn't change
- uint32_t overhead = sizeleft - content.size();
- string::size_type contentStart = 0;
- while (sizeleft) {
- string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead;
- channel.send(new MessageAppendBody(channel.getVersion(), reference->getId(),
- string(content, contentStart, contentSize)));
- sizeleft -= contentSize;
- contentStart += contentSize;
- }
- }
- }
-
- // The transfer
- if ( transfer->size()<=framesize ) {
- channel.send(
- new MessageTransferBody(channel.getVersion(),
- transfer->getTicket(),
- consumerTag,
- getRedelivered(),
- transfer->getImmediate(),
- transfer->getTtl(),
- transfer->getPriority(),
- transfer->getTimestamp(),
- transfer->getDeliveryMode(),
- transfer->getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer->getMessageId(),
- transfer->getCorrelationId(),
- transfer->getReplyTo(),
- transfer->getContentType(),
- transfer->getContentEncoding(),
- transfer->getUserId(),
- transfer->getAppId(),
- transfer->getTransactionId(),
- transfer->getSecurityToken(),
- transfer->getApplicationHeaders(),
- body,
- transfer->getMandatory()));
- } else {
- // Thing to do here is to construct a simple reference message then deliver that instead
- // fragmentation will be taken care of in the delivery if necessary;
- string content = body.getValue();
- string refname = "dummy";
- TransferPtr newTransfer(
- new MessageTransferBody(channel.getVersion(),
- transfer->getTicket(),
- consumerTag,
- getRedelivered(),
- transfer->getImmediate(),
- transfer->getTtl(),
- transfer->getPriority(),
- transfer->getTimestamp(),
- transfer->getDeliveryMode(),
- transfer->getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer->getMessageId(),
- transfer->getCorrelationId(),
- transfer->getReplyTo(),
- transfer->getContentType(),
- transfer->getContentEncoding(),
- transfer->getUserId(),
- transfer->getAppId(),
- transfer->getTransactionId(),
- transfer->getSecurityToken(),
- transfer->getApplicationHeaders(),
- framing::Content(REFERENCE, refname),
- transfer->getMandatory()));
- ReferencePtr newRef(new Reference(refname));
- Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content));
- newRef->append(newAppend);
- MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), 0, newTransfer, newRef);
- newMsg.transferMessage(channel, consumerTag, framesize);
- return;
- }
- // Close any reference data
- if (!body.isInline()){
- // Close
- channel.send(new MessageCloseBody(channel.getVersion(), reference->getId()));
- }
-}
-
-void MessageMessage::deliver(
- framing::ChannelAdapter& channel,
- const std::string& consumerTag,
- uint64_t /*deliveryTag*/,
- uint32_t framesize)
-{
- transferMessage(channel, consumerTag, framesize);
-}
-
-void MessageMessage::sendGetOk(
- const framing::MethodContext& context,
- const std::string& destination,
- uint32_t /*messageCount*/,
- uint64_t /*deliveryTag*/,
- uint32_t framesize)
-{
- framing::ChannelAdapter* channel = context.channel;
- transferMessage(*channel, destination, framesize);
-}
-
-bool MessageMessage::isComplete()
-{
- return true;
-}
-
-uint64_t MessageMessage::contentSize() const
-{
- if (transfer->getBody().isInline())
- return transfer->getBody().getValue().size();
- else
- return reference->getSize();
-}
-
-qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties()
-{
- return 0; // FIXME aconway 2007-02-05:
-}
-
-const FieldTable& MessageMessage::getApplicationHeaders()
-{
- return transfer->getApplicationHeaders();
-}
-bool MessageMessage::isPersistent()
-{
- return transfer->getDeliveryMode() == PERSISTENT;
-}
-
-uint32_t MessageMessage::encodedSize() const
-{
- return encodedHeaderSize() + encodedContentSize();
-}
-
-uint32_t MessageMessage::encodedHeaderSize() const
-{
- return RecoveryManagerImpl::encodedMessageTypeSize() + transfer->size() - transfer->baseSize();
-}
-
-uint32_t MessageMessage::encodedContentSize() const
-{
- return 0;
-}
-
-uint64_t MessageMessage::expectedContentSize()
-{
- return 0;
-}
-
-void MessageMessage::encode(Buffer& buffer) const
-{
- encodeHeader(buffer);
-}
-
-void MessageMessage::encodeHeader(Buffer& buffer) const
-{
- RecoveryManagerImpl::encodeMessageType(*this, buffer);
- if (transfer->getBody().isInline()) {
- transfer->encodeContent(buffer);
- } else {
- string data;
- for(Reference::Appends::const_iterator a = reference->getAppends().begin(); a != reference->getAppends().end(); ++a) {
- data += (*a)->getBytes();
- }
- framing::Content body(INLINE, data);
- std::auto_ptr<MessageTransferBody> copy(copyTransfer(transfer->version, transfer->getDestination(), body));
- copy->encodeContent(buffer);
- }
-}
-
-void MessageMessage::decodeHeader(Buffer& buffer)
-{
- //don't care about the type here, but want encode/decode to be symmetric
- RecoveryManagerImpl::decodeMessageType(buffer);
-
- transfer->decodeContent(buffer);
-}
-
-void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/)
-{
-}
-
-
-MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version,
- const string& destination,
- const framing::Content& body) const
-{
- return new MessageTransferBody(version,
- transfer->getTicket(),
- destination,
- getRedelivered(),
- transfer->getImmediate(),
- transfer->getTtl(),
- transfer->getPriority(),
- transfer->getTimestamp(),
- transfer->getDeliveryMode(),
- transfer->getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer->getMessageId(),
- transfer->getCorrelationId(),
- transfer->getReplyTo(),
- transfer->getContentType(),
- transfer->getContentEncoding(),
- transfer->getUserId(),
- transfer->getAppId(),
- transfer->getTransactionId(),
- transfer->getSecurityToken(),
- transfer->getApplicationHeaders(),
- body,
- transfer->getMandatory());
-
-}
-}} // namespace qpid::broker
diff --git a/qpid/cpp/lib/broker/BrokerMessageMessage.h b/qpid/cpp/lib/broker/BrokerMessageMessage.h
deleted file mode 100644
index 1da171fba8..0000000000
--- a/qpid/cpp/lib/broker/BrokerMessageMessage.h
+++ /dev/null
@@ -1,99 +0,0 @@
-#ifndef _broker_BrokerMessageMessage_h
-#define _broker_BrokerMessageMessage_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "BrokerMessageBase.h"
-#include "MessageTransferBody.h"
-#include "amqp_types.h"
-
-#include <vector>
-
-namespace qpid {
-
-namespace framing {
-class MessageTransferBody;
-}
-
-namespace broker {
-class ConnectionToken;
-class Reference;
-
-class MessageMessage: public Message{
- public:
- typedef boost::shared_ptr<MessageMessage> shared_ptr;
- typedef boost::shared_ptr<framing::MessageTransferBody> TransferPtr;
- typedef boost::shared_ptr<Reference> ReferencePtr;
-
- MessageMessage(ConnectionToken* publisher, framing::RequestId, TransferPtr transfer);
- MessageMessage(ConnectionToken* publisher, framing::RequestId, TransferPtr transfer, ReferencePtr reference);
- MessageMessage();
-
- // Default destructor okay
-
- framing::RequestId getRequestId() {return requestId; }
- TransferPtr getTransfer() { return transfer; }
- ReferencePtr getReference() { return reference; }
-
- void deliver(framing::ChannelAdapter& channel,
- const std::string& consumerTag,
- uint64_t deliveryTag,
- uint32_t framesize);
-
- void sendGetOk(const framing::MethodContext& context,
- const std::string& destination,
- uint32_t messageCount,
- uint64_t deliveryTag,
- uint32_t framesize);
-
- bool isComplete();
-
- uint64_t contentSize() const;
- framing::BasicHeaderProperties* getHeaderProperties();
- const framing::FieldTable& getApplicationHeaders();
- bool isPersistent();
-
- void encode(framing::Buffer& buffer) const;
- void encodeHeader(framing::Buffer& buffer) const;
- uint32_t encodedSize() const;
- uint32_t encodedHeaderSize() const;
- uint32_t encodedContentSize() const;
- uint64_t expectedContentSize();
- void decodeHeader(framing::Buffer& buffer);
- void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0);
-
- private:
- void transferMessage(framing::ChannelAdapter& channel,
- const std::string& consumerTag,
- uint32_t framesize);
- framing::MessageTransferBody* copyTransfer(const framing::ProtocolVersion& version,
- const std::string& destination,
- const framing::Content& body) const;
-
- framing::RequestId requestId;
- const TransferPtr transfer;
- const ReferencePtr reference;
-};
-
-}}
-
-
-#endif /*!_broker_BrokerMessage_h*/
diff --git a/qpid/cpp/lib/broker/BrokerQueue.cpp b/qpid/cpp/lib/broker/BrokerQueue.cpp
deleted file mode 100644
index e2b59aa766..0000000000
--- a/qpid/cpp/lib/broker/BrokerQueue.cpp
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <boost/format.hpp>
-
-#include <BrokerQueue.h>
-#include <MessageStore.h>
-#include <sys/Monitor.h>
-#include <sys/Time.h>
-#include <iostream>
-#include "QueueRegistry.h"
-
-using namespace qpid::broker;
-using namespace qpid::sys;
-using namespace qpid::framing;
-using boost::format;
-
-Queue::Queue(const string& _name, uint32_t _autodelete,
- MessageStore* const _store,
- const ConnectionToken* const _owner) :
-
- name(_name),
- autodelete(_autodelete),
- store(_store),
- owner(_owner),
- queueing(false),
- dispatching(false),
- next(0),
- lastUsed(0),
- exclusive(0),
- persistenceId(0)
-{
- if(autodelete) lastUsed = now()/TIME_MSEC;
-}
-
-Queue::~Queue(){}
-
-void Queue::deliver(Message::shared_ptr& msg){
- enqueue(0, msg);
- process(msg);
-}
-
-void Queue::recover(Message::shared_ptr& msg){
- push(msg);
- if (store && msg->expectedContentSize() != msg->encodedContentSize()) {
- //content has not been loaded, need to ensure that lazy loading mode is set:
- //TODO: find a nicer way to do this
- msg->releaseContent(store);
- }
-}
-
-void Queue::process(Message::shared_ptr& msg){
- Mutex::ScopedLock locker(lock);
- if(queueing || !dispatch(msg)){
- push(msg);
- }
-}
-
-bool Queue::dispatch(Message::shared_ptr& msg){
- if(consumers.empty()){
- return false;
- }else if(exclusive){
- if(!exclusive->deliver(msg)){
- std::cout << "WARNING: Dropping undeliverable message from queue with exclusive consumer." << std::endl;
- }
- return true;
- }else{
- //deliver to next consumer
- next = next % consumers.size();
- Consumer* c = consumers[next];
- int start = next;
- while(c){
- next++;
- if(c->deliver(msg)) return true;
-
- next = next % consumers.size();
- c = next == start ? 0 : consumers[next];
- }
- return false;
- }
-}
-
-bool Queue::startDispatching(){
- Mutex::ScopedLock locker(lock);
- if(queueing && !dispatching){
- dispatching = true;
- return true;
- }else{
- return false;
- }
-}
-
-void Queue::dispatch(){
- bool proceed = startDispatching();
- while(proceed){
- Mutex::ScopedLock locker(lock);
- if(!messages.empty() && dispatch(messages.front())){
- pop();
- }else{
- dispatching = false;
- proceed = false;
- queueing = !messages.empty();
- }
- }
-}
-
-void Queue::consume(Consumer* c, bool requestExclusive){
- Mutex::ScopedLock locker(lock);
- if(exclusive)
- throw ChannelException(
- 403, format("Queue '%s' has an exclusive consumer."
- " No more consumers allowed.") % getName());
- if(requestExclusive) {
- if(!consumers.empty())
- throw ChannelException(
- 403, format("Queue '%s' already has conumers."
- "Exclusive access denied.") %getName());
- exclusive = c;
- }
- if(autodelete && consumers.empty()) lastUsed = 0;
- consumers.push_back(c);
-}
-
-void Queue::cancel(Consumer* c){
- Mutex::ScopedLock locker(lock);
- Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c);
- if (i != consumers.end())
- consumers.erase(i);
- if(autodelete && consumers.empty()) lastUsed = now()*TIME_MSEC;
- if(exclusive == c) exclusive = 0;
-}
-
-Message::shared_ptr Queue::dequeue(){
- Mutex::ScopedLock locker(lock);
- Message::shared_ptr msg;
- if(!messages.empty()){
- msg = messages.front();
- pop();
- }
- return msg;
-}
-
-uint32_t Queue::purge(){
- Mutex::ScopedLock locker(lock);
- int count = messages.size();
- while(!messages.empty()) pop();
- return count;
-}
-
-void Queue::pop(){
- if (policy.get()) policy->dequeued(messages.front()->contentSize());
- messages.pop();
-}
-
-void Queue::push(Message::shared_ptr& msg){
- queueing = true;
- messages.push(msg);
- if (policy.get()) {
- policy->enqueued(msg->contentSize());
- if (policy->limitExceeded()) {
- msg->releaseContent(store);
- }
- }
-}
-
-uint32_t Queue::getMessageCount() const{
- Mutex::ScopedLock locker(lock);
- return messages.size();
-}
-
-uint32_t Queue::getConsumerCount() const{
- Mutex::ScopedLock locker(lock);
- return consumers.size();
-}
-
-bool Queue::canAutoDelete() const{
- Mutex::ScopedLock locker(lock);
- return lastUsed && (now()*TIME_MSEC - lastUsed > autodelete);
-}
-
-void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg)
-{
- if (msg->isPersistent() && store) {
- store->enqueue(ctxt, *msg.get(), *this);
- }
-}
-
-void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg)
-{
- if (msg->isPersistent() && store) {
- store->dequeue(ctxt, *msg.get(), *this);
- }
-}
-
-namespace
-{
- const std::string qpidMaxSize("qpid.max_size");
- const std::string qpidMaxCount("qpid.max_count");
-}
-
-void Queue::create(const FieldTable& settings)
-{
- //TODO: hold onto settings and persist them as part of encode
- // in fact settings should be passed in on construction
- if (store) {
- store->create(*this);
- }
- configure(settings);
-}
-
-void Queue::configure(const FieldTable& settings)
-{
- std::auto_ptr<QueuePolicy> _policy(new QueuePolicy(settings));
- if (_policy->getMaxCount() || _policy->getMaxSize())
- setPolicy(_policy);
-}
-
-void Queue::destroy()
-{
- if (store) {
- store->destroy(*this);
- }
-}
-
-void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
-{
- policy = _policy;
-}
-
-const QueuePolicy* const Queue::getPolicy()
-{
- return policy.get();
-}
-
-uint64_t Queue::getPersistenceId() const
-{
- return persistenceId;
-}
-
-void Queue::setPersistenceId(uint64_t _persistenceId)
-{
- persistenceId = _persistenceId;
-}
-
-void Queue::encode(framing::Buffer& buffer) const
-{
- buffer.putShortString(name);
- //TODO store all required properties
-}
-
-uint32_t Queue::encodedSize() const
-{
- //TODO, revise when storing full set of queue properties
- return name.size() + 1/*short string size octet*/;
-}
-
-Queue::shared_ptr Queue::decode(QueueRegistry& queues, framing::Buffer& buffer)
-{
- string name;
- buffer.getShortString(name);
- std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true);
- return result.first;
-}
-
diff --git a/qpid/cpp/lib/broker/BrokerQueue.h b/qpid/cpp/lib/broker/BrokerQueue.h
deleted file mode 100644
index 20d81e4e87..0000000000
--- a/qpid/cpp/lib/broker/BrokerQueue.h
+++ /dev/null
@@ -1,151 +0,0 @@
-#ifndef _broker_BrokerQueue_h
-#define _broker_BrokerQueue_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <vector>
-#include <memory>
-#include <queue>
-#include <boost/shared_ptr.hpp>
-#include <amqp_types.h>
-#include <ConnectionToken.h>
-#include <Consumer.h>
-#include <BrokerMessage.h>
-#include <FieldTable.h>
-#include <sys/Monitor.h>
-#include "PersistableQueue.h"
-#include <QueuePolicy.h>
-
-// TODO aconway 2007-02-06: Use auto_ptr and boost::ptr_vector to
-// enforce ownership of Consumers.
-
-namespace qpid {
- namespace broker {
- class MessageStore;
- class QueueRegistry;
-
- /**
- * Thrown when exclusive access would be violated.
- */
- using std::string;
-
- /**
- * The brokers representation of an amqp queue. Messages are
- * delivered to a queue from where they can be dispatched to
- * registered consumers or be stored until dequeued or until one
- * or more consumers registers.
- */
- class Queue : public PersistableQueue{
- typedef std::vector<Consumer*> Consumers;
- typedef std::queue<Message::shared_ptr> Messages;
-
- const string name;
- const uint32_t autodelete;
- MessageStore* const store;
- const ConnectionToken* const owner;
- Consumers consumers;
- Messages messages;
- bool queueing;
- bool dispatching;
- int next;
- mutable qpid::sys::Mutex lock;
- int64_t lastUsed;
- Consumer* exclusive;
- mutable uint64_t persistenceId;
- std::auto_ptr<QueuePolicy> policy;
-
- void pop();
- void push(Message::shared_ptr& msg);
- bool startDispatching();
- bool dispatch(Message::shared_ptr& msg);
- void setPolicy(std::auto_ptr<QueuePolicy> policy);
-
- public:
-
- typedef boost::shared_ptr<Queue> shared_ptr;
-
- typedef std::vector<shared_ptr> vector;
-
- Queue(const string& name, uint32_t autodelete = 0,
- MessageStore* const store = 0,
- const ConnectionToken* const owner = 0);
- ~Queue();
-
- void create(const qpid::framing::FieldTable& settings);
- void configure(const qpid::framing::FieldTable& settings);
- void destroy();
- /**
- * Delivers a message to the queue. Will record it as
- * enqueued if persistent then process it.
- */
- void deliver(Message::shared_ptr& msg);
- /**
- * Dispatches the messages immediately to a consumer if
- * one is available or stores it for later if not.
- */
- void process(Message::shared_ptr& msg);
- /**
- * Used during recovery to add stored messages back to the queue
- */
- void recover(Message::shared_ptr& msg);
- /**
- * Dispatch any queued messages providing there are
- * consumers for them. Only one thread can be dispatching
- * at any time, but this method (rather than the caller)
- * is responsible for ensuring that.
- */
- void dispatch();
- void consume(Consumer* c, bool exclusive = false);
- void cancel(Consumer* c);
- uint32_t purge();
- uint32_t getMessageCount() const;
- uint32_t getConsumerCount() const;
- inline const string& getName() const { return name; }
- inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; }
- inline bool hasExclusiveConsumer() const { return exclusive; }
-
- bool canAutoDelete() const;
-
- void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg);
- /**
- * dequeue from store (only done once messages is acknowledged)
- */
- void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg);
- /**
- * dequeues from memory only
- */
- Message::shared_ptr dequeue();
-
- const QueuePolicy* const getPolicy();
-
- //PersistableQueue support:
- uint64_t getPersistenceId() const;
- void setPersistenceId(uint64_t persistenceId);
- void encode(framing::Buffer& buffer) const;
- uint32_t encodedSize() const;
-
- static Queue::shared_ptr decode(QueueRegistry& queues, framing::Buffer& buffer);
- };
- }
-}
-
-
-#endif /*!_broker_BrokerQueue_h*/
diff --git a/qpid/cpp/lib/broker/BrokerSingleton.cpp b/qpid/cpp/lib/broker/BrokerSingleton.cpp
deleted file mode 100644
index 4571764850..0000000000
--- a/qpid/cpp/lib/broker/BrokerSingleton.cpp
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "BrokerSingleton.h"
-
-namespace qpid {
-namespace broker {
-
-BrokerSingleton::BrokerSingleton() {
- if (broker.get() == 0)
- broker = Broker::create();
- Broker::shared_ptr::operator=(broker);
-}
-
-BrokerSingleton::~BrokerSingleton() {
- broker->shutdown();
-}
-
-Broker::shared_ptr BrokerSingleton::broker;
-
-}} // namespace qpid::broker
diff --git a/qpid/cpp/lib/broker/BrokerSingleton.h b/qpid/cpp/lib/broker/BrokerSingleton.h
deleted file mode 100644
index 139e02a5fd..0000000000
--- a/qpid/cpp/lib/broker/BrokerSingleton.h
+++ /dev/null
@@ -1,52 +0,0 @@
-#ifndef _broker_BrokerSingleton_h
-#define _broker_BrokerSingleton_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "Broker.h"
-
-namespace qpid {
-namespace broker {
-
-/**
- * BrokerSingleton is a smart pointer to a process-wide singleton broker
- * started on an os-chosen port. The broker starts the first time
- * an instance of BrokerSingleton is created and runs untill the process exits.
- *
- * Useful for unit tests that want to share a broker between multiple
- * tests to reduce overhead of starting/stopping a broker for every test.
- *
- * Tests that need a new broker can call Broker::create directly.
- *
- * THREAD UNSAFE.
- */
-class BrokerSingleton : public Broker::shared_ptr
-{
- public:
- BrokerSingleton();
- ~BrokerSingleton();
- private:
- static Broker::shared_ptr broker;
-};
-
-}} // namespace qpid::broker
-
-
-
-#endif /*!_broker_BrokerSingleton_h*/
diff --git a/qpid/cpp/lib/broker/CompletionHandler.h b/qpid/cpp/lib/broker/CompletionHandler.h
deleted file mode 100644
index 9d51656282..0000000000
--- a/qpid/cpp/lib/broker/CompletionHandler.h
+++ /dev/null
@@ -1,39 +0,0 @@
-#ifndef _broker_CompletionHandler_h
-#define _broker_CompletionHandler_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-namespace qpid {
-namespace broker {
-
-/**
- * Callback interface to handle completion of a message.
- */
-class CompletionHandler
-{
- public:
- virtual ~CompletionHandler(){}
- virtual void complete(Message::shared_ptr) = 0;
-};
-
-}} // namespace qpid::broker
-
-
-
-#endif /*!_broker_CompletionHandler_h*/
diff --git a/qpid/cpp/lib/broker/Configuration.cpp b/qpid/cpp/lib/broker/Configuration.cpp
deleted file mode 100644
index e83c359f2d..0000000000
--- a/qpid/cpp/lib/broker/Configuration.cpp
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <Configuration.h>
-#include <string.h>
-#include <config.h>
-
-using namespace qpid::broker;
-using namespace std;
-
-Configuration::Configuration() :
- daemon('d', "daemon", "Run as system daemon, detached from terminal.", false),
- trace('t', "trace", "Print incoming & outgoing frames to the console", false),
- port('p', "port", "Set the port to listen on (default=5672)", 5672),
- workerThreads("worker-threads", "Set the number of worker threads to use (default=5).", 5),
- maxConnections("max-connections", "Set the maximum number of connections the broker can accept (default=500).", 500),
- connectionBacklog("connection-backlog", "Set the connection backlog for the servers socket (default=10)", 10),
- store('s', "store", "Set the message store module to use (default='' which implies no store)", ""),
- stagingThreshold("staging-threshold", "Set the message size threshold above which messages will be written to disk as they arrive (default=5,000,000)", 5000000),
- help("help", "Print usage information", false),
- version("version", "Print version information", false)
-{
- options.push_back(&daemon);
- options.push_back(&trace);
- options.push_back(&port);
- options.push_back(&workerThreads);
- options.push_back(&maxConnections);
- options.push_back(&connectionBacklog);
- options.push_back(&store);
- options.push_back(&stagingThreshold);
- options.push_back(&help);
- options.push_back(&version);
-}
-
-Configuration::~Configuration(){}
-
-void Configuration::parse(char const *progName, int argc, char** argv){
- programName = progName;
- int position = 1;
- while(position < argc){
- bool matched(false);
- for(op_iterator i = options.begin(); i < options.end() && !matched; i++){
- matched = (*i)->parse(position, argv, argc);
- }
- if(!matched) {
- throw BadOptionException(
- std::string("Unrecognised option: ")+argv[position]);
- }
- }
-}
-
-void Configuration::usage(){
- std::cout << "Usage: " << programName << " [OPTION]..." << std::endl
- << "Start the Qpid AMQP broker daemon." << std::endl << std::endl
- << "Options:" << std::endl;
- for(op_iterator i = options.begin(); i < options.end(); i++){
- (*i)->print(std::cout);
- }
-
- std::cout << std::endl << "Report bugs to <" << PACKAGE_BUGREPORT << ">."
- << std::endl;
-}
-
-bool Configuration::isHelp() const {
- return help.getValue();
-}
-
-bool Configuration::isVersion() const {
- return version.getValue();
-}
-
-bool Configuration::isDaemon() const {
- return daemon.getValue();
-}
-
-bool Configuration::isTrace() const {
- return trace.getValue();
-}
-
-int Configuration::getPort() const {
- return port.getValue();
-}
-
-int Configuration::getWorkerThreads() const {
- return workerThreads.getValue();
-}
-
-int Configuration::getMaxConnections() const {
- return maxConnections.getValue();
-}
-
-int Configuration::getConnectionBacklog() const {
- return connectionBacklog.getValue();
-}
-
-const std::string& Configuration::getStore() const {
- return store.getValue();
-}
-
-long Configuration::getStagingThreshold() const {
- return stagingThreshold.getValue();
-}
-
-
-Configuration::Option::Option(const char _flag, const string& _name, const string& _desc) :
- flag(string("-") + _flag), name("--" +_name), desc(_desc) {}
-
-Configuration::Option::Option(const string& _name, const string& _desc) :
- flag(""), name("--" + _name), desc(_desc) {}
-
-Configuration::Option::~Option(){}
-
-bool Configuration::Option::match(const string& arg){
- return flag == arg || name == arg;
-}
-
-bool Configuration::Option::parse(int& i, char** argv, int argc){
- const string arg(argv[i]);
- if(match(arg)){
- if(needsValue()){
- if(++i < argc) setValue(argv[i]);
- else throw ParseException("Argument " + arg + " requires a value!");
- }else{
- setValue("");
- }
- i++;
- return true;
- }else{
- return false;
- }
-}
-
-void Configuration::Option::print(ostream& out) const {
- out << " ";
- if(flag.length() > 0){
- out << flag << ", ";
- } else {
- out << " ";
- }
- out << name;
- if(needsValue()) out << " <value>";
- out << std::endl;
- out << " " << desc << std::endl;
-}
-
-
-// String Option:
-
-Configuration::StringOption::StringOption(const char _flag, const string& _name, const string& _desc, const string _value) :
- Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
-
-Configuration::StringOption::StringOption(const string& _name, const string& _desc, const string _value) :
- Option(_name,_desc), defaultValue(_value), value(_value) {}
-
-Configuration::StringOption::~StringOption(){}
-
-const string& Configuration::StringOption::getValue() const {
- return value;
-}
-
-bool Configuration::StringOption::needsValue() const {
- return true;
-}
-
-void Configuration::StringOption::setValue(const std::string& _value){
- value = _value;
-}
-
-// Int Option:
-
-Configuration::IntOption::IntOption(const char _flag, const string& _name, const string& _desc, const int _value) :
- Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
-
-Configuration::IntOption::IntOption(const string& _name, const string& _desc, const int _value) :
- Option(_name,_desc), defaultValue(_value), value(_value) {}
-
-Configuration::IntOption::~IntOption(){}
-
-int Configuration::IntOption::getValue() const {
- return value;
-}
-
-bool Configuration::IntOption::needsValue() const {
- return true;
-}
-
-void Configuration::IntOption::setValue(const std::string& _value){
- value = atoi(_value.c_str());
-}
-
-// Long Option:
-
-Configuration::LongOption::LongOption(const char _flag, const string& _name, const string& _desc, const long _value) :
- Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
-
-Configuration::LongOption::LongOption(const string& _name, const string& _desc, const long _value) :
- Option(_name,_desc), defaultValue(_value), value(_value) {}
-
-Configuration::LongOption::~LongOption(){}
-
-long Configuration::LongOption::getValue() const {
- return value;
-}
-
-bool Configuration::LongOption::needsValue() const {
- return true;
-}
-
-void Configuration::LongOption::setValue(const std::string& _value){
- value = atol(_value.c_str());
-}
-
-// Bool Option:
-
-Configuration::BoolOption::BoolOption(const char _flag, const string& _name, const string& _desc, const bool _value) :
- Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
-
-Configuration::BoolOption::BoolOption(const string& _name, const string& _desc, const bool _value) :
- Option(_name,_desc), defaultValue(_value), value(_value) {}
-
-Configuration::BoolOption::~BoolOption(){}
-
-bool Configuration::BoolOption::getValue() const {
- return value;
-}
-
-bool Configuration::BoolOption::needsValue() const {
- return false;
-}
-
-void Configuration::BoolOption::setValue(const std::string& /*not required*/){
- //BoolOptions have no value. The fact that the option is specified
- //implies the value is true.
- value = true;
-}
diff --git a/qpid/cpp/lib/broker/Configuration.h b/qpid/cpp/lib/broker/Configuration.h
deleted file mode 100644
index 27c743c8f0..0000000000
--- a/qpid/cpp/lib/broker/Configuration.h
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _Configuration_
-#define _Configuration_
-
-#include <cstdlib>
-#include <iostream>
-#include <vector>
-#include <Exception.h>
-
-namespace qpid {
-namespace broker {
-class Configuration{
-
- class Option {
- const std::string flag;
- const std::string name;
- const std::string desc;
-
- bool match(const std::string& arg);
-
- protected:
- virtual bool needsValue() const = 0;
- virtual void setValue(const std::string& value) = 0;
-
- public:
- Option(const char flag, const std::string& name, const std::string& desc);
- Option(const std::string& name, const std::string& desc);
- virtual ~Option();
-
- bool parse(int& i, char** argv, int argc);
- void print(std::ostream& out) const;
- };
-
- class IntOption : public Option{
- const int defaultValue;
- int value;
- public:
- IntOption(char flag, const std::string& name, const std::string& desc, const int value = 0);
- IntOption(const std::string& name, const std::string& desc, const int value = 0);
- virtual ~IntOption();
-
- int getValue() const;
- virtual bool needsValue() const;
- virtual void setValue(const std::string& value);
- virtual void setValue(int _value) { value = _value; }
- };
-
- class LongOption : public Option{
- const long defaultValue;
- int value;
- public:
- LongOption(char flag, const std::string& name, const std::string& desc, const long value = 0);
- LongOption(const std::string& name, const std::string& desc, const long value = 0);
- virtual ~LongOption();
-
- long getValue() const;
- virtual bool needsValue() const;
- virtual void setValue(const std::string& value);
- virtual void setValue(int _value) { value = _value; }
- };
-
- class StringOption : public Option{
- const std::string defaultValue;
- std::string value;
- public:
- StringOption(char flag, const std::string& name, const std::string& desc, const std::string value = "");
- StringOption(const std::string& name, const std::string& desc, const std::string value = "");
- virtual ~StringOption();
-
- const std::string& getValue() const;
- virtual bool needsValue() const;
- virtual void setValue(const std::string& value);
- };
-
- class BoolOption : public Option{
- const bool defaultValue;
- bool value;
- public:
- BoolOption(char flag, const std::string& name, const std::string& desc, const bool value = 0);
- BoolOption(const std::string& name, const std::string& desc, const bool value = 0);
- virtual ~BoolOption();
-
- bool getValue() const;
- virtual bool needsValue() const;
- virtual void setValue(const std::string& value);
- virtual void setValue(bool _value) { value = _value; }
- };
-
- BoolOption daemon;
- BoolOption trace;
- IntOption port;
- IntOption workerThreads;
- IntOption maxConnections;
- IntOption connectionBacklog;
- StringOption store;
- LongOption stagingThreshold;
- BoolOption help;
- BoolOption version;
- char const *programName;
-
- typedef std::vector<Option*>::iterator op_iterator;
- std::vector<Option*> options;
-
- public:
-
- struct BadOptionException : public Exception {
- template<class T>
- BadOptionException(const T& msg) : Exception(msg) {}
- };
-
-
- class ParseException : public Exception {
- public:
- template <class T>
- ParseException(const T& msg) : Exception(msg) {}
- };
-
-
- Configuration();
- ~Configuration();
-
- void parse(char const*, int argc, char** argv);
-
- bool isHelp() const;
- bool isVersion() const;
- bool isDaemon() const;
- bool isTrace() const;
- int getPort() const;
- int getWorkerThreads() const;
- int getMaxConnections() const;
- int getConnectionBacklog() const;
- const std::string& getStore() const;
- long getStagingThreshold() const;
-
- void setHelp(bool b) { help.setValue(b); }
- void setVersion(bool b) { version.setValue(b); }
- void setDaemon(bool b) { daemon.setValue(b); }
- void setTrace(bool b) { trace.setValue(b); }
- void setPort(int i) { port.setValue(i); }
- void setWorkerThreads(int i) { workerThreads.setValue(i); }
- void setMaxConnections(int i) { maxConnections.setValue(i); }
- void setConnectionBacklog(int i) { connectionBacklog.setValue(i); }
- void setStore(const std::string& s) { store.setValue(s); }
- void setStagingThreshold(long l) { stagingThreshold.setValue(l); }
-
- void usage();
-};
-}
-}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/Connection.cpp b/qpid/cpp/lib/broker/Connection.cpp
deleted file mode 100644
index dbc8149cb5..0000000000
--- a/qpid/cpp/lib/broker/Connection.cpp
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <iostream>
-#include <assert.h>
-
-#include "Connection.h"
-#include "BrokerChannel.h"
-#include "AMQP_ClientProxy.h"
-#include "BrokerAdapter.h"
-
-using namespace boost;
-using namespace qpid::sys;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-namespace qpid {
-namespace broker {
-
-Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) :
- broker(broker_),
- out(out_),
- framemax(65536),
- heartbeat(0),
- client(0),
- timeout(broker.getTimeout()),
- stagingThreshold(broker.getStagingThreshold())
-{}
-
-
-Queue::shared_ptr Connection::getQueue(const string& name, uint16_t channel){
- Queue::shared_ptr queue;
- if (name.empty()) {
- queue = getChannel(channel).getDefaultQueue();
- if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" );
- } else {
- queue = broker.getQueues().find(name);
- if (queue == 0) {
- throw ChannelException( 404, "Queue not found: " + name);
- }
- }
- return queue;
-}
-
-
-Exchange::shared_ptr Connection::findExchange(const string& name){
- return broker.getExchanges().get(name);
-}
-
-
-void Connection::received(framing::AMQFrame* frame){
- getChannel(frame->getChannel()).handleBody(frame->getBody());
-}
-
-void Connection::close(
- ReplyCode code, const string& text, ClassId classId, MethodId methodId)
-{
- client->close(code, text, classId, methodId);
- getOutput().close();
-}
-
-void Connection::initiated(const framing::ProtocolInitiation& header) {
- version = ProtocolVersion(header.getMajor(), header.getMinor());
- FieldTable properties;
- string mechanisms("PLAIN");
- string locales("en_US");
- getChannel(0).init(0, *out, getVersion());
- client = &getChannel(0).getAdatper().getProxy().getConnection();
- client->start(
- header.getMajor(), header.getMinor(),
- properties, mechanisms, locales);
-}
-
-void Connection::idleOut(){}
-
-void Connection::idleIn(){}
-
-void Connection::closed(){
- try {
- while (!exclusiveQueues.empty()) {
- broker.getQueues().destroy(exclusiveQueues.front()->getName());
- exclusiveQueues.erase(exclusiveQueues.begin());
- }
- } catch(std::exception& e) {
- std::cout << "Caught unhandled exception while closing session: " <<
- e.what() << std::endl;
- assert(0);
- }
-}
-
-void Connection::closeChannel(uint16_t id) {
- ChannelMap::iterator i = channels.find(id);
- if (i != channels.end())
- i->close();
-}
-
-
-Channel& Connection::getChannel(ChannelId id) {
- ChannelMap::iterator i = channels.find(id);
- if (i == channels.end()) {
- i = channels.insert(
- id, new Channel(
- *this, id, framemax, broker.getQueues().getStore(),
- broker.getStagingThreshold())).first;
- }
- return *i;
-}
-
-
-}}
-
diff --git a/qpid/cpp/lib/broker/Connection.h b/qpid/cpp/lib/broker/Connection.h
deleted file mode 100644
index 5c6f40ca54..0000000000
--- a/qpid/cpp/lib/broker/Connection.h
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _Connection_
-#define _Connection_
-
-#include <sstream>
-#include <vector>
-
-#include <boost/ptr_container/ptr_map.hpp>
-
-#include <AMQFrame.h>
-#include <AMQP_ServerOperations.h>
-#include <AMQP_ClientProxy.h>
-#include <sys/ConnectionOutputHandler.h>
-#include <sys/ConnectionInputHandler.h>
-#include <sys/TimeoutHandler.h>
-#include "framing/ProtocolVersion.h"
-#include "Broker.h"
-#include "Exception.h"
-#include "BrokerChannel.h"
-
-namespace qpid {
-namespace broker {
-
-class Channel;
-
-class Connection : public sys::ConnectionInputHandler,
- public ConnectionToken
-{
- public:
- Connection(sys::ConnectionOutputHandler* out, Broker& broker);
-
- /** Get a channel. Create if it does not already exist */
- Channel& getChannel(framing::ChannelId channel);
-
- /** Close a channel */
- void closeChannel(framing::ChannelId channel);
-
- /** Close the connection */
- void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId);
-
- sys::ConnectionOutputHandler& getOutput() const { return *out; }
- framing::ProtocolVersion getVersion() const { return version; }
-
- uint32_t getFrameMax() const { return framemax; }
- uint16_t getHeartbeat() const { return heartbeat; }
- uint32_t getTimeout() const { return timeout; }
- uint64_t getStagingThreshold() const { return stagingThreshold; }
-
- void setFrameMax(uint32_t fm) { framemax = fm; }
- void setHeartbeat(uint16_t hb) { heartbeat = hb; }
-
- /**
- * Get named queue, never returns 0.
- * @return: named queue or default queue for channel if name=""
- * @exception: ChannelException if no queue of that name is found.
- * @exception: ConnectionException if name="" and channel has no default.
- */
- Queue::shared_ptr getQueue(const string& name, uint16_t channel);
-
- Broker& broker;
- std::vector<Queue::shared_ptr> exclusiveQueues;
-
- // ConnectionInputHandler methods
- void received(framing::AMQFrame* frame);
- void initiated(const framing::ProtocolInitiation& header);
- void idleOut();
- void idleIn();
- void closed();
-
- private:
- typedef boost::ptr_map<framing::ChannelId, Channel> ChannelMap;
-
- typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
- Exchange::shared_ptr findExchange(const string& name);
-
- framing::ProtocolVersion version;
- ChannelMap channels;
- sys::ConnectionOutputHandler* out;
- uint32_t framemax;
- uint16_t heartbeat;
- framing::AMQP_ClientProxy::Connection* client;
- const uint32_t timeout; //timeout for auto-deleted queues (in ms)
- const uint64_t stagingThreshold;
-
-};
-
-}}
-
-#endif
diff --git a/qpid/cpp/lib/broker/ConnectionFactory.cpp b/qpid/cpp/lib/broker/ConnectionFactory.cpp
deleted file mode 100644
index 0208e32b4d..0000000000
--- a/qpid/cpp/lib/broker/ConnectionFactory.cpp
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <ConnectionFactory.h>
-#include "Connection.h"
-
-namespace qpid {
-namespace broker {
-
-
-ConnectionFactory::ConnectionFactory(Broker& b) : broker(b)
-{}
-
-
-ConnectionFactory::~ConnectionFactory()
-{
- broker.getCleaner().stop();
-}
-
-qpid::sys::ConnectionInputHandler*
-ConnectionFactory::create(qpid::sys::ConnectionOutputHandler* out)
-{
- return new Connection(out, broker);
-}
-
-}} // namespace qpid::broker
diff --git a/qpid/cpp/lib/broker/ConnectionFactory.h b/qpid/cpp/lib/broker/ConnectionFactory.h
deleted file mode 100644
index 9147384b2a..0000000000
--- a/qpid/cpp/lib/broker/ConnectionFactory.h
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _ConnectionFactory_
-#define _ConnectionFactory_
-
-#include "ConnectionInputHandlerFactory.h"
-
-namespace qpid {
-namespace broker {
-class Broker;
-
-class ConnectionFactory : public qpid::sys::ConnectionInputHandlerFactory
-{
- public:
- ConnectionFactory(Broker& b);
-
- virtual qpid::sys::ConnectionInputHandler* create(
- qpid::sys::ConnectionOutputHandler* ctxt);
-
- virtual ~ConnectionFactory();
-
- private:
- Broker& broker;
-};
-
-}}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/ConnectionToken.h b/qpid/cpp/lib/broker/ConnectionToken.h
deleted file mode 100644
index 7e7f813d0e..0000000000
--- a/qpid/cpp/lib/broker/ConnectionToken.h
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _ConnectionToken_
-#define _ConnectionToken_
-
-namespace qpid {
- namespace broker {
- /**
- * An empty interface allowing opaque implementations of some
- * form of token to identify a connection.
- */
- class ConnectionToken{
- public:
- virtual ~ConnectionToken(){}
- };
- }
-}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/Consumer.h b/qpid/cpp/lib/broker/Consumer.h
deleted file mode 100644
index 26deef4a26..0000000000
--- a/qpid/cpp/lib/broker/Consumer.h
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _Consumer_
-#define _Consumer_
-
-#include <BrokerMessage.h>
-
-namespace qpid {
- namespace broker {
- class Consumer{
- public:
- virtual bool deliver(Message::shared_ptr& msg) = 0;
- virtual ~Consumer(){}
- };
- }
-}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/Content.h b/qpid/cpp/lib/broker/Content.h
deleted file mode 100644
index b65a454778..0000000000
--- a/qpid/cpp/lib/broker/Content.h
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _Content_
-#define _Content_
-
-#include <boost/function.hpp>
-
-#include <AMQContentBody.h>
-#include <Buffer.h>
-#include <OutputHandler.h>
-
-namespace qpid {
-
-namespace framing {
-class ChannelAdapter;
-}
-
-namespace broker {
-class Content{
- public:
- typedef std::string DataBlock;
- typedef boost::function1<void, const DataBlock&> SendFn;
-
- virtual ~Content(){}
-
- /** Add a block of data to the content */
- virtual void add(framing::AMQContentBody::shared_ptr data) = 0;
-
- /** Total size of content in bytes */
- virtual uint32_t size() = 0;
-
- /**
- * Iterate over the content calling SendFn for each block.
- * Subdivide blocks if necessary to ensure each block is
- * <= framesize bytes long.
- */
- virtual void send(framing::ChannelAdapter& channel, uint32_t framesize) = 0;
-
- //FIXME aconway 2007-02-07: This is inconsistently implemented
- //find out what is needed.
- virtual void encode(qpid::framing::Buffer& buffer) = 0;
-};
-}}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/DeletingTxOp.cpp b/qpid/cpp/lib/broker/DeletingTxOp.cpp
deleted file mode 100644
index 25fe9c98db..0000000000
--- a/qpid/cpp/lib/broker/DeletingTxOp.cpp
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <DeletingTxOp.h>
-
-using namespace qpid::broker;
-
-DeletingTxOp::DeletingTxOp(TxOp* const _delegate) : delegate(_delegate){}
-
-bool DeletingTxOp::prepare(TransactionContext* ctxt) throw(){
- return delegate && delegate->prepare(ctxt);
-}
-
-void DeletingTxOp::commit() throw(){
- if(delegate){
- delegate->commit();
- delete delegate;
- delegate = 0;
- }
-}
-
-void DeletingTxOp::rollback() throw(){
- if(delegate){
- delegate->rollback();
- delete delegate;
- delegate = 0;
- }
-}
diff --git a/qpid/cpp/lib/broker/DeletingTxOp.h b/qpid/cpp/lib/broker/DeletingTxOp.h
deleted file mode 100644
index 3e026cd4ca..0000000000
--- a/qpid/cpp/lib/broker/DeletingTxOp.h
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _DeletingTxOp_
-#define _DeletingTxOp_
-
-#include <TxOp.h>
-
-namespace qpid {
- namespace broker {
- /**
- * TxOp wrapper that will delegate calls & delete the object
- * to which it delegates after completion of the transaction.
- */
- class DeletingTxOp : public virtual TxOp{
- TxOp* delegate;
- public:
- DeletingTxOp(TxOp* const delegate);
- virtual bool prepare(TransactionContext* ctxt) throw();
- virtual void commit() throw();
- virtual void rollback() throw();
- virtual ~DeletingTxOp(){}
- };
- }
-}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/Deliverable.h b/qpid/cpp/lib/broker/Deliverable.h
deleted file mode 100644
index e33443555d..0000000000
--- a/qpid/cpp/lib/broker/Deliverable.h
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _Deliverable_
-#define _Deliverable_
-
-#include <BrokerQueue.h>
-
-namespace qpid {
- namespace broker {
- class Deliverable{
- public:
- virtual void deliverTo(Queue::shared_ptr& queue) = 0;
- virtual ~Deliverable(){}
- };
- }
-}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/DeliverableMessage.cpp b/qpid/cpp/lib/broker/DeliverableMessage.cpp
deleted file mode 100644
index b9c89da690..0000000000
--- a/qpid/cpp/lib/broker/DeliverableMessage.cpp
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <DeliverableMessage.h>
-
-using namespace qpid::broker;
-
-DeliverableMessage::DeliverableMessage(Message::shared_ptr& _msg) : msg(_msg)
-{
-}
-
-void DeliverableMessage::deliverTo(Queue::shared_ptr& queue)
-{
- queue->deliver(msg);
-}
-
diff --git a/qpid/cpp/lib/broker/DeliverableMessage.h b/qpid/cpp/lib/broker/DeliverableMessage.h
deleted file mode 100644
index 962f0da640..0000000000
--- a/qpid/cpp/lib/broker/DeliverableMessage.h
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _DeliverableMessage_
-#define _DeliverableMessage_
-
-#include <Deliverable.h>
-#include <BrokerMessage.h>
-#include <BrokerQueue.h>
-
-namespace qpid {
- namespace broker {
- class DeliverableMessage : public Deliverable{
- Message::shared_ptr msg;
- public:
- DeliverableMessage(Message::shared_ptr& msg);
- virtual void deliverTo(Queue::shared_ptr& queue);
- virtual ~DeliverableMessage(){}
- };
- }
-}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/DeliveryRecord.cpp b/qpid/cpp/lib/broker/DeliveryRecord.cpp
deleted file mode 100644
index e0b5bcfeb1..0000000000
--- a/qpid/cpp/lib/broker/DeliveryRecord.cpp
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <DeliveryRecord.h>
-#include <BrokerChannel.h>
-
-using namespace qpid::broker;
-using std::string;
-
-DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg,
- Queue::shared_ptr _queue,
- const string _consumerTag,
- const uint64_t _deliveryTag) : msg(_msg),
- queue(_queue),
- consumerTag(_consumerTag),
- deliveryTag(_deliveryTag),
- pull(false){}
-
-DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg,
- Queue::shared_ptr _queue,
- const uint64_t _deliveryTag) : msg(_msg),
- queue(_queue),
- consumerTag(""),
- deliveryTag(_deliveryTag),
- pull(true){}
-
-
-void DeliveryRecord::discard(TransactionContext* ctxt) const{
- queue->dequeue(ctxt, msg);
-}
-
-bool DeliveryRecord::matches(uint64_t tag) const{
- return deliveryTag == tag;
-}
-
-bool DeliveryRecord::coveredBy(const AccumulatedAck* const range) const{
- return range->covers(deliveryTag);
-}
-
-void DeliveryRecord::redeliver(Channel* const channel) const{
- if(pull){
- //if message was originally sent as response to get, we must requeue it
- requeue();
- }else{
- channel->deliver(msg, consumerTag, deliveryTag);
- }
-}
-
-void DeliveryRecord::requeue() const{
- msg->redeliver();
- queue->process(msg);
-}
-
-void DeliveryRecord::addTo(Prefetch* const prefetch) const{
- if(!pull){
- //ignore 'pulled' messages (i.e. those that were sent in
- //response to get) when calculating prefetch
- prefetch->size += msg->contentSize();
- prefetch->count++;
- }
-}
-
-void DeliveryRecord::subtractFrom(Prefetch* const prefetch) const{
- if(!pull){
- //ignore 'pulled' messages (i.e. those that were sent in
- //response to get) when calculating prefetch
- prefetch->size -= msg->contentSize();
- prefetch->count--;
- }
-}
diff --git a/qpid/cpp/lib/broker/DeliveryRecord.h b/qpid/cpp/lib/broker/DeliveryRecord.h
deleted file mode 100644
index 9423dd2062..0000000000
--- a/qpid/cpp/lib/broker/DeliveryRecord.h
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _DeliveryRecord_
-#define _DeliveryRecord_
-
-#include <algorithm>
-#include <list>
-#include <AccumulatedAck.h>
-#include <BrokerMessage.h>
-#include <Prefetch.h>
-#include <BrokerQueue.h>
-
-namespace qpid {
- namespace broker {
- class Channel;
-
- /**
- * Record of a delivery for which an ack is outstanding.
- */
- class DeliveryRecord{
- mutable Message::shared_ptr msg;
- mutable Queue::shared_ptr queue;
- std::string consumerTag;
- uint64_t deliveryTag;
- bool pull;
-
- public:
- DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const std::string consumerTag, const uint64_t deliveryTag);
- DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const uint64_t deliveryTag);
-
- void discard(TransactionContext* ctxt = 0) const;
- bool matches(uint64_t tag) const;
- bool coveredBy(const AccumulatedAck* const range) const;
- void requeue() const;
- void redeliver(Channel* const) const;
- void addTo(Prefetch* const prefetch) const;
- void subtractFrom(Prefetch* const prefetch) const;
- };
-
- typedef std::list<DeliveryRecord>::iterator ack_iterator;
- }
-}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/DirectExchange.cpp b/qpid/cpp/lib/broker/DirectExchange.cpp
deleted file mode 100644
index 0661e8c365..0000000000
--- a/qpid/cpp/lib/broker/DirectExchange.cpp
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <DirectExchange.h>
-#include <iostream>
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-DirectExchange::DirectExchange(const string& _name) : Exchange(_name) {
-
-}
-
-void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable*){
- Mutex::ScopedLock l(lock);
- std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
- std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
- if(i == queues.end()){
- bindings[routingKey].push_back(queue);
- }
-}
-
-void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
- Mutex::ScopedLock l(lock);
- std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
-
- std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
- if(i < queues.end()){
- queues.erase(i);
- if(queues.empty()){
- bindings.erase(routingKey);
- }
- }
-}
-
-void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
- Mutex::ScopedLock l(lock);
- std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
- int count(0);
- for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){
- msg.deliverTo(*i);
- }
- if(!count){
- std::cout << "WARNING: DirectExchange " << getName() << " could not route message with key " << routingKey << std::endl;
- }
-}
-
-DirectExchange::~DirectExchange(){
-
-}
-
-
-const std::string DirectExchange::typeName("direct");
diff --git a/qpid/cpp/lib/broker/DirectExchange.h b/qpid/cpp/lib/broker/DirectExchange.h
deleted file mode 100644
index a7ef5aca9e..0000000000
--- a/qpid/cpp/lib/broker/DirectExchange.h
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _DirectExchange_
-#define _DirectExchange_
-
-#include <map>
-#include <vector>
-#include <BrokerExchange.h>
-#include <FieldTable.h>
-#include <BrokerMessage.h>
-#include <sys/Monitor.h>
-#include <BrokerQueue.h>
-
-namespace qpid {
-namespace broker {
- class DirectExchange : public virtual Exchange{
- std::map<string, std::vector<Queue::shared_ptr> > bindings;
- qpid::sys::Mutex lock;
-
- public:
- static const std::string typeName;
-
- DirectExchange(const std::string& name);
-
- virtual std::string getType(){ return typeName; }
-
- virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
-
- virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
-
- virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
-
- virtual ~DirectExchange();
- };
-}
-}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/ExchangeRegistry.cpp b/qpid/cpp/lib/broker/ExchangeRegistry.cpp
deleted file mode 100644
index 3e5ed89b54..0000000000
--- a/qpid/cpp/lib/broker/ExchangeRegistry.cpp
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <ExchangeRegistry.h>
-#include <DirectExchange.h>
-#include <FanOutExchange.h>
-#include <HeadersExchange.h>
-#include <TopicExchange.h>
-
-using namespace qpid::broker;
-using namespace qpid::sys;
-using std::pair;
-
-pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type) throw(UnknownExchangeTypeException){
- Mutex::ScopedLock locker(lock);
- ExchangeMap::iterator i = exchanges.find(name);
- if (i == exchanges.end()) {
- Exchange::shared_ptr exchange;
-
- if(type == TopicExchange::typeName){
- exchange = Exchange::shared_ptr(new TopicExchange(name));
- }else if(type == DirectExchange::typeName){
- exchange = Exchange::shared_ptr(new DirectExchange(name));
- }else if(type == FanOutExchange::typeName){
- exchange = Exchange::shared_ptr(new FanOutExchange(name));
- }else if (type == HeadersExchange::typeName) {
- exchange = Exchange::shared_ptr(new HeadersExchange(name));
- }else{
- throw UnknownExchangeTypeException();
- }
- exchanges[name] = exchange;
- return std::pair<Exchange::shared_ptr, bool>(exchange, true);
- } else {
- return std::pair<Exchange::shared_ptr, bool>(i->second, false);
- }
-}
-
-void ExchangeRegistry::destroy(const string& name){
- Mutex::ScopedLock locker(lock);
- exchanges.erase(name);
-}
-
-Exchange::shared_ptr ExchangeRegistry::get(const string& name){
- Mutex::ScopedLock locker(lock);
- Exchange::shared_ptr exchange =exchanges[name];
- if (!exchange)
- throw ChannelException(404, "Exchange not found:" + name);
- return exchange;
-}
-
-namespace
-{
-const std::string empty;
-}
-
-Exchange::shared_ptr ExchangeRegistry::getDefault()
-{
- return get(empty);
-}
diff --git a/qpid/cpp/lib/broker/ExchangeRegistry.h b/qpid/cpp/lib/broker/ExchangeRegistry.h
deleted file mode 100644
index aeb32753df..0000000000
--- a/qpid/cpp/lib/broker/ExchangeRegistry.h
+++ /dev/null
@@ -1,47 +0,0 @@
-#ifndef _broker_ExchangeRegistry_h
-#define _broker_ExchangeRegistry_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <map>
-#include <BrokerExchange.h>
-#include <sys/Monitor.h>
-
-namespace qpid {
-namespace broker {
- struct UnknownExchangeTypeException{};
-
- class ExchangeRegistry{
- typedef std::map<std::string, Exchange::shared_ptr> ExchangeMap;
- ExchangeMap exchanges;
- qpid::sys::Mutex lock;
- public:
- std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type) throw(UnknownExchangeTypeException);
- void destroy(const std::string& name);
- Exchange::shared_ptr get(const std::string& name);
- Exchange::shared_ptr getDefault();
- };
-}
-}
-
-
-#endif /*!_broker_ExchangeRegistry_h*/
diff --git a/qpid/cpp/lib/broker/FanOutExchange.cpp b/qpid/cpp/lib/broker/FanOutExchange.cpp
deleted file mode 100644
index b487593efd..0000000000
--- a/qpid/cpp/lib/broker/FanOutExchange.cpp
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <FanOutExchange.h>
-#include <algorithm>
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {}
-
-void FanOutExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
- Mutex::ScopedLock locker(lock);
- // Add if not already present.
- Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
- if (i == bindings.end()) {
- bindings.push_back(queue);
- }
-}
-
-void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){
- Mutex::ScopedLock locker(lock);
- Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
- if (i != bindings.end()) {
- bindings.erase(i);
- }
-}
-
-void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){
- Mutex::ScopedLock locker(lock);
- for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){
- msg.deliverTo(*i);
- }
-}
-
-FanOutExchange::~FanOutExchange() {}
-
-const std::string FanOutExchange::typeName("fanout");
diff --git a/qpid/cpp/lib/broker/FanOutExchange.h b/qpid/cpp/lib/broker/FanOutExchange.h
deleted file mode 100644
index 6dc70e69bb..0000000000
--- a/qpid/cpp/lib/broker/FanOutExchange.h
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _FanOutExchange_
-#define _FanOutExchange_
-
-#include <map>
-#include <vector>
-#include <BrokerExchange.h>
-#include <FieldTable.h>
-#include <BrokerMessage.h>
-#include <sys/Monitor.h>
-#include <BrokerQueue.h>
-
-namespace qpid {
-namespace broker {
-
-class FanOutExchange : public virtual Exchange {
- std::vector<Queue::shared_ptr> bindings;
- qpid::sys::Mutex lock;
-
- public:
- static const std::string typeName;
-
- FanOutExchange(const std::string& name);
-
- virtual std::string getType(){ return typeName; }
-
- virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
-
- virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
-
- virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
-
- virtual ~FanOutExchange();
-};
-
-}
-}
-
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/HandlerImpl.h b/qpid/cpp/lib/broker/HandlerImpl.h
deleted file mode 100644
index c55a36da45..0000000000
--- a/qpid/cpp/lib/broker/HandlerImpl.h
+++ /dev/null
@@ -1,71 +0,0 @@
-#ifndef _broker_HandlerImpl_h
-#define _broker_HandlerImpl_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "BrokerChannel.h"
-#include "AMQP_ClientProxy.h"
-
-namespace qpid {
-
-namespace framing {
-class AMQP_ClientProxy;
-}
-
-namespace broker {
-
-class Broker;
-class Channel;
-class Connection;
-
-/**
- * A collection of references to the core objects required by an adapter,
- * and a client proxy.
- */
-struct CoreRefs
-{
- CoreRefs(Channel& ch, Connection& c, Broker& b)
- : channel(ch), connection(c), broker(b), proxy(ch) {}
-
- Channel& channel;
- Connection& connection;
- Broker& broker;
- framing::AMQP_ClientProxy proxy;
-};
-
-
-/**
- * Base template for protocol handler implementations.
- * Provides the core references and appropriate AMQP class proxy.
- */
-template <class ProxyType>
-struct HandlerImpl : public CoreRefs {
- typedef HandlerImpl<ProxyType> HandlerImplType;
- HandlerImpl(CoreRefs& parent)
- : CoreRefs(parent), client(ProxyType::get(proxy)) {}
- ProxyType client;
-};
-
-
-
-}} // namespace qpid::broker
-
-
-
-#endif /*!_broker_HandlerImpl_h*/
diff --git a/qpid/cpp/lib/broker/HeadersExchange.cpp b/qpid/cpp/lib/broker/HeadersExchange.cpp
deleted file mode 100644
index 3ef0cc0446..0000000000
--- a/qpid/cpp/lib/broker/HeadersExchange.cpp
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <HeadersExchange.h>
-#include <Value.h>
-#include <QpidError.h>
-#include <algorithm>
-
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-// TODO aconway 2006-09-20: More efficient matching algorithm.
-// The current search algorithm really sucks.
-// Fieldtables are heavy, maybe use shared_ptr to do handle-body.
-
-using namespace qpid::broker;
-
-namespace {
- const std::string all("all");
- const std::string any("any");
- const std::string x_match("x-match");
-}
-
-HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { }
-
-void HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
- Mutex::ScopedLock locker(lock);
- std::string what = args->getString("x-match");
- if (what != all && what != any) {
- THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to headers exchange.");
- }
- bindings.push_back(Binding(*args, queue));
-}
-
-void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
- Mutex::ScopedLock locker(lock);
- Bindings::iterator i =
- std::find(bindings.begin(),bindings.end(), Binding(*args, queue));
- if (i != bindings.end()) bindings.erase(i);
-}
-
-
-void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){
- Mutex::ScopedLock locker(lock);;
- for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- if (match(i->first, *args)) msg.deliverTo(i->second);
- }
-}
-
-HeadersExchange::~HeadersExchange() {}
-
-const std::string HeadersExchange::typeName("headers");
-
-namespace
-{
-
- bool match_values(const Value& bind, const Value& msg) {
- return dynamic_cast<const EmptyValue*>(&bind) || bind == msg;
- }
-
-}
-
-
-bool HeadersExchange::match(const FieldTable& bind, const FieldTable& msg) {
- typedef FieldTable::ValueMap Map;
- std::string what = bind.getString(x_match);
- if (what == all) {
- for (Map::const_iterator i = bind.getMap().begin();
- i != bind.getMap().end();
- ++i)
- {
- if (i->first != x_match)
- {
- Map::const_iterator j = msg.getMap().find(i->first);
- if (j == msg.getMap().end()) return false;
- if (!match_values(*(i->second), *(j->second))) return false;
- }
- }
- return true;
- } else if (what == any) {
- for (Map::const_iterator i = bind.getMap().begin();
- i != bind.getMap().end();
- ++i)
- {
- if (i->first != x_match)
- {
- Map::const_iterator j = msg.getMap().find(i->first);
- if (j != msg.getMap().end()) {
- if (match_values(*(i->second), *(j->second))) return true;
- }
- }
- }
- return false;
- } else {
- return false;
- }
-}
-
-
-
diff --git a/qpid/cpp/lib/broker/HeadersExchange.h b/qpid/cpp/lib/broker/HeadersExchange.h
deleted file mode 100644
index 5e8da5ad85..0000000000
--- a/qpid/cpp/lib/broker/HeadersExchange.h
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _HeadersExchange_
-#define _HeadersExchange_
-
-#include <vector>
-#include <BrokerExchange.h>
-#include <FieldTable.h>
-#include <BrokerMessage.h>
-#include <sys/Monitor.h>
-#include <BrokerQueue.h>
-
-namespace qpid {
-namespace broker {
-
-
-class HeadersExchange : public virtual Exchange {
- typedef std::pair<qpid::framing::FieldTable, Queue::shared_ptr> Binding;
- typedef std::vector<Binding> Bindings;
-
- Bindings bindings;
- qpid::sys::Mutex lock;
-
- public:
- static const std::string typeName;
-
- HeadersExchange(const string& name);
-
- virtual std::string getType(){ return typeName; }
-
- virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
-
- virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
-
- virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args);
-
- virtual ~HeadersExchange();
-
- static bool match(const qpid::framing::FieldTable& bindArgs, const qpid::framing::FieldTable& msgArgs);
-};
-
-
-
-}
-}
-
-#endif
diff --git a/qpid/cpp/lib/broker/InMemoryContent.cpp b/qpid/cpp/lib/broker/InMemoryContent.cpp
deleted file mode 100644
index 237375e860..0000000000
--- a/qpid/cpp/lib/broker/InMemoryContent.cpp
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <InMemoryContent.h>
-#include "AMQFrame.h"
-#include "framing/ChannelAdapter.h"
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-using boost::static_pointer_cast;
-
-void InMemoryContent::add(AMQContentBody::shared_ptr data)
-{
- content.push_back(data);
-}
-
-uint32_t InMemoryContent::size()
-{
- int sum(0);
- for (content_iterator i = content.begin(); i != content.end(); i++) {
- sum += (*i)->size();
- }
- return sum;
-}
-
-void InMemoryContent::send(ChannelAdapter& channel, uint32_t framesize)
-{
- for (content_iterator i = content.begin(); i != content.end(); i++) {
- if ((*i)->size() > framesize) {
- uint32_t offset = 0;
- for (int chunk = (*i)->size() / framesize; chunk > 0; chunk--) {
- string data = (*i)->getData().substr(offset, framesize);
- channel.send(new AMQContentBody(data));
- offset += framesize;
- }
- uint32_t remainder = (*i)->size() % framesize;
- if (remainder) {
- string data = (*i)->getData().substr(offset, remainder);
- channel.send(new AMQContentBody(data));
- }
- } else {
- AMQBody::shared_ptr contentBody =
- static_pointer_cast<AMQBody, AMQContentBody>(*i);
- channel.send(contentBody);
- }
- }
-}
-
-void InMemoryContent::encode(Buffer& buffer)
-{
- for (content_iterator i = content.begin(); i != content.end(); i++) {
- (*i)->encode(buffer);
- }
-}
-
diff --git a/qpid/cpp/lib/broker/InMemoryContent.h b/qpid/cpp/lib/broker/InMemoryContent.h
deleted file mode 100644
index 7a58ace3a7..0000000000
--- a/qpid/cpp/lib/broker/InMemoryContent.h
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _InMemoryContent_
-#define _InMemoryContent_
-
-#include <Content.h>
-#include <vector>
-
-
-namespace qpid {
- namespace broker {
- class InMemoryContent : public Content{
- typedef std::vector<qpid::framing::AMQContentBody::shared_ptr> content_list;
- typedef content_list::iterator content_iterator;
-
- content_list content;
- public:
- void add(qpid::framing::AMQContentBody::shared_ptr data);
- uint32_t size();
- void send(framing::ChannelAdapter&, uint32_t framesize);
- void encode(qpid::framing::Buffer& buffer);
- };
- }
-}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/LazyLoadedContent.cpp b/qpid/cpp/lib/broker/LazyLoadedContent.cpp
deleted file mode 100644
index 9810ee671c..0000000000
--- a/qpid/cpp/lib/broker/LazyLoadedContent.cpp
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <LazyLoadedContent.h>
-#include "AMQFrame.h"
-#include "framing/ChannelAdapter.h"
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-
-LazyLoadedContent::~LazyLoadedContent()
-{
- store->destroy(*msg);
-}
-
-LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, Message* const _msg, uint64_t _expectedSize) :
- store(_store), msg(_msg), expectedSize(_expectedSize) {}
-
-void LazyLoadedContent::add(AMQContentBody::shared_ptr data)
-{
- store->appendContent(*msg, data->getData());
-}
-
-uint32_t LazyLoadedContent::size()
-{
- return 0;//all content is written as soon as it is added
-}
-
-void LazyLoadedContent::send(ChannelAdapter& channel, uint32_t framesize)
-{
- if (expectedSize > framesize) {
- for (uint64_t offset = 0; offset < expectedSize; offset += framesize)
- {
- uint64_t remaining = expectedSize - offset;
- string data;
- store->loadContent(*msg, data, offset,
- remaining > framesize ? framesize : remaining);
- channel.send(new AMQContentBody(data));
- }
- } else {
- string data;
- store->loadContent(*msg, data, 0, expectedSize);
- channel.send(new AMQContentBody(data));
- }
-}
-
-void LazyLoadedContent::encode(Buffer&)
-{
- //do nothing as all content is written as soon as it is added
-}
-
diff --git a/qpid/cpp/lib/broker/LazyLoadedContent.h b/qpid/cpp/lib/broker/LazyLoadedContent.h
deleted file mode 100644
index 3306f6e3ba..0000000000
--- a/qpid/cpp/lib/broker/LazyLoadedContent.h
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _LazyLoadedContent_
-#define _LazyLoadedContent_
-
-#include <Content.h>
-#include <MessageStore.h>
-#include "BrokerMessageBase.h"
-
-namespace qpid {
- namespace broker {
- class LazyLoadedContent : public Content{
- MessageStore* const store;
- Message* const msg;
- const uint64_t expectedSize;
- public:
- LazyLoadedContent(
- MessageStore* const store, Message* const msg,
- uint64_t expectedSize);
- ~LazyLoadedContent();
- void add(qpid::framing::AMQContentBody::shared_ptr data);
- uint32_t size();
- void send(
- framing::ChannelAdapter&,
- uint32_t framesize);
- void encode(qpid::framing::Buffer& buffer);
- };
- }
-}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/Makefile.am b/qpid/cpp/lib/broker/Makefile.am
deleted file mode 100644
index 29cfb0f347..0000000000
--- a/qpid/cpp/lib/broker/Makefile.am
+++ /dev/null
@@ -1,101 +0,0 @@
-AM_CXXFLAGS = $(WARNING_CFLAGS)
-INCLUDES = \
- -I$(top_srcdir)/gen \
- -I$(top_srcdir)/lib/common \
- -I$(top_srcdir)/lib/common/sys \
- -I$(top_srcdir)/lib/common/framing \
- $(APR_CXXFLAGS)
-
-lib_LTLIBRARIES = libqpidbroker.la
-libqpidbroker_la_LIBADD = ../common/libqpidcommon.la
-libqpidbroker_la_LDFLAGS = -version-info $(LIBTOOL_VERSION_INFO_ARG)
-libqpidbroker_la_SOURCES = \
- AccumulatedAck.cpp \
- AccumulatedAck.h \
- AutoDelete.cpp \
- AutoDelete.h \
- Binding.h \
- Broker.cpp \
- Broker.h \
- BrokerSingleton.cpp \
- BrokerSingleton.h \
- BrokerChannel.cpp \
- BrokerChannel.h \
- BrokerExchange.h \
- BrokerMessage.cpp \
- BrokerMessage.h \
- BrokerMessageMessage.cpp \
- BrokerMessageMessage.h \
- BrokerQueue.cpp \
- BrokerQueue.h \
- Configuration.cpp \
- Configuration.h \
- ConnectionToken.h \
- Consumer.h \
- Content.h \
- DeletingTxOp.cpp \
- DeletingTxOp.h \
- Deliverable.h \
- DeliverableMessage.cpp \
- DeliverableMessage.h \
- DeliveryRecord.cpp \
- DeliveryRecord.h \
- DirectExchange.cpp \
- DirectExchange.h \
- ExchangeRegistry.cpp \
- ExchangeRegistry.h \
- FanOutExchange.cpp \
- FanOutExchange.h \
- HeadersExchange.cpp \
- HeadersExchange.h \
- InMemoryContent.cpp \
- InMemoryContent.h \
- LazyLoadedContent.cpp \
- LazyLoadedContent.h \
- MessageBuilder.cpp \
- MessageBuilder.h \
- MessageStore.h \
- MessageStoreModule.cpp \
- MessageStoreModule.h \
- NameGenerator.cpp \
- NameGenerator.h \
- NullMessageStore.cpp \
- NullMessageStore.h \
- Persistable.h \
- PersistableExchange.h \
- PersistableMessage.h \
- PersistableQueue.h \
- Prefetch.h \
- QueuePolicy.cpp \
- QueuePolicy.h \
- QueueRegistry.cpp \
- QueueRegistry.h \
- RecoverableMessage.h \
- RecoverableQueue.h \
- RecoveryManager.h \
- RecoveryManagerImpl.cpp \
- RecoveryManagerImpl.h \
- Reference.cpp \
- Reference.h \
- ConnectionFactory.cpp \
- ConnectionFactory.h \
- Connection.cpp \
- Connection.h \
- BrokerAdapter.cpp \
- BrokerAdapter.h \
- MessageHandlerImpl.cpp \
- MessageHandlerImpl.h \
- TopicExchange.cpp \
- TopicExchange.h \
- TransactionalStore.h \
- TxAck.cpp \
- TxAck.h \
- TxBuffer.cpp \
- TxBuffer.h \
- TxOp.h \
- TxPublish.cpp \
- TxPublish.h
-
-
-# Force build during dist phase so help2man will work.
-dist-hook: $(lib_LTLIBRARIES)
diff --git a/qpid/cpp/lib/broker/MessageBuilder.cpp b/qpid/cpp/lib/broker/MessageBuilder.cpp
deleted file mode 100644
index e99dcad7d6..0000000000
--- a/qpid/cpp/lib/broker/MessageBuilder.cpp
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <MessageBuilder.h>
-
-#include <InMemoryContent.h>
-#include <LazyLoadedContent.h>
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-using std::auto_ptr;
-
-MessageBuilder::MessageBuilder(CompletionHandler* _handler,
- MessageStore* const _store,
- uint64_t _stagingThreshold
-) :
- handler(_handler),
- store(_store),
- stagingThreshold(_stagingThreshold)
-{}
-
-void MessageBuilder::route(){
- if (message->isComplete()) {
- if (handler) handler->complete(message);
- message.reset();
- }
-}
-
-void MessageBuilder::initialise(Message::shared_ptr& msg){
- if(message.get()){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed.");
- }
- message = msg;
-}
-
-void MessageBuilder::setHeader(AMQHeaderBody::shared_ptr& header){
- if(!message.get()){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish.");
- }
- message->setHeader(header);
- if (stagingThreshold && header->getContentSize() >= stagingThreshold) {
- store->stage(*message);
- message->releaseContent(store);
- } else {
- auto_ptr<Content> content(new InMemoryContent());
- message->setContent(content);
- }
- route();
-}
-
-void MessageBuilder::addContent(AMQContentBody::shared_ptr& content){
- if(!message.get()){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before publish.");
- }
- message->addContent(content);
- route();
-}
diff --git a/qpid/cpp/lib/broker/MessageBuilder.h b/qpid/cpp/lib/broker/MessageBuilder.h
deleted file mode 100644
index 30834e1075..0000000000
--- a/qpid/cpp/lib/broker/MessageBuilder.h
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _MessageBuilder_
-#define _MessageBuilder_
-
-#include <memory>
-#include <QpidError.h>
-#include <BrokerExchange.h>
-#include <BrokerMessage.h>
-#include <MessageStore.h>
-#include <AMQContentBody.h>
-#include <AMQHeaderBody.h>
-#include <BasicPublishBody.h>
-#include "CompletionHandler.h"
-
-namespace qpid {
- namespace broker {
- class MessageBuilder{
- public:
- MessageBuilder(CompletionHandler* _handler,
- MessageStore* const store = 0,
- uint64_t stagingThreshold = 0);
- void initialise(Message::shared_ptr& msg);
- void setHeader(framing::AMQHeaderBody::shared_ptr& header);
- void addContent(framing::AMQContentBody::shared_ptr& content);
- Message::shared_ptr getMessage() { return message; }
- private:
- Message::shared_ptr message;
- CompletionHandler* handler;
- MessageStore* const store;
- const uint64_t stagingThreshold;
-
- void route();
- };
- }
-}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/MessageHandlerImpl.cpp b/qpid/cpp/lib/broker/MessageHandlerImpl.cpp
deleted file mode 100644
index fa7c10f26c..0000000000
--- a/qpid/cpp/lib/broker/MessageHandlerImpl.cpp
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "QpidError.h"
-#include "MessageHandlerImpl.h"
-#include "BrokerChannel.h"
-#include "FramingContent.h"
-#include "Connection.h"
-#include "Broker.h"
-#include "BrokerMessageMessage.h"
-#include "MessageAppendBody.h"
-#include "MessageTransferBody.h"
-#include "BrokerAdapter.h"
-
-namespace qpid {
-namespace broker {
-
-using namespace framing;
-
-MessageHandlerImpl::MessageHandlerImpl(CoreRefs& parent)
- : HandlerImplType(parent) {}
-
-//
-// Message class method handlers
-//
-
-void
-MessageHandlerImpl::cancel(const MethodContext& context,
- const string& destination )
-{
- channel.cancel(destination);
- client.ok(context.getRequestId());
-}
-
-void
-MessageHandlerImpl::open(const MethodContext& context,
- const string& reference)
-{
- references.open(reference);
- client.ok(context.getRequestId());
-}
-
-void
-MessageHandlerImpl::append(const MethodContext& context,
- const string& reference,
- const string& /*bytes*/ )
-{
- references.get(reference)->append(
- boost::shared_polymorphic_downcast<MessageAppendBody>(
- context.methodBody));
- client.ok(context.getRequestId());
-}
-
-void
-MessageHandlerImpl::close(const MethodContext& context,
- const string& reference)
-{
- Reference::shared_ptr ref = references.get(reference);
- client.ok(context.getRequestId());
-
- // Send any transfer messages to their correct exchanges and okay them
- const Reference::Messages& msgs = ref->getMessages();
- for (Reference::Messages::const_iterator m = msgs.begin(); m != msgs.end(); ++m) {
- channel.handleInlineTransfer(*m);
- client.ok((*m)->getRequestId());
- }
- ref->close();
-}
-
-void
-MessageHandlerImpl::checkpoint(const MethodContext& context,
- const string& /*reference*/,
- const string& /*identifier*/ )
-{
- // Initial implementation (which is conforming) is to do nothing here
- // and return offset zero for the resume
- client.ok(context.getRequestId());
-}
-
-void
-MessageHandlerImpl::resume(const MethodContext& context,
- const string& reference,
- const string& /*identifier*/ )
-{
- // Initial (null) implementation
- // open reference and return 0 offset
- references.open(reference);
- client.offset(0, context.getRequestId());
-}
-
-void
-MessageHandlerImpl::offset(const MethodContext&,
- uint64_t /*value*/ )
-{
- // Shouldn't ever receive this as it is reponse to resume
- // which is never sent
- // TODO astitcher 2007-02-16 What is the correct exception to throw here?
- THROW_QPID_ERROR(INTERNAL_ERROR, "impossible");
-}
-
-void
-MessageHandlerImpl::consume(const MethodContext& context,
- uint16_t /*ticket*/,
- const string& queueName,
- const string& destination,
- bool noLocal,
- bool noAck,
- bool exclusive,
- const framing::FieldTable& filter )
-{
- Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
- if(!destination.empty() && channel.exists(destination))
- throw ConnectionException(530, "Consumer tags must be unique");
- string tag = destination;
- channel.consume(
- tag, queue, !noAck, exclusive,
- noLocal ? &connection : 0, &filter);
- client.ok(context.getRequestId());
- // Dispatch messages as there is now a consumer.
- queue->dispatch();
-}
-
-void
-MessageHandlerImpl::get( const MethodContext& context,
- uint16_t /*ticket*/,
- const string& queueName,
- const string& destination,
- bool noAck )
-{
- Queue::shared_ptr queue =
- connection.getQueue(queueName, context.channel->getId());
-
- if(channel.get(queue, destination, !noAck))
- client.ok(context.getRequestId());
- else
- client.empty(context.getRequestId());
-}
-
-void
-MessageHandlerImpl::empty( const MethodContext& )
-{
- // Shouldn't ever receive this as it is a response to get
- // which is never sent
- // TODO astitcher 2007-02-09 What is the correct exception to throw here?
- THROW_QPID_ERROR(INTERNAL_ERROR, "Impossible");
-}
-
-void
-MessageHandlerImpl::ok(const MethodContext& /*context*/)
-{
- channel.ack();
-}
-
-void
-MessageHandlerImpl::qos(const MethodContext& context,
- uint32_t prefetchSize,
- uint16_t prefetchCount,
- bool /*global*/ )
-{
- //TODO: handle global
- channel.setPrefetchSize(prefetchSize);
- channel.setPrefetchCount(prefetchCount);
- client.ok(context.getRequestId());
-}
-
-void
-MessageHandlerImpl::recover(const MethodContext& context,
- bool requeue)
-{
- channel.recover(requeue);
- client.ok(context.getRequestId());
-}
-
-void
-MessageHandlerImpl::reject(const MethodContext& /*context*/,
- uint16_t /*code*/,
- const string& /*text*/ )
-{
- channel.ack();
- // channel.requeue();
-}
-
-void
-MessageHandlerImpl::transfer(const MethodContext& context,
- uint16_t /*ticket*/,
- const string& /* destination */,
- bool /*redelivered*/,
- bool /*immediate*/,
- uint64_t /*ttl*/,
- uint8_t /*priority*/,
- uint64_t /*timestamp*/,
- uint8_t /*deliveryMode*/,
- uint64_t /*expiration*/,
- const string& /*exchangeName*/,
- const string& /*routingKey*/,
- const string& /*messageId*/,
- const string& /*correlationId*/,
- const string& /*replyTo*/,
- const string& /*contentType*/,
- const string& /*contentEncoding*/,
- const string& /*userId*/,
- const string& /*appId*/,
- const string& /*transactionId*/,
- const string& /*securityToken*/,
- const framing::FieldTable& /*applicationHeaders*/,
- const framing::Content& body,
- bool /*mandatory*/)
-{
- MessageTransferBody::shared_ptr transfer(
- boost::shared_polymorphic_downcast<MessageTransferBody>(
- context.methodBody));
- RequestId requestId = context.getRequestId();
-
- if (body.isInline()) {
- MessageMessage::shared_ptr message(
- new MessageMessage(&connection, requestId, transfer));
- channel.handleInlineTransfer(message);
- client.ok(requestId);
- } else {
- Reference::shared_ptr ref(references.get(body.getValue()));
- MessageMessage::shared_ptr message(
- new MessageMessage(&connection, requestId, transfer, ref));
- ref->addMessage(message);
- }
-}
-
-
-}} // namespace qpid::broker
diff --git a/qpid/cpp/lib/broker/MessageHandlerImpl.h b/qpid/cpp/lib/broker/MessageHandlerImpl.h
deleted file mode 100644
index 872d429d5c..0000000000
--- a/qpid/cpp/lib/broker/MessageHandlerImpl.h
+++ /dev/null
@@ -1,130 +0,0 @@
-#ifndef _broker_MessageHandlerImpl_h
-#define _broker_MessageHandlerImpl_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <memory>
-
-#include "AMQP_ServerOperations.h"
-#include "AMQP_ClientProxy.h"
-#include "Reference.h"
-#include "HandlerImpl.h"
-
-namespace qpid {
-namespace broker {
-
-class Connection;
-class Broker;
-class MessageMessage;
-
-class MessageHandlerImpl :
- public framing::AMQP_ServerOperations::MessageHandler,
- public HandlerImpl<framing::AMQP_ClientProxy::Message>
-{
- public:
- MessageHandlerImpl(CoreRefs& parent);
-
- void append(const framing::MethodContext&,
- const std::string& reference,
- const std::string& bytes );
-
- void cancel(const framing::MethodContext&,
- const std::string& destination );
-
- void checkpoint(const framing::MethodContext&,
- const std::string& reference,
- const std::string& identifier );
-
- void close(const framing::MethodContext&,
- const std::string& reference );
-
- void consume(const framing::MethodContext&,
- uint16_t ticket,
- const std::string& queue,
- const std::string& destination,
- bool noLocal,
- bool noAck,
- bool exclusive,
- const framing::FieldTable& filter );
-
- void empty( const framing::MethodContext& );
-
- void get(const framing::MethodContext&,
- uint16_t ticket,
- const std::string& queue,
- const std::string& destination,
- bool noAck );
-
- void offset(const framing::MethodContext&,
- uint64_t value );
-
- void ok( const framing::MethodContext& );
-
- void open(const framing::MethodContext&,
- const std::string& reference );
-
- void qos(const framing::MethodContext&,
- uint32_t prefetchSize,
- uint16_t prefetchCount,
- bool global );
-
- void recover(const framing::MethodContext&,
- bool requeue );
-
- void reject(const framing::MethodContext&,
- uint16_t code,
- const std::string& text );
-
- void resume(const framing::MethodContext&,
- const std::string& reference,
- const std::string& identifier );
-
- void transfer(const framing::MethodContext&,
- uint16_t ticket,
- const std::string& destination,
- bool redelivered,
- bool immediate,
- uint64_t ttl,
- uint8_t priority,
- uint64_t timestamp,
- uint8_t deliveryMode,
- uint64_t expiration,
- const std::string& exchange,
- const std::string& routingKey,
- const std::string& messageId,
- const std::string& correlationId,
- const std::string& replyTo,
- const std::string& contentType,
- const std::string& contentEncoding,
- const std::string& userId,
- const std::string& appId,
- const std::string& transactionId,
- const std::string& securityToken,
- const framing::FieldTable& applicationHeaders,
- const framing::Content& body,
- bool mandatory );
- private:
- ReferenceRegistry references;
-};
-
-}} // namespace qpid::broker
-
-
-
-#endif /*!_broker_MessageHandlerImpl_h*/
diff --git a/qpid/cpp/lib/broker/MessageStore.h b/qpid/cpp/lib/broker/MessageStore.h
deleted file mode 100644
index 1d9ee86e48..0000000000
--- a/qpid/cpp/lib/broker/MessageStore.h
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _MessageStore_
-#define _MessageStore_
-
-#include "PersistableExchange.h"
-#include "PersistableMessage.h"
-#include "PersistableQueue.h"
-#include "RecoveryManager.h"
-#include "TransactionalStore.h"
-
-namespace qpid {
-namespace broker {
-
-/**
- * An abstraction of the persistent storage for messages. (In
- * all methods, any pointers/references to queues or messages
- * are valid only for the duration of the call).
- */
-class MessageStore : public TransactionalStore{
-public:
- /**
- * Record the existence of a durable queue
- */
- virtual void create(const PersistableQueue& queue) = 0;
- /**
- * Destroy a durable queue
- */
- virtual void destroy(const PersistableQueue& queue) = 0;
-
- /**
- * Record the existence of a durable exchange
- */
- virtual void create(const PersistableExchange& exchange) = 0;
- /**
- * Destroy a durable exchange
- */
- virtual void destroy(const PersistableExchange& exchange) = 0;
-
- /**
- * Request recovery of queue and message state from store
- */
- virtual void recover(RecoveryManager& queues) = 0;
-
- /**
- * Stores a messages before it has been enqueued
- * (enqueueing automatically stores the message so this is
- * only required if storage is required prior to that
- * point). If the message has not yet been stored it will
- * store the headers as well as any content passed in. A
- * persistence id will be set on the message which can be
- * used to load the content or to append to it.
- */
- virtual void stage(PersistableMessage& msg) = 0;
-
- /**
- * Destroys a previously staged message. This only needs
- * to be called if the message is never enqueued. (Once
- * enqueued, deletion will be automatic when the message
- * is dequeued from all queues it was enqueued onto).
- */
- virtual void destroy(PersistableMessage& msg) = 0;
-
- /**
- * Appends content to a previously staged message
- */
- virtual void appendContent(PersistableMessage& msg, const std::string& data) = 0;
-
- /**
- * Loads (a section) of content data for the specified
- * message (previously stored through a call to stage or
- * enqueue) into data. The offset refers to the content
- * only (i.e. an offset of 0 implies that the start of the
- * content should be loaded, not the headers or related
- * meta-data).
- */
- virtual void loadContent(PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length) = 0;
-
- /**
- * Enqueues a message, storing the message if it has not
- * been previously stored and recording that the given
- * message is on the given queue.
- *
- * @param msg the message to enqueue
- * @param queue the name of the queue onto which it is to be enqueued
- * @param xid (a pointer to) an identifier of the
- * distributed transaction in which the operation takes
- * place or null for 'local' transactions
- */
- virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue) = 0;
- /**
- * Dequeues a message, recording that the given message is
- * no longer on the given queue and deleting the message
- * if it is no longer on any other queue.
- *
- * @param msg the message to dequeue
- * @param queue the name of th queue from which it is to be dequeued
- * @param xid (a pointer to) an identifier of the
- * distributed transaction in which the operation takes
- * place or null for 'local' transactions
- */
- virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue) = 0;
-
- virtual ~MessageStore(){}
-};
-
-}
-}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/MessageStoreModule.cpp b/qpid/cpp/lib/broker/MessageStoreModule.cpp
deleted file mode 100644
index 9939440ecb..0000000000
--- a/qpid/cpp/lib/broker/MessageStoreModule.cpp
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <MessageStoreModule.h>
-#include <iostream>
-
-using namespace qpid::broker;
-
-MessageStoreModule::MessageStoreModule(const std::string& name) : store(name)
-{
-}
-
-void MessageStoreModule::create(const PersistableQueue& queue)
-{
- store->create(queue);
-}
-
-void MessageStoreModule::destroy(const PersistableQueue& queue)
-{
- store->destroy(queue);
-}
-
-void MessageStoreModule::create(const PersistableExchange& exchange)
-{
- store->create(exchange);
-}
-
-void MessageStoreModule::destroy(const PersistableExchange& exchange)
-{
- store->destroy(exchange);
-}
-
-void MessageStoreModule::recover(RecoveryManager& registry)
-{
- store->recover(registry);
-}
-
-void MessageStoreModule::stage(PersistableMessage& msg)
-{
- store->stage(msg);
-}
-
-void MessageStoreModule::destroy(PersistableMessage& msg)
-{
- store->destroy(msg);
-}
-
-void MessageStoreModule::appendContent(PersistableMessage& msg, const std::string& data)
-{
- store->appendContent(msg, data);
-}
-
-void MessageStoreModule::loadContent(PersistableMessage& msg, string& data, uint64_t offset, uint32_t length)
-{
- store->loadContent(msg, data, offset, length);
-}
-
-void MessageStoreModule::enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue)
-{
- store->enqueue(ctxt, msg, queue);
-}
-
-void MessageStoreModule::dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue)
-{
- store->dequeue(ctxt, msg, queue);
-}
-
-std::auto_ptr<TransactionContext> MessageStoreModule::begin()
-{
- return store->begin();
-}
-
-std::auto_ptr<TPCTransactionContext> MessageStoreModule::begin(const std::string& xid)
-{
- return store->begin(xid);
-}
-
-void MessageStoreModule::prepare(TPCTransactionContext& txn)
-{
- store->prepare(txn);
-}
-
-void MessageStoreModule::commit(TransactionContext& ctxt)
-{
- store->commit(ctxt);
-}
-
-void MessageStoreModule::abort(TransactionContext& ctxt)
-{
- store->abort(ctxt);
-}
diff --git a/qpid/cpp/lib/broker/MessageStoreModule.h b/qpid/cpp/lib/broker/MessageStoreModule.h
deleted file mode 100644
index 1787a4f361..0000000000
--- a/qpid/cpp/lib/broker/MessageStoreModule.h
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _MessageStoreModule_
-#define _MessageStoreModule_
-
-#include <BrokerMessage.h>
-#include <MessageStore.h>
-#include <BrokerQueue.h>
-#include <RecoveryManager.h>
-#include <sys/Module.h>
-
-namespace qpid {
-namespace broker {
-
-/**
- * A null implementation of the MessageStore interface
- */
-class MessageStoreModule : public MessageStore
-{
- qpid::sys::Module<MessageStore> store;
-public:
- MessageStoreModule(const std::string& name);
-
- std::auto_ptr<TransactionContext> begin();
- std::auto_ptr<TPCTransactionContext> begin(const std::string& xid);
- void prepare(TPCTransactionContext& txn);
- void commit(TransactionContext& txn);
- void abort(TransactionContext& txn);
-
- void create(const PersistableQueue& queue);
- void destroy(const PersistableQueue& queue);
- void create(const PersistableExchange& exchange);
- void destroy(const PersistableExchange& exchange);
- void recover(RecoveryManager& queues);
- void stage(PersistableMessage& msg);
- void destroy(PersistableMessage& msg);
- void appendContent(PersistableMessage& msg, const std::string& data);
- void loadContent(PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length);
- void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
- void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
-
- ~MessageStoreModule(){}
-};
-
-}
-}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/NameGenerator.cpp b/qpid/cpp/lib/broker/NameGenerator.cpp
deleted file mode 100644
index 3f281859fa..0000000000
--- a/qpid/cpp/lib/broker/NameGenerator.cpp
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <NameGenerator.h>
-#include <sstream>
-
-using namespace qpid::broker;
-
-NameGenerator::NameGenerator(const std::string& _base) : base(_base), counter(1) {}
-
-std::string NameGenerator::generate(){
- std::stringstream ss;
- ss << base << counter++;
- return ss.str();
-}
diff --git a/qpid/cpp/lib/broker/NameGenerator.h b/qpid/cpp/lib/broker/NameGenerator.h
deleted file mode 100644
index b2dbbdfb69..0000000000
--- a/qpid/cpp/lib/broker/NameGenerator.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _NameGenerator_
-#define _NameGenerator_
-
-#include <BrokerMessage.h>
-
-namespace qpid {
- namespace broker {
- class NameGenerator{
- const std::string base;
- unsigned int counter;
- public:
- NameGenerator(const std::string& base);
- std::string generate();
- };
- }
-}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/NullMessageStore.cpp b/qpid/cpp/lib/broker/NullMessageStore.cpp
deleted file mode 100644
index 0d53a31069..0000000000
--- a/qpid/cpp/lib/broker/NullMessageStore.cpp
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <NullMessageStore.h>
-
-#include <RecoveryManager.h>
-
-#include <iostream>
-
-using namespace qpid::broker;
-
-NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){}
-
-void NullMessageStore::create(const PersistableQueue& queue)
-{
- if (warn) std::cout << "WARNING: Can't create durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
-}
-
-void NullMessageStore::destroy(const PersistableQueue& queue)
-{
- if (warn) std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
-}
-
-void NullMessageStore::create(const PersistableExchange&)
-{
-}
-
-void NullMessageStore::destroy(const PersistableExchange&)
-{
-}
-
-void NullMessageStore::recover(RecoveryManager&)
-{
- if (warn) std::cout << "WARNING: Persistence not enabled, no recovery of queues or messages." << std::endl;
-}
-
-void NullMessageStore::stage(PersistableMessage&)
-{
- if (warn) std::cout << "WARNING: Can't stage message. Persistence not enabled." << std::endl;
-}
-
-void NullMessageStore::destroy(PersistableMessage&)
-{
- if (warn) std::cout << "WARNING: No need to destroy staged message. Persistence not enabled." << std::endl;
-}
-
-void NullMessageStore::appendContent(PersistableMessage&, const string&)
-{
- if (warn) std::cout << "WARNING: Can't append content. Persistence not enabled." << std::endl;
-}
-
-void NullMessageStore::loadContent(PersistableMessage&, string&, uint64_t, uint32_t)
-{
- if (warn) std::cout << "WARNING: Can't load content. Persistence not enabled." << std::endl;
-}
-
-void NullMessageStore::enqueue(TransactionContext*, PersistableMessage&, const PersistableQueue& queue)
-{
- if (warn) std::cout << "WARNING: Can't enqueue message onto '" << queue.getName() << "'. Persistence not enabled." << std::endl;
-}
-
-void NullMessageStore::dequeue(TransactionContext*, PersistableMessage&, const PersistableQueue& queue)
-{
- if (warn) std::cout << "WARNING: Can't dequeue message from '" << queue.getName() << "'. Persistence not enabled." << std::endl;
-}
-
-std::auto_ptr<TransactionContext> NullMessageStore::begin()
-{
- return std::auto_ptr<TransactionContext>();
-}
-
-std::auto_ptr<TPCTransactionContext> NullMessageStore::begin(const std::string&)
-{
- return std::auto_ptr<TPCTransactionContext>();
-}
-
-void NullMessageStore::prepare(TPCTransactionContext&)
-{
-}
-
-void NullMessageStore::commit(TransactionContext&)
-{
-}
-
-void NullMessageStore::abort(TransactionContext&)
-{
-}
diff --git a/qpid/cpp/lib/broker/NullMessageStore.h b/qpid/cpp/lib/broker/NullMessageStore.h
deleted file mode 100644
index f1a321cff4..0000000000
--- a/qpid/cpp/lib/broker/NullMessageStore.h
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _NullMessageStore_
-#define _NullMessageStore_
-
-#include <BrokerMessage.h>
-#include <MessageStore.h>
-#include <BrokerQueue.h>
-
-namespace qpid {
-namespace broker {
-
-/**
- * A null implementation of the MessageStore interface
- */
-class NullMessageStore : public MessageStore
-{
- const bool warn;
-public:
- NullMessageStore(bool warn = false);
-
- virtual std::auto_ptr<TransactionContext> begin();
- virtual std::auto_ptr<TPCTransactionContext> begin(const std::string& xid);
- virtual void prepare(TPCTransactionContext& txn);
- virtual void commit(TransactionContext& txn);
- virtual void abort(TransactionContext& txn);
-
- virtual void create(const PersistableQueue& queue);
- virtual void destroy(const PersistableQueue& queue);
- virtual void create(const PersistableExchange& exchange);
- virtual void destroy(const PersistableExchange& exchange);
- virtual void recover(RecoveryManager& queues);
- virtual void stage(PersistableMessage& msg);
- virtual void destroy(PersistableMessage& msg);
- virtual void appendContent(PersistableMessage& msg, const std::string& data);
- virtual void loadContent(PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length);
- virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
- virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
- ~NullMessageStore(){}
-};
-
-}
-}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/Persistable.h b/qpid/cpp/lib/broker/Persistable.h
deleted file mode 100644
index 9f48643c9e..0000000000
--- a/qpid/cpp/lib/broker/Persistable.h
+++ /dev/null
@@ -1,62 +0,0 @@
-#ifndef _broker_Persistable_h
-#define _broker_Persistable_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "framing/amqp_types.h"
-#include "framing/Buffer.h"
-
-namespace qpid {
-namespace broker {
-
-/**
- * Base class for all persistable objects
- */
-class Persistable
-{
-public:
- /**
- * Allows the store to attach its own identifier to this object
- */
- virtual void setPersistenceId(uint64_t id) = 0;
- /**
- * Returns any identifier the store may have attached to this
- * object
- */
- virtual uint64_t getPersistenceId() const = 0;
- /**
- * Encodes the persistable state of this object into the supplied
- * buffer
- */
- virtual void encode(framing::Buffer& buffer) const = 0;
- /**
- * @returns the size of the buffer needed to encode this object
- */
- virtual uint32_t encodedSize() const = 0;
-
- virtual ~Persistable() {};
-};
-
-}}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/PersistableExchange.h b/qpid/cpp/lib/broker/PersistableExchange.h
deleted file mode 100644
index 9badf5f609..0000000000
--- a/qpid/cpp/lib/broker/PersistableExchange.h
+++ /dev/null
@@ -1,44 +0,0 @@
-#ifndef _broker_PersistableExchange_h
-#define _broker_PersistableExchange_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <string>
-#include "Persistable.h"
-
-namespace qpid {
-namespace broker {
-
-/**
- * The interface exchanges must expose to the MessageStore in order to be
- * persistable.
- */
-class PersistableExchange : public Persistable
-{
-public:
- virtual ~PersistableExchange() {};
-};
-
-}}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/PersistableMessage.h b/qpid/cpp/lib/broker/PersistableMessage.h
deleted file mode 100644
index f598e48709..0000000000
--- a/qpid/cpp/lib/broker/PersistableMessage.h
+++ /dev/null
@@ -1,53 +0,0 @@
-#ifndef _broker_PersistableMessage_h
-#define _broker_PersistableMessage_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <string>
-#include <boost/shared_ptr.hpp>
-#include "Persistable.h"
-#include "framing/amqp_types.h"
-
-namespace qpid {
-namespace broker {
-
-/**
- * The interface messages must expose to the MessageStore in order to
- * be persistable.
- */
- class PersistableMessage : public Persistable
-{
-public:
- typedef boost::shared_ptr<PersistableMessage> shared_ptr;
-
- /**
- * @returns the size of the headers when encoded
- */
- virtual uint32_t encodedHeaderSize() const = 0;
-
- virtual ~PersistableMessage() {};
-};
-
-}}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/PersistableQueue.h b/qpid/cpp/lib/broker/PersistableQueue.h
deleted file mode 100644
index 5dd91dde9b..0000000000
--- a/qpid/cpp/lib/broker/PersistableQueue.h
+++ /dev/null
@@ -1,45 +0,0 @@
-#ifndef _broker_PersistableQueue_h
-#define _broker_PersistableQueue_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <string>
-#include "Persistable.h"
-
-namespace qpid {
-namespace broker {
-
-/**
- * The interface queues must expose to the MessageStore in order to be
- * persistable.
- */
-class PersistableQueue : public Persistable
-{
-public:
- virtual const std::string& getName() const = 0;
- virtual ~PersistableQueue() {};
-};
-
-}}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/Prefetch.h b/qpid/cpp/lib/broker/Prefetch.h
deleted file mode 100644
index b6d4026c3f..0000000000
--- a/qpid/cpp/lib/broker/Prefetch.h
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _Prefetch_
-#define _Prefetch_
-
-#include <amqp_types.h>
-
-namespace qpid {
- namespace broker {
- /**
- * Count and total size of asynchronously delivered
- * (i.e. pushed) messages that have acks outstanding.
- */
- struct Prefetch{
- uint32_t size;
- uint16_t count;
-
- void reset() { size = 0; count = 0; }
- };
- }
-}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/QueuePolicy.cpp b/qpid/cpp/lib/broker/QueuePolicy.cpp
deleted file mode 100644
index 94b86f2bbb..0000000000
--- a/qpid/cpp/lib/broker/QueuePolicy.cpp
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <QueuePolicy.h>
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-
-QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize) :
- maxCount(_maxCount), maxSize(_maxSize), count(0), size(0) {}
-
-QueuePolicy::QueuePolicy(const FieldTable& settings) :
- maxCount(getInt(settings, maxCountKey, 0)),
- maxSize(getInt(settings, maxSizeKey, 0)), count(0), size(0) {}
-
-void QueuePolicy::enqueued(uint64_t _size)
-{
- if (maxCount) count++;
- if (maxSize) size += _size;
-}
-
-void QueuePolicy::dequeued(uint64_t _size)
-{
- if (maxCount) count--;
- if (maxSize) size -= _size;
-}
-
-bool QueuePolicy::limitExceeded()
-{
- return (maxSize && size > maxSize) || (maxCount && count > maxCount);
-}
-
-void QueuePolicy::update(FieldTable& settings)
-{
- if (maxCount) settings.setInt(maxCountKey, maxCount);
- if (maxSize) settings.setInt(maxSizeKey, maxSize);
-}
-
-
-int QueuePolicy::getInt(const FieldTable& settings, const std::string& key, int defaultValue)
-{
- //Note: currently field table only contain signed 32 bit ints, which
- // restricts the values that can be set on the queue policy.
- try {
- return settings.getInt(key);
- } catch (FieldNotFoundException& ignore) {
- return defaultValue;
- }
-}
-
-const std::string QueuePolicy::maxCountKey("qpid.max_count");
-const std::string QueuePolicy::maxSizeKey("qpid.max_size");
diff --git a/qpid/cpp/lib/broker/QueuePolicy.h b/qpid/cpp/lib/broker/QueuePolicy.h
deleted file mode 100644
index e7688f3e67..0000000000
--- a/qpid/cpp/lib/broker/QueuePolicy.h
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _QueuePolicy_
-#define _QueuePolicy_
-
-#include <FieldTable.h>
-
-namespace qpid {
- namespace broker {
- class QueuePolicy
- {
- static const std::string maxCountKey;
- static const std::string maxSizeKey;
-
- const uint32_t maxCount;
- const uint64_t maxSize;
- uint32_t count;
- uint64_t size;
-
- static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue);
-
- public:
- QueuePolicy(uint32_t maxCount, uint64_t maxSize);
- QueuePolicy(const qpid::framing::FieldTable& settings);
- void enqueued(uint64_t size);
- void dequeued(uint64_t size);
- void update(qpid::framing::FieldTable& settings);
- bool limitExceeded();
- uint32_t getMaxCount() const { return maxCount; }
- uint64_t getMaxSize() const { return maxSize; }
- };
- }
-}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/QueueRegistry.cpp b/qpid/cpp/lib/broker/QueueRegistry.cpp
deleted file mode 100644
index d33cd09840..0000000000
--- a/qpid/cpp/lib/broker/QueueRegistry.cpp
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <QueueRegistry.h>
-#include <sstream>
-#include <assert.h>
-
-using namespace qpid::broker;
-using namespace qpid::sys;
-
-QueueRegistry::QueueRegistry(MessageStore* const _store) : counter(1), store(_store){}
-
-QueueRegistry::~QueueRegistry(){}
-
-std::pair<Queue::shared_ptr, bool>
-QueueRegistry::declare(const string& declareName, bool durable,
- uint32_t autoDelete, const ConnectionToken* owner)
-{
- Mutex::ScopedLock locker(lock);
- string name = declareName.empty() ? generateName() : declareName;
- assert(!name.empty());
- QueueMap::iterator i = queues.find(name);
- if (i == queues.end()) {
- Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner));
- queues[name] = queue;
- return std::pair<Queue::shared_ptr, bool>(queue, true);
- } else {
- return std::pair<Queue::shared_ptr, bool>(i->second, false);
- }
-}
-
-void QueueRegistry::destroy(const string& name){
- Mutex::ScopedLock locker(lock);
- queues.erase(name);
-}
-
-Queue::shared_ptr QueueRegistry::find(const string& name){
- Mutex::ScopedLock locker(lock);
- QueueMap::iterator i = queues.find(name);
- if (i == queues.end()) {
- return Queue::shared_ptr();
- } else {
- return i->second;
- }
-}
-
-string QueueRegistry::generateName(){
- string name;
- do {
- std::stringstream ss;
- ss << "tmp_" << counter++;
- name = ss.str();
- // Thread safety: Private function, only called with lock held
- // so this is OK.
- } while(queues.find(name) != queues.end());
- return name;
-}
-
-MessageStore* const QueueRegistry::getStore() const {
- return store;
-}
diff --git a/qpid/cpp/lib/broker/QueueRegistry.h b/qpid/cpp/lib/broker/QueueRegistry.h
deleted file mode 100644
index 079034359e..0000000000
--- a/qpid/cpp/lib/broker/QueueRegistry.h
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _QueueRegistry_
-#define _QueueRegistry_
-
-#include <map>
-#include <sys/Monitor.h>
-#include <BrokerQueue.h>
-
-namespace qpid {
-namespace broker {
-
-/**
- * A registry of queues indexed by queue name.
- *
- * Queues are reference counted using shared_ptr to ensure that they
- * are deleted when and only when they are no longer in use.
- *
- */
-class QueueRegistry{
-
- public:
- QueueRegistry(MessageStore* const store = 0);
- ~QueueRegistry();
-
- /**
- * Declare a queue.
- *
- * @return The queue and a boolean flag which is true if the queue
- * was created by this declare call false if it already existed.
- */
- std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, uint32_t autodelete = 0,
- const ConnectionToken* const owner = 0);
-
- /**
- * Destroy the named queue.
- *
- * Note: if the queue is in use it is not actually destroyed until
- * all shared_ptrs to it are destroyed. During that time it is
- * possible that a new queue with the same name may be
- * created. This should not create any problems as the new and
- * old queues exist independently. The registry has
- * forgotten the old queue so there can be no confusion for
- * subsequent calls to find or declare with the same name.
- *
- */
- void destroy(const string& name);
-
- /**
- * Find the named queue. Return 0 if not found.
- */
- Queue::shared_ptr find(const string& name);
-
- /**
- * Generate unique queue name.
- */
- string generateName();
-
- /**
- * Return the message store used.
- */
- MessageStore* const getStore() const;
-
-
- private:
- typedef std::map<string, Queue::shared_ptr> QueueMap;
- QueueMap queues;
- qpid::sys::Mutex lock;
- int counter;
- MessageStore* const store;
-};
-
-
-}
-}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/RecoverableMessage.h b/qpid/cpp/lib/broker/RecoverableMessage.h
deleted file mode 100644
index 4bb0d2c4a1..0000000000
--- a/qpid/cpp/lib/broker/RecoverableMessage.h
+++ /dev/null
@@ -1,57 +0,0 @@
-#ifndef _broker_RecoverableMessage_h
-#define _broker_RecoverableMessage_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <boost/shared_ptr.hpp>
-#include "framing/amqp_types.h"
-#include "framing/Buffer.h"
-
-namespace qpid {
-namespace broker {
-
-/**
- * The interface through which messages are reloaded on recovery.
- */
-class RecoverableMessage
-{
-public:
- typedef boost::shared_ptr<RecoverableMessage> shared_ptr;
- /**
- * Used by store to determine whether to load content on recovery
- * or let message load its own content as and when it requires it.
- *
- * @returns true if the content of the message should be loaded
- */
- virtual bool loadContent(uint64_t available) = 0;
- /**
- * Loads the content held in the supplied buffer (may do checking
- * of length as necessary)
- */
- virtual void decodeContent(framing::Buffer& buffer) = 0;
- virtual ~RecoverableMessage() {};
-};
-
-}}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/RecoverableQueue.h b/qpid/cpp/lib/broker/RecoverableQueue.h
deleted file mode 100644
index a5c564b947..0000000000
--- a/qpid/cpp/lib/broker/RecoverableQueue.h
+++ /dev/null
@@ -1,49 +0,0 @@
-#ifndef _broker_RecoverableQueue_h
-#define _broker_RecoverableQueue_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "RecoverableMessage.h"
-#include <boost/shared_ptr.hpp>
-
-namespace qpid {
-namespace broker {
-
-/**
- * The interface through which messages are added back to queues on
- * recovery.
- */
-class RecoverableQueue
-{
-public:
- typedef boost::shared_ptr<RecoverableQueue> shared_ptr;
- /**
- * Used during recovery to add stored messages back to the queue
- */
- virtual void recover(RecoverableMessage::shared_ptr msg) = 0;
- virtual ~RecoverableQueue() {};
-};
-
-}}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/RecoveryManager.h b/qpid/cpp/lib/broker/RecoveryManager.h
deleted file mode 100644
index 700bbdcf80..0000000000
--- a/qpid/cpp/lib/broker/RecoveryManager.h
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _RecoveryManager_
-#define _RecoveryManager_
-
-#include "RecoverableQueue.h"
-#include "RecoverableMessage.h"
-#include "framing/Buffer.h"
-
-namespace qpid {
-namespace broker {
-
- class RecoveryManager{
- public:
- virtual ~RecoveryManager(){}
- virtual void recoverExchange(framing::Buffer& buffer) = 0;
- virtual RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer) = 0;
- virtual RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer) = 0;
- virtual void recoveryComplete() = 0;
- };
-
-
-}
-}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/RecoveryManagerImpl.cpp b/qpid/cpp/lib/broker/RecoveryManagerImpl.cpp
deleted file mode 100644
index c14f9c52cc..0000000000
--- a/qpid/cpp/lib/broker/RecoveryManagerImpl.cpp
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <RecoveryManagerImpl.h>
-
-#include "BrokerMessage.h"
-#include "BrokerMessageMessage.h"
-#include "BrokerQueue.h"
-
-using namespace qpid;
-using namespace qpid::broker;
-using boost::dynamic_pointer_cast;
-
-
-static const uint8_t BASIC = 1;
-static const uint8_t MESSAGE = 2;
-
-RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, uint64_t _stagingThreshold)
- : queues(_queues), exchanges(_exchanges), stagingThreshold(_stagingThreshold) {}
-
-RecoveryManagerImpl::~RecoveryManagerImpl() {}
-
-class RecoverableMessageImpl : public RecoverableMessage
-{
- Message::shared_ptr msg;
- const uint64_t stagingThreshold;
-public:
- RecoverableMessageImpl(Message::shared_ptr& _msg, uint64_t _stagingThreshold)
- : msg(_msg), stagingThreshold(_stagingThreshold) {}
- ~RecoverableMessageImpl() {};
- bool loadContent(uint64_t available);
- void decodeContent(framing::Buffer& buffer);
- void recover(Queue::shared_ptr queue);
-};
-
-class RecoverableQueueImpl : public RecoverableQueue
-{
- Queue::shared_ptr queue;
-public:
- RecoverableQueueImpl(Queue::shared_ptr& _queue) : queue(_queue) {}
- ~RecoverableQueueImpl() {};
- void recover(RecoverableMessage::shared_ptr msg);
-};
-
-void RecoveryManagerImpl::recoverExchange(framing::Buffer&)
-{
- //TODO
-}
-
-RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& buffer)
-{
- Queue::shared_ptr queue = Queue::decode(queues, buffer);
- try {
- Exchange::shared_ptr exchange = exchanges.getDefault();
- if (exchange) {
- exchange->bind(queue, queue->getName(), 0);
- }
- } catch (ChannelException& e) {
- //assume no default exchange has been declared
- }
- return RecoverableQueue::shared_ptr(new RecoverableQueueImpl(queue));
-}
-
-RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buffer& buffer)
-{
- buffer.record();
- //peek at type:
- Message::shared_ptr message(decodeMessageType(buffer) == MESSAGE ?
- ((Message*) new MessageMessage()) :
- ((Message*) new BasicMessage()));
- buffer.restore();
- message->decodeHeader(buffer);
- return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, stagingThreshold));
-}
-
-void RecoveryManagerImpl::recoveryComplete()
-{
- //TODO (finalise binding setup etc)
-}
-
-uint8_t RecoveryManagerImpl::decodeMessageType(framing::Buffer& buffer)
-{
- return buffer.getOctet();
-}
-
-void RecoveryManagerImpl::encodeMessageType(const Message& msg, framing::Buffer& buffer)
-{
- buffer.putOctet(dynamic_cast<const MessageMessage*>(&msg) ? MESSAGE : BASIC);
-}
-
-uint32_t RecoveryManagerImpl::encodedMessageTypeSize()
-{
- return 1;
-}
-
-bool RecoverableMessageImpl::loadContent(uint64_t available)
-{
- return !stagingThreshold || available < stagingThreshold;
-}
-
-void RecoverableMessageImpl::decodeContent(framing::Buffer& buffer)
-{
- msg->decodeContent(buffer);
-}
-
-void RecoverableMessageImpl::recover(Queue::shared_ptr queue)
-{
- queue->recover(msg);
-}
-
-void RecoverableQueueImpl::recover(RecoverableMessage::shared_ptr msg)
-{
- dynamic_pointer_cast<RecoverableMessageImpl>(msg)->recover(queue);
-}
diff --git a/qpid/cpp/lib/broker/RecoveryManagerImpl.h b/qpid/cpp/lib/broker/RecoveryManagerImpl.h
deleted file mode 100644
index c40de7895f..0000000000
--- a/qpid/cpp/lib/broker/RecoveryManagerImpl.h
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _RecoveryManagerImpl_
-#define _RecoveryManagerImpl_
-
-#include <list>
-#include "ExchangeRegistry.h"
-#include "QueueRegistry.h"
-#include "RecoveryManager.h"
-
-namespace qpid {
-namespace broker {
-
- class RecoveryManagerImpl : public RecoveryManager{
- QueueRegistry& queues;
- ExchangeRegistry& exchanges;
- const uint64_t stagingThreshold;
- public:
- RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, uint64_t stagingThreshold);
- ~RecoveryManagerImpl();
-
- void recoverExchange(framing::Buffer& buffer);
- RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer);
- RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer);
- void recoveryComplete();
-
- static uint8_t decodeMessageType(framing::Buffer& buffer);
- static void encodeMessageType(const Message& msg, framing::Buffer& buffer);
- static uint32_t encodedMessageTypeSize();
- };
-
-
-}
-}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/Reference.cpp b/qpid/cpp/lib/broker/Reference.cpp
deleted file mode 100644
index bd1bdcb007..0000000000
--- a/qpid/cpp/lib/broker/Reference.cpp
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <boost/bind.hpp>
-#include "Reference.h"
-#include "BrokerMessageMessage.h"
-#include "QpidError.h"
-#include "MessageAppendBody.h"
-#include "CompletionHandler.h"
-
-namespace qpid {
-namespace broker {
-
-Reference::shared_ptr ReferenceRegistry::open(const Reference::Id& id) {
- ReferenceMap::iterator i = references.find(id);
- if (i != references.end())
- throw ConnectionException(503, "Attempt to re-open reference " +id);
- return references[id] = Reference::shared_ptr(new Reference(id, this));
-}
-
-Reference::shared_ptr ReferenceRegistry::get(const Reference::Id& id) {
- ReferenceMap::iterator i = references.find(id);
- if (i == references.end())
- throw ConnectionException(503, "Attempt to use non-existent reference "+id);
- return i->second;
-}
-
-void Reference::append(AppendPtr ptr) {
- appends.push_back(ptr);
- size += ptr->getBytes().length();
-}
-
-void Reference::close() {
- registry->references.erase(getId());
-}
-
-}} // namespace qpid::broker
diff --git a/qpid/cpp/lib/broker/Reference.h b/qpid/cpp/lib/broker/Reference.h
deleted file mode 100644
index 277eb7b917..0000000000
--- a/qpid/cpp/lib/broker/Reference.h
+++ /dev/null
@@ -1,114 +0,0 @@
-#ifndef _broker_Reference_h
-#define _broker_Reference_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <string>
-#include <vector>
-#include <map>
-#include <boost/shared_ptr.hpp>
-#include <boost/range.hpp>
-
-namespace qpid {
-
-namespace framing {
-class MessageAppendBody;
-}
-
-namespace broker {
-
-class MessageMessage;
-class ReferenceRegistry;
-
-// FIXME aconway 2007-03-27: Merge with client::IncomingMessage
-// to common reference handling code.
-
-/**
- * A reference is an accumulation point for data in a multi-frame
- * message. A reference can be used by multiple transfer commands to
- * create multiple messages, so the reference tracks which commands
- * are using it. When the reference is closed, all the associated
- * transfers are completed.
- *
- * THREAD UNSAFE: per-channel resource, access to channels is
- * serialized.
- */
-class Reference
-{
- public:
- typedef std::string Id;
- typedef boost::shared_ptr<Reference> shared_ptr;
- typedef boost::shared_ptr<MessageMessage> MessagePtr;
- typedef std::vector<MessagePtr> Messages;
- typedef boost::shared_ptr<framing::MessageAppendBody> AppendPtr;
- typedef std::vector<AppendPtr> Appends;
-
- Reference(const Id& id_=Id(), ReferenceRegistry* reg=0)
- : id(id_), size(0), registry(reg) {}
-
- const std::string& getId() const { return id; }
- uint64_t getSize() const { return size; }
-
- /** Add a message to be completed with this reference */
- void addMessage(MessagePtr message) { messages.push_back(message); }
-
- /** Append more data to the reference */
- void append(AppendPtr ptr);
-
- /** Close the reference, complete each associated message */
- void close();
-
- const Appends& getAppends() const { return appends; }
- const Messages& getMessages() const { return messages; }
-
- private:
- Id id;
- uint64_t size;
- ReferenceRegistry* registry;
- Messages messages;
- Appends appends;
-};
-
-
-/**
- * A registry/factory for references.
- *
- * THREAD UNSAFE: per-channel resource, access to channels is
- * serialized.
- */
-class ReferenceRegistry {
- public:
- ReferenceRegistry() {};
- Reference::shared_ptr open(const Reference::Id& id);
- Reference::shared_ptr get(const Reference::Id& id);
-
- private:
- typedef std::map<Reference::Id, Reference::shared_ptr> ReferenceMap;
- ReferenceMap references;
-
- // Reference calls references.erase().
- friend class Reference;
-};
-
-
-}} // namespace qpid::broker
-
-
-
-#endif /*!_broker_Reference_h*/
diff --git a/qpid/cpp/lib/broker/TopicExchange.cpp b/qpid/cpp/lib/broker/TopicExchange.cpp
deleted file mode 100644
index 796d3cea02..0000000000
--- a/qpid/cpp/lib/broker/TopicExchange.cpp
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <TopicExchange.h>
-#include <algorithm>
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-// TODO aconway 2006-09-20: More efficient matching algorithm.
-// Areas for improvement:
-// - excessive string copying: should be 0 copy, match from original buffer.
-// - match/lookup: use descision tree or other more efficient structure.
-
-Tokens& Tokens::operator=(const std::string& s) {
- clear();
- if (s.empty()) return *this;
- std::string::const_iterator i = s.begin();
- while (true) {
- // Invariant: i is at the beginning of the next untokenized word.
- std::string::const_iterator j = find(i, s.end(), '.');
- push_back(std::string(i, j));
- if (j == s.end()) return *this;
- i = j + 1;
- }
- return *this;
-}
-
-TopicPattern& TopicPattern::operator=(const Tokens& tokens) {
- Tokens::operator=(tokens);
- normalize();
- return *this;
-}
-
-namespace {
-const std::string hashmark("#");
-const std::string star("*");
-}
-
-void TopicPattern::normalize() {
- std::string word;
- Tokens::iterator i = begin();
- while (i != end()) {
- if (*i == hashmark) {
- ++i;
- while (i != end()) {
- // Invariant: *(i-1)==#, [begin()..i-1] is normalized.
- if (*i == star) { // Move * before #.
- std::swap(*i, *(i-1));
- ++i;
- } else if (*i == hashmark) {
- erase(i); // Remove extra #
- } else {
- break;
- }
- }
- } else {
- i ++;
- }
- }
-}
-
-
-namespace {
-// TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string.
-// Need StringRef class that operates on a string in place witout copy.
-// Should be applied everywhere strings are extracted from frames.
-//
-bool do_match(Tokens::const_iterator pattern_begin, Tokens::const_iterator pattern_end, Tokens::const_iterator target_begin, Tokens::const_iterator target_end)
-{
- // Invariant: [pattern_begin..p) matches [target_begin..t)
- Tokens::const_iterator p = pattern_begin;
- Tokens::const_iterator t = target_begin;
- while (p != pattern_end && t != target_end)
- {
- if (*p == star || *p == *t) {
- ++p, ++t;
- } else if (*p == hashmark) {
- ++p;
- if (do_match(p, pattern_end, t, target_end)) return true;
- while (t != target_end) {
- ++t;
- if (do_match(p, pattern_end, t, target_end)) return true;
- }
- return false;
- } else {
- return false;
- }
- }
- while (p != pattern_end && *p == hashmark) ++p; // Ignore trailing #
- return t == target_end && p == pattern_end;
-}
-}
-
-bool TopicPattern::match(const Tokens& target) const
-{
- return do_match(begin(), end(), target.begin(), target.end());
-}
-
-TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { }
-
-void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
- Monitor::ScopedLock l(lock);
- TopicPattern routingPattern(routingKey);
- bindings[routingPattern].push_back(queue);
-}
-
-void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){
- Monitor::ScopedLock l(lock);
- BindingMap::iterator bi = bindings.find(TopicPattern(routingKey));
- Queue::vector& qv(bi->second);
- if (bi == bindings.end()) return;
- Queue::vector::iterator q = find(qv.begin(), qv.end(), queue);
- if(q == qv.end()) return;
- qv.erase(q);
- if(qv.empty()) bindings.erase(bi);
-}
-
-
-void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
- Monitor::ScopedLock l(lock);
- for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- if (i->first.match(routingKey)) {
- Queue::vector& qv(i->second);
- for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){
- msg.deliverTo(*j);
- }
- }
- }
-}
-
-TopicExchange::~TopicExchange() {}
-
-const std::string TopicExchange::typeName("topic");
-
-
diff --git a/qpid/cpp/lib/broker/TopicExchange.h b/qpid/cpp/lib/broker/TopicExchange.h
deleted file mode 100644
index fa0c86863a..0000000000
--- a/qpid/cpp/lib/broker/TopicExchange.h
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _TopicExchange_
-#define _TopicExchange_
-
-#include <map>
-#include <vector>
-#include <BrokerExchange.h>
-#include <FieldTable.h>
-#include <BrokerMessage.h>
-#include <sys/Monitor.h>
-#include <BrokerQueue.h>
-
-namespace qpid {
-namespace broker {
-
-/** A vector of string tokens */
-class Tokens : public std::vector<std::string> {
- public:
- Tokens() {};
- // Default copy, assign, dtor are sufficient.
-
- /** Tokenize s, provides automatic conversion of string to Tokens */
- Tokens(const std::string& s) { operator=(s); }
- /** Tokenizing assignment operator s */
- Tokens & operator=(const std::string& s);
-
- private:
- size_t hash;
-};
-
-
-/**
- * Tokens that have been normalized as a pattern and can be matched
- * with topic Tokens. Normalized meands all sequences of mixed * and
- * # are reduced to a series of * followed by at most one #.
- */
-class TopicPattern : public Tokens
-{
- public:
- TopicPattern() {}
- // Default copy, assign, dtor are sufficient.
- TopicPattern(const Tokens& tokens) { operator=(tokens); }
- TopicPattern(const std::string& str) { operator=(str); }
- TopicPattern& operator=(const Tokens&);
- TopicPattern& operator=(const std::string& str) { return operator=(Tokens(str)); }
-
- /** Match a topic */
- bool match(const std::string& topic) { return match(Tokens(topic)); }
- bool match(const Tokens& topic) const;
-
- private:
- void normalize();
-};
-
-class TopicExchange : public virtual Exchange{
- typedef std::map<TopicPattern, Queue::vector> BindingMap;
- BindingMap bindings;
- qpid::sys::Mutex lock;
-
- public:
- static const std::string typeName;
-
- TopicExchange(const string& name);
-
- virtual std::string getType(){ return typeName; }
-
- virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
-
- virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
-
- virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args);
-
- virtual ~TopicExchange();
-};
-
-
-
-}
-}
-
-#endif
diff --git a/qpid/cpp/lib/broker/TransactionalStore.h b/qpid/cpp/lib/broker/TransactionalStore.h
deleted file mode 100644
index 9347edf0ad..0000000000
--- a/qpid/cpp/lib/broker/TransactionalStore.h
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _TransactionalStore_
-#define _TransactionalStore_
-
-#include <memory>
-#include <string>
-
-namespace qpid {
-namespace broker {
-
-struct InvalidTransactionContextException : public std::exception {};
-
-class TransactionContext {
-public:
- virtual ~TransactionContext(){}
-};
-
-class TPCTransactionContext : public TransactionContext {
-public:
- virtual ~TPCTransactionContext(){}
-};
-
-class TransactionalStore {
-public:
- virtual std::auto_ptr<TransactionContext> begin() = 0;
- virtual std::auto_ptr<TPCTransactionContext> begin(const std::string& xid) = 0;
- virtual void prepare(TPCTransactionContext& txn) = 0;
- virtual void commit(TransactionContext& txn) = 0;
- virtual void abort(TransactionContext& txn) = 0;
-
- virtual ~TransactionalStore(){}
-};
-
-}
-}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/TxAck.cpp b/qpid/cpp/lib/broker/TxAck.cpp
deleted file mode 100644
index a2f3283f91..0000000000
--- a/qpid/cpp/lib/broker/TxAck.cpp
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <TxAck.h>
-
-using std::bind1st;
-using std::bind2nd;
-using std::mem_fun_ref;
-using namespace qpid::broker;
-
-TxAck::TxAck(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked) :
- acked(_acked), unacked(_unacked){
-
-}
-
-bool TxAck::prepare(TransactionContext* ctxt) throw(){
- try{
- //dequeue all acked messages from their queues
- for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) {
- if (i->coveredBy(&acked)) {
- i->discard(ctxt);
- }
- }
- return true;
- }catch(...){
- std::cout << "TxAck::prepare() - Failed to prepare" << std::endl;
- return false;
- }
-}
-
-void TxAck::commit() throw(){
- //remove all acked records from the list
- unacked.remove_if(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked));
-}
-
-void TxAck::rollback() throw(){
-}
diff --git a/qpid/cpp/lib/broker/TxAck.h b/qpid/cpp/lib/broker/TxAck.h
deleted file mode 100644
index d023cfae0d..0000000000
--- a/qpid/cpp/lib/broker/TxAck.h
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _TxAck_
-#define _TxAck_
-
-#include <algorithm>
-#include <functional>
-#include <list>
-#include <AccumulatedAck.h>
-#include <DeliveryRecord.h>
-#include <TxOp.h>
-
-namespace qpid {
- namespace broker {
- /**
- * Defines the transactional behaviour for acks received by a
- * transactional channel.
- */
- class TxAck : public TxOp{
- AccumulatedAck& acked;
- std::list<DeliveryRecord>& unacked;
-
- public:
- /**
- * @param acked a representation of the accumulation of
- * acks received
- * @param unacked the record of delivered messages
- */
- TxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
- virtual bool prepare(TransactionContext* ctxt) throw();
- virtual void commit() throw();
- virtual void rollback() throw();
- virtual ~TxAck(){}
- };
- }
-}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/TxBuffer.cpp b/qpid/cpp/lib/broker/TxBuffer.cpp
deleted file mode 100644
index e5701c3d46..0000000000
--- a/qpid/cpp/lib/broker/TxBuffer.cpp
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <TxBuffer.h>
-
-using std::mem_fun;
-using namespace qpid::broker;
-
-bool TxBuffer::prepare(TransactionalStore* const store)
-{
- std::auto_ptr<TransactionContext> ctxt;
- if(store) ctxt = store->begin();
- for(op_iterator i = ops.begin(); i < ops.end(); i++){
- if(!(*i)->prepare(ctxt.get())){
- if(store) store->abort(*ctxt);
- return false;
- }
- }
- if(store) store->commit(*ctxt);
- return true;
-}
-
-void TxBuffer::commit()
-{
- for_each(ops.begin(), ops.end(), mem_fun(&TxOp::commit));
- ops.clear();
-}
-
-void TxBuffer::rollback()
-{
- for_each(ops.begin(), ops.end(), mem_fun(&TxOp::rollback));
- ops.clear();
-}
-
-void TxBuffer::enlist(TxOp* const op)
-{
- ops.push_back(op);
-}
diff --git a/qpid/cpp/lib/broker/TxBuffer.h b/qpid/cpp/lib/broker/TxBuffer.h
deleted file mode 100644
index 2d9a2a3679..0000000000
--- a/qpid/cpp/lib/broker/TxBuffer.h
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _TxBuffer_
-#define _TxBuffer_
-
-#include <algorithm>
-#include <functional>
-#include <vector>
-#include <TransactionalStore.h>
-#include <TxOp.h>
-
-/**
- * Represents a single transaction. As such, an instance of this class
- * will hold a list of operations representing the workload of the
- * transaction. This work can be committed or rolled back. Committing
- * is a two-stage process: first all the operations should be
- * prepared, then if that succeeds they can be committed.
- *
- * In the 2pc case, a successful prepare may be followed by either a
- * commit or a rollback.
- *
- * Atomicity of prepare is ensured by using a lower level
- * transactional facility. This saves explicitly rolling back all the
- * successfully prepared ops when one of them fails. i.e. we do not
- * use 2pc internally, we instead ensure that prepare is atomic at a
- * lower level. This makes individual prepare operations easier to
- * code.
- *
- * Transactions on a messaging broker effect three types of 'action':
- * (1) updates to persistent storage (2) updates to transient storage
- * or cached data (3) network writes.
- *
- * Of these, (1) should always occur atomically during prepare to
- * ensure that if the broker crashes while a transaction is being
- * completed the persistent state (which is all that then remains) is
- * consistent. (3) can only be done on commit, after a successful
- * prepare. There is a little more flexibility with (2) but any
- * changes made during prepare should be subject to the control of the
- * TransactionalStore in use.
- */
-namespace qpid {
- namespace broker {
- class TxBuffer{
- typedef std::vector<TxOp*>::iterator op_iterator;
- std::vector<TxOp*> ops;
- public:
- /**
- * Requests that all ops are prepared. This should
- * primarily involve making sure that a persistent record
- * of the operations is stored where necessary.
- *
- * All ops will be prepared under a transaction on the
- * specified store. If any operation fails on prepare,
- * this transaction will be rolled back.
- *
- * Once prepared, a transaction can be committed (or in
- * the 2pc case, rolled back).
- *
- * @returns true if all the operations prepared
- * successfully, false if not.
- */
- bool prepare(TransactionalStore* const store);
- /**
- * Signals that the ops all prepared all completed
- * successfully and can now commit, i.e. the operation can
- * now be fully carried out.
- *
- * Should only be called after a call to prepare() returns
- * true.
- */
- void commit();
- /**
- * Rolls back all the operations.
- *
- * Should only be called either after a call to prepare()
- * returns true (2pc) or instead of a prepare call
- * ('server-local')
- */
- void rollback();
- /**
- * Adds an operation to the transaction.
- */
- void enlist(TxOp* const op);
- };
- }
-}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/TxOp.h b/qpid/cpp/lib/broker/TxOp.h
deleted file mode 100644
index abba84a8e8..0000000000
--- a/qpid/cpp/lib/broker/TxOp.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _TxOp_
-#define _TxOp_
-
-#include <TransactionalStore.h>
-
-namespace qpid {
- namespace broker {
- class TxOp{
- public:
- virtual bool prepare(TransactionContext*) throw() = 0;
- virtual void commit() throw() = 0;
- virtual void rollback() throw() = 0;
- virtual ~TxOp(){}
- };
- }
-}
-
-
-#endif
diff --git a/qpid/cpp/lib/broker/TxPublish.cpp b/qpid/cpp/lib/broker/TxPublish.cpp
deleted file mode 100644
index 57993782d0..0000000000
--- a/qpid/cpp/lib/broker/TxPublish.cpp
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <TxPublish.h>
-
-using namespace qpid::broker;
-
-TxPublish::TxPublish(Message::shared_ptr _msg) : msg(_msg) {}
-
-bool TxPublish::prepare(TransactionContext* ctxt) throw(){
- try{
- for_each(queues.begin(), queues.end(), Prepare(ctxt, msg));
- return true;
- }catch(...){
- std::cout << "TxPublish::prepare() - Failed to prepare" << std::endl;
- return false;
- }
-}
-
-void TxPublish::commit() throw(){
- for_each(queues.begin(), queues.end(), Commit(msg));
-}
-
-void TxPublish::rollback() throw(){
-}
-
-void TxPublish::deliverTo(Queue::shared_ptr& queue){
- queues.push_back(queue);
-}
-
-TxPublish::Prepare::Prepare(TransactionContext* _ctxt, Message::shared_ptr& _msg)
- : ctxt(_ctxt), msg(_msg){}
-
-void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){
- queue->enqueue(ctxt, msg);
-}
-
-TxPublish::Commit::Commit(Message::shared_ptr& _msg) : msg(_msg){}
-
-void TxPublish::Commit::operator()(Queue::shared_ptr& queue){
- queue->process(msg);
-}
-
diff --git a/qpid/cpp/lib/broker/TxPublish.h b/qpid/cpp/lib/broker/TxPublish.h
deleted file mode 100644
index 0c7596086a..0000000000
--- a/qpid/cpp/lib/broker/TxPublish.h
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _TxPublish_
-#define _TxPublish_
-
-#include <algorithm>
-#include <functional>
-#include <list>
-#include <Deliverable.h>
-#include <BrokerMessage.h>
-#include <MessageStore.h>
-#include <BrokerQueue.h>
-#include <TxOp.h>
-
-namespace qpid {
- namespace broker {
- /**
- * Defines the behaviour for publish operations on a
- * transactional channel. Messages are routed through
- * exchanges when received but are not at that stage delivered
- * to the matching queues, rather the queues are held in an
- * instance of this class. On prepare() the message is marked
- * enqueued to the relevant queues in the MessagesStore. On
- * commit() the messages will be passed to the queue for
- * dispatch or to be added to the in-memory queue.
- */
- class TxPublish : public TxOp, public Deliverable{
- class Prepare{
- TransactionContext* ctxt;
- Message::shared_ptr& msg;
- public:
- Prepare(TransactionContext* ctxt, Message::shared_ptr& msg);
- void operator()(Queue::shared_ptr& queue);
- };
-
- class Commit{
- Message::shared_ptr& msg;
- public:
- Commit(Message::shared_ptr& msg);
- void operator()(Queue::shared_ptr& queue);
- };
-
- Message::shared_ptr msg;
- std::list<Queue::shared_ptr> queues;
-
- public:
- TxPublish(Message::shared_ptr msg);
- virtual bool prepare(TransactionContext* ctxt) throw();
- virtual void commit() throw();
- virtual void rollback() throw();
-
- virtual void deliverTo(Queue::shared_ptr& queue);
-
- virtual ~TxPublish(){}
- };
- }
-}
-
-
-#endif