diff options
| author | Alan Conway <aconway@apache.org> | 2007-03-21 02:08:18 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-03-21 02:08:18 +0000 |
| commit | d2eb3361494710466280341c98f76c03536d2ebe (patch) | |
| tree | f16ec2eacd8383e388657e54a22fc0214a0ce023 /qpid/cpp-0-9/lib/broker | |
| parent | 732544fe86089ab86c03fcc48d5ca4c72667c275 (diff) | |
| download | qpid-python-d2eb3361494710466280341c98f76c03536d2ebe.tar.gz | |
Renamed cpp-0-9 to cpp
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@520706 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp-0-9/lib/broker')
84 files changed, 0 insertions, 7796 deletions
diff --git a/qpid/cpp-0-9/lib/broker/AccumulatedAck.cpp b/qpid/cpp-0-9/lib/broker/AccumulatedAck.cpp deleted file mode 100644 index ff471b0287..0000000000 --- a/qpid/cpp-0-9/lib/broker/AccumulatedAck.cpp +++ /dev/null @@ -1,57 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "AccumulatedAck.h" - -#include <assert.h> - -using std::less_equal; -using std::bind2nd; -using namespace qpid::broker; - -void AccumulatedAck::update(uint64_t firstTag, uint64_t lastTag){ - assert(firstTag<=lastTag); - if (firstTag <= range + 1) { - if (lastTag > range) range = lastTag; - } else { - for (uint64_t tag = firstTag; tag<=lastTag; tag++) - individual.push_back(tag); - } -} - -void AccumulatedAck::consolidate(){ - individual.sort(); - //remove any individual tags that are covered by range - individual.remove_if(bind2nd(less_equal<uint64_t>(), range)); - //update range if possible (using <= allows for duplicates from overlapping ranges) - while (individual.front() <= range + 1) { - range = individual.front(); - individual.pop_front(); - } -} - -void AccumulatedAck::clear(){ - range = 0; - individual.clear(); -} - -bool AccumulatedAck::covers(uint64_t tag) const{ - return tag <= range || find(individual.begin(), individual.end(), tag) != individual.end(); -} diff --git a/qpid/cpp-0-9/lib/broker/AccumulatedAck.h b/qpid/cpp-0-9/lib/broker/AccumulatedAck.h deleted file mode 100644 index c4a6e3b79b..0000000000 --- a/qpid/cpp-0-9/lib/broker/AccumulatedAck.h +++ /dev/null @@ -1,57 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _AccumulatedAck_ -#define _AccumulatedAck_ - -#include <algorithm> -#include <functional> -#include <list> - -namespace qpid { - namespace broker { - /** - * Keeps an accumulated record of acked messages (by delivery - * tag). - */ - class AccumulatedAck { - public: - /** - * If not zero, then everything up to this value has been - * acked. - */ - uint64_t range; - /** - * List of individually acked messages that are not - * included in the range marked by 'range'. - */ - std::list<uint64_t> individual; - - AccumulatedAck(uint64_t r) : range(r) {} - void update(uint64_t firstTag, uint64_t lastTag); - void consolidate(); - void clear(); - bool covers(uint64_t tag) const; - }; - } -} - - -#endif diff --git a/qpid/cpp-0-9/lib/broker/AutoDelete.cpp b/qpid/cpp-0-9/lib/broker/AutoDelete.cpp deleted file mode 100644 index 2037a9c71c..0000000000 --- a/qpid/cpp-0-9/lib/broker/AutoDelete.cpp +++ /dev/null @@ -1,86 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <AutoDelete.h> -#include <sys/Time.h> - -using namespace qpid::broker; -using namespace qpid::sys; - -AutoDelete::AutoDelete(QueueRegistry* const _registry, uint32_t _period) - : registry(_registry), period(_period), stopped(true) { } - -void AutoDelete::add(Queue::shared_ptr const queue){ - Mutex::ScopedLock l(lock); - queues.push(queue); -} - -Queue::shared_ptr const AutoDelete::pop(){ - Queue::shared_ptr next; - Mutex::ScopedLock l(lock); - if(!queues.empty()){ - next = queues.front(); - queues.pop(); - } - return next; -} - -void AutoDelete::process(){ - Queue::shared_ptr seen; - for(Queue::shared_ptr q = pop(); q; q = pop()){ - if(seen == q){ - add(q); - break; - }else if(q->canAutoDelete()){ - std::string name(q->getName()); - registry->destroy(name); - std::cout << "INFO: Auto-deleted queue named " << name << std::endl; - }else{ - add(q); - if(!seen) seen = q; - } - } -} - -void AutoDelete::run(){ - Monitor::ScopedLock l(monitor); - while(!stopped){ - process(); - monitor.wait(period*TIME_MSEC); - } -} - -void AutoDelete::start(){ - Monitor::ScopedLock l(monitor); - if(stopped){ - stopped = false; - runner = Thread(this); - } -} - -void AutoDelete::stop(){ - { - Monitor::ScopedLock l(monitor); - if(stopped) return; - stopped = true; - } - monitor.notify(); - runner.join(); -} diff --git a/qpid/cpp-0-9/lib/broker/AutoDelete.h b/qpid/cpp-0-9/lib/broker/AutoDelete.h deleted file mode 100644 index 9034de1730..0000000000 --- a/qpid/cpp-0-9/lib/broker/AutoDelete.h +++ /dev/null @@ -1,55 +0,0 @@ -#ifndef _AutoDelete_ -#define _AutoDelete_ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <iostream> -#include <queue> -#include <sys/Monitor.h> -#include <BrokerQueue.h> -#include <QueueRegistry.h> -#include <sys/Thread.h> - -namespace qpid { - namespace broker{ - class AutoDelete : private qpid::sys::Runnable { - qpid::sys::Mutex lock; - qpid::sys::Monitor monitor; - std::queue<Queue::shared_ptr> queues; - QueueRegistry* const registry; - uint32_t period; - volatile bool stopped; - qpid::sys::Thread runner; - - Queue::shared_ptr const pop(); - void process(); - virtual void run(); - - public: - AutoDelete(QueueRegistry* const registry, uint32_t period); - void add(Queue::shared_ptr const); - void start(); - void stop(); - }; - } -} - - -#endif diff --git a/qpid/cpp-0-9/lib/broker/Binding.h b/qpid/cpp-0-9/lib/broker/Binding.h deleted file mode 100644 index 16ca223208..0000000000 --- a/qpid/cpp-0-9/lib/broker/Binding.h +++ /dev/null @@ -1,38 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _Binding_ -#define _Binding_ - -#include <FieldTable.h> - -namespace qpid { - namespace broker { - class Binding{ - public: - virtual void cancel() = 0; - virtual ~Binding(){} - }; - } -} - - -#endif - diff --git a/qpid/cpp-0-9/lib/broker/Broker.cpp b/qpid/cpp-0-9/lib/broker/Broker.cpp deleted file mode 100644 index f650452e33..0000000000 --- a/qpid/cpp-0-9/lib/broker/Broker.cpp +++ /dev/null @@ -1,118 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <iostream> -#include <memory> - -#include "AMQFrame.h" -#include "DirectExchange.h" -#include "TopicExchange.h" -#include "FanOutExchange.h" -#include "HeadersExchange.h" -#include "MessageStoreModule.h" -#include "NullMessageStore.h" -#include "ProtocolInitiation.h" -#include "Connection.h" -#include "sys/ConnectionInputHandler.h" -#include "sys/ConnectionInputHandlerFactory.h" -#include "sys/TimeoutHandler.h" - -#include "Broker.h" - -namespace qpid { -namespace broker { - -const std::string empty; -const std::string amq_direct("amq.direct"); -const std::string amq_topic("amq.topic"); -const std::string amq_fanout("amq.fanout"); -const std::string amq_match("amq.match"); - -Broker::Broker(const Configuration& conf) : - config(conf), - queues(store.get()), - timeout(30000), - stagingThreshold(0), - cleaner(&queues, timeout/10), - factory(*this) -{ - if (config.getStore().empty()) - store.reset(new NullMessageStore(config.isTrace())); - else - store.reset(new MessageStoreModule(config.getStore())); - - exchanges.declare(empty, DirectExchange::typeName); // Default exchange. - exchanges.declare(amq_direct, DirectExchange::typeName); - exchanges.declare(amq_topic, TopicExchange::typeName); - exchanges.declare(amq_fanout, FanOutExchange::typeName); - exchanges.declare(amq_match, HeadersExchange::typeName); - - if(store.get()) { - RecoveryManager recoverer(queues, exchanges); - MessageStoreSettings storeSettings = { getStagingThreshold() }; - store->recover(recoverer, &storeSettings); - } - - cleaner.start(); -} - - -Broker::shared_ptr Broker::create(int16_t port) -{ - Configuration config; - config.setPort(port); - return create(config); -} - -Broker::shared_ptr Broker::create(const Configuration& config) { - return Broker::shared_ptr(new Broker(config)); -} - -void Broker::run() { - getAcceptor().run(&factory); -} - -void Broker::shutdown() { - if (acceptor) - acceptor->shutdown(); -} - -Broker::~Broker() { - shutdown(); -} - -int16_t Broker::getPort() const { return getAcceptor().getPort(); } - -Acceptor& Broker::getAcceptor() const { - if (!acceptor) - const_cast<Acceptor::shared_ptr&>(acceptor) = - Acceptor::create(config.getPort(), - config.getConnectionBacklog(), - config.getWorkerThreads(), - config.isTrace()); - return *acceptor; -} - - -const int16_t Broker::DEFAULT_PORT(5672); - - -}} // namespace qpid::broker - diff --git a/qpid/cpp-0-9/lib/broker/Broker.h b/qpid/cpp-0-9/lib/broker/Broker.h deleted file mode 100644 index 7c21e90b18..0000000000 --- a/qpid/cpp-0-9/lib/broker/Broker.h +++ /dev/null @@ -1,106 +0,0 @@ -#ifndef _Broker_ -#define _Broker_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <Configuration.h> -#include <ConnectionFactory.h> -#include <sys/Runnable.h> -#include <sys/Acceptor.h> -#include <SharedObject.h> -#include <MessageStore.h> -#include <AutoDelete.h> -#include <ExchangeRegistry.h> -#include <ConnectionToken.h> -#include <DirectExchange.h> -#include <OutputHandler.h> -#include <ProtocolInitiation.h> -#include <QueueRegistry.h> - -namespace qpid { -namespace broker { -/** - * A broker instance. - */ -class Broker : public sys::Runnable, - public SharedObject<Broker> -{ - public: - static const int16_t DEFAULT_PORT; - - virtual ~Broker(); - - /** - * Create a broker. - * @param port Port to listen on or 0 to pick a port dynamically. - */ - static shared_ptr create(int16_t port = DEFAULT_PORT); - - /** - * Create a broker using a Configuration. - */ - static shared_ptr create(const Configuration& config); - - /** - * Return listening port. If called before bind this is - * the configured port. If called after it is the actual - * port, which will be different if the configured port is - * 0. - */ - virtual int16_t getPort() const; - - /** - * Run the broker. Implements Runnable::run() so the broker - * can be run in a separate thread. - */ - virtual void run(); - - /** Shut down the broker */ - virtual void shutdown(); - - MessageStore& getStore() { return *store; } - QueueRegistry& getQueues() { return queues; } - ExchangeRegistry& getExchanges() { return exchanges; } - uint32_t getTimeout() { return timeout; } - uint64_t getStagingThreshold() { return stagingThreshold; } - AutoDelete& getCleaner() { return cleaner; } - - private: - Broker(const Configuration& config); - sys::Acceptor& getAcceptor() const; - - Configuration config; - sys::Acceptor::shared_ptr acceptor; - std::auto_ptr<MessageStore> store; - QueueRegistry queues; - ExchangeRegistry exchanges; - uint32_t timeout; - uint64_t stagingThreshold; - AutoDelete cleaner; - ConnectionFactory factory; -}; - -}} - - - -#endif /*!_Broker_*/ diff --git a/qpid/cpp-0-9/lib/broker/BrokerAdapter.cpp b/qpid/cpp-0-9/lib/broker/BrokerAdapter.cpp deleted file mode 100644 index 981801c40e..0000000000 --- a/qpid/cpp-0-9/lib/broker/BrokerAdapter.cpp +++ /dev/null @@ -1,388 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <boost/format.hpp> - -#include "BrokerAdapter.h" -#include "BrokerChannel.h" -#include "Connection.h" -#include "AMQMethodBody.h" -#include "Exception.h" - -namespace qpid { -namespace broker { - -using boost::format; -using namespace qpid; -using namespace qpid::framing; - -typedef std::vector<Queue::shared_ptr> QueueVector; - - -BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) : - CoreRefs(ch, c, b), - connection(c), - basicHandler(*this), - channelHandler(*this), - connectionHandler(*this), - exchangeHandler(*this), - messageHandler(*this), - queueHandler(*this), - txHandler(*this) -{} - - -ProtocolVersion BrokerAdapter::getVersion() const { - return connection.getVersion(); -} - -void BrokerAdapter::ConnectionHandlerImpl::startOk( - const MethodContext&, const FieldTable& /*clientProperties*/, - const string& /*mechanism*/, - const string& /*response*/, const string& /*locale*/) -{ - client.tune( - 100, connection.getFrameMax(), connection.getHeartbeat()); -} - -void BrokerAdapter::ConnectionHandlerImpl::secureOk( - const MethodContext&, const string& /*response*/){} - -void BrokerAdapter::ConnectionHandlerImpl::tuneOk( - const MethodContext&, uint16_t /*channelmax*/, - uint32_t framemax, uint16_t heartbeat) -{ - connection.setFrameMax(framemax); - connection.setHeartbeat(heartbeat); -} - -void BrokerAdapter::ConnectionHandlerImpl::open( - const MethodContext& context, const string& /*virtualHost*/, - const string& /*capabilities*/, bool /*insist*/) -{ - string knownhosts; - client.openOk( - knownhosts, context.getRequestId()); -} - -void BrokerAdapter::ConnectionHandlerImpl::close( - const MethodContext& context, uint16_t /*replyCode*/, const string& /*replyText*/, - uint16_t /*classId*/, uint16_t /*methodId*/) -{ - client.closeOk(context.getRequestId()); - connection.getOutput().close(); -} - -void BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){ - connection.getOutput().close(); -} - -void BrokerAdapter::ChannelHandlerImpl::open( - const MethodContext& context, const string& /*outOfBand*/){ - channel.open(); - // FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9 - client.openOk( - std::string()/* ID */, context.getRequestId()); -} - -void BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){} -void BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){} - -void BrokerAdapter::ChannelHandlerImpl::close( - const MethodContext& context, uint16_t /*replyCode*/, - const string& /*replyText*/, - uint16_t /*classId*/, uint16_t /*methodId*/) -{ - client.closeOk(context.getRequestId()); - // FIXME aconway 2007-01-18: Following line will "delete this". Ugly. - connection.closeChannel(channel.getId()); -} - -void BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){} - - - -void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& exchange, const string& type, - bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, - const FieldTable& /*arguments*/){ - - if(passive){ - if(!broker.getExchanges().get(exchange)) { - throw ChannelException(404, "Exchange not found: " + exchange); - } - }else{ - try{ - std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type); - if(!response.second && response.first->getType() != type){ - throw ConnectionException( - 530, - "Exchange already declared to be of type " - + response.first->getType() + ", requested " + type); - } - }catch(UnknownExchangeTypeException& e){ - throw ConnectionException( - 503, "Exchange type not implemented: " + type); - } - } - if(!nowait){ - client.declareOk(context.getRequestId()); - } -} - -void BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, uint16_t /*ticket*/, - const string& exchange, bool /*ifUnused*/, bool nowait){ - - //TODO: implement unused - broker.getExchanges().destroy(exchange); - if(!nowait) client.deleteOk(context.getRequestId()); -} - -void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, uint16_t /*ticket*/, const string& name, - bool passive, bool durable, bool exclusive, - bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ - Queue::shared_ptr queue; - if (passive && !name.empty()) { - queue = connection.getQueue(name, channel.getId()); - } else { - std::pair<Queue::shared_ptr, bool> queue_created = - broker.getQueues().declare( - name, durable, - autoDelete ? connection.getTimeout() : 0, - exclusive ? &connection : 0); - queue = queue_created.first; - assert(queue); - if (queue_created.second) { // This is a new queue - channel.setDefaultQueue(queue); - - //apply settings & create persistent record if required - queue_created.first->create(arguments); - - //add default binding: - broker.getExchanges().getDefault()->bind(queue, name, 0); - if (exclusive) { - connection.exclusiveQueues.push_back(queue); - } else if(autoDelete){ - broker.getCleaner().add(queue); - } - } - } - if (exclusive && !queue->isExclusiveOwner(&connection)) - throw ChannelException( - 405, - format("Cannot grant exclusive access to queue '%s'") - % queue->getName()); - if (!nowait) { - string queueName = queue->getName(); - client.declareOk( - queueName, queue->getMessageCount(), queue->getConsumerCount(), - context.getRequestId()); - } -} - -void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, - const string& exchangeName, const string& routingKey, bool nowait, - const FieldTable& arguments){ - - Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); - Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName); - if(exchange){ - string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; - exchange->bind(queue, exchangeRoutingKey, &arguments); - if(!nowait) client.bindOk(context.getRequestId()); - }else{ - throw ChannelException( - 404, "Bind failed. No such exchange: " + exchangeName); - } -} - -void -BrokerAdapter::QueueHandlerImpl::unbind( - const MethodContext& context, - uint16_t /*ticket*/, - const string& queueName, - const string& exchangeName, - const string& routingKey, - const qpid::framing::FieldTable& arguments ) -{ - Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); - if (!queue.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName); - - Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName); - if (!exchange.get()) throw ChannelException(404, "Unbind failed. No such exchange: " + exchangeName); - - exchange->unbind(queue, routingKey, &arguments); - - client.unbindOk(context.getRequestId()); -} - -void BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool nowait){ - - Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); - int count = queue->purge(); - if(!nowait) client.purgeOk( count, context.getRequestId()); -} - -void BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, uint16_t /*ticket*/, const string& queue, - bool ifUnused, bool ifEmpty, bool nowait){ - ChannelException error(0, ""); - int count(0); - Queue::shared_ptr q = connection.getQueue(queue, channel.getId()); - if(ifEmpty && q->getMessageCount() > 0){ - throw ChannelException(406, "Queue not empty."); - }else if(ifUnused && q->getConsumerCount() > 0){ - throw ChannelException(406, "Queue in use."); - }else{ - //remove the queue from the list of exclusive queues if necessary - if(q->isExclusiveOwner(&connection)){ - QueueVector::iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q); - if(i < connection.exclusiveQueues.end()) connection.exclusiveQueues.erase(i); - } - count = q->getMessageCount(); - q->destroy(); - broker.getQueues().destroy(queue); - } - - if(!nowait) - client.deleteOk(count, context.getRequestId()); -} - - - - -void BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){ - //TODO: handle global - channel.setPrefetchSize(prefetchSize); - channel.setPrefetchCount(prefetchCount); - client.qosOk(context.getRequestId()); -} - -void BrokerAdapter::BasicHandlerImpl::consume( - const MethodContext& context, uint16_t /*ticket*/, - const string& queueName, const string& consumerTag, - bool noLocal, bool noAck, bool exclusive, - bool nowait, const FieldTable& fields) -{ - - Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); - if(!consumerTag.empty() && channel.exists(consumerTag)){ - throw ConnectionException(530, "Consumer tags must be unique"); - } - - string newTag = consumerTag; - channel.consume( - newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields); - - if(!nowait) client.consumeOk(newTag, context.getRequestId()); - - //allow messages to be dispatched if required as there is now a consumer: - queue->dispatch(); -} - -void BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){ - channel.cancel(consumerTag); - - if(!nowait) client.cancelOk(consumerTag, context.getRequestId()); -} - -void BrokerAdapter::BasicHandlerImpl::publish( - const MethodContext& context, uint16_t /*ticket*/, - const string& exchangeName, const string& routingKey, - bool mandatory, bool immediate) -{ - - Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); - if(exchange){ - BasicMessage* msg = new BasicMessage( - &connection, exchangeName, routingKey, mandatory, immediate, - context.methodBody); - channel.handlePublish(msg); - }else{ - throw ChannelException( - 404, "Exchange not found '" + exchangeName + "'"); - } -} - -void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, uint16_t /*ticket*/, const string& queueName, bool noAck){ - Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); - if(!connection.getChannel(channel.getId()).get(queue, "", !noAck)){ - string clusterId;//not used, part of an imatix hack - - client.getEmpty(clusterId, context.getRequestId()); - } -} - -void BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, uint64_t deliveryTag, bool multiple){ - channel.ack(deliveryTag, multiple); -} - -void BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, uint64_t /*deliveryTag*/, bool /*requeue*/){} - -void BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){ - channel.recover(requeue); -} - -void BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){ - channel.begin(); - client.selectOk(context.getRequestId()); -} - -void BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){ - channel.commit(); - client.commitOk(context.getRequestId()); -} - -void BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){ - - channel.rollback(); - client.rollbackOk(context.getRequestId()); - channel.recover(false); -} - -void -BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& ) -{ - //no specific action required, generic response handling should be sufficient -} - - -// -// Message class method handlers -// -void -BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context) -{ - client.ok(context.getRequestId()); - client.pong(); -} - - -void -BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context) -{ - client.ok(context.getRequestId()); -} - -void -BrokerAdapter::ChannelHandlerImpl::resume( - const MethodContext&, - const string& /*channel*/ ) -{ - assert(0); // FIXME aconway 2007-01-04: 0-9 feature -} - -}} // namespace qpid::broker - diff --git a/qpid/cpp-0-9/lib/broker/BrokerAdapter.h b/qpid/cpp-0-9/lib/broker/BrokerAdapter.h deleted file mode 100644 index 2fafbcc180..0000000000 --- a/qpid/cpp-0-9/lib/broker/BrokerAdapter.h +++ /dev/null @@ -1,222 +0,0 @@ -#ifndef _broker_BrokerAdapter_h -#define _broker_BrokerAdapter_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "AMQP_ServerOperations.h" -#include "HandlerImpl.h" -#include "MessageHandlerImpl.h" -#include "Exception.h" - -namespace qpid { -namespace broker { - -class Channel; -class Connection; -class Broker; -class ChannelHandler; -class ConnectionHandler; -class BasicHandler; -class ExchangeHandler; -class QueueHandler; -class TxHandler; -class MessageHandler; -class AccessHandler; -class FileHandler; -class StreamHandler; -class DtxHandler; -class TunnelHandler; -class MessageHandlerImpl; - -/** - * Per-channel protocol adapter. - * - * A container for a collection of AMQP-class adapters that translate - * AMQP method bodies into calls on the core Channel, Connection and - * Broker objects. Each adapter class also provides a client proxy - * to send methods to the peer. - * - */ -class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations -{ - public: - BrokerAdapter(Channel& ch, Connection& c, Broker& b); - - framing::ProtocolVersion getVersion() const; - ChannelHandler* getChannelHandler() { return &channelHandler; } - ConnectionHandler* getConnectionHandler() { return &connectionHandler; } - BasicHandler* getBasicHandler() { return &basicHandler; } - ExchangeHandler* getExchangeHandler() { return &exchangeHandler; } - QueueHandler* getQueueHandler() { return &queueHandler; } - TxHandler* getTxHandler() { return &txHandler; } - MessageHandler* getMessageHandler() { return &messageHandler; } - AccessHandler* getAccessHandler() { - throw ConnectionException(540, "Access class not implemented"); } - FileHandler* getFileHandler() { - throw ConnectionException(540, "File class not implemented"); } - StreamHandler* getStreamHandler() { - throw ConnectionException(540, "Stream class not implemented"); } - DtxHandler* getDtxHandler() { - throw ConnectionException(540, "Dtx class not implemented"); } - TunnelHandler* getTunnelHandler() { - throw ConnectionException(540, "Tunnel class not implemented"); } - - framing::AMQP_ClientProxy& getProxy() { return proxy; } - - private: - - class ConnectionHandlerImpl : - public ConnectionHandler, - public HandlerImpl<framing::AMQP_ClientProxy::Connection> - { - public: - ConnectionHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} - - void startOk(const framing::MethodContext& context, - const qpid::framing::FieldTable& clientProperties, - const std::string& mechanism, const std::string& response, - const std::string& locale); - void secureOk(const framing::MethodContext& context, - const std::string& response); - void tuneOk(const framing::MethodContext& context, - uint16_t channelMax, - uint32_t frameMax, uint16_t heartbeat); - void open(const framing::MethodContext& context, - const std::string& virtualHost, - const std::string& capabilities, bool insist); - void close(const framing::MethodContext& context, uint16_t replyCode, - const std::string& replyText, - uint16_t classId, uint16_t methodId); - void closeOk(const framing::MethodContext& context); - }; - - class ChannelHandlerImpl : - public ChannelHandler, - public HandlerImpl<framing::AMQP_ClientProxy::Channel> - { - public: - ChannelHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} - - void open(const framing::MethodContext& context, const std::string& outOfBand); - void flow(const framing::MethodContext& context, bool active); - void flowOk(const framing::MethodContext& context, bool active); - void ok( const framing::MethodContext& context ); - void ping( const framing::MethodContext& context ); - void pong( const framing::MethodContext& context ); - void resume( const framing::MethodContext& context, const std::string& channelId ); - void close(const framing::MethodContext& context, uint16_t replyCode, const - std::string& replyText, uint16_t classId, uint16_t methodId); - void closeOk(const framing::MethodContext& context); - }; - - class ExchangeHandlerImpl : - public ExchangeHandler, - public HandlerImpl<framing::AMQP_ClientProxy::Exchange> - { - public: - ExchangeHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} - - void declare(const framing::MethodContext& context, uint16_t ticket, - const std::string& exchange, const std::string& type, - bool passive, bool durable, bool autoDelete, - bool internal, bool nowait, - const qpid::framing::FieldTable& arguments); - void delete_(const framing::MethodContext& context, uint16_t ticket, - const std::string& exchange, bool ifUnused, bool nowait); - }; - - class QueueHandlerImpl : - public QueueHandler, - public HandlerImpl<framing::AMQP_ClientProxy::Queue> - { - public: - QueueHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} - - void declare(const framing::MethodContext& context, uint16_t ticket, const std::string& queue, - bool passive, bool durable, bool exclusive, - bool autoDelete, bool nowait, - const qpid::framing::FieldTable& arguments); - void bind(const framing::MethodContext& context, uint16_t ticket, const std::string& queue, - const std::string& exchange, const std::string& routingKey, - bool nowait, const qpid::framing::FieldTable& arguments); - void unbind(const framing::MethodContext& context, - uint16_t ticket, - const std::string& queue, - const std::string& exchange, - const std::string& routingKey, - const qpid::framing::FieldTable& arguments ); - void purge(const framing::MethodContext& context, uint16_t ticket, const std::string& queue, - bool nowait); - void delete_(const framing::MethodContext& context, uint16_t ticket, const std::string& queue, - bool ifUnused, bool ifEmpty, - bool nowait); - }; - - class BasicHandlerImpl : - public BasicHandler, - public HandlerImpl<framing::AMQP_ClientProxy::Basic> - { - public: - BasicHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} - - void qos(const framing::MethodContext& context, uint32_t prefetchSize, - uint16_t prefetchCount, bool global); - void consume( - const framing::MethodContext& context, uint16_t ticket, const std::string& queue, - const std::string& consumerTag, bool noLocal, bool noAck, - bool exclusive, bool nowait, - const qpid::framing::FieldTable& fields); - void cancel(const framing::MethodContext& context, const std::string& consumerTag, - bool nowait); - void publish(const framing::MethodContext& context, uint16_t ticket, - const std::string& exchange, const std::string& routingKey, - bool mandatory, bool immediate); - void get(const framing::MethodContext& context, uint16_t ticket, const std::string& queue, - bool noAck); - void ack(const framing::MethodContext& context, uint64_t deliveryTag, bool multiple); - void reject(const framing::MethodContext& context, uint64_t deliveryTag, bool requeue); - void recover(const framing::MethodContext& context, bool requeue); - }; - - class TxHandlerImpl : - public TxHandler, - public HandlerImpl<framing::AMQP_ClientProxy::Tx> - { - public: - TxHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} - - void select(const framing::MethodContext& context); - void commit(const framing::MethodContext& context); - void rollback(const framing::MethodContext& context); - }; - - Connection& connection; - BasicHandlerImpl basicHandler; - ChannelHandlerImpl channelHandler; - ConnectionHandlerImpl connectionHandler; - ExchangeHandlerImpl exchangeHandler; - MessageHandlerImpl messageHandler; - QueueHandlerImpl queueHandler; - TxHandlerImpl txHandler; - -}; -}} // namespace qpid::broker - - - -#endif /*!_broker_BrokerAdapter_h*/ diff --git a/qpid/cpp-0-9/lib/broker/BrokerChannel.cpp b/qpid/cpp-0-9/lib/broker/BrokerChannel.cpp deleted file mode 100644 index 5673a2c42a..0000000000 --- a/qpid/cpp-0-9/lib/broker/BrokerChannel.cpp +++ /dev/null @@ -1,346 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <assert.h> - -#include <iostream> -#include <sstream> -#include <algorithm> -#include <functional> - -#include <boost/bind.hpp> - -#include "BrokerChannel.h" -#include "DeletingTxOp.h" -#include "framing/ChannelAdapter.h" -#include <QpidError.h> -#include <DeliverableMessage.h> -#include <BrokerQueue.h> -#include <BrokerMessage.h> -#include <MessageStore.h> -#include <TxAck.h> -#include <TxPublish.h> -#include "BrokerAdapter.h" -#include "Connection.h" - -using std::mem_fun_ref; -using std::bind2nd; -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::sys; - - -Channel::Channel( - Connection& con, ChannelId id, - uint32_t _framesize, MessageStore* const _store, - uint64_t _stagingThreshold -) : - ChannelAdapter(id, &con.getOutput(), con.getVersion()), - connection(con), - currentDeliveryTag(1), - transactional(false), - prefetchSize(0), - prefetchCount(0), - framesize(_framesize), - tagGenerator("sgen"), - accumulatedAck(0), - store(_store), - messageBuilder(this, _store, _stagingThreshold), - opened(id == 0),//channel 0 is automatically open, other must be explicitly opened - adapter(new BrokerAdapter(*this, con, con.broker)) -{ - outstanding.reset(); -} - -Channel::~Channel(){ - close(); -} - -bool Channel::exists(const string& consumerTag){ - return consumers.find(consumerTag) != consumers.end(); -} - -// TODO aconway 2007-02-12: Why is connection token passed in instead -// of using the channel's parent connection? -void Channel::consume(string& tagInOut, Queue::shared_ptr queue, bool acks, - bool exclusive, ConnectionToken* const connection, - const FieldTable*) -{ - if(tagInOut.empty()) - tagInOut = tagGenerator.generate(); - std::auto_ptr<ConsumerImpl> c( - new ConsumerImpl(this, tagInOut, queue, connection, acks)); - queue->consume(c.get(), exclusive);//may throw exception - consumers.insert(tagInOut, c.release()); -} - -void Channel::cancel(const string& tag){ - // consumers is a ptr_map so erase will delete the consumer - // which will call cancel. - ConsumerImplMap::iterator i = consumers.find(tag); - if (i != consumers.end()) - consumers.erase(i); -} - -void Channel::close(){ - opened = false; - consumers.clear(); - recover(true); -} - -void Channel::begin(){ - transactional = true; -} - -void Channel::commit(){ - TxAck txAck(accumulatedAck, unacked); - txBuffer.enlist(&txAck); - if(txBuffer.prepare(store)){ - txBuffer.commit(); - } - accumulatedAck.clear(); -} - -void Channel::rollback(){ - txBuffer.rollback(); - accumulatedAck.clear(); -} - -void Channel::deliver( - Message::shared_ptr& msg, const string& consumerTag, - Queue::shared_ptr& queue, bool ackExpected) -{ - Mutex::ScopedLock locker(deliveryLock); - - // Key the delivered messages to the id of the request in which they're sent - uint64_t deliveryTag = getNextSendRequestId(); - - if(ackExpected){ - unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag)); - outstanding.size += msg->contentSize(); - outstanding.count++; - } - //send deliver method, header and content(s) - msg->deliver(*this, consumerTag, deliveryTag, framesize); -} - -bool Channel::checkPrefetch(Message::shared_ptr& msg){ - Mutex::ScopedLock locker(deliveryLock); - bool countOk = !prefetchCount || prefetchCount > unacked.size(); - bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty(); - return countOk && sizeOk; -} - -Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag, - Queue::shared_ptr _queue, - ConnectionToken* const _connection, bool ack -) : parent(_parent), tag(_tag), queue(_queue), connection(_connection), - ackExpected(ack), blocked(false) {} - -bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ - if(!connection || connection != msg->getPublisher()){//check for no_local - if(ackExpected && !parent->checkPrefetch(msg)){ - blocked = true; - }else{ - blocked = false; - parent->deliver(msg, tag, queue, ackExpected); - return true; - } - } - return false; -} - -Channel::ConsumerImpl::~ConsumerImpl() { - cancel(); -} - -void Channel::ConsumerImpl::cancel(){ - if(queue) - queue->cancel(this); -} - -void Channel::ConsumerImpl::requestDispatch(){ - if(blocked) - queue->dispatch(); -} - -void Channel::handleInlineTransfer(Message::shared_ptr msg) -{ - Exchange::shared_ptr exchange = - connection.broker.getExchanges().get(msg->getExchange()); - if(transactional){ - TxPublish* deliverable = new TxPublish(msg); - exchange->route( - *deliverable, msg->getRoutingKey(), - &(msg->getApplicationHeaders())); - txBuffer.enlist(new DeletingTxOp(deliverable)); - }else{ - DeliverableMessage deliverable(msg); - exchange->route( - deliverable, msg->getRoutingKey(), - &(msg->getApplicationHeaders())); - } -} - -void Channel::handlePublish(Message* _message){ - Message::shared_ptr message(_message); - messageBuilder.initialise(message); -} - -void Channel::handleHeader(AMQHeaderBody::shared_ptr header){ - messageBuilder.setHeader(header); - //at this point, decide based on the size of the message whether we want - //to stage it by saving content directly to disk as it arrives -} - -void Channel::handleContent(AMQContentBody::shared_ptr content){ - messageBuilder.addContent(content); -} - -void Channel::handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) { - // TODO aconway 2007-01-17: Implement heartbeating. -} - -void Channel::complete(Message::shared_ptr msg) { - Exchange::shared_ptr exchange = - connection.broker.getExchanges().get(msg->getExchange()); - assert(exchange.get()); - if(transactional) { - std::auto_ptr<TxPublish> deliverable(new TxPublish(msg)); - exchange->route(*deliverable, msg->getRoutingKey(), - &(msg->getApplicationHeaders())); - txBuffer.enlist(new DeletingTxOp(deliverable.release())); - } else { - DeliverableMessage deliverable(msg); - exchange->route(deliverable, msg->getRoutingKey(), - &(msg->getApplicationHeaders())); - } -} - -void Channel::ack(){ - ack(getFirstAckRequest(), getLastAckRequest()); -} - -// Used by Basic -void Channel::ack(uint64_t deliveryTag, bool multiple){ - if (multiple) - ack(0, deliveryTag); - else - ack(deliveryTag, deliveryTag); -} - -void Channel::ack(uint64_t firstTag, uint64_t lastTag){ - if(transactional){ - accumulatedAck.update(firstTag, lastTag); - - //TODO: I think the outstanding prefetch size & count should be updated at this point... - //TODO: ...this may then necessitate dispatching to consumers - }else{ - Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery - - ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag)); - ack_iterator j = (firstTag == 0) ? - unacked.begin() : - find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag)); - - if(i == unacked.end()){ - throw ConnectionException(530, "Received ack for unrecognised delivery tag"); - }else if(i!=j){ - ack_iterator end = ++i; - for_each(j, end, mem_fun_ref(&DeliveryRecord::discard)); - unacked.erase(unacked.begin(), end); - - //recalculate the prefetch: - outstanding.reset(); - for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding)); - }else{ - i->discard(); - i->subtractFrom(&outstanding); - unacked.erase(i); - } - - //if the prefetch limit had previously been reached, there may - //be messages that can be now be delivered - std::for_each(consumers.begin(), consumers.end(), - boost::bind(&ConsumerImpl::requestDispatch, _1)); - } -} - -void Channel::recover(bool requeue){ - Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery - - if(requeue){ - outstanding.reset(); - std::list<DeliveryRecord> copy = unacked; - unacked.clear(); - for_each(copy.begin(), copy.end(), mem_fun_ref(&DeliveryRecord::requeue)); - }else{ - for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this)); - } -} - -bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackExpected){ - Message::shared_ptr msg = queue->dequeue(); - if(msg){ - Mutex::ScopedLock locker(deliveryLock); - uint64_t myDeliveryTag = getNextSendRequestId(); - msg->sendGetOk(MethodContext(this, msg->getRespondTo()), - destination, - queue->getMessageCount() + 1, myDeliveryTag, - framesize); - if(ackExpected){ - unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); - } - return true; - }else{ - return false; - } -} - -void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, - uint64_t deliveryTag) -{ - msg->deliver(*this, consumerTag, deliveryTag, framesize); -} - -void Channel::handleMethodInContext( - boost::shared_ptr<qpid::framing::AMQMethodBody> method, - const MethodContext& context -) -{ - try{ - if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) { - std::stringstream out; - out << "Attempt to use unopened channel: " << getId(); - throw ConnectionException(504, out.str()); - } else { - method->invoke(*adapter, context); - } - }catch(ChannelException& e){ - adapter->getProxy().getChannel().close( - e.code, e.toString(), - method->amqpClassId(), method->amqpMethodId()); - connection.closeChannel(getId()); - }catch(ConnectionException& e){ - connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); - }catch(std::exception& e){ - connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); - } -} diff --git a/qpid/cpp-0-9/lib/broker/BrokerChannel.h b/qpid/cpp-0-9/lib/broker/BrokerChannel.h deleted file mode 100644 index 5085783685..0000000000 --- a/qpid/cpp-0-9/lib/broker/BrokerChannel.h +++ /dev/null @@ -1,159 +0,0 @@ -#ifndef _broker_BrokerChannel_h -#define _broker_BrokerChannel_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <list> - -#include <boost/scoped_ptr.hpp> -#include <boost/shared_ptr.hpp> -#include <boost/ptr_container/ptr_map.hpp> - -#include <AccumulatedAck.h> -#include <Consumer.h> -#include <DeliveryRecord.h> -#include <MessageBuilder.h> -#include <NameGenerator.h> -#include <Prefetch.h> -#include <TxBuffer.h> -#include "framing/ChannelAdapter.h" -#include "ChannelOpenBody.h" -#include "CompletionHandler.h" - -namespace qpid { -namespace broker { - -class ConnectionToken; -class Connection; -class Queue; -class BrokerAdapter; - -using framing::string; - -/** - * Maintains state for an AMQP channel. Handles incoming and - * outgoing messages for that channel. - */ -class Channel : public framing::ChannelAdapter, - public CompletionHandler -{ - class ConsumerImpl : public Consumer - { - Channel* parent; - const string tag; - Queue::shared_ptr queue; - ConnectionToken* const connection; - const bool ackExpected; - bool blocked; - - public: - ConsumerImpl(Channel* parent, const string& tag, - Queue::shared_ptr queue, - ConnectionToken* const connection, bool ack); - ~ConsumerImpl(); - virtual bool deliver(Message::shared_ptr& msg); - void cancel(); - void requestDispatch(); - }; - - typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap; - - Connection& connection; - uint64_t currentDeliveryTag; - Queue::shared_ptr defaultQueue; - bool transactional; - ConsumerImplMap consumers; - uint32_t prefetchSize; - uint16_t prefetchCount; - Prefetch outstanding; - uint32_t framesize; - NameGenerator tagGenerator; - std::list<DeliveryRecord> unacked; - sys::Mutex deliveryLock; - TxBuffer txBuffer; - AccumulatedAck accumulatedAck; - MessageStore* const store; - MessageBuilder messageBuilder;//builder for in-progress message - bool opened; - boost::scoped_ptr<BrokerAdapter> adapter; - - // completion handler for MessageBuilder - void complete(Message::shared_ptr msg); - - void deliver(Message::shared_ptr& msg, const string& tag, - Queue::shared_ptr& queue, bool ackExpected); - bool checkPrefetch(Message::shared_ptr& msg); - - public: - Channel(Connection& parent, - framing::ChannelId id, - uint32_t framesize, - MessageStore* const _store = 0, - uint64_t stagingThreshold = 0); - - ~Channel(); - - bool isOpen() const { return opened; } - BrokerAdapter& getAdatper() { return *adapter; } - - void open() { opened = true; } - void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; } - Queue::shared_ptr getDefaultQueue() const { return defaultQueue; } - uint32_t setPrefetchSize(uint32_t size){ return prefetchSize = size; } - uint16_t setPrefetchCount(uint16_t n){ return prefetchCount = n; } - - bool exists(const string& consumerTag); - - /** - *@param tagInOut - if empty it is updated with the generated token. - */ - void consume(string& tagInOut, Queue::shared_ptr queue, bool acks, - bool exclusive, ConnectionToken* const connection = 0, - const framing::FieldTable* = 0); - void cancel(const string& tag); - bool get(Queue::shared_ptr queue, const std::string& destination, bool ackExpected); - void begin(); - void close(); - void commit(); - void rollback(); - void ack(); - void ack(uint64_t deliveryTag, bool multiple); - void ack(uint64_t deliveryTag, uint64_t endTag); - void recover(bool requeue); - void deliver(Message::shared_ptr& msg, const string& consumerTag, uint64_t deliveryTag); - void handlePublish(Message* msg); - void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>); - void handleContent(boost::shared_ptr<framing::AMQContentBody>); - void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>); - - void handleInlineTransfer(Message::shared_ptr msg); - - // For ChannelAdapter - void handleMethodInContext( - boost::shared_ptr<framing::AMQMethodBody> method, - const framing::MethodContext& context); -}; - -}} // namespace broker - - -#endif /*!_broker_BrokerChannel_h*/ diff --git a/qpid/cpp-0-9/lib/broker/BrokerExchange.h b/qpid/cpp-0-9/lib/broker/BrokerExchange.h deleted file mode 100644 index 6f4e9e6671..0000000000 --- a/qpid/cpp-0-9/lib/broker/BrokerExchange.h +++ /dev/null @@ -1,51 +0,0 @@ -#ifndef _broker_BrokerExchange_h -#define _broker_BrokerExchange_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <boost/shared_ptr.hpp> -#include <Deliverable.h> -#include <BrokerQueue.h> -#include <FieldTable.h> - -namespace qpid { - namespace broker { - using std::string; - - class Exchange{ - const string name; - public: - typedef boost::shared_ptr<Exchange> shared_ptr; - - explicit Exchange(const string& _name) : name(_name){} - virtual ~Exchange(){} - string getName() { return name; } - virtual string getType() = 0; - virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0; - virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0; - virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args) = 0; - }; - } -} - - -#endif /*!_broker_BrokerExchange_h*/ diff --git a/qpid/cpp-0-9/lib/broker/BrokerMessage.cpp b/qpid/cpp-0-9/lib/broker/BrokerMessage.cpp deleted file mode 100644 index 91ba3dfec0..0000000000 --- a/qpid/cpp-0-9/lib/broker/BrokerMessage.cpp +++ /dev/null @@ -1,245 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <boost/cast.hpp> - -#include <BrokerMessage.h> -#include <iostream> - -#include <InMemoryContent.h> -#include <LazyLoadedContent.h> -#include <MessageStore.h> -#include <BasicDeliverBody.h> -#include <BasicGetOkBody.h> -#include <AMQContentBody.h> -#include <AMQHeaderBody.h> -#include "AMQMethodBody.h" -#include "AMQFrame.h" -#include "framing/ChannelAdapter.h" - -using namespace boost; -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::sys; - -BasicMessage::BasicMessage( - const ConnectionToken* const _publisher, - const string& _exchange, const string& _routingKey, - bool _mandatory, bool _immediate, framing::AMQMethodBody::shared_ptr respondTo -) : - Message(_publisher, _exchange, _routingKey, _mandatory, - _immediate, respondTo), - size(0) -{} - -// FIXME aconway 2007-02-01: remove. -// BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, uint32_t contentChunkSize) : -// publisher(0), size(0) -// { - -// decode(buffer, headersOnly, contentChunkSize); -// } - -// For tests only. -BasicMessage::BasicMessage() : size(0) -{} - -BasicMessage::~BasicMessage(){} - -void BasicMessage::setHeader(AMQHeaderBody::shared_ptr _header){ - this->header = _header; -} - -void BasicMessage::addContent(AMQContentBody::shared_ptr data){ - if (!content.get()) { - content = std::auto_ptr<Content>(new InMemoryContent()); - } - content->add(data); - size += data->size(); -} - -bool BasicMessage::isComplete(){ - return header.get() && (header->getContentSize() == contentSize()); -} - -void BasicMessage::deliver(ChannelAdapter& channel, - const string& consumerTag, uint64_t deliveryTag, - uint32_t framesize) -{ - // CCT -- TODO - Update code generator to take pointer/ not - // instance to avoid extra contruction - channel.send( - new BasicDeliverBody( - channel.getVersion(), consumerTag, deliveryTag, - getRedelivered(), getExchange(), getRoutingKey())); - sendContent(channel, framesize); -} - -void BasicMessage::sendGetOk(const MethodContext& context, - const std::string& /*destination*/, - uint32_t messageCount, - uint64_t deliveryTag, - uint32_t framesize) -{ - // CCT -- TODO - Update code generator to take pointer/ not - // instance to avoid extra contruction - context.channel->send( - new BasicGetOkBody( - context.channel->getVersion(), - context.methodBody->getRequestId(), - deliveryTag, getRedelivered(), getExchange(), - getRoutingKey(), messageCount)); - sendContent(*context.channel, framesize); -} - -void BasicMessage::sendContent( - ChannelAdapter& channel, uint32_t framesize) -{ - channel.send(header); - Mutex::ScopedLock locker(contentLock); - if (content.get()) - content->send(channel, framesize); -} - -BasicHeaderProperties* BasicMessage::getHeaderProperties(){ - return boost::polymorphic_downcast<BasicHeaderProperties*>( - header->getProperties()); -} - -const FieldTable& BasicMessage::getApplicationHeaders(){ - return getHeaderProperties()->getHeaders(); -} - -bool BasicMessage::isPersistent() -{ - if(!header) return false; - BasicHeaderProperties* props = getHeaderProperties(); - return props && props->getDeliveryMode() == PERSISTENT; -} - -void BasicMessage::decode(Buffer& buffer, bool headersOnly, uint32_t contentChunkSize) -{ - decodeHeader(buffer); - if (!headersOnly) decodeContent(buffer, contentChunkSize); -} - -void BasicMessage::decodeHeader(Buffer& buffer) -{ - string exchange; - string routingKey; - - buffer.getShortString(exchange); - buffer.getShortString(routingKey); - setRouting(exchange, routingKey); - - uint32_t headerSize = buffer.getLong(); - AMQHeaderBody::shared_ptr headerBody(new AMQHeaderBody()); - headerBody->decode(buffer, headerSize); - setHeader(headerBody); -} - -void BasicMessage::decodeContent(Buffer& buffer, uint32_t chunkSize) -{ - uint64_t expected = expectedContentSize(); - if (expected != buffer.available()) { - std::cout << "WARN: Expected " << expectedContentSize() << " bytes, got " << buffer.available() << std::endl; - throw Exception("Cannot decode content, buffer not large enough."); - } - - if (!chunkSize || chunkSize > expected) { - chunkSize = expected; - } - - uint64_t total = 0; - while (total < expectedContentSize()) { - uint64_t remaining = expected - total; - AMQContentBody::shared_ptr contentBody(new AMQContentBody()); - contentBody->decode(buffer, remaining < chunkSize ? remaining : chunkSize); - addContent(contentBody); - total += chunkSize; - } -} - -void BasicMessage::encode(Buffer& buffer) -{ - encodeHeader(buffer); - encodeContent(buffer); -} - -void BasicMessage::encodeHeader(Buffer& buffer) -{ - buffer.putShortString(getExchange()); - buffer.putShortString(getRoutingKey()); - buffer.putLong(header->size()); - header->encode(buffer); -} - -void BasicMessage::encodeContent(Buffer& buffer) -{ - Mutex::ScopedLock locker(contentLock); - if (content.get()) content->encode(buffer); -} - -uint32_t BasicMessage::encodedSize() -{ - return encodedHeaderSize() + encodedContentSize(); -} - -uint32_t BasicMessage::encodedContentSize() -{ - Mutex::ScopedLock locker(contentLock); - return content.get() ? content->size() : 0; -} - -uint32_t BasicMessage::encodedHeaderSize() -{ - return getExchange().size() + 1 - + getRoutingKey().size() + 1 - + header->size() + 4;//4 extra bytes for size -} - -uint64_t BasicMessage::expectedContentSize() -{ - return header.get() ? header->getContentSize() : 0; -} - -void BasicMessage::releaseContent(MessageStore* store) -{ - Mutex::ScopedLock locker(contentLock); - if (!isPersistent() && getPersistenceId() == 0) { - store->stage(this); - } - if (!content.get() || content->size() > 0) { - // FIXME aconway 2007-02-07: handle MessageMessage. - //set content to lazy loading mode (but only if there is stored content): - - //Note: the LazyLoadedContent instance contains a raw pointer to the message, however it is - // then set as a member of that message so its lifetime is guaranteed to be no longer than - // that of the message itself - content = std::auto_ptr<Content>( - new LazyLoadedContent(store, this, expectedContentSize())); - } -} - -void BasicMessage::setContent(std::auto_ptr<Content>& _content) -{ - Mutex::ScopedLock locker(contentLock); - content = _content; -} diff --git a/qpid/cpp-0-9/lib/broker/BrokerMessage.h b/qpid/cpp-0-9/lib/broker/BrokerMessage.h deleted file mode 100644 index fcb104edbb..0000000000 --- a/qpid/cpp-0-9/lib/broker/BrokerMessage.h +++ /dev/null @@ -1,137 +0,0 @@ -#ifndef _broker_BrokerMessage_h -#define _broker_BrokerMessage_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <memory> -#include <boost/shared_ptr.hpp> - -#include <BrokerMessageBase.h> -#include <BasicHeaderProperties.h> -#include <ConnectionToken.h> -#include <Content.h> -#include <Mutex.h> -#include <TxBuffer.h> - -namespace qpid { - -namespace framing { -class MethodContext; -class ChannelAdapter; -class AMQHeaderBody; -} - -namespace broker { - -class MessageStore; -using framing::string; - -/** - * Represents an AMQP message, i.e. a header body, a list of - * content bodies and some details about the publication - * request. - */ -class BasicMessage : public Message { - boost::shared_ptr<framing::AMQHeaderBody> header; - std::auto_ptr<Content> content; - sys::Mutex contentLock; - uint64_t size; - - void sendContent(framing::ChannelAdapter&, uint32_t framesize); - - public: - typedef boost::shared_ptr<BasicMessage> shared_ptr; - - BasicMessage(const ConnectionToken* const publisher, - const string& exchange, const string& routingKey, - bool mandatory, bool immediate, - boost::shared_ptr<framing::AMQMethodBody> respondTo); - BasicMessage(); - ~BasicMessage(); - void setHeader(boost::shared_ptr<framing::AMQHeaderBody> header); - void addContent(framing::AMQContentBody::shared_ptr data); - bool isComplete(); - - void deliver(framing::ChannelAdapter&, - const string& consumerTag, - uint64_t deliveryTag, - uint32_t framesize); - - void sendGetOk(const framing::MethodContext&, - const std::string& destination, - uint32_t messageCount, - uint64_t deliveryTag, - uint32_t framesize); - - framing::BasicHeaderProperties* getHeaderProperties(); - const framing::FieldTable& getApplicationHeaders(); - bool isPersistent(); - uint64_t contentSize() const { return size; } - - void decode(framing::Buffer& buffer, bool headersOnly = false, - uint32_t contentChunkSize = 0); - void decodeHeader(framing::Buffer& buffer); - void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0); - - void encode(framing::Buffer& buffer); - void encodeHeader(framing::Buffer& buffer); - void encodeContent(framing::Buffer& buffer); - /** - * @returns the size of the buffer needed to encode this - * message in its entirety - */ - uint32_t encodedSize(); - /** - * @returns the size of the buffer needed to encode the - * 'header' of this message (not just the header frame, - * but other meta data e.g.routing key and exchange) - */ - uint32_t encodedHeaderSize(); - /** - * @returns the size of the buffer needed to encode the - * (possibly partial) content held by this message - */ - uint32_t encodedContentSize(); - /** - * Releases the in-memory content data held by this - * message. Must pass in a store from which the data can - * be reloaded. - */ - void releaseContent(MessageStore* store); - /** - * If headers have been received, returns the expected - * content size else returns 0. - */ - uint64_t expectedContentSize(); - /** - * Sets the 'content' implementation of this message (the - * message controls the lifecycle of the content instance - * it uses). - */ - void setContent(std::auto_ptr<Content>& content); -}; - -} -} - - -#endif /*!_broker_BrokerMessage_h*/ diff --git a/qpid/cpp-0-9/lib/broker/BrokerMessageBase.h b/qpid/cpp-0-9/lib/broker/BrokerMessageBase.h deleted file mode 100644 index 709369ae2f..0000000000 --- a/qpid/cpp-0-9/lib/broker/BrokerMessageBase.h +++ /dev/null @@ -1,184 +0,0 @@ -#ifndef _broker_BrokerMessageBase_h -#define _broker_BrokerMessageBase_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <string> -#include <boost/shared_ptr.hpp> -#include "Content.h" -#include "framing/amqp_types.h" - -namespace qpid { - -namespace framing { -class MethodContext; -class ChannelAdapter; -class BasicHeaderProperties; -class FieldTable; -class AMQMethodBody; -class AMQContentBody; -class AMQHeaderBody; -} - - -namespace broker { -class ConnectionToken; -class MessageStore; - -/** - * Base class for all types of internal broker messages - * abstracting away the operations - * TODO; AMS: for the moment this is mostly a placeholder - */ -class Message { - public: - typedef boost::shared_ptr<Message> shared_ptr; - typedef boost::shared_ptr<framing::AMQMethodBody> AMQMethodBodyPtr; - - - Message(const ConnectionToken* publisher_, - const std::string& _exchange, - const std::string& _routingKey, - bool _mandatory, bool _immediate, - AMQMethodBodyPtr respondTo_) : - publisher(publisher_), - exchange(_exchange), - routingKey(_routingKey), - mandatory(_mandatory), - immediate(_immediate), - persistenceId(0), - redelivered(false), - respondTo(respondTo_) - {} - - Message() : - mandatory(false), - immediate(false), - persistenceId(0), - redelivered(false) - {} - - virtual ~Message() {}; - - // Accessors - const std::string& getRoutingKey() const { return routingKey; } - const std::string& getExchange() const { return exchange; } - uint64_t getPersistenceId() const { return persistenceId; } - bool getRedelivered() const { return redelivered; } - AMQMethodBodyPtr getRespondTo() const { return respondTo; } - - void setRouting(const std::string& _exchange, const std::string& _routingKey) - { exchange = _exchange; routingKey = _routingKey; } - void setPersistenceId(uint64_t _persistenceId) { persistenceId = _persistenceId; } // XXXX: Only used in tests? - void redeliver() { redelivered = true; } - - /** - * Used to deliver the message from the queue - */ - virtual void deliver(framing::ChannelAdapter& channel, - const std::string& consumerTag, - uint64_t deliveryTag, - uint32_t framesize) = 0; - /** - * Used to return a message in response to a get from a queue - */ - virtual void sendGetOk(const framing::MethodContext& context, - const std::string& destination, - uint32_t messageCount, - uint64_t deliveryTag, - uint32_t framesize) = 0; - - virtual bool isComplete() = 0; - - virtual uint64_t contentSize() const = 0; - // FIXME aconway 2007-02-06: Get rid of BasicHeaderProperties - // at this level. Expose only generic properties available from both - // message types (e.g. getApplicationHeaders below). - // - virtual framing::BasicHeaderProperties* getHeaderProperties() = 0; - virtual const framing::FieldTable& getApplicationHeaders() = 0; - virtual bool isPersistent() = 0; - virtual const ConnectionToken* getPublisher() const { - return publisher; - } - - virtual void encode(framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests? - virtual void encodeHeader(framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests? - - /** - * @returns the size of the buffer needed to encode this - * message in its entirety - * - * XXXX: Only used in tests? - */ - virtual uint32_t encodedSize() = 0; - /** - * @returns the size of the buffer needed to encode the - * 'header' of this message (not just the header frame, - * but other meta data e.g.routing key and exchange) - * - * XXXX: Only used in tests? - */ - virtual uint32_t encodedHeaderSize() = 0; - /** - * @returns the size of the buffer needed to encode the - * (possibly partial) content held by this message - */ - virtual uint32_t encodedContentSize() = 0; - /** - * If headers have been received, returns the expected - * content size else returns 0. - */ - virtual uint64_t expectedContentSize() = 0; - - // TODO: AMS 29/1/2007 Don't think these are really part of base class - - /** - * Sets the 'content' implementation of this message (the - * message controls the lifecycle of the content instance - * it uses). - */ - virtual void setContent(std::auto_ptr<Content>& /*content*/) {}; - virtual void setHeader(boost::shared_ptr<framing::AMQHeaderBody>) {}; - virtual void addContent(boost::shared_ptr<framing::AMQContentBody>) {}; - /** - * Releases the in-memory content data held by this - * message. Must pass in a store from which the data can - * be reloaded. - */ - virtual void releaseContent(MessageStore* /*store*/) {}; - - private: - const ConnectionToken* publisher; - std::string exchange; - std::string routingKey; - const bool mandatory; - const bool immediate; - uint64_t persistenceId; - bool redelivered; - AMQMethodBodyPtr respondTo; -}; - -}} - - -#endif /*!_broker_BrokerMessage_h*/ diff --git a/qpid/cpp-0-9/lib/broker/BrokerMessageMessage.cpp b/qpid/cpp-0-9/lib/broker/BrokerMessageMessage.cpp deleted file mode 100644 index 3449078d70..0000000000 --- a/qpid/cpp-0-9/lib/broker/BrokerMessageMessage.cpp +++ /dev/null @@ -1,239 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "QpidError.h" -#include "BrokerMessageMessage.h" -#include "ChannelAdapter.h" -#include "MessageTransferBody.h" -#include "MessageOpenBody.h" -#include "MessageCloseBody.h" -#include "MessageAppendBody.h" -#include "Reference.h" -#include "framing/FieldTable.h" -#include "framing/BasicHeaderProperties.h" - -#include <algorithm> - -using namespace std; -using namespace qpid::framing; - -namespace qpid { -namespace broker { - -MessageMessage::MessageMessage( - ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_ -) : Message(publisher, transfer_->getDestination(), - transfer_->getRoutingKey(), - transfer_->getMandatory(), - transfer_->getImmediate(), - transfer_), - requestId(requestId_), - transfer(transfer_) -{} - -MessageMessage::MessageMessage( - ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_, - ReferencePtr reference_ -) : Message(publisher, transfer_->getDestination(), - transfer_->getRoutingKey(), - transfer_->getMandatory(), - transfer_->getImmediate(), - transfer_), - requestId(requestId_), - transfer(transfer_), - reference(reference_) -{} - -// TODO: astitcher 1-Mar-2007: This code desperately needs better factoring -void MessageMessage::transferMessage( - framing::ChannelAdapter& channel, - const std::string& consumerTag, - uint32_t framesize) -{ - const framing::Content& body = transfer->getBody(); - - // Send any reference data - if (!body.isInline()){ - // Open - channel.send(new MessageOpenBody(channel.getVersion(), reference->getId())); - // Appends - for(Reference::Appends::const_iterator a = reference->getAppends().begin(); - a != reference->getAppends().end(); - ++a) { - uint32_t sizeleft = (*a)->size(); - const string& content = (*a)->getBytes(); - // Calculate overhead bytes - // Assume that the overhead is constant as the reference name doesn't change - uint32_t overhead = sizeleft - content.size(); - string::size_type contentStart = 0; - while (sizeleft) { - string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead; - channel.send(new MessageAppendBody(channel.getVersion(), reference->getId(), - string(content, contentStart, contentSize))); - sizeleft -= contentSize; - contentStart += contentSize; - } - } - } - - // The transfer - if ( transfer->size()<=framesize ) { - channel.send( - new MessageTransferBody(channel.getVersion(), - transfer->getTicket(), - consumerTag, - getRedelivered(), - transfer->getImmediate(), - transfer->getTtl(), - transfer->getPriority(), - transfer->getTimestamp(), - transfer->getDeliveryMode(), - transfer->getExpiration(), - getExchange(), - getRoutingKey(), - transfer->getMessageId(), - transfer->getCorrelationId(), - transfer->getReplyTo(), - transfer->getContentType(), - transfer->getContentEncoding(), - transfer->getUserId(), - transfer->getAppId(), - transfer->getTransactionId(), - transfer->getSecurityToken(), - transfer->getApplicationHeaders(), - body, - transfer->getMandatory())); - } else { - // Thing to do here is to construct a simple reference message then deliver that instead - // fragmentation will be taken care of in the delivery if necessary; - string content = body.getValue(); - string refname = "dummy"; - TransferPtr newTransfer( - new MessageTransferBody(channel.getVersion(), - transfer->getTicket(), - consumerTag, - getRedelivered(), - transfer->getImmediate(), - transfer->getTtl(), - transfer->getPriority(), - transfer->getTimestamp(), - transfer->getDeliveryMode(), - transfer->getExpiration(), - getExchange(), - getRoutingKey(), - transfer->getMessageId(), - transfer->getCorrelationId(), - transfer->getReplyTo(), - transfer->getContentType(), - transfer->getContentEncoding(), - transfer->getUserId(), - transfer->getAppId(), - transfer->getTransactionId(), - transfer->getSecurityToken(), - transfer->getApplicationHeaders(), - framing::Content(REFERENCE, refname), - transfer->getMandatory())); - ReferencePtr newRef(new Reference(refname)); - Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content)); - newRef->append(newAppend); - MessageMessage newMsg(const_cast<ConnectionToken*>(getPublisher()), 0, newTransfer, newRef); - newMsg.transferMessage(channel, consumerTag, framesize); - return; - } - // Close any reference data - if (!body.isInline()){ - // Close - channel.send(new MessageCloseBody(channel.getVersion(), reference->getId())); - } -} - -void MessageMessage::deliver( - framing::ChannelAdapter& channel, - const std::string& consumerTag, - uint64_t /*deliveryTag*/, - uint32_t framesize) -{ - transferMessage(channel, consumerTag, framesize); -} - -void MessageMessage::sendGetOk( - const framing::MethodContext& context, - const std::string& destination, - uint32_t /*messageCount*/, - uint64_t /*deliveryTag*/, - uint32_t framesize) -{ - framing::ChannelAdapter* channel = context.channel; - transferMessage(*channel, destination, framesize); -} - -bool MessageMessage::isComplete() -{ - return true; -} - -uint64_t MessageMessage::contentSize() const -{ - if (transfer->getBody().isInline()) - return transfer->getBody().getValue().size(); - else - return reference->getSize(); -} - -qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties() -{ - return 0; // FIXME aconway 2007-02-05: -} - -const FieldTable& MessageMessage::getApplicationHeaders() -{ - return transfer->getApplicationHeaders(); -} -bool MessageMessage::isPersistent() -{ - return transfer->getDeliveryMode() == PERSISTENT; -} - -uint32_t MessageMessage::encodedSize() -{ - THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); - return 0; // FIXME aconway 2007-02-05: -} - -uint32_t MessageMessage::encodedHeaderSize() -{ - THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); - return 0; // FIXME aconway 2007-02-05: -} - -uint32_t MessageMessage::encodedContentSize() -{ - THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); - return 0; // FIXME aconway 2007-02-05: -} - -uint64_t MessageMessage::expectedContentSize() -{ - THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished"); - return 0; // FIXME aconway 2007-02-05: -} - - -}} // namespace qpid::broker diff --git a/qpid/cpp-0-9/lib/broker/BrokerMessageMessage.h b/qpid/cpp-0-9/lib/broker/BrokerMessageMessage.h deleted file mode 100644 index 8a2ff3a063..0000000000 --- a/qpid/cpp-0-9/lib/broker/BrokerMessageMessage.h +++ /dev/null @@ -1,91 +0,0 @@ -#ifndef _broker_BrokerMessageMessage_h -#define _broker_BrokerMessageMessage_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "BrokerMessageBase.h" -#include "MessageTransferBody.h" -#include "amqp_types.h" - -#include <vector> - -namespace qpid { - -namespace framing { -class MessageTransferBody; -} - -namespace broker { -class ConnectionToken; -class Reference; - -class MessageMessage: public Message{ - public: - typedef boost::shared_ptr<MessageMessage> shared_ptr; - typedef boost::shared_ptr<framing::MessageTransferBody> TransferPtr; - typedef boost::shared_ptr<Reference> ReferencePtr; - - MessageMessage(ConnectionToken* publisher, framing::RequestId, TransferPtr transfer); - MessageMessage(ConnectionToken* publisher, framing::RequestId, TransferPtr transfer, ReferencePtr reference); - - // Default destructor okay - - framing::RequestId getRequestId() {return requestId; } - TransferPtr getTransfer() { return transfer; } - ReferencePtr getReference() { return reference; } - - void deliver(framing::ChannelAdapter& channel, - const std::string& consumerTag, - uint64_t deliveryTag, - uint32_t framesize); - - void sendGetOk(const framing::MethodContext& context, - const std::string& destination, - uint32_t messageCount, - uint64_t deliveryTag, - uint32_t framesize); - - bool isComplete(); - - uint64_t contentSize() const; - framing::BasicHeaderProperties* getHeaderProperties(); - const framing::FieldTable& getApplicationHeaders(); - bool isPersistent(); - - uint32_t encodedSize(); - uint32_t encodedHeaderSize(); - uint32_t encodedContentSize(); - uint64_t expectedContentSize(); - - private: - void transferMessage(framing::ChannelAdapter& channel, - const std::string& consumerTag, - uint32_t framesize); - - framing::RequestId requestId; - const TransferPtr transfer; - const ReferencePtr reference; -}; - -}} - - -#endif /*!_broker_BrokerMessage_h*/ diff --git a/qpid/cpp-0-9/lib/broker/BrokerQueue.cpp b/qpid/cpp-0-9/lib/broker/BrokerQueue.cpp deleted file mode 100644 index 31309bd6c5..0000000000 --- a/qpid/cpp-0-9/lib/broker/BrokerQueue.cpp +++ /dev/null @@ -1,258 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <boost/format.hpp> - -#include <BrokerQueue.h> -#include <MessageStore.h> -#include <sys/Monitor.h> -#include <sys/Time.h> -#include <iostream> - -using namespace qpid::broker; -using namespace qpid::sys; -using namespace qpid::framing; -using boost::format; - -Queue::Queue(const string& _name, uint32_t _autodelete, - MessageStore* const _store, - const ConnectionToken* const _owner) : - - name(_name), - autodelete(_autodelete), - store(_store), - owner(_owner), - queueing(false), - dispatching(false), - next(0), - lastUsed(0), - exclusive(0), - persistenceId(0) -{ - if(autodelete) lastUsed = now()/TIME_MSEC; -} - -Queue::~Queue(){ - for(Binding* b = bindings.front(); !bindings.empty(); b = bindings.front()){ - b->cancel(); - bindings.pop(); - } -} - -void Queue::bound(Binding* b){ - bindings.push(b); -} - -void Queue::deliver(Message::shared_ptr& msg){ - enqueue(0, msg, 0); - process(msg); -} - -void Queue::recover(Message::shared_ptr& msg){ - push(msg); - if (store && msg->expectedContentSize() != msg->encodedContentSize()) { - //content has not been loaded, need to ensure that lazy loading mode is set: - //TODO: find a nicer way to do this - msg->releaseContent(store); - } -} - -void Queue::process(Message::shared_ptr& msg){ - Mutex::ScopedLock locker(lock); - if(queueing || !dispatch(msg)){ - push(msg); - } -} - -bool Queue::dispatch(Message::shared_ptr& msg){ - if(consumers.empty()){ - return false; - }else if(exclusive){ - if(!exclusive->deliver(msg)){ - std::cout << "WARNING: Dropping undeliverable message from queue with exclusive consumer." << std::endl; - } - return true; - }else{ - //deliver to next consumer - next = next % consumers.size(); - Consumer* c = consumers[next]; - int start = next; - while(c){ - next++; - if(c->deliver(msg)) return true; - - next = next % consumers.size(); - c = next == start ? 0 : consumers[next]; - } - return false; - } -} - -bool Queue::startDispatching(){ - Mutex::ScopedLock locker(lock); - if(queueing && !dispatching){ - dispatching = true; - return true; - }else{ - return false; - } -} - -void Queue::dispatch(){ - bool proceed = startDispatching(); - while(proceed){ - Mutex::ScopedLock locker(lock); - if(!messages.empty() && dispatch(messages.front())){ - pop(); - }else{ - dispatching = false; - proceed = false; - queueing = !messages.empty(); - } - } -} - -void Queue::consume(Consumer* c, bool requestExclusive){ - Mutex::ScopedLock locker(lock); - if(exclusive) - throw ChannelException( - 403, format("Queue '%s' has an exclusive consumer." - " No more consumers allowed.") % getName()); - if(requestExclusive) { - if(!consumers.empty()) - throw ChannelException( - 403, format("Queue '%s' already has conumers." - "Exclusive access denied.") %getName()); - exclusive = c; - } - if(autodelete && consumers.empty()) lastUsed = 0; - consumers.push_back(c); -} - -void Queue::cancel(Consumer* c){ - Mutex::ScopedLock locker(lock); - Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c); - if (i != consumers.end()) - consumers.erase(i); - if(autodelete && consumers.empty()) lastUsed = now()*TIME_MSEC; - if(exclusive == c) exclusive = 0; -} - -Message::shared_ptr Queue::dequeue(){ - Mutex::ScopedLock locker(lock); - Message::shared_ptr msg; - if(!messages.empty()){ - msg = messages.front(); - pop(); - } - return msg; -} - -uint32_t Queue::purge(){ - Mutex::ScopedLock locker(lock); - int count = messages.size(); - while(!messages.empty()) pop(); - return count; -} - -void Queue::pop(){ - if (policy.get()) policy->dequeued(messages.front()->contentSize()); - messages.pop(); -} - -void Queue::push(Message::shared_ptr& msg){ - queueing = true; - messages.push(msg); - if (policy.get()) { - policy->enqueued(msg->contentSize()); - if (policy->limitExceeded()) { - msg->releaseContent(store); - } - } -} - -uint32_t Queue::getMessageCount() const{ - Mutex::ScopedLock locker(lock); - return messages.size(); -} - -uint32_t Queue::getConsumerCount() const{ - Mutex::ScopedLock locker(lock); - return consumers.size(); -} - -bool Queue::canAutoDelete() const{ - Mutex::ScopedLock locker(lock); - return lastUsed && (now()*TIME_MSEC - lastUsed > autodelete); -} - -void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid) -{ - if (msg->isPersistent() && store) { - store->enqueue(ctxt, msg.get(), *this, xid); - } -} - -void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid) -{ - if (msg->isPersistent() && store) { - store->dequeue(ctxt, msg.get(), *this, xid); - } -} - -namespace -{ - const std::string qpidMaxSize("qpid.max_size"); - const std::string qpidMaxCount("qpid.max_count"); -} - -void Queue::create(const FieldTable& settings) -{ - if (store) { - store->create(*this, settings); - } - configure(settings); -} - -void Queue::configure(const FieldTable& settings) -{ - QueuePolicy* _policy = new QueuePolicy(settings); - if (_policy->getMaxCount() || _policy->getMaxSize()) { - setPolicy(std::auto_ptr<QueuePolicy>(_policy)); - } -} - -void Queue::destroy() -{ - if (store) { - store->destroy(*this); - } -} - -void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy) -{ - policy = _policy; -} - -const QueuePolicy* const Queue::getPolicy() -{ - return policy.get(); -} diff --git a/qpid/cpp-0-9/lib/broker/BrokerQueue.h b/qpid/cpp-0-9/lib/broker/BrokerQueue.h deleted file mode 100644 index 12f5815027..0000000000 --- a/qpid/cpp-0-9/lib/broker/BrokerQueue.h +++ /dev/null @@ -1,151 +0,0 @@ -#ifndef _broker_BrokerQueue_h -#define _broker_BrokerQueue_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <vector> -#include <memory> -#include <queue> -#include <boost/shared_ptr.hpp> -#include <amqp_types.h> -#include <Binding.h> -#include <ConnectionToken.h> -#include <Consumer.h> -#include <BrokerMessage.h> -#include <FieldTable.h> -#include <sys/Monitor.h> -#include <QueuePolicy.h> - -// TODO aconway 2007-02-06: Use auto_ptr and boost::ptr_vector to -// enforce ownership of Consumers. - -namespace qpid { - namespace broker { - class MessageStore; - - /** - * Thrown when exclusive access would be violated. - */ - using std::string; - - /** - * The brokers representation of an amqp queue. Messages are - * delivered to a queue from where they can be dispatched to - * registered consumers or be stored until dequeued or until one - * or more consumers registers. - */ - class Queue{ - typedef std::vector<Consumer*> Consumers; - typedef std::queue<Binding*> Bindings; - typedef std::queue<Message::shared_ptr> Messages; - - const string name; - const uint32_t autodelete; - MessageStore* const store; - const ConnectionToken* const owner; - Consumers consumers; - Bindings bindings; - Messages messages; - bool queueing; - bool dispatching; - int next; - mutable qpid::sys::Mutex lock; - int64_t lastUsed; - Consumer* exclusive; - mutable uint64_t persistenceId; - std::auto_ptr<QueuePolicy> policy; - - void pop(); - void push(Message::shared_ptr& msg); - bool startDispatching(); - bool dispatch(Message::shared_ptr& msg); - void setPolicy(std::auto_ptr<QueuePolicy> policy); - - public: - - typedef boost::shared_ptr<Queue> shared_ptr; - - typedef std::vector<shared_ptr> vector; - - Queue(const string& name, uint32_t autodelete = 0, - MessageStore* const store = 0, - const ConnectionToken* const owner = 0); - ~Queue(); - - void create(const qpid::framing::FieldTable& settings); - void configure(const qpid::framing::FieldTable& settings); - void destroy(); - /** - * Informs the queue of a binding that should be cancelled on - * destruction of the queue. - */ - void bound(Binding* b); - /** - * Delivers a message to the queue. Will record it as - * enqueued if persistent then process it. - */ - void deliver(Message::shared_ptr& msg); - /** - * Dispatches the messages immediately to a consumer if - * one is available or stores it for later if not. - */ - void process(Message::shared_ptr& msg); - /** - * Used during recovery to add stored messages back to the queue - */ - void recover(Message::shared_ptr& msg); - /** - * Dispatch any queued messages providing there are - * consumers for them. Only one thread can be dispatching - * at any time, but this method (rather than the caller) - * is responsible for ensuring that. - */ - void dispatch(); - void consume(Consumer* c, bool exclusive = false); - void cancel(Consumer* c); - uint32_t purge(); - uint32_t getMessageCount() const; - uint32_t getConsumerCount() const; - inline const string& getName() const { return name; } - inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; } - inline bool hasExclusiveConsumer() const { return exclusive; } - inline uint64_t getPersistenceId() const { return persistenceId; } - inline void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; } - - bool canAutoDelete() const; - - void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid); - /** - * dequeue from store (only done once messages is acknowledged) - */ - void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid); - /** - * dequeues from memory only - */ - Message::shared_ptr dequeue(); - - const QueuePolicy* const getPolicy(); - }; - } -} - - -#endif /*!_broker_BrokerQueue_h*/ diff --git a/qpid/cpp-0-9/lib/broker/BrokerSingleton.cpp b/qpid/cpp-0-9/lib/broker/BrokerSingleton.cpp deleted file mode 100644 index 4571764850..0000000000 --- a/qpid/cpp-0-9/lib/broker/BrokerSingleton.cpp +++ /dev/null @@ -1,36 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "BrokerSingleton.h" - -namespace qpid { -namespace broker { - -BrokerSingleton::BrokerSingleton() { - if (broker.get() == 0) - broker = Broker::create(); - Broker::shared_ptr::operator=(broker); -} - -BrokerSingleton::~BrokerSingleton() { - broker->shutdown(); -} - -Broker::shared_ptr BrokerSingleton::broker; - -}} // namespace qpid::broker diff --git a/qpid/cpp-0-9/lib/broker/BrokerSingleton.h b/qpid/cpp-0-9/lib/broker/BrokerSingleton.h deleted file mode 100644 index 139e02a5fd..0000000000 --- a/qpid/cpp-0-9/lib/broker/BrokerSingleton.h +++ /dev/null @@ -1,52 +0,0 @@ -#ifndef _broker_BrokerSingleton_h -#define _broker_BrokerSingleton_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "Broker.h" - -namespace qpid { -namespace broker { - -/** - * BrokerSingleton is a smart pointer to a process-wide singleton broker - * started on an os-chosen port. The broker starts the first time - * an instance of BrokerSingleton is created and runs untill the process exits. - * - * Useful for unit tests that want to share a broker between multiple - * tests to reduce overhead of starting/stopping a broker for every test. - * - * Tests that need a new broker can call Broker::create directly. - * - * THREAD UNSAFE. - */ -class BrokerSingleton : public Broker::shared_ptr -{ - public: - BrokerSingleton(); - ~BrokerSingleton(); - private: - static Broker::shared_ptr broker; -}; - -}} // namespace qpid::broker - - - -#endif /*!_broker_BrokerSingleton_h*/ diff --git a/qpid/cpp-0-9/lib/broker/CompletionHandler.h b/qpid/cpp-0-9/lib/broker/CompletionHandler.h deleted file mode 100644 index 9d51656282..0000000000 --- a/qpid/cpp-0-9/lib/broker/CompletionHandler.h +++ /dev/null @@ -1,39 +0,0 @@ -#ifndef _broker_CompletionHandler_h -#define _broker_CompletionHandler_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -namespace qpid { -namespace broker { - -/** - * Callback interface to handle completion of a message. - */ -class CompletionHandler -{ - public: - virtual ~CompletionHandler(){} - virtual void complete(Message::shared_ptr) = 0; -}; - -}} // namespace qpid::broker - - - -#endif /*!_broker_CompletionHandler_h*/ diff --git a/qpid/cpp-0-9/lib/broker/Configuration.cpp b/qpid/cpp-0-9/lib/broker/Configuration.cpp deleted file mode 100644 index e83c359f2d..0000000000 --- a/qpid/cpp-0-9/lib/broker/Configuration.cpp +++ /dev/null @@ -1,252 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <Configuration.h> -#include <string.h> -#include <config.h> - -using namespace qpid::broker; -using namespace std; - -Configuration::Configuration() : - daemon('d', "daemon", "Run as system daemon, detached from terminal.", false), - trace('t', "trace", "Print incoming & outgoing frames to the console", false), - port('p', "port", "Set the port to listen on (default=5672)", 5672), - workerThreads("worker-threads", "Set the number of worker threads to use (default=5).", 5), - maxConnections("max-connections", "Set the maximum number of connections the broker can accept (default=500).", 500), - connectionBacklog("connection-backlog", "Set the connection backlog for the servers socket (default=10)", 10), - store('s', "store", "Set the message store module to use (default='' which implies no store)", ""), - stagingThreshold("staging-threshold", "Set the message size threshold above which messages will be written to disk as they arrive (default=5,000,000)", 5000000), - help("help", "Print usage information", false), - version("version", "Print version information", false) -{ - options.push_back(&daemon); - options.push_back(&trace); - options.push_back(&port); - options.push_back(&workerThreads); - options.push_back(&maxConnections); - options.push_back(&connectionBacklog); - options.push_back(&store); - options.push_back(&stagingThreshold); - options.push_back(&help); - options.push_back(&version); -} - -Configuration::~Configuration(){} - -void Configuration::parse(char const *progName, int argc, char** argv){ - programName = progName; - int position = 1; - while(position < argc){ - bool matched(false); - for(op_iterator i = options.begin(); i < options.end() && !matched; i++){ - matched = (*i)->parse(position, argv, argc); - } - if(!matched) { - throw BadOptionException( - std::string("Unrecognised option: ")+argv[position]); - } - } -} - -void Configuration::usage(){ - std::cout << "Usage: " << programName << " [OPTION]..." << std::endl - << "Start the Qpid AMQP broker daemon." << std::endl << std::endl - << "Options:" << std::endl; - for(op_iterator i = options.begin(); i < options.end(); i++){ - (*i)->print(std::cout); - } - - std::cout << std::endl << "Report bugs to <" << PACKAGE_BUGREPORT << ">." - << std::endl; -} - -bool Configuration::isHelp() const { - return help.getValue(); -} - -bool Configuration::isVersion() const { - return version.getValue(); -} - -bool Configuration::isDaemon() const { - return daemon.getValue(); -} - -bool Configuration::isTrace() const { - return trace.getValue(); -} - -int Configuration::getPort() const { - return port.getValue(); -} - -int Configuration::getWorkerThreads() const { - return workerThreads.getValue(); -} - -int Configuration::getMaxConnections() const { - return maxConnections.getValue(); -} - -int Configuration::getConnectionBacklog() const { - return connectionBacklog.getValue(); -} - -const std::string& Configuration::getStore() const { - return store.getValue(); -} - -long Configuration::getStagingThreshold() const { - return stagingThreshold.getValue(); -} - - -Configuration::Option::Option(const char _flag, const string& _name, const string& _desc) : - flag(string("-") + _flag), name("--" +_name), desc(_desc) {} - -Configuration::Option::Option(const string& _name, const string& _desc) : - flag(""), name("--" + _name), desc(_desc) {} - -Configuration::Option::~Option(){} - -bool Configuration::Option::match(const string& arg){ - return flag == arg || name == arg; -} - -bool Configuration::Option::parse(int& i, char** argv, int argc){ - const string arg(argv[i]); - if(match(arg)){ - if(needsValue()){ - if(++i < argc) setValue(argv[i]); - else throw ParseException("Argument " + arg + " requires a value!"); - }else{ - setValue(""); - } - i++; - return true; - }else{ - return false; - } -} - -void Configuration::Option::print(ostream& out) const { - out << " "; - if(flag.length() > 0){ - out << flag << ", "; - } else { - out << " "; - } - out << name; - if(needsValue()) out << " <value>"; - out << std::endl; - out << " " << desc << std::endl; -} - - -// String Option: - -Configuration::StringOption::StringOption(const char _flag, const string& _name, const string& _desc, const string _value) : - Option(_flag,_name,_desc), defaultValue(_value), value(_value) {} - -Configuration::StringOption::StringOption(const string& _name, const string& _desc, const string _value) : - Option(_name,_desc), defaultValue(_value), value(_value) {} - -Configuration::StringOption::~StringOption(){} - -const string& Configuration::StringOption::getValue() const { - return value; -} - -bool Configuration::StringOption::needsValue() const { - return true; -} - -void Configuration::StringOption::setValue(const std::string& _value){ - value = _value; -} - -// Int Option: - -Configuration::IntOption::IntOption(const char _flag, const string& _name, const string& _desc, const int _value) : - Option(_flag,_name,_desc), defaultValue(_value), value(_value) {} - -Configuration::IntOption::IntOption(const string& _name, const string& _desc, const int _value) : - Option(_name,_desc), defaultValue(_value), value(_value) {} - -Configuration::IntOption::~IntOption(){} - -int Configuration::IntOption::getValue() const { - return value; -} - -bool Configuration::IntOption::needsValue() const { - return true; -} - -void Configuration::IntOption::setValue(const std::string& _value){ - value = atoi(_value.c_str()); -} - -// Long Option: - -Configuration::LongOption::LongOption(const char _flag, const string& _name, const string& _desc, const long _value) : - Option(_flag,_name,_desc), defaultValue(_value), value(_value) {} - -Configuration::LongOption::LongOption(const string& _name, const string& _desc, const long _value) : - Option(_name,_desc), defaultValue(_value), value(_value) {} - -Configuration::LongOption::~LongOption(){} - -long Configuration::LongOption::getValue() const { - return value; -} - -bool Configuration::LongOption::needsValue() const { - return true; -} - -void Configuration::LongOption::setValue(const std::string& _value){ - value = atol(_value.c_str()); -} - -// Bool Option: - -Configuration::BoolOption::BoolOption(const char _flag, const string& _name, const string& _desc, const bool _value) : - Option(_flag,_name,_desc), defaultValue(_value), value(_value) {} - -Configuration::BoolOption::BoolOption(const string& _name, const string& _desc, const bool _value) : - Option(_name,_desc), defaultValue(_value), value(_value) {} - -Configuration::BoolOption::~BoolOption(){} - -bool Configuration::BoolOption::getValue() const { - return value; -} - -bool Configuration::BoolOption::needsValue() const { - return false; -} - -void Configuration::BoolOption::setValue(const std::string& /*not required*/){ - //BoolOptions have no value. The fact that the option is specified - //implies the value is true. - value = true; -} diff --git a/qpid/cpp-0-9/lib/broker/Configuration.h b/qpid/cpp-0-9/lib/broker/Configuration.h deleted file mode 100644 index 27c743c8f0..0000000000 --- a/qpid/cpp-0-9/lib/broker/Configuration.h +++ /dev/null @@ -1,171 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _Configuration_ -#define _Configuration_ - -#include <cstdlib> -#include <iostream> -#include <vector> -#include <Exception.h> - -namespace qpid { -namespace broker { -class Configuration{ - - class Option { - const std::string flag; - const std::string name; - const std::string desc; - - bool match(const std::string& arg); - - protected: - virtual bool needsValue() const = 0; - virtual void setValue(const std::string& value) = 0; - - public: - Option(const char flag, const std::string& name, const std::string& desc); - Option(const std::string& name, const std::string& desc); - virtual ~Option(); - - bool parse(int& i, char** argv, int argc); - void print(std::ostream& out) const; - }; - - class IntOption : public Option{ - const int defaultValue; - int value; - public: - IntOption(char flag, const std::string& name, const std::string& desc, const int value = 0); - IntOption(const std::string& name, const std::string& desc, const int value = 0); - virtual ~IntOption(); - - int getValue() const; - virtual bool needsValue() const; - virtual void setValue(const std::string& value); - virtual void setValue(int _value) { value = _value; } - }; - - class LongOption : public Option{ - const long defaultValue; - int value; - public: - LongOption(char flag, const std::string& name, const std::string& desc, const long value = 0); - LongOption(const std::string& name, const std::string& desc, const long value = 0); - virtual ~LongOption(); - - long getValue() const; - virtual bool needsValue() const; - virtual void setValue(const std::string& value); - virtual void setValue(int _value) { value = _value; } - }; - - class StringOption : public Option{ - const std::string defaultValue; - std::string value; - public: - StringOption(char flag, const std::string& name, const std::string& desc, const std::string value = ""); - StringOption(const std::string& name, const std::string& desc, const std::string value = ""); - virtual ~StringOption(); - - const std::string& getValue() const; - virtual bool needsValue() const; - virtual void setValue(const std::string& value); - }; - - class BoolOption : public Option{ - const bool defaultValue; - bool value; - public: - BoolOption(char flag, const std::string& name, const std::string& desc, const bool value = 0); - BoolOption(const std::string& name, const std::string& desc, const bool value = 0); - virtual ~BoolOption(); - - bool getValue() const; - virtual bool needsValue() const; - virtual void setValue(const std::string& value); - virtual void setValue(bool _value) { value = _value; } - }; - - BoolOption daemon; - BoolOption trace; - IntOption port; - IntOption workerThreads; - IntOption maxConnections; - IntOption connectionBacklog; - StringOption store; - LongOption stagingThreshold; - BoolOption help; - BoolOption version; - char const *programName; - - typedef std::vector<Option*>::iterator op_iterator; - std::vector<Option*> options; - - public: - - struct BadOptionException : public Exception { - template<class T> - BadOptionException(const T& msg) : Exception(msg) {} - }; - - - class ParseException : public Exception { - public: - template <class T> - ParseException(const T& msg) : Exception(msg) {} - }; - - - Configuration(); - ~Configuration(); - - void parse(char const*, int argc, char** argv); - - bool isHelp() const; - bool isVersion() const; - bool isDaemon() const; - bool isTrace() const; - int getPort() const; - int getWorkerThreads() const; - int getMaxConnections() const; - int getConnectionBacklog() const; - const std::string& getStore() const; - long getStagingThreshold() const; - - void setHelp(bool b) { help.setValue(b); } - void setVersion(bool b) { version.setValue(b); } - void setDaemon(bool b) { daemon.setValue(b); } - void setTrace(bool b) { trace.setValue(b); } - void setPort(int i) { port.setValue(i); } - void setWorkerThreads(int i) { workerThreads.setValue(i); } - void setMaxConnections(int i) { maxConnections.setValue(i); } - void setConnectionBacklog(int i) { connectionBacklog.setValue(i); } - void setStore(const std::string& s) { store.setValue(s); } - void setStagingThreshold(long l) { stagingThreshold.setValue(l); } - - void usage(); -}; -} -} - - -#endif diff --git a/qpid/cpp-0-9/lib/broker/Connection.cpp b/qpid/cpp-0-9/lib/broker/Connection.cpp deleted file mode 100644 index ae0114cba9..0000000000 --- a/qpid/cpp-0-9/lib/broker/Connection.cpp +++ /dev/null @@ -1,128 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <iostream> -#include <assert.h> - -#include "Connection.h" -#include "BrokerChannel.h" -#include "AMQP_ClientProxy.h" -#include "BrokerAdapter.h" - -using namespace boost; -using namespace qpid::sys; -using namespace qpid::framing; -using namespace qpid::sys; - -namespace qpid { -namespace broker { - -Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) : - broker(broker_), - out(out_), - framemax(65536), - heartbeat(0), - client(0), - timeout(broker.getTimeout()), - stagingThreshold(broker.getStagingThreshold()) -{} - - -Queue::shared_ptr Connection::getQueue(const string& name, uint16_t channel){ - Queue::shared_ptr queue; - if (name.empty()) { - queue = getChannel(channel).getDefaultQueue(); - if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" ); - } else { - queue = broker.getQueues().find(name); - if (queue == 0) { - throw ChannelException( 404, "Queue not found: " + name); - } - } - return queue; -} - - -Exchange::shared_ptr Connection::findExchange(const string& name){ - return broker.getExchanges().get(name); -} - - -void Connection::received(framing::AMQFrame* frame){ - getChannel(frame->getChannel()).handleBody(frame->getBody()); -} - -void Connection::close( - ReplyCode code, const string& text, ClassId classId, MethodId methodId) -{ - client->close(code, text, classId, methodId); - getOutput().close(); -} - -void Connection::initiated(framing::ProtocolInitiation* header) { - version = ProtocolVersion(header->getMajor(), header->getMinor()); - FieldTable properties; - string mechanisms("PLAIN"); - string locales("en_US"); - getChannel(0).init(0, *out, getVersion()); - client = &getChannel(0).getAdatper().getProxy().getConnection(); - client->start( - header->getMajor(), header->getMinor(), - properties, mechanisms, locales); -} - -void Connection::idleOut(){} - -void Connection::idleIn(){} - -void Connection::closed(){ - try { - while (!exclusiveQueues.empty()) { - broker.getQueues().destroy(exclusiveQueues.front()->getName()); - exclusiveQueues.erase(exclusiveQueues.begin()); - } - } catch(std::exception& e) { - std::cout << "Caught unhandled exception while closing session: " << - e.what() << std::endl; - assert(0); - } -} - -void Connection::closeChannel(uint16_t id) { - ChannelMap::iterator i = channels.find(id); - if (i != channels.end()) - i->close(); -} - - -Channel& Connection::getChannel(ChannelId id) { - ChannelMap::iterator i = channels.find(id); - if (i == channels.end()) { - i = channels.insert( - id, new Channel( - *this, id, framemax, broker.getQueues().getStore(), - broker.getStagingThreshold())).first; - } - return *i; -} - - -}} - diff --git a/qpid/cpp-0-9/lib/broker/Connection.h b/qpid/cpp-0-9/lib/broker/Connection.h deleted file mode 100644 index 1314ccbd97..0000000000 --- a/qpid/cpp-0-9/lib/broker/Connection.h +++ /dev/null @@ -1,108 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _Connection_ -#define _Connection_ - -#include <sstream> -#include <vector> - -#include <boost/ptr_container/ptr_map.hpp> - -#include <AMQFrame.h> -#include <AMQP_ServerOperations.h> -#include <AMQP_ClientProxy.h> -#include <sys/ConnectionOutputHandler.h> -#include <sys/ConnectionInputHandler.h> -#include <sys/TimeoutHandler.h> -#include "framing/ProtocolVersion.h" -#include "Broker.h" -#include "Exception.h" -#include "BrokerChannel.h" - -namespace qpid { -namespace broker { - -class Channel; - -class Connection : public sys::ConnectionInputHandler, - public ConnectionToken -{ - public: - Connection(sys::ConnectionOutputHandler* out, Broker& broker); - - /** Get a channel. Create if it does not already exist */ - Channel& getChannel(framing::ChannelId channel); - - /** Close a channel */ - void closeChannel(framing::ChannelId channel); - - /** Close the connection */ - void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId); - - sys::ConnectionOutputHandler& getOutput() const { return *out; } - framing::ProtocolVersion getVersion() const { return version; } - - uint32_t getFrameMax() const { return framemax; } - uint16_t getHeartbeat() const { return heartbeat; } - uint32_t getTimeout() const { return timeout; } - uint64_t getStagingThreshold() const { return stagingThreshold; } - - void setFrameMax(uint32_t fm) { framemax = fm; } - void setHeartbeat(uint16_t hb) { heartbeat = hb; } - - /** - * Get named queue, never returns 0. - * @return: named queue or default queue for channel if name="" - * @exception: ChannelException if no queue of that name is found. - * @exception: ConnectionException if name="" and channel has no default. - */ - Queue::shared_ptr getQueue(const string& name, uint16_t channel); - - Broker& broker; - std::vector<Queue::shared_ptr> exclusiveQueues; - - // ConnectionInputHandler methods - void received(framing::AMQFrame* frame); - void initiated(framing::ProtocolInitiation* header); - void idleOut(); - void idleIn(); - void closed(); - - private: - typedef boost::ptr_map<framing::ChannelId, Channel> ChannelMap; - - typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; - Exchange::shared_ptr findExchange(const string& name); - - framing::ProtocolVersion version; - ChannelMap channels; - sys::ConnectionOutputHandler* out; - uint32_t framemax; - uint16_t heartbeat; - framing::AMQP_ClientProxy::Connection* client; - const uint32_t timeout; //timeout for auto-deleted queues (in ms) - const uint64_t stagingThreshold; - -}; - -}} - -#endif diff --git a/qpid/cpp-0-9/lib/broker/ConnectionFactory.cpp b/qpid/cpp-0-9/lib/broker/ConnectionFactory.cpp deleted file mode 100644 index 20485dd0e1..0000000000 --- a/qpid/cpp-0-9/lib/broker/ConnectionFactory.cpp +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <ConnectionFactory.h> -#include <Connection.h> - -namespace qpid { -namespace broker { - - -ConnectionFactory::ConnectionFactory(Broker& b) : broker(b) -{} - - -ConnectionFactory::~ConnectionFactory() -{ - broker.getCleaner().stop(); -} - -qpid::sys::ConnectionInputHandler* -ConnectionFactory::create(qpid::sys::ConnectionOutputHandler* out) -{ - return new Connection(out, broker); -} - -}} // namespace qpid::broker diff --git a/qpid/cpp-0-9/lib/broker/ConnectionFactory.h b/qpid/cpp-0-9/lib/broker/ConnectionFactory.h deleted file mode 100644 index 9147384b2a..0000000000 --- a/qpid/cpp-0-9/lib/broker/ConnectionFactory.h +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _ConnectionFactory_ -#define _ConnectionFactory_ - -#include "ConnectionInputHandlerFactory.h" - -namespace qpid { -namespace broker { -class Broker; - -class ConnectionFactory : public qpid::sys::ConnectionInputHandlerFactory -{ - public: - ConnectionFactory(Broker& b); - - virtual qpid::sys::ConnectionInputHandler* create( - qpid::sys::ConnectionOutputHandler* ctxt); - - virtual ~ConnectionFactory(); - - private: - Broker& broker; -}; - -}} - - -#endif diff --git a/qpid/cpp-0-9/lib/broker/ConnectionToken.h b/qpid/cpp-0-9/lib/broker/ConnectionToken.h deleted file mode 100644 index 7e7f813d0e..0000000000 --- a/qpid/cpp-0-9/lib/broker/ConnectionToken.h +++ /dev/null @@ -1,38 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _ConnectionToken_ -#define _ConnectionToken_ - -namespace qpid { - namespace broker { - /** - * An empty interface allowing opaque implementations of some - * form of token to identify a connection. - */ - class ConnectionToken{ - public: - virtual ~ConnectionToken(){} - }; - } -} - - -#endif diff --git a/qpid/cpp-0-9/lib/broker/Consumer.h b/qpid/cpp-0-9/lib/broker/Consumer.h deleted file mode 100644 index 26deef4a26..0000000000 --- a/qpid/cpp-0-9/lib/broker/Consumer.h +++ /dev/null @@ -1,37 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _Consumer_ -#define _Consumer_ - -#include <BrokerMessage.h> - -namespace qpid { - namespace broker { - class Consumer{ - public: - virtual bool deliver(Message::shared_ptr& msg) = 0; - virtual ~Consumer(){} - }; - } -} - - -#endif diff --git a/qpid/cpp-0-9/lib/broker/Content.h b/qpid/cpp-0-9/lib/broker/Content.h deleted file mode 100644 index b65a454778..0000000000 --- a/qpid/cpp-0-9/lib/broker/Content.h +++ /dev/null @@ -1,64 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _Content_ -#define _Content_ - -#include <boost/function.hpp> - -#include <AMQContentBody.h> -#include <Buffer.h> -#include <OutputHandler.h> - -namespace qpid { - -namespace framing { -class ChannelAdapter; -} - -namespace broker { -class Content{ - public: - typedef std::string DataBlock; - typedef boost::function1<void, const DataBlock&> SendFn; - - virtual ~Content(){} - - /** Add a block of data to the content */ - virtual void add(framing::AMQContentBody::shared_ptr data) = 0; - - /** Total size of content in bytes */ - virtual uint32_t size() = 0; - - /** - * Iterate over the content calling SendFn for each block. - * Subdivide blocks if necessary to ensure each block is - * <= framesize bytes long. - */ - virtual void send(framing::ChannelAdapter& channel, uint32_t framesize) = 0; - - //FIXME aconway 2007-02-07: This is inconsistently implemented - //find out what is needed. - virtual void encode(qpid::framing::Buffer& buffer) = 0; -}; -}} - - -#endif diff --git a/qpid/cpp-0-9/lib/broker/DeletingTxOp.cpp b/qpid/cpp-0-9/lib/broker/DeletingTxOp.cpp deleted file mode 100644 index 25fe9c98db..0000000000 --- a/qpid/cpp-0-9/lib/broker/DeletingTxOp.cpp +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <DeletingTxOp.h> - -using namespace qpid::broker; - -DeletingTxOp::DeletingTxOp(TxOp* const _delegate) : delegate(_delegate){} - -bool DeletingTxOp::prepare(TransactionContext* ctxt) throw(){ - return delegate && delegate->prepare(ctxt); -} - -void DeletingTxOp::commit() throw(){ - if(delegate){ - delegate->commit(); - delete delegate; - delegate = 0; - } -} - -void DeletingTxOp::rollback() throw(){ - if(delegate){ - delegate->rollback(); - delete delegate; - delegate = 0; - } -} diff --git a/qpid/cpp-0-9/lib/broker/DeletingTxOp.h b/qpid/cpp-0-9/lib/broker/DeletingTxOp.h deleted file mode 100644 index 3e026cd4ca..0000000000 --- a/qpid/cpp-0-9/lib/broker/DeletingTxOp.h +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _DeletingTxOp_ -#define _DeletingTxOp_ - -#include <TxOp.h> - -namespace qpid { - namespace broker { - /** - * TxOp wrapper that will delegate calls & delete the object - * to which it delegates after completion of the transaction. - */ - class DeletingTxOp : public virtual TxOp{ - TxOp* delegate; - public: - DeletingTxOp(TxOp* const delegate); - virtual bool prepare(TransactionContext* ctxt) throw(); - virtual void commit() throw(); - virtual void rollback() throw(); - virtual ~DeletingTxOp(){} - }; - } -} - - -#endif diff --git a/qpid/cpp-0-9/lib/broker/Deliverable.h b/qpid/cpp-0-9/lib/broker/Deliverable.h deleted file mode 100644 index e33443555d..0000000000 --- a/qpid/cpp-0-9/lib/broker/Deliverable.h +++ /dev/null @@ -1,37 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _Deliverable_ -#define _Deliverable_ - -#include <BrokerQueue.h> - -namespace qpid { - namespace broker { - class Deliverable{ - public: - virtual void deliverTo(Queue::shared_ptr& queue) = 0; - virtual ~Deliverable(){} - }; - } -} - - -#endif diff --git a/qpid/cpp-0-9/lib/broker/DeliverableMessage.cpp b/qpid/cpp-0-9/lib/broker/DeliverableMessage.cpp deleted file mode 100644 index b9c89da690..0000000000 --- a/qpid/cpp-0-9/lib/broker/DeliverableMessage.cpp +++ /dev/null @@ -1,33 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <DeliverableMessage.h> - -using namespace qpid::broker; - -DeliverableMessage::DeliverableMessage(Message::shared_ptr& _msg) : msg(_msg) -{ -} - -void DeliverableMessage::deliverTo(Queue::shared_ptr& queue) -{ - queue->deliver(msg); -} - diff --git a/qpid/cpp-0-9/lib/broker/DeliverableMessage.h b/qpid/cpp-0-9/lib/broker/DeliverableMessage.h deleted file mode 100644 index 962f0da640..0000000000 --- a/qpid/cpp-0-9/lib/broker/DeliverableMessage.h +++ /dev/null @@ -1,41 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _DeliverableMessage_ -#define _DeliverableMessage_ - -#include <Deliverable.h> -#include <BrokerMessage.h> -#include <BrokerQueue.h> - -namespace qpid { - namespace broker { - class DeliverableMessage : public Deliverable{ - Message::shared_ptr msg; - public: - DeliverableMessage(Message::shared_ptr& msg); - virtual void deliverTo(Queue::shared_ptr& queue); - virtual ~DeliverableMessage(){} - }; - } -} - - -#endif diff --git a/qpid/cpp-0-9/lib/broker/DeliveryRecord.cpp b/qpid/cpp-0-9/lib/broker/DeliveryRecord.cpp deleted file mode 100644 index 0d2e5325c5..0000000000 --- a/qpid/cpp-0-9/lib/broker/DeliveryRecord.cpp +++ /dev/null @@ -1,91 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <DeliveryRecord.h> -#include <BrokerChannel.h> - -using namespace qpid::broker; -using std::string; - -DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg, - Queue::shared_ptr _queue, - const string _consumerTag, - const uint64_t _deliveryTag) : msg(_msg), - queue(_queue), - consumerTag(_consumerTag), - deliveryTag(_deliveryTag), - pull(false){} - -DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg, - Queue::shared_ptr _queue, - const uint64_t _deliveryTag) : msg(_msg), - queue(_queue), - consumerTag(""), - deliveryTag(_deliveryTag), - pull(true){} - - -void DeliveryRecord::discard(TransactionContext* ctxt, const std::string* const xid) const{ - queue->dequeue(ctxt, msg, xid); -} - -void DeliveryRecord::discard() const{ - discard(0, 0); -} - -bool DeliveryRecord::matches(uint64_t tag) const{ - return deliveryTag == tag; -} - -bool DeliveryRecord::coveredBy(const AccumulatedAck* const range) const{ - return range->covers(deliveryTag); -} - -void DeliveryRecord::redeliver(Channel* const channel) const{ - if(pull){ - //if message was originally sent as response to get, we must requeue it - requeue(); - }else{ - channel->deliver(msg, consumerTag, deliveryTag); - } -} - -void DeliveryRecord::requeue() const{ - msg->redeliver(); - queue->process(msg); -} - -void DeliveryRecord::addTo(Prefetch* const prefetch) const{ - if(!pull){ - //ignore 'pulled' messages (i.e. those that were sent in - //response to get) when calculating prefetch - prefetch->size += msg->contentSize(); - prefetch->count++; - } -} - -void DeliveryRecord::subtractFrom(Prefetch* const prefetch) const{ - if(!pull){ - //ignore 'pulled' messages (i.e. those that were sent in - //response to get) when calculating prefetch - prefetch->size -= msg->contentSize(); - prefetch->count--; - } -} diff --git a/qpid/cpp-0-9/lib/broker/DeliveryRecord.h b/qpid/cpp-0-9/lib/broker/DeliveryRecord.h deleted file mode 100644 index bda2c2ec90..0000000000 --- a/qpid/cpp-0-9/lib/broker/DeliveryRecord.h +++ /dev/null @@ -1,64 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _DeliveryRecord_ -#define _DeliveryRecord_ - -#include <algorithm> -#include <list> -#include <AccumulatedAck.h> -#include <BrokerMessage.h> -#include <Prefetch.h> -#include <BrokerQueue.h> - -namespace qpid { - namespace broker { - class Channel; - - /** - * Record of a delivery for which an ack is outstanding. - */ - class DeliveryRecord{ - mutable Message::shared_ptr msg; - mutable Queue::shared_ptr queue; - std::string consumerTag; - uint64_t deliveryTag; - bool pull; - - public: - DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const std::string consumerTag, const uint64_t deliveryTag); - DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const uint64_t deliveryTag); - - void discard() const; - void discard(TransactionContext* ctxt, const std::string* const xid) const; - bool matches(uint64_t tag) const; - bool coveredBy(const AccumulatedAck* const range) const; - void requeue() const; - void redeliver(Channel* const) const; - void addTo(Prefetch* const prefetch) const; - void subtractFrom(Prefetch* const prefetch) const; - }; - - typedef std::list<DeliveryRecord>::iterator ack_iterator; - } -} - - -#endif diff --git a/qpid/cpp-0-9/lib/broker/DirectExchange.cpp b/qpid/cpp-0-9/lib/broker/DirectExchange.cpp deleted file mode 100644 index c898ae8d7e..0000000000 --- a/qpid/cpp-0-9/lib/broker/DirectExchange.cpp +++ /dev/null @@ -1,73 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <DirectExchange.h> -#include <ExchangeBinding.h> -#include <iostream> - -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::sys; - -DirectExchange::DirectExchange(const string& _name) : Exchange(_name) { - -} - -void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){ - Mutex::ScopedLock l(lock); - std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); - std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue); - if(i == queues.end()){ - bindings[routingKey].push_back(queue); - queue->bound(new ExchangeBinding(this, queue, routingKey, args)); - } -} - -void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ - Mutex::ScopedLock l(lock); - std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); - - std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue); - if(i < queues.end()){ - queues.erase(i); - if(queues.empty()){ - bindings.erase(routingKey); - } - } -} - -void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ - Mutex::ScopedLock l(lock); - std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); - int count(0); - for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){ - msg.deliverTo(*i); - } - if(!count){ - std::cout << "WARNING: DirectExchange " << getName() << " could not route message with key " << routingKey << std::endl; - } -} - -DirectExchange::~DirectExchange(){ - -} - - -const std::string DirectExchange::typeName("direct"); diff --git a/qpid/cpp-0-9/lib/broker/DirectExchange.h b/qpid/cpp-0-9/lib/broker/DirectExchange.h deleted file mode 100644 index a7ef5aca9e..0000000000 --- a/qpid/cpp-0-9/lib/broker/DirectExchange.h +++ /dev/null @@ -1,57 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _DirectExchange_ -#define _DirectExchange_ - -#include <map> -#include <vector> -#include <BrokerExchange.h> -#include <FieldTable.h> -#include <BrokerMessage.h> -#include <sys/Monitor.h> -#include <BrokerQueue.h> - -namespace qpid { -namespace broker { - class DirectExchange : public virtual Exchange{ - std::map<string, std::vector<Queue::shared_ptr> > bindings; - qpid::sys::Mutex lock; - - public: - static const std::string typeName; - - DirectExchange(const std::string& name); - - virtual std::string getType(){ return typeName; } - - virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); - - virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); - - virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args); - - virtual ~DirectExchange(); - }; -} -} - - -#endif diff --git a/qpid/cpp-0-9/lib/broker/ExchangeBinding.cpp b/qpid/cpp-0-9/lib/broker/ExchangeBinding.cpp deleted file mode 100644 index bf2102414d..0000000000 --- a/qpid/cpp-0-9/lib/broker/ExchangeBinding.cpp +++ /dev/null @@ -1,35 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <ExchangeBinding.h> -#include <BrokerExchange.h> - -using namespace qpid::broker; -using namespace qpid::framing; - -ExchangeBinding::ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, const FieldTable* _args) : e(_e), q(_q), key(_key), args(_args){} - -void ExchangeBinding::cancel(){ - e->unbind(q, key, args); - delete this; -} - -ExchangeBinding::~ExchangeBinding(){ -} diff --git a/qpid/cpp-0-9/lib/broker/ExchangeBinding.h b/qpid/cpp-0-9/lib/broker/ExchangeBinding.h deleted file mode 100644 index 2afaa89552..0000000000 --- a/qpid/cpp-0-9/lib/broker/ExchangeBinding.h +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _ExchangeBinding_ -#define _ExchangeBinding_ - -#include <Binding.h> -#include <FieldTable.h> -#include <BrokerQueue.h> - -namespace qpid { - namespace broker { - class Exchange; - class Queue; - - class ExchangeBinding : public virtual Binding{ - Exchange* e; - Queue::shared_ptr q; - const string key; - const qpid::framing::FieldTable* args; - public: - ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, const qpid::framing::FieldTable* _args); - virtual void cancel(); - virtual ~ExchangeBinding(); - }; - } -} - - -#endif - diff --git a/qpid/cpp-0-9/lib/broker/ExchangeRegistry.cpp b/qpid/cpp-0-9/lib/broker/ExchangeRegistry.cpp deleted file mode 100644 index 3e5ed89b54..0000000000 --- a/qpid/cpp-0-9/lib/broker/ExchangeRegistry.cpp +++ /dev/null @@ -1,76 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <ExchangeRegistry.h> -#include <DirectExchange.h> -#include <FanOutExchange.h> -#include <HeadersExchange.h> -#include <TopicExchange.h> - -using namespace qpid::broker; -using namespace qpid::sys; -using std::pair; - -pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type) throw(UnknownExchangeTypeException){ - Mutex::ScopedLock locker(lock); - ExchangeMap::iterator i = exchanges.find(name); - if (i == exchanges.end()) { - Exchange::shared_ptr exchange; - - if(type == TopicExchange::typeName){ - exchange = Exchange::shared_ptr(new TopicExchange(name)); - }else if(type == DirectExchange::typeName){ - exchange = Exchange::shared_ptr(new DirectExchange(name)); - }else if(type == FanOutExchange::typeName){ - exchange = Exchange::shared_ptr(new FanOutExchange(name)); - }else if (type == HeadersExchange::typeName) { - exchange = Exchange::shared_ptr(new HeadersExchange(name)); - }else{ - throw UnknownExchangeTypeException(); - } - exchanges[name] = exchange; - return std::pair<Exchange::shared_ptr, bool>(exchange, true); - } else { - return std::pair<Exchange::shared_ptr, bool>(i->second, false); - } -} - -void ExchangeRegistry::destroy(const string& name){ - Mutex::ScopedLock locker(lock); - exchanges.erase(name); -} - -Exchange::shared_ptr ExchangeRegistry::get(const string& name){ - Mutex::ScopedLock locker(lock); - Exchange::shared_ptr exchange =exchanges[name]; - if (!exchange) - throw ChannelException(404, "Exchange not found:" + name); - return exchange; -} - -namespace -{ -const std::string empty; -} - -Exchange::shared_ptr ExchangeRegistry::getDefault() -{ - return get(empty); -} diff --git a/qpid/cpp-0-9/lib/broker/ExchangeRegistry.h b/qpid/cpp-0-9/lib/broker/ExchangeRegistry.h deleted file mode 100644 index aeb32753df..0000000000 --- a/qpid/cpp-0-9/lib/broker/ExchangeRegistry.h +++ /dev/null @@ -1,47 +0,0 @@ -#ifndef _broker_ExchangeRegistry_h -#define _broker_ExchangeRegistry_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <map> -#include <BrokerExchange.h> -#include <sys/Monitor.h> - -namespace qpid { -namespace broker { - struct UnknownExchangeTypeException{}; - - class ExchangeRegistry{ - typedef std::map<std::string, Exchange::shared_ptr> ExchangeMap; - ExchangeMap exchanges; - qpid::sys::Mutex lock; - public: - std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type) throw(UnknownExchangeTypeException); - void destroy(const std::string& name); - Exchange::shared_ptr get(const std::string& name); - Exchange::shared_ptr getDefault(); - }; -} -} - - -#endif /*!_broker_ExchangeRegistry_h*/ diff --git a/qpid/cpp-0-9/lib/broker/FanOutExchange.cpp b/qpid/cpp-0-9/lib/broker/FanOutExchange.cpp deleted file mode 100644 index 48afcc20d5..0000000000 --- a/qpid/cpp-0-9/lib/broker/FanOutExchange.cpp +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <FanOutExchange.h> -#include <ExchangeBinding.h> -#include <algorithm> - -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::sys; - -FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {} - -void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){ - Mutex::ScopedLock locker(lock); - // Add if not already present. - Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue); - if (i == bindings.end()) { - bindings.push_back(queue); - queue->bound(new ExchangeBinding(this, queue, routingKey, args)); - } -} - -void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* /*args*/){ - Mutex::ScopedLock locker(lock); - Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue); - if (i != bindings.end()) { - bindings.erase(i); - // TODO aconway 2006-09-14: What about the ExchangeBinding object? - // Don't we have to verify routingKey/args match? - } -} - -void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){ - Mutex::ScopedLock locker(lock); - for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){ - msg.deliverTo(*i); - } -} - -FanOutExchange::~FanOutExchange() {} - -const std::string FanOutExchange::typeName("fanout"); diff --git a/qpid/cpp-0-9/lib/broker/FanOutExchange.h b/qpid/cpp-0-9/lib/broker/FanOutExchange.h deleted file mode 100644 index 6dc70e69bb..0000000000 --- a/qpid/cpp-0-9/lib/broker/FanOutExchange.h +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _FanOutExchange_ -#define _FanOutExchange_ - -#include <map> -#include <vector> -#include <BrokerExchange.h> -#include <FieldTable.h> -#include <BrokerMessage.h> -#include <sys/Monitor.h> -#include <BrokerQueue.h> - -namespace qpid { -namespace broker { - -class FanOutExchange : public virtual Exchange { - std::vector<Queue::shared_ptr> bindings; - qpid::sys::Mutex lock; - - public: - static const std::string typeName; - - FanOutExchange(const std::string& name); - - virtual std::string getType(){ return typeName; } - - virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); - - virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); - - virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args); - - virtual ~FanOutExchange(); -}; - -} -} - - - -#endif diff --git a/qpid/cpp-0-9/lib/broker/HandlerImpl.h b/qpid/cpp-0-9/lib/broker/HandlerImpl.h deleted file mode 100644 index c55a36da45..0000000000 --- a/qpid/cpp-0-9/lib/broker/HandlerImpl.h +++ /dev/null @@ -1,71 +0,0 @@ -#ifndef _broker_HandlerImpl_h -#define _broker_HandlerImpl_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "BrokerChannel.h" -#include "AMQP_ClientProxy.h" - -namespace qpid { - -namespace framing { -class AMQP_ClientProxy; -} - -namespace broker { - -class Broker; -class Channel; -class Connection; - -/** - * A collection of references to the core objects required by an adapter, - * and a client proxy. - */ -struct CoreRefs -{ - CoreRefs(Channel& ch, Connection& c, Broker& b) - : channel(ch), connection(c), broker(b), proxy(ch) {} - - Channel& channel; - Connection& connection; - Broker& broker; - framing::AMQP_ClientProxy proxy; -}; - - -/** - * Base template for protocol handler implementations. - * Provides the core references and appropriate AMQP class proxy. - */ -template <class ProxyType> -struct HandlerImpl : public CoreRefs { - typedef HandlerImpl<ProxyType> HandlerImplType; - HandlerImpl(CoreRefs& parent) - : CoreRefs(parent), client(ProxyType::get(proxy)) {} - ProxyType client; -}; - - - -}} // namespace qpid::broker - - - -#endif /*!_broker_HandlerImpl_h*/ diff --git a/qpid/cpp-0-9/lib/broker/HeadersExchange.cpp b/qpid/cpp-0-9/lib/broker/HeadersExchange.cpp deleted file mode 100644 index acd344725a..0000000000 --- a/qpid/cpp-0-9/lib/broker/HeadersExchange.cpp +++ /dev/null @@ -1,121 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <HeadersExchange.h> -#include <ExchangeBinding.h> -#include <Value.h> -#include <QpidError.h> -#include <algorithm> - - -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::sys; - -// TODO aconway 2006-09-20: More efficient matching algorithm. -// The current search algorithm really sucks. -// Fieldtables are heavy, maybe use shared_ptr to do handle-body. - -using namespace qpid::broker; - -namespace { - const std::string all("all"); - const std::string any("any"); - const std::string x_match("x-match"); -} - -HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { } - -void HeadersExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){ - Mutex::ScopedLock locker(lock); - std::string what = args->getString("x-match"); - if (what != all && what != any) { - THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to headers exchange."); - } - bindings.push_back(Binding(*args, queue)); - queue->bound(new ExchangeBinding(this, queue, routingKey, args)); -} - -void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){ - Mutex::ScopedLock locker(lock); - Bindings::iterator i = - std::find(bindings.begin(),bindings.end(), Binding(*args, queue)); - if (i != bindings.end()) bindings.erase(i); -} - - -void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){ - Mutex::ScopedLock locker(lock);; - for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) { - if (match(i->first, *args)) msg.deliverTo(i->second); - } -} - -HeadersExchange::~HeadersExchange() {} - -const std::string HeadersExchange::typeName("headers"); - -namespace -{ - - bool match_values(const Value& bind, const Value& msg) { - return dynamic_cast<const EmptyValue*>(&bind) || bind == msg; - } - -} - - -bool HeadersExchange::match(const FieldTable& bind, const FieldTable& msg) { - typedef FieldTable::ValueMap Map; - std::string what = bind.getString(x_match); - if (what == all) { - for (Map::const_iterator i = bind.getMap().begin(); - i != bind.getMap().end(); - ++i) - { - if (i->first != x_match) - { - Map::const_iterator j = msg.getMap().find(i->first); - if (j == msg.getMap().end()) return false; - if (!match_values(*(i->second), *(j->second))) return false; - } - } - return true; - } else if (what == any) { - for (Map::const_iterator i = bind.getMap().begin(); - i != bind.getMap().end(); - ++i) - { - if (i->first != x_match) - { - Map::const_iterator j = msg.getMap().find(i->first); - if (j != msg.getMap().end()) { - if (match_values(*(i->second), *(j->second))) return true; - } - } - } - return false; - } else { - return false; - } -} - - - diff --git a/qpid/cpp-0-9/lib/broker/HeadersExchange.h b/qpid/cpp-0-9/lib/broker/HeadersExchange.h deleted file mode 100644 index 5e8da5ad85..0000000000 --- a/qpid/cpp-0-9/lib/broker/HeadersExchange.h +++ /dev/null @@ -1,65 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _HeadersExchange_ -#define _HeadersExchange_ - -#include <vector> -#include <BrokerExchange.h> -#include <FieldTable.h> -#include <BrokerMessage.h> -#include <sys/Monitor.h> -#include <BrokerQueue.h> - -namespace qpid { -namespace broker { - - -class HeadersExchange : public virtual Exchange { - typedef std::pair<qpid::framing::FieldTable, Queue::shared_ptr> Binding; - typedef std::vector<Binding> Bindings; - - Bindings bindings; - qpid::sys::Mutex lock; - - public: - static const std::string typeName; - - HeadersExchange(const string& name); - - virtual std::string getType(){ return typeName; } - - virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args); - - virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args); - - virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args); - - virtual ~HeadersExchange(); - - static bool match(const qpid::framing::FieldTable& bindArgs, const qpid::framing::FieldTable& msgArgs); -}; - - - -} -} - -#endif diff --git a/qpid/cpp-0-9/lib/broker/InMemoryContent.cpp b/qpid/cpp-0-9/lib/broker/InMemoryContent.cpp deleted file mode 100644 index 3e4ac29486..0000000000 --- a/qpid/cpp-0-9/lib/broker/InMemoryContent.cpp +++ /dev/null @@ -1,73 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <InMemoryContent.h> -#include "AMQFrame.h" -#include "framing/ChannelAdapter.h" - -using namespace qpid::broker; -using namespace qpid::framing; -using boost::static_pointer_cast; - -void InMemoryContent::add(AMQContentBody::shared_ptr data) -{ - content.push_back(data); -} - -uint32_t InMemoryContent::size() -{ - int sum(0); - for (content_iterator i = content.begin(); i != content.end(); i++) { - sum += (*i)->size(); - } - return sum; -} - -// FIXME aconway 2007-02-01: Remove version parameter. -void InMemoryContent::send(ChannelAdapter& channel, uint32_t framesize) -{ - for (content_iterator i = content.begin(); i != content.end(); i++) { - if ((*i)->size() > framesize) { - uint32_t offset = 0; - for (int chunk = (*i)->size() / framesize; chunk > 0; chunk--) { - string data = (*i)->getData().substr(offset, framesize); - channel.send(new AMQContentBody(data)); - offset += framesize; - } - uint32_t remainder = (*i)->size() % framesize; - if (remainder) { - string data = (*i)->getData().substr(offset, remainder); - channel.send(new AMQContentBody(data)); - } - } else { - AMQBody::shared_ptr contentBody = - static_pointer_cast<AMQBody, AMQContentBody>(*i); - channel.send(contentBody); - } - } -} - -void InMemoryContent::encode(Buffer& buffer) -{ - for (content_iterator i = content.begin(); i != content.end(); i++) { - (*i)->encode(buffer); - } -} - diff --git a/qpid/cpp-0-9/lib/broker/InMemoryContent.h b/qpid/cpp-0-9/lib/broker/InMemoryContent.h deleted file mode 100644 index 7a58ace3a7..0000000000 --- a/qpid/cpp-0-9/lib/broker/InMemoryContent.h +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _InMemoryContent_ -#define _InMemoryContent_ - -#include <Content.h> -#include <vector> - - -namespace qpid { - namespace broker { - class InMemoryContent : public Content{ - typedef std::vector<qpid::framing::AMQContentBody::shared_ptr> content_list; - typedef content_list::iterator content_iterator; - - content_list content; - public: - void add(qpid::framing::AMQContentBody::shared_ptr data); - uint32_t size(); - void send(framing::ChannelAdapter&, uint32_t framesize); - void encode(qpid::framing::Buffer& buffer); - }; - } -} - - -#endif diff --git a/qpid/cpp-0-9/lib/broker/LazyLoadedContent.cpp b/qpid/cpp-0-9/lib/broker/LazyLoadedContent.cpp deleted file mode 100644 index 131943b448..0000000000 --- a/qpid/cpp-0-9/lib/broker/LazyLoadedContent.cpp +++ /dev/null @@ -1,68 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <LazyLoadedContent.h> -#include "AMQFrame.h" -#include "framing/ChannelAdapter.h" - -using namespace qpid::broker; -using namespace qpid::framing; - -LazyLoadedContent::~LazyLoadedContent() -{ - store->destroy(msg); -} - -LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, Message* const _msg, uint64_t _expectedSize) : - store(_store), msg(_msg), expectedSize(_expectedSize) {} - -void LazyLoadedContent::add(AMQContentBody::shared_ptr data) -{ - store->appendContent(msg, data->getData()); -} - -uint32_t LazyLoadedContent::size() -{ - return 0;//all content is written as soon as it is added -} - -void LazyLoadedContent::send(ChannelAdapter& channel, uint32_t framesize) -{ - if (expectedSize > framesize) { - for (uint64_t offset = 0; offset < expectedSize; offset += framesize) - { - uint64_t remaining = expectedSize - offset; - string data; - store->loadContent(msg, data, offset, - remaining > framesize ? framesize : remaining); - channel.send(new AMQContentBody(data)); - } - } else { - string data; - store->loadContent(msg, data, 0, expectedSize); - channel.send(new AMQContentBody(data)); - } -} - -void LazyLoadedContent::encode(Buffer&) -{ - //do nothing as all content is written as soon as it is added -} - diff --git a/qpid/cpp-0-9/lib/broker/LazyLoadedContent.h b/qpid/cpp-0-9/lib/broker/LazyLoadedContent.h deleted file mode 100644 index e000a4ef69..0000000000 --- a/qpid/cpp-0-9/lib/broker/LazyLoadedContent.h +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _LazyLoadedContent_ -#define _LazyLoadedContent_ - -#include <Content.h> -#include <MessageStore.h> - -namespace qpid { - namespace broker { - class LazyLoadedContent : public Content{ - MessageStore* const store; - Message* const msg; - const uint64_t expectedSize; - public: - LazyLoadedContent( - MessageStore* const store, Message* const msg, - uint64_t expectedSize); - ~LazyLoadedContent(); - void add(qpid::framing::AMQContentBody::shared_ptr data); - uint32_t size(); - void send( - framing::ChannelAdapter&, - uint32_t framesize); - void encode(qpid::framing::Buffer& buffer); - }; - } -} - - -#endif diff --git a/qpid/cpp-0-9/lib/broker/Makefile.am b/qpid/cpp-0-9/lib/broker/Makefile.am deleted file mode 100644 index 68649c2b28..0000000000 --- a/qpid/cpp-0-9/lib/broker/Makefile.am +++ /dev/null @@ -1,96 +0,0 @@ -AM_CXXFLAGS = $(WARNING_CFLAGS) -INCLUDES = \ - -I$(top_srcdir)/gen \ - -I$(top_srcdir)/lib/common \ - -I$(top_srcdir)/lib/common/sys \ - -I$(top_srcdir)/lib/common/framing \ - $(APR_CXXFLAGS) - -lib_LTLIBRARIES = libqpidbroker.la -libqpidbroker_la_LIBADD = ../common/libqpidcommon.la -libqpidbroker_la_LDFLAGS = -version-info $(LIBTOOL_VERSION_INFO_ARG) -libqpidbroker_la_SOURCES = \ - AccumulatedAck.cpp \ - AccumulatedAck.h \ - AutoDelete.cpp \ - AutoDelete.h \ - Binding.h \ - Broker.cpp \ - Broker.h \ - BrokerSingleton.cpp \ - BrokerSingleton.h \ - BrokerChannel.cpp \ - BrokerChannel.h \ - BrokerExchange.h \ - BrokerMessage.cpp \ - BrokerMessage.h \ - BrokerMessageMessage.cpp \ - BrokerMessageMessage.h \ - BrokerQueue.cpp \ - BrokerQueue.h \ - Configuration.cpp \ - Configuration.h \ - ConnectionToken.h \ - Consumer.h \ - Content.h \ - DeletingTxOp.cpp \ - DeletingTxOp.h \ - Deliverable.h \ - DeliverableMessage.cpp \ - DeliverableMessage.h \ - DeliveryRecord.cpp \ - DeliveryRecord.h \ - DirectExchange.cpp \ - DirectExchange.h \ - ExchangeBinding.cpp \ - ExchangeBinding.h \ - ExchangeRegistry.cpp \ - ExchangeRegistry.h \ - FanOutExchange.cpp \ - FanOutExchange.h \ - HeadersExchange.cpp \ - HeadersExchange.h \ - InMemoryContent.cpp \ - InMemoryContent.h \ - LazyLoadedContent.cpp \ - LazyLoadedContent.h \ - MessageBuilder.cpp \ - MessageBuilder.h \ - MessageStore.h \ - MessageStoreModule.cpp \ - MessageStoreModule.h \ - NameGenerator.cpp \ - NameGenerator.h \ - NullMessageStore.cpp \ - NullMessageStore.h \ - Prefetch.h \ - QueuePolicy.cpp \ - QueuePolicy.h \ - QueueRegistry.cpp \ - QueueRegistry.h \ - RecoveryManager.cpp \ - RecoveryManager.h \ - Reference.cpp \ - Reference.h \ - ConnectionFactory.cpp \ - ConnectionFactory.h \ - Connection.cpp \ - Connection.h \ - BrokerAdapter.cpp \ - BrokerAdapter.h \ - MessageHandlerImpl.cpp \ - MessageHandlerImpl.h \ - TopicExchange.cpp \ - TopicExchange.h \ - TransactionalStore.h \ - TxAck.cpp \ - TxAck.h \ - TxBuffer.cpp \ - TxBuffer.h \ - TxOp.h \ - TxPublish.cpp \ - TxPublish.h - - -# Force build during dist phase so help2man will work. -dist-hook: $(lib_LTLIBRARIES) diff --git a/qpid/cpp-0-9/lib/broker/MessageBuilder.cpp b/qpid/cpp-0-9/lib/broker/MessageBuilder.cpp deleted file mode 100644 index 8bffaef50f..0000000000 --- a/qpid/cpp-0-9/lib/broker/MessageBuilder.cpp +++ /dev/null @@ -1,74 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <MessageBuilder.h> - -#include <InMemoryContent.h> -#include <LazyLoadedContent.h> - -using namespace qpid::broker; -using namespace qpid::framing; -using std::auto_ptr; - -MessageBuilder::MessageBuilder(CompletionHandler* _handler, - MessageStore* const _store, - uint64_t _stagingThreshold -) : - handler(_handler), - store(_store), - stagingThreshold(_stagingThreshold) -{} - -void MessageBuilder::route(){ - if (message->isComplete()) { - if (handler) handler->complete(message); - message.reset(); - } -} - -void MessageBuilder::initialise(Message::shared_ptr& msg){ - if(message.get()){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed."); - } - message = msg; -} - -void MessageBuilder::setHeader(AMQHeaderBody::shared_ptr& header){ - if(!message.get()){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish."); - } - message->setHeader(header); - if (stagingThreshold && header->getContentSize() >= stagingThreshold) { - store->stage(message.get()); - message->releaseContent(store); - } else { - auto_ptr<Content> content(new InMemoryContent()); - message->setContent(content); - } - route(); -} - -void MessageBuilder::addContent(AMQContentBody::shared_ptr& content){ - if(!message.get()){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before publish."); - } - message->addContent(content); - route(); -} diff --git a/qpid/cpp-0-9/lib/broker/MessageBuilder.h b/qpid/cpp-0-9/lib/broker/MessageBuilder.h deleted file mode 100644 index 30834e1075..0000000000 --- a/qpid/cpp-0-9/lib/broker/MessageBuilder.h +++ /dev/null @@ -1,57 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _MessageBuilder_ -#define _MessageBuilder_ - -#include <memory> -#include <QpidError.h> -#include <BrokerExchange.h> -#include <BrokerMessage.h> -#include <MessageStore.h> -#include <AMQContentBody.h> -#include <AMQHeaderBody.h> -#include <BasicPublishBody.h> -#include "CompletionHandler.h" - -namespace qpid { - namespace broker { - class MessageBuilder{ - public: - MessageBuilder(CompletionHandler* _handler, - MessageStore* const store = 0, - uint64_t stagingThreshold = 0); - void initialise(Message::shared_ptr& msg); - void setHeader(framing::AMQHeaderBody::shared_ptr& header); - void addContent(framing::AMQContentBody::shared_ptr& content); - Message::shared_ptr getMessage() { return message; } - private: - Message::shared_ptr message; - CompletionHandler* handler; - MessageStore* const store; - const uint64_t stagingThreshold; - - void route(); - }; - } -} - - -#endif diff --git a/qpid/cpp-0-9/lib/broker/MessageHandlerImpl.cpp b/qpid/cpp-0-9/lib/broker/MessageHandlerImpl.cpp deleted file mode 100644 index fa7c10f26c..0000000000 --- a/qpid/cpp-0-9/lib/broker/MessageHandlerImpl.cpp +++ /dev/null @@ -1,243 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "QpidError.h" -#include "MessageHandlerImpl.h" -#include "BrokerChannel.h" -#include "FramingContent.h" -#include "Connection.h" -#include "Broker.h" -#include "BrokerMessageMessage.h" -#include "MessageAppendBody.h" -#include "MessageTransferBody.h" -#include "BrokerAdapter.h" - -namespace qpid { -namespace broker { - -using namespace framing; - -MessageHandlerImpl::MessageHandlerImpl(CoreRefs& parent) - : HandlerImplType(parent) {} - -// -// Message class method handlers -// - -void -MessageHandlerImpl::cancel(const MethodContext& context, - const string& destination ) -{ - channel.cancel(destination); - client.ok(context.getRequestId()); -} - -void -MessageHandlerImpl::open(const MethodContext& context, - const string& reference) -{ - references.open(reference); - client.ok(context.getRequestId()); -} - -void -MessageHandlerImpl::append(const MethodContext& context, - const string& reference, - const string& /*bytes*/ ) -{ - references.get(reference)->append( - boost::shared_polymorphic_downcast<MessageAppendBody>( - context.methodBody)); - client.ok(context.getRequestId()); -} - -void -MessageHandlerImpl::close(const MethodContext& context, - const string& reference) -{ - Reference::shared_ptr ref = references.get(reference); - client.ok(context.getRequestId()); - - // Send any transfer messages to their correct exchanges and okay them - const Reference::Messages& msgs = ref->getMessages(); - for (Reference::Messages::const_iterator m = msgs.begin(); m != msgs.end(); ++m) { - channel.handleInlineTransfer(*m); - client.ok((*m)->getRequestId()); - } - ref->close(); -} - -void -MessageHandlerImpl::checkpoint(const MethodContext& context, - const string& /*reference*/, - const string& /*identifier*/ ) -{ - // Initial implementation (which is conforming) is to do nothing here - // and return offset zero for the resume - client.ok(context.getRequestId()); -} - -void -MessageHandlerImpl::resume(const MethodContext& context, - const string& reference, - const string& /*identifier*/ ) -{ - // Initial (null) implementation - // open reference and return 0 offset - references.open(reference); - client.offset(0, context.getRequestId()); -} - -void -MessageHandlerImpl::offset(const MethodContext&, - uint64_t /*value*/ ) -{ - // Shouldn't ever receive this as it is reponse to resume - // which is never sent - // TODO astitcher 2007-02-16 What is the correct exception to throw here? - THROW_QPID_ERROR(INTERNAL_ERROR, "impossible"); -} - -void -MessageHandlerImpl::consume(const MethodContext& context, - uint16_t /*ticket*/, - const string& queueName, - const string& destination, - bool noLocal, - bool noAck, - bool exclusive, - const framing::FieldTable& filter ) -{ - Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); - if(!destination.empty() && channel.exists(destination)) - throw ConnectionException(530, "Consumer tags must be unique"); - string tag = destination; - channel.consume( - tag, queue, !noAck, exclusive, - noLocal ? &connection : 0, &filter); - client.ok(context.getRequestId()); - // Dispatch messages as there is now a consumer. - queue->dispatch(); -} - -void -MessageHandlerImpl::get( const MethodContext& context, - uint16_t /*ticket*/, - const string& queueName, - const string& destination, - bool noAck ) -{ - Queue::shared_ptr queue = - connection.getQueue(queueName, context.channel->getId()); - - if(channel.get(queue, destination, !noAck)) - client.ok(context.getRequestId()); - else - client.empty(context.getRequestId()); -} - -void -MessageHandlerImpl::empty( const MethodContext& ) -{ - // Shouldn't ever receive this as it is a response to get - // which is never sent - // TODO astitcher 2007-02-09 What is the correct exception to throw here? - THROW_QPID_ERROR(INTERNAL_ERROR, "Impossible"); -} - -void -MessageHandlerImpl::ok(const MethodContext& /*context*/) -{ - channel.ack(); -} - -void -MessageHandlerImpl::qos(const MethodContext& context, - uint32_t prefetchSize, - uint16_t prefetchCount, - bool /*global*/ ) -{ - //TODO: handle global - channel.setPrefetchSize(prefetchSize); - channel.setPrefetchCount(prefetchCount); - client.ok(context.getRequestId()); -} - -void -MessageHandlerImpl::recover(const MethodContext& context, - bool requeue) -{ - channel.recover(requeue); - client.ok(context.getRequestId()); -} - -void -MessageHandlerImpl::reject(const MethodContext& /*context*/, - uint16_t /*code*/, - const string& /*text*/ ) -{ - channel.ack(); - // channel.requeue(); -} - -void -MessageHandlerImpl::transfer(const MethodContext& context, - uint16_t /*ticket*/, - const string& /* destination */, - bool /*redelivered*/, - bool /*immediate*/, - uint64_t /*ttl*/, - uint8_t /*priority*/, - uint64_t /*timestamp*/, - uint8_t /*deliveryMode*/, - uint64_t /*expiration*/, - const string& /*exchangeName*/, - const string& /*routingKey*/, - const string& /*messageId*/, - const string& /*correlationId*/, - const string& /*replyTo*/, - const string& /*contentType*/, - const string& /*contentEncoding*/, - const string& /*userId*/, - const string& /*appId*/, - const string& /*transactionId*/, - const string& /*securityToken*/, - const framing::FieldTable& /*applicationHeaders*/, - const framing::Content& body, - bool /*mandatory*/) -{ - MessageTransferBody::shared_ptr transfer( - boost::shared_polymorphic_downcast<MessageTransferBody>( - context.methodBody)); - RequestId requestId = context.getRequestId(); - - if (body.isInline()) { - MessageMessage::shared_ptr message( - new MessageMessage(&connection, requestId, transfer)); - channel.handleInlineTransfer(message); - client.ok(requestId); - } else { - Reference::shared_ptr ref(references.get(body.getValue())); - MessageMessage::shared_ptr message( - new MessageMessage(&connection, requestId, transfer, ref)); - ref->addMessage(message); - } -} - - -}} // namespace qpid::broker diff --git a/qpid/cpp-0-9/lib/broker/MessageHandlerImpl.h b/qpid/cpp-0-9/lib/broker/MessageHandlerImpl.h deleted file mode 100644 index 872d429d5c..0000000000 --- a/qpid/cpp-0-9/lib/broker/MessageHandlerImpl.h +++ /dev/null @@ -1,130 +0,0 @@ -#ifndef _broker_MessageHandlerImpl_h -#define _broker_MessageHandlerImpl_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <memory> - -#include "AMQP_ServerOperations.h" -#include "AMQP_ClientProxy.h" -#include "Reference.h" -#include "HandlerImpl.h" - -namespace qpid { -namespace broker { - -class Connection; -class Broker; -class MessageMessage; - -class MessageHandlerImpl : - public framing::AMQP_ServerOperations::MessageHandler, - public HandlerImpl<framing::AMQP_ClientProxy::Message> -{ - public: - MessageHandlerImpl(CoreRefs& parent); - - void append(const framing::MethodContext&, - const std::string& reference, - const std::string& bytes ); - - void cancel(const framing::MethodContext&, - const std::string& destination ); - - void checkpoint(const framing::MethodContext&, - const std::string& reference, - const std::string& identifier ); - - void close(const framing::MethodContext&, - const std::string& reference ); - - void consume(const framing::MethodContext&, - uint16_t ticket, - const std::string& queue, - const std::string& destination, - bool noLocal, - bool noAck, - bool exclusive, - const framing::FieldTable& filter ); - - void empty( const framing::MethodContext& ); - - void get(const framing::MethodContext&, - uint16_t ticket, - const std::string& queue, - const std::string& destination, - bool noAck ); - - void offset(const framing::MethodContext&, - uint64_t value ); - - void ok( const framing::MethodContext& ); - - void open(const framing::MethodContext&, - const std::string& reference ); - - void qos(const framing::MethodContext&, - uint32_t prefetchSize, - uint16_t prefetchCount, - bool global ); - - void recover(const framing::MethodContext&, - bool requeue ); - - void reject(const framing::MethodContext&, - uint16_t code, - const std::string& text ); - - void resume(const framing::MethodContext&, - const std::string& reference, - const std::string& identifier ); - - void transfer(const framing::MethodContext&, - uint16_t ticket, - const std::string& destination, - bool redelivered, - bool immediate, - uint64_t ttl, - uint8_t priority, - uint64_t timestamp, - uint8_t deliveryMode, - uint64_t expiration, - const std::string& exchange, - const std::string& routingKey, - const std::string& messageId, - const std::string& correlationId, - const std::string& replyTo, - const std::string& contentType, - const std::string& contentEncoding, - const std::string& userId, - const std::string& appId, - const std::string& transactionId, - const std::string& securityToken, - const framing::FieldTable& applicationHeaders, - const framing::Content& body, - bool mandatory ); - private: - ReferenceRegistry references; -}; - -}} // namespace qpid::broker - - - -#endif /*!_broker_MessageHandlerImpl_h*/ diff --git a/qpid/cpp-0-9/lib/broker/MessageStore.h b/qpid/cpp-0-9/lib/broker/MessageStore.h deleted file mode 100644 index 9e38408886..0000000000 --- a/qpid/cpp-0-9/lib/broker/MessageStore.h +++ /dev/null @@ -1,140 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _MessageStore_ -#define _MessageStore_ - -#include <BrokerMessage.h> -#include <FieldTable.h> -#include <RecoveryManager.h> -#include <TransactionalStore.h> - -namespace qpid { - namespace broker { - struct MessageStoreSettings - { - /** - * Messages whose content length is larger than this value - * will be staged (i.e. will have thier data written to - * disk as it arrives) and will load their data lazily. On - * recovery therefore, only the headers should be loaded. - */ - uint64_t stagingThreshold; - }; - /** - * An abstraction of the persistent storage for messages. (In - * all methods, any pointers/references to queues or messages - * are valid only for the duration of the call). - */ - class MessageStore : public TransactionalStore{ - public: - /** - * Record the existance of a durable queue - */ - virtual void create(const Queue& queue, const qpid::framing::FieldTable& settings) = 0; - /** - * Destroy a durable queue - */ - virtual void destroy(const Queue& queue) = 0; - - /** - * Request recovery of queue and message state from store - */ - virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0) = 0; - - /** - * Stores a messages before it has been enqueued - * (enqueueing automatically stores the message so this is - * only required if storage is required prior to that - * point). If the message has not yet been stored it will - * store the headers as well as any content passed in. A - * persistence id will be set on the message which can be - * used to load the content or to append to it. - */ - virtual void stage(Message* const msg) = 0; - - /** - * Destroys a previously staged message. This only needs - * to be called if the message is never enqueued. (Once - * enqueued, deletion will be automatic when the message - * is dequeued from all queues it was enqueued onto). - */ - virtual void destroy(Message* const msg) = 0; - - /** - * Appends content to a previously staged message - */ - virtual void appendContent(Message* const msg, const std::string& data) = 0; - - /** - * Loads (a section) of content data for the specified - * message (previously stored through a call to stage or - * enqueue) into data. The offset refers to the content - * only (i.e. an offset of 0 implies that the start of the - * content should be loaded, not the headers or related - * meta-data). - */ - virtual void loadContent(Message* const msg, std::string& data, uint64_t offset, uint32_t length) = 0; - - /** - * Enqueues a message, storing the message if it has not - * been previously stored and recording that the given - * message is on the given queue. - * - * @param msg the message to enqueue - * @param queue the name of the queue onto which it is to be enqueued - * @param xid (a pointer to) an identifier of the - * distributed transaction in which the operation takes - * place or null for 'local' transactions - */ - virtual void enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const std::string * const xid) = 0; - /** - * Dequeues a message, recording that the given message is - * no longer on the given queue and deleting the message - * if it is no longer on any other queue. - * - * @param msg the message to dequeue - * @param queue the name of th queue from which it is to be dequeued - * @param xid (a pointer to) an identifier of the - * distributed transaction in which the operation takes - * place or null for 'local' transactions - */ - virtual void dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const std::string * const xid) = 0; - - /** - * Treat all enqueue/dequeues where this xid was specified as being prepared. - */ - virtual void prepared(const std::string * const xid) = 0; - /** - * Treat all enqueue/dequeues where this xid was specified as being committed. - */ - virtual void committed(const std::string * const xid) = 0; - /** - * Treat all enqueue/dequeues where this xid was specified as being aborted. - */ - virtual void aborted(const std::string * const xid) = 0; - - virtual ~MessageStore(){} - }; - } -} - - -#endif diff --git a/qpid/cpp-0-9/lib/broker/MessageStoreModule.cpp b/qpid/cpp-0-9/lib/broker/MessageStoreModule.cpp deleted file mode 100644 index 676e86f84a..0000000000 --- a/qpid/cpp-0-9/lib/broker/MessageStoreModule.cpp +++ /dev/null @@ -1,104 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <MessageStoreModule.h> -#include <iostream> - -using namespace qpid::broker; - -MessageStoreModule::MessageStoreModule(const std::string& name) : store(name) -{ -} - -void MessageStoreModule::create(const Queue& queue, const qpid::framing::FieldTable& settings) -{ - store->create(queue, settings); -} - -void MessageStoreModule::destroy(const Queue& queue) -{ - store->destroy(queue); -} - -void MessageStoreModule::recover(RecoveryManager& registry, const MessageStoreSettings* const settings) -{ - store->recover(registry, settings); -} - -void MessageStoreModule::stage(Message* const msg) -{ - store->stage(msg); -} - -void MessageStoreModule::destroy(Message* const msg) -{ - store->destroy(msg); -} - -void MessageStoreModule::appendContent(Message* const msg, const std::string& data) -{ - store->appendContent(msg, data); -} - -void MessageStoreModule::loadContent(Message* const msg, string& data, uint64_t offset, uint32_t length) -{ - store->loadContent(msg, data, offset, length); -} - -void MessageStoreModule::enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid) -{ - store->enqueue(ctxt, msg, queue, xid); -} - -void MessageStoreModule::dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid) -{ - store->dequeue(ctxt, msg, queue, xid); -} - -void MessageStoreModule::prepared(const string * const xid) -{ - store->prepared(xid); -} - -void MessageStoreModule::committed(const string * const xid) -{ - store->committed(xid); -} - -void MessageStoreModule::aborted(const string * const xid) -{ - store->aborted(xid); -} - -std::auto_ptr<TransactionContext> MessageStoreModule::begin() -{ - return store->begin(); -} - -void MessageStoreModule::commit(TransactionContext* ctxt) -{ - store->commit(ctxt); -} - -void MessageStoreModule::abort(TransactionContext* ctxt) -{ - store->abort(ctxt); -} diff --git a/qpid/cpp-0-9/lib/broker/MessageStoreModule.h b/qpid/cpp-0-9/lib/broker/MessageStoreModule.h deleted file mode 100644 index 27fedbf635..0000000000 --- a/qpid/cpp-0-9/lib/broker/MessageStoreModule.h +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _MessageStoreModule_ -#define _MessageStoreModule_ - -#include <BrokerMessage.h> -#include <MessageStore.h> -#include <BrokerQueue.h> -#include <RecoveryManager.h> -#include <sys/Module.h> - -namespace qpid { - namespace broker { - /** - * A null implementation of the MessageStore interface - */ - class MessageStoreModule : public MessageStore{ - qpid::sys::Module<MessageStore> store; - public: - MessageStoreModule(const std::string& name); - void create(const Queue& queue, const qpid::framing::FieldTable& settings); - void destroy(const Queue& queue); - void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0); - void stage(Message* const msg); - void destroy(Message* const msg); - void appendContent(Message* const msg, const std::string& data); - void loadContent(Message* const msg, std::string& data, uint64_t offset, uint32_t length); - void enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid); - void dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid); - void prepared(const std::string * const xid); - void committed(const std::string * const xid); - void aborted(const std::string * const xid); - std::auto_ptr<TransactionContext> begin(); - void commit(TransactionContext* ctxt); - void abort(TransactionContext* ctxt); - ~MessageStoreModule(){} - }; - } -} - - -#endif diff --git a/qpid/cpp-0-9/lib/broker/NameGenerator.cpp b/qpid/cpp-0-9/lib/broker/NameGenerator.cpp deleted file mode 100644 index 3f281859fa..0000000000 --- a/qpid/cpp-0-9/lib/broker/NameGenerator.cpp +++ /dev/null @@ -1,32 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <NameGenerator.h> -#include <sstream> - -using namespace qpid::broker; - -NameGenerator::NameGenerator(const std::string& _base) : base(_base), counter(1) {} - -std::string NameGenerator::generate(){ - std::stringstream ss; - ss << base << counter++; - return ss.str(); -} diff --git a/qpid/cpp-0-9/lib/broker/NameGenerator.h b/qpid/cpp-0-9/lib/broker/NameGenerator.h deleted file mode 100644 index b2dbbdfb69..0000000000 --- a/qpid/cpp-0-9/lib/broker/NameGenerator.h +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _NameGenerator_ -#define _NameGenerator_ - -#include <BrokerMessage.h> - -namespace qpid { - namespace broker { - class NameGenerator{ - const std::string base; - unsigned int counter; - public: - NameGenerator(const std::string& base); - std::string generate(); - }; - } -} - - -#endif diff --git a/qpid/cpp-0-9/lib/broker/NullMessageStore.cpp b/qpid/cpp-0-9/lib/broker/NullMessageStore.cpp deleted file mode 100644 index bcb15c2ae0..0000000000 --- a/qpid/cpp-0-9/lib/broker/NullMessageStore.cpp +++ /dev/null @@ -1,104 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <NullMessageStore.h> - -#include <BrokerQueue.h> -#include <RecoveryManager.h> - -#include <iostream> - -using namespace qpid::broker; - -NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){} - -void NullMessageStore::create(const Queue& queue, const qpid::framing::FieldTable&) -{ - if (warn) std::cout << "WARNING: Can't create durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl; -} - -void NullMessageStore::destroy(const Queue& queue) -{ - if (warn) std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl; -} - -void NullMessageStore::recover(RecoveryManager&, const MessageStoreSettings* const) -{ - if (warn) std::cout << "WARNING: Persistence not enabled, no recovery of queues or messages." << std::endl; -} - -void NullMessageStore::stage(Message* const) -{ - if (warn) std::cout << "WARNING: Can't stage message. Persistence not enabled." << std::endl; -} - -void NullMessageStore::destroy(Message* const) -{ - if (warn) std::cout << "WARNING: No need to destroy staged message. Persistence not enabled." << std::endl; -} - -void NullMessageStore::appendContent(Message* const, const string&) -{ - if (warn) std::cout << "WARNING: Can't append content. Persistence not enabled." << std::endl; -} - -void NullMessageStore::loadContent(Message* const, string&, uint64_t, uint32_t) -{ - if (warn) std::cout << "WARNING: Can't load content. Persistence not enabled." << std::endl; -} - -void NullMessageStore::enqueue(TransactionContext*, Message* const, const Queue& queue, const string * const) -{ - if (warn) std::cout << "WARNING: Can't enqueue message onto '" << queue.getName() << "'. Persistence not enabled." << std::endl; -} - -void NullMessageStore::dequeue(TransactionContext*, Message* const, const Queue& queue, const string * const) -{ - if (warn) std::cout << "WARNING: Can't dequeue message from '" << queue.getName() << "'. Persistence not enabled." << std::endl; -} - -void NullMessageStore::prepared(const string * const) -{ - if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl; -} - -void NullMessageStore::committed(const string * const) -{ - if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl; -} - -void NullMessageStore::aborted(const string * const) -{ - if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl; -} - -std::auto_ptr<TransactionContext> NullMessageStore::begin() -{ - return std::auto_ptr<TransactionContext>(); -} - -void NullMessageStore::commit(TransactionContext*) -{ -} - -void NullMessageStore::abort(TransactionContext*) -{ -} diff --git a/qpid/cpp-0-9/lib/broker/NullMessageStore.h b/qpid/cpp-0-9/lib/broker/NullMessageStore.h deleted file mode 100644 index 705f18ab43..0000000000 --- a/qpid/cpp-0-9/lib/broker/NullMessageStore.h +++ /dev/null @@ -1,59 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _NullMessageStore_ -#define _NullMessageStore_ - -#include <BrokerMessage.h> -#include <MessageStore.h> -#include <BrokerQueue.h> - -namespace qpid { - namespace broker { - - /** - * A null implementation of the MessageStore interface - */ - class NullMessageStore : public MessageStore{ - const bool warn; - public: - NullMessageStore(bool warn = false); - virtual void create(const Queue& queue, const qpid::framing::FieldTable& settings); - virtual void destroy(const Queue& queue); - virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0); - virtual void stage(Message* const msg); - virtual void destroy(Message* const msg); - virtual void appendContent(Message* const msg, const std::string& data); - virtual void loadContent(Message* const msg, std::string& data, uint64_t offset, uint32_t length); - virtual void enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid); - virtual void dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid); - virtual void prepared(const std::string * const xid); - virtual void committed(const std::string * const xid); - virtual void aborted(const std::string * const xid); - virtual std::auto_ptr<TransactionContext> begin(); - virtual void commit(TransactionContext* ctxt); - virtual void abort(TransactionContext* ctxt); - ~NullMessageStore(){} - }; - } -} - - -#endif diff --git a/qpid/cpp-0-9/lib/broker/Prefetch.h b/qpid/cpp-0-9/lib/broker/Prefetch.h deleted file mode 100644 index b6d4026c3f..0000000000 --- a/qpid/cpp-0-9/lib/broker/Prefetch.h +++ /dev/null @@ -1,42 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _Prefetch_ -#define _Prefetch_ - -#include <amqp_types.h> - -namespace qpid { - namespace broker { - /** - * Count and total size of asynchronously delivered - * (i.e. pushed) messages that have acks outstanding. - */ - struct Prefetch{ - uint32_t size; - uint16_t count; - - void reset() { size = 0; count = 0; } - }; - } -} - - -#endif diff --git a/qpid/cpp-0-9/lib/broker/QueuePolicy.cpp b/qpid/cpp-0-9/lib/broker/QueuePolicy.cpp deleted file mode 100644 index 94b86f2bbb..0000000000 --- a/qpid/cpp-0-9/lib/broker/QueuePolicy.cpp +++ /dev/null @@ -1,69 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <QueuePolicy.h> - -using namespace qpid::broker; -using namespace qpid::framing; - -QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize) : - maxCount(_maxCount), maxSize(_maxSize), count(0), size(0) {} - -QueuePolicy::QueuePolicy(const FieldTable& settings) : - maxCount(getInt(settings, maxCountKey, 0)), - maxSize(getInt(settings, maxSizeKey, 0)), count(0), size(0) {} - -void QueuePolicy::enqueued(uint64_t _size) -{ - if (maxCount) count++; - if (maxSize) size += _size; -} - -void QueuePolicy::dequeued(uint64_t _size) -{ - if (maxCount) count--; - if (maxSize) size -= _size; -} - -bool QueuePolicy::limitExceeded() -{ - return (maxSize && size > maxSize) || (maxCount && count > maxCount); -} - -void QueuePolicy::update(FieldTable& settings) -{ - if (maxCount) settings.setInt(maxCountKey, maxCount); - if (maxSize) settings.setInt(maxSizeKey, maxSize); -} - - -int QueuePolicy::getInt(const FieldTable& settings, const std::string& key, int defaultValue) -{ - //Note: currently field table only contain signed 32 bit ints, which - // restricts the values that can be set on the queue policy. - try { - return settings.getInt(key); - } catch (FieldNotFoundException& ignore) { - return defaultValue; - } -} - -const std::string QueuePolicy::maxCountKey("qpid.max_count"); -const std::string QueuePolicy::maxSizeKey("qpid.max_size"); diff --git a/qpid/cpp-0-9/lib/broker/QueuePolicy.h b/qpid/cpp-0-9/lib/broker/QueuePolicy.h deleted file mode 100644 index e7688f3e67..0000000000 --- a/qpid/cpp-0-9/lib/broker/QueuePolicy.h +++ /dev/null @@ -1,54 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _QueuePolicy_ -#define _QueuePolicy_ - -#include <FieldTable.h> - -namespace qpid { - namespace broker { - class QueuePolicy - { - static const std::string maxCountKey; - static const std::string maxSizeKey; - - const uint32_t maxCount; - const uint64_t maxSize; - uint32_t count; - uint64_t size; - - static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue); - - public: - QueuePolicy(uint32_t maxCount, uint64_t maxSize); - QueuePolicy(const qpid::framing::FieldTable& settings); - void enqueued(uint64_t size); - void dequeued(uint64_t size); - void update(qpid::framing::FieldTable& settings); - bool limitExceeded(); - uint32_t getMaxCount() const { return maxCount; } - uint64_t getMaxSize() const { return maxSize; } - }; - } -} - - -#endif diff --git a/qpid/cpp-0-9/lib/broker/QueueRegistry.cpp b/qpid/cpp-0-9/lib/broker/QueueRegistry.cpp deleted file mode 100644 index d33cd09840..0000000000 --- a/qpid/cpp-0-9/lib/broker/QueueRegistry.cpp +++ /dev/null @@ -1,78 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <QueueRegistry.h> -#include <sstream> -#include <assert.h> - -using namespace qpid::broker; -using namespace qpid::sys; - -QueueRegistry::QueueRegistry(MessageStore* const _store) : counter(1), store(_store){} - -QueueRegistry::~QueueRegistry(){} - -std::pair<Queue::shared_ptr, bool> -QueueRegistry::declare(const string& declareName, bool durable, - uint32_t autoDelete, const ConnectionToken* owner) -{ - Mutex::ScopedLock locker(lock); - string name = declareName.empty() ? generateName() : declareName; - assert(!name.empty()); - QueueMap::iterator i = queues.find(name); - if (i == queues.end()) { - Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner)); - queues[name] = queue; - return std::pair<Queue::shared_ptr, bool>(queue, true); - } else { - return std::pair<Queue::shared_ptr, bool>(i->second, false); - } -} - -void QueueRegistry::destroy(const string& name){ - Mutex::ScopedLock locker(lock); - queues.erase(name); -} - -Queue::shared_ptr QueueRegistry::find(const string& name){ - Mutex::ScopedLock locker(lock); - QueueMap::iterator i = queues.find(name); - if (i == queues.end()) { - return Queue::shared_ptr(); - } else { - return i->second; - } -} - -string QueueRegistry::generateName(){ - string name; - do { - std::stringstream ss; - ss << "tmp_" << counter++; - name = ss.str(); - // Thread safety: Private function, only called with lock held - // so this is OK. - } while(queues.find(name) != queues.end()); - return name; -} - -MessageStore* const QueueRegistry::getStore() const { - return store; -} diff --git a/qpid/cpp-0-9/lib/broker/QueueRegistry.h b/qpid/cpp-0-9/lib/broker/QueueRegistry.h deleted file mode 100644 index 079034359e..0000000000 --- a/qpid/cpp-0-9/lib/broker/QueueRegistry.h +++ /dev/null @@ -1,96 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _QueueRegistry_ -#define _QueueRegistry_ - -#include <map> -#include <sys/Monitor.h> -#include <BrokerQueue.h> - -namespace qpid { -namespace broker { - -/** - * A registry of queues indexed by queue name. - * - * Queues are reference counted using shared_ptr to ensure that they - * are deleted when and only when they are no longer in use. - * - */ -class QueueRegistry{ - - public: - QueueRegistry(MessageStore* const store = 0); - ~QueueRegistry(); - - /** - * Declare a queue. - * - * @return The queue and a boolean flag which is true if the queue - * was created by this declare call false if it already existed. - */ - std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, uint32_t autodelete = 0, - const ConnectionToken* const owner = 0); - - /** - * Destroy the named queue. - * - * Note: if the queue is in use it is not actually destroyed until - * all shared_ptrs to it are destroyed. During that time it is - * possible that a new queue with the same name may be - * created. This should not create any problems as the new and - * old queues exist independently. The registry has - * forgotten the old queue so there can be no confusion for - * subsequent calls to find or declare with the same name. - * - */ - void destroy(const string& name); - - /** - * Find the named queue. Return 0 if not found. - */ - Queue::shared_ptr find(const string& name); - - /** - * Generate unique queue name. - */ - string generateName(); - - /** - * Return the message store used. - */ - MessageStore* const getStore() const; - - - private: - typedef std::map<string, Queue::shared_ptr> QueueMap; - QueueMap queues; - qpid::sys::Mutex lock; - int counter; - MessageStore* const store; -}; - - -} -} - - -#endif diff --git a/qpid/cpp-0-9/lib/broker/RecoveryManager.cpp b/qpid/cpp-0-9/lib/broker/RecoveryManager.cpp deleted file mode 100644 index 6548e6a24f..0000000000 --- a/qpid/cpp-0-9/lib/broker/RecoveryManager.cpp +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <RecoveryManager.h> - -using namespace qpid::broker; - -RecoveryManager::RecoveryManager(QueueRegistry& _queues, ExchangeRegistry& _exchanges) : queues(_queues), exchanges(_exchanges) {} - -RecoveryManager::~RecoveryManager() {} - -Queue::shared_ptr RecoveryManager::recoverQueue(const string& name) -{ - std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true); - try { - Exchange::shared_ptr exchange = exchanges.getDefault(); - if (exchange) { - exchange->bind(result.first, result.first->getName(), 0); - } - } catch (ChannelException& e) { - //assume no default exchange has been declared - } - return result.first; -} - -Exchange::shared_ptr RecoveryManager::recoverExchange(const string& name, const string& type) -{ - return exchanges.declare(name, type).first; -} diff --git a/qpid/cpp-0-9/lib/broker/RecoveryManager.h b/qpid/cpp-0-9/lib/broker/RecoveryManager.h deleted file mode 100644 index d4e4cff3fd..0000000000 --- a/qpid/cpp-0-9/lib/broker/RecoveryManager.h +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _RecoveryManager_ -#define _RecoveryManager_ - -#include <ExchangeRegistry.h> -#include <QueueRegistry.h> - -namespace qpid { -namespace broker { - - class RecoveryManager{ - QueueRegistry& queues; - ExchangeRegistry& exchanges; - public: - RecoveryManager(QueueRegistry& queues, ExchangeRegistry& exchanges); - ~RecoveryManager(); - Queue::shared_ptr recoverQueue(const std::string& name); - Exchange::shared_ptr recoverExchange(const std::string& name, const std::string& type); - }; - - -} -} - - -#endif diff --git a/qpid/cpp-0-9/lib/broker/Reference.cpp b/qpid/cpp-0-9/lib/broker/Reference.cpp deleted file mode 100644 index c4c33e6363..0000000000 --- a/qpid/cpp-0-9/lib/broker/Reference.cpp +++ /dev/null @@ -1,54 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <boost/bind.hpp> -#include "Reference.h" -#include "BrokerMessageMessage.h" -#include "QpidError.h" -#include "MessageAppendBody.h" -#include "CompletionHandler.h" - -namespace qpid { -namespace broker { - -Reference::shared_ptr ReferenceRegistry::open(const Reference::Id& id) { - ReferenceMap::iterator i = references.find(id); - // TODO aconway 2007-02-05: should we throw Channel or Connection - // exceptions here? - if (i != references.end()) - throw ConnectionException(503, "Attempt to re-open reference " +id); - return references[id] = Reference::shared_ptr(new Reference(id, this)); -} - -Reference::shared_ptr ReferenceRegistry::get(const Reference::Id& id) { - ReferenceMap::iterator i = references.find(id); - if (i == references.end()) - throw ConnectionException(503, "Attempt to use non-existent reference "+id); - return i->second; -} - -void Reference::append(AppendPtr ptr) { - appends.push_back(ptr); - size += ptr->getBytes().length(); -} - -void Reference::close() { - registry->references.erase(getId()); -} - -}} // namespace qpid::broker diff --git a/qpid/cpp-0-9/lib/broker/Reference.h b/qpid/cpp-0-9/lib/broker/Reference.h deleted file mode 100644 index e453645a54..0000000000 --- a/qpid/cpp-0-9/lib/broker/Reference.h +++ /dev/null @@ -1,111 +0,0 @@ -#ifndef _broker_Reference_h -#define _broker_Reference_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <string> -#include <vector> -#include <map> -#include <boost/shared_ptr.hpp> -#include <boost/range.hpp> - -namespace qpid { - -namespace framing { -class MessageAppendBody; -} - -namespace broker { - -class MessageMessage; -class ReferenceRegistry; - -/** - * A reference is an accumulation point for data in a multi-frame - * message. A reference can be used by multiple transfer commands to - * create multiple messages, so the reference tracks which commands - * are using it. When the reference is closed, all the associated - * transfers are completed. - * - * THREAD UNSAFE: per-channel resource, access to channels is - * serialized. - */ -class Reference -{ - public: - typedef std::string Id; - typedef boost::shared_ptr<Reference> shared_ptr; - typedef boost::shared_ptr<MessageMessage> MessagePtr; - typedef std::vector<MessagePtr> Messages; - typedef boost::shared_ptr<framing::MessageAppendBody> AppendPtr; - typedef std::vector<AppendPtr> Appends; - - Reference(const Id& id_=Id(), ReferenceRegistry* reg=0) - : id(id_), size(0), registry(reg) {} - - const std::string& getId() const { return id; } - uint64_t getSize() const { return size; } - - /** Add a message to be completed with this reference */ - void addMessage(MessagePtr message) { messages.push_back(message); } - - /** Append more data to the reference */ - void append(AppendPtr ptr); - - /** Close the reference, complete each associated message */ - void close(); - - const Appends& getAppends() const { return appends; } - const Messages& getMessages() const { return messages; } - - private: - Id id; - uint64_t size; - ReferenceRegistry* registry; - Messages messages; - Appends appends; -}; - - -/** - * A registry/factory for references. - * - * THREAD UNSAFE: per-channel resource, access to channels is - * serialized. - */ -class ReferenceRegistry { - public: - ReferenceRegistry() {}; - Reference::shared_ptr open(const Reference::Id& id); - Reference::shared_ptr get(const Reference::Id& id); - - private: - typedef std::map<Reference::Id, Reference::shared_ptr> ReferenceMap; - ReferenceMap references; - - // Reference calls references.erase(). - friend class Reference; -}; - - -}} // namespace qpid::broker - - - -#endif /*!_broker_Reference_h*/ diff --git a/qpid/cpp-0-9/lib/broker/TopicExchange.cpp b/qpid/cpp-0-9/lib/broker/TopicExchange.cpp deleted file mode 100644 index 3ebb3c8c56..0000000000 --- a/qpid/cpp-0-9/lib/broker/TopicExchange.cpp +++ /dev/null @@ -1,156 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <TopicExchange.h> -#include <ExchangeBinding.h> -#include <algorithm> - -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::sys; - -// TODO aconway 2006-09-20: More efficient matching algorithm. -// Areas for improvement: -// - excessive string copying: should be 0 copy, match from original buffer. -// - match/lookup: use descision tree or other more efficient structure. - -Tokens& Tokens::operator=(const std::string& s) { - clear(); - if (s.empty()) return *this; - std::string::const_iterator i = s.begin(); - while (true) { - // Invariant: i is at the beginning of the next untokenized word. - std::string::const_iterator j = find(i, s.end(), '.'); - push_back(std::string(i, j)); - if (j == s.end()) return *this; - i = j + 1; - } - return *this; -} - -TopicPattern& TopicPattern::operator=(const Tokens& tokens) { - Tokens::operator=(tokens); - normalize(); - return *this; -} - -namespace { -const std::string hashmark("#"); -const std::string star("*"); -} - -void TopicPattern::normalize() { - std::string word; - Tokens::iterator i = begin(); - while (i != end()) { - if (*i == hashmark) { - ++i; - while (i != end()) { - // Invariant: *(i-1)==#, [begin()..i-1] is normalized. - if (*i == star) { // Move * before #. - std::swap(*i, *(i-1)); - ++i; - } else if (*i == hashmark) { - erase(i); // Remove extra # - } else { - break; - } - } - } else { - i ++; - } - } -} - - -namespace { -// TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string. -// Need StringRef class that operates on a string in place witout copy. -// Should be applied everywhere strings are extracted from frames. -// -bool do_match(Tokens::const_iterator pattern_begin, Tokens::const_iterator pattern_end, Tokens::const_iterator target_begin, Tokens::const_iterator target_end) -{ - // Invariant: [pattern_begin..p) matches [target_begin..t) - Tokens::const_iterator p = pattern_begin; - Tokens::const_iterator t = target_begin; - while (p != pattern_end && t != target_end) - { - if (*p == star || *p == *t) { - ++p, ++t; - } else if (*p == hashmark) { - ++p; - if (do_match(p, pattern_end, t, target_end)) return true; - while (t != target_end) { - ++t; - if (do_match(p, pattern_end, t, target_end)) return true; - } - return false; - } else { - return false; - } - } - while (p != pattern_end && *p == hashmark) ++p; // Ignore trailing # - return t == target_end && p == pattern_end; -} -} - -bool TopicPattern::match(const Tokens& target) const -{ - return do_match(begin(), end(), target.begin(), target.end()); -} - -TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { } - -void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args){ - Monitor::ScopedLock l(lock); - TopicPattern routingPattern(routingKey); - bindings[routingPattern].push_back(queue); - queue->bound(new ExchangeBinding(this, queue, routingKey, args)); -} - -void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/){ - Monitor::ScopedLock l(lock); - BindingMap::iterator bi = bindings.find(TopicPattern(routingKey)); - Queue::vector& qv(bi->second); - if (bi == bindings.end()) return; - Queue::vector::iterator q = find(qv.begin(), qv.end(), queue); - if(q == qv.end()) return; - qv.erase(q); - if(qv.empty()) bindings.erase(bi); -} - - -void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ - Monitor::ScopedLock l(lock); - for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { - if (i->first.match(routingKey)) { - Queue::vector& qv(i->second); - for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){ - msg.deliverTo(*j); - } - } - } -} - -TopicExchange::~TopicExchange() {} - -const std::string TopicExchange::typeName("topic"); - - diff --git a/qpid/cpp-0-9/lib/broker/TopicExchange.h b/qpid/cpp-0-9/lib/broker/TopicExchange.h deleted file mode 100644 index fa0c86863a..0000000000 --- a/qpid/cpp-0-9/lib/broker/TopicExchange.h +++ /dev/null @@ -1,100 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _TopicExchange_ -#define _TopicExchange_ - -#include <map> -#include <vector> -#include <BrokerExchange.h> -#include <FieldTable.h> -#include <BrokerMessage.h> -#include <sys/Monitor.h> -#include <BrokerQueue.h> - -namespace qpid { -namespace broker { - -/** A vector of string tokens */ -class Tokens : public std::vector<std::string> { - public: - Tokens() {}; - // Default copy, assign, dtor are sufficient. - - /** Tokenize s, provides automatic conversion of string to Tokens */ - Tokens(const std::string& s) { operator=(s); } - /** Tokenizing assignment operator s */ - Tokens & operator=(const std::string& s); - - private: - size_t hash; -}; - - -/** - * Tokens that have been normalized as a pattern and can be matched - * with topic Tokens. Normalized meands all sequences of mixed * and - * # are reduced to a series of * followed by at most one #. - */ -class TopicPattern : public Tokens -{ - public: - TopicPattern() {} - // Default copy, assign, dtor are sufficient. - TopicPattern(const Tokens& tokens) { operator=(tokens); } - TopicPattern(const std::string& str) { operator=(str); } - TopicPattern& operator=(const Tokens&); - TopicPattern& operator=(const std::string& str) { return operator=(Tokens(str)); } - - /** Match a topic */ - bool match(const std::string& topic) { return match(Tokens(topic)); } - bool match(const Tokens& topic) const; - - private: - void normalize(); -}; - -class TopicExchange : public virtual Exchange{ - typedef std::map<TopicPattern, Queue::vector> BindingMap; - BindingMap bindings; - qpid::sys::Mutex lock; - - public: - static const std::string typeName; - - TopicExchange(const string& name); - - virtual std::string getType(){ return typeName; } - - virtual void bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args); - - virtual void unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args); - - virtual void route(Deliverable& msg, const string& routingKey, const qpid::framing::FieldTable* args); - - virtual ~TopicExchange(); -}; - - - -} -} - -#endif diff --git a/qpid/cpp-0-9/lib/broker/TransactionalStore.h b/qpid/cpp-0-9/lib/broker/TransactionalStore.h deleted file mode 100644 index 17bca3878a..0000000000 --- a/qpid/cpp-0-9/lib/broker/TransactionalStore.h +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _TransactionalStore_ -#define _TransactionalStore_ - -#include <memory> - -namespace qpid { - namespace broker { - struct InvalidTransactionContextException : public std::exception {}; - - class TransactionContext{ - public: - virtual ~TransactionContext(){} - }; - - class TransactionalStore{ - public: - virtual std::auto_ptr<TransactionContext> begin() = 0; - virtual void commit(TransactionContext*) = 0; - virtual void abort(TransactionContext*) = 0; - - virtual ~TransactionalStore(){} - }; - } -} - - -#endif diff --git a/qpid/cpp-0-9/lib/broker/TxAck.cpp b/qpid/cpp-0-9/lib/broker/TxAck.cpp deleted file mode 100644 index b5211158f3..0000000000 --- a/qpid/cpp-0-9/lib/broker/TxAck.cpp +++ /dev/null @@ -1,54 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <TxAck.h> - -using std::bind1st; -using std::bind2nd; -using std::mem_fun_ref; -using namespace qpid::broker; - -TxAck::TxAck(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked, const std::string* const _xid) : - acked(_acked), unacked(_unacked), xid(_xid){ - -} - -bool TxAck::prepare(TransactionContext* ctxt) throw(){ - try{ - //dequeue all acked messages from their queues - for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) { - if (i->coveredBy(&acked)) { - i->discard(ctxt, xid); - } - } - return true; - }catch(...){ - std::cout << "TxAck::prepare() - Failed to prepare" << std::endl; - return false; - } -} - -void TxAck::commit() throw(){ - //remove all acked records from the list - unacked.remove_if(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked)); -} - -void TxAck::rollback() throw(){ -} diff --git a/qpid/cpp-0-9/lib/broker/TxAck.h b/qpid/cpp-0-9/lib/broker/TxAck.h deleted file mode 100644 index 88c321c445..0000000000 --- a/qpid/cpp-0-9/lib/broker/TxAck.h +++ /dev/null @@ -1,58 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _TxAck_ -#define _TxAck_ - -#include <algorithm> -#include <functional> -#include <list> -#include <AccumulatedAck.h> -#include <DeliveryRecord.h> -#include <TxOp.h> - -namespace qpid { - namespace broker { - /** - * Defines the transactional behaviour for acks received by a - * transactional channel. - */ - class TxAck : public TxOp{ - AccumulatedAck& acked; - std::list<DeliveryRecord>& unacked; - const std::string* const xid; - - public: - /** - * @param acked a representation of the accumulation of - * acks received - * @param unacked the record of delivered messages - */ - TxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked, const std::string* const xid = 0); - virtual bool prepare(TransactionContext* ctxt) throw(); - virtual void commit() throw(); - virtual void rollback() throw(); - virtual ~TxAck(){} - }; - } -} - - -#endif diff --git a/qpid/cpp-0-9/lib/broker/TxBuffer.cpp b/qpid/cpp-0-9/lib/broker/TxBuffer.cpp deleted file mode 100644 index acd3283bb7..0000000000 --- a/qpid/cpp-0-9/lib/broker/TxBuffer.cpp +++ /dev/null @@ -1,55 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <TxBuffer.h> - -using std::mem_fun; -using namespace qpid::broker; - -bool TxBuffer::prepare(TransactionalStore* const store) -{ - std::auto_ptr<TransactionContext> ctxt; - if(store) ctxt = store->begin(); - for(op_iterator i = ops.begin(); i < ops.end(); i++){ - if(!(*i)->prepare(ctxt.get())){ - if(store) store->abort(ctxt.get()); - return false; - } - } - if(store) store->commit(ctxt.get()); - return true; -} - -void TxBuffer::commit() -{ - for_each(ops.begin(), ops.end(), mem_fun(&TxOp::commit)); - ops.clear(); -} - -void TxBuffer::rollback() -{ - for_each(ops.begin(), ops.end(), mem_fun(&TxOp::rollback)); - ops.clear(); -} - -void TxBuffer::enlist(TxOp* const op) -{ - ops.push_back(op); -} diff --git a/qpid/cpp-0-9/lib/broker/TxBuffer.h b/qpid/cpp-0-9/lib/broker/TxBuffer.h deleted file mode 100644 index 2d9a2a3679..0000000000 --- a/qpid/cpp-0-9/lib/broker/TxBuffer.h +++ /dev/null @@ -1,107 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _TxBuffer_ -#define _TxBuffer_ - -#include <algorithm> -#include <functional> -#include <vector> -#include <TransactionalStore.h> -#include <TxOp.h> - -/** - * Represents a single transaction. As such, an instance of this class - * will hold a list of operations representing the workload of the - * transaction. This work can be committed or rolled back. Committing - * is a two-stage process: first all the operations should be - * prepared, then if that succeeds they can be committed. - * - * In the 2pc case, a successful prepare may be followed by either a - * commit or a rollback. - * - * Atomicity of prepare is ensured by using a lower level - * transactional facility. This saves explicitly rolling back all the - * successfully prepared ops when one of them fails. i.e. we do not - * use 2pc internally, we instead ensure that prepare is atomic at a - * lower level. This makes individual prepare operations easier to - * code. - * - * Transactions on a messaging broker effect three types of 'action': - * (1) updates to persistent storage (2) updates to transient storage - * or cached data (3) network writes. - * - * Of these, (1) should always occur atomically during prepare to - * ensure that if the broker crashes while a transaction is being - * completed the persistent state (which is all that then remains) is - * consistent. (3) can only be done on commit, after a successful - * prepare. There is a little more flexibility with (2) but any - * changes made during prepare should be subject to the control of the - * TransactionalStore in use. - */ -namespace qpid { - namespace broker { - class TxBuffer{ - typedef std::vector<TxOp*>::iterator op_iterator; - std::vector<TxOp*> ops; - public: - /** - * Requests that all ops are prepared. This should - * primarily involve making sure that a persistent record - * of the operations is stored where necessary. - * - * All ops will be prepared under a transaction on the - * specified store. If any operation fails on prepare, - * this transaction will be rolled back. - * - * Once prepared, a transaction can be committed (or in - * the 2pc case, rolled back). - * - * @returns true if all the operations prepared - * successfully, false if not. - */ - bool prepare(TransactionalStore* const store); - /** - * Signals that the ops all prepared all completed - * successfully and can now commit, i.e. the operation can - * now be fully carried out. - * - * Should only be called after a call to prepare() returns - * true. - */ - void commit(); - /** - * Rolls back all the operations. - * - * Should only be called either after a call to prepare() - * returns true (2pc) or instead of a prepare call - * ('server-local') - */ - void rollback(); - /** - * Adds an operation to the transaction. - */ - void enlist(TxOp* const op); - }; - } -} - - -#endif diff --git a/qpid/cpp-0-9/lib/broker/TxOp.h b/qpid/cpp-0-9/lib/broker/TxOp.h deleted file mode 100644 index abba84a8e8..0000000000 --- a/qpid/cpp-0-9/lib/broker/TxOp.h +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _TxOp_ -#define _TxOp_ - -#include <TransactionalStore.h> - -namespace qpid { - namespace broker { - class TxOp{ - public: - virtual bool prepare(TransactionContext*) throw() = 0; - virtual void commit() throw() = 0; - virtual void rollback() throw() = 0; - virtual ~TxOp(){} - }; - } -} - - -#endif diff --git a/qpid/cpp-0-9/lib/broker/TxPublish.cpp b/qpid/cpp-0-9/lib/broker/TxPublish.cpp deleted file mode 100644 index 49dd8abd89..0000000000 --- a/qpid/cpp-0-9/lib/broker/TxPublish.cpp +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <TxPublish.h> - -using namespace qpid::broker; - -TxPublish::TxPublish(Message::shared_ptr _msg, const std::string* const _xid) : msg(_msg), xid(_xid) {} - -bool TxPublish::prepare(TransactionContext* ctxt) throw(){ - try{ - for_each(queues.begin(), queues.end(), Prepare(ctxt, msg, xid)); - return true; - }catch(...){ - std::cout << "TxPublish::prepare() - Failed to prepare" << std::endl; - return false; - } -} - -void TxPublish::commit() throw(){ - for_each(queues.begin(), queues.end(), Commit(msg)); -} - -void TxPublish::rollback() throw(){ -} - -void TxPublish::deliverTo(Queue::shared_ptr& queue){ - queues.push_back(queue); -} - -TxPublish::Prepare::Prepare(TransactionContext* _ctxt, Message::shared_ptr& _msg, const string* const _xid) - : ctxt(_ctxt), msg(_msg), xid(_xid){} - -void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){ - queue->enqueue(ctxt, msg, xid); -} - -TxPublish::Commit::Commit(Message::shared_ptr& _msg) : msg(_msg){} - -void TxPublish::Commit::operator()(Queue::shared_ptr& queue){ - queue->process(msg); -} - diff --git a/qpid/cpp-0-9/lib/broker/TxPublish.h b/qpid/cpp-0-9/lib/broker/TxPublish.h deleted file mode 100644 index 75f201257e..0000000000 --- a/qpid/cpp-0-9/lib/broker/TxPublish.h +++ /dev/null @@ -1,80 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _TxPublish_ -#define _TxPublish_ - -#include <algorithm> -#include <functional> -#include <list> -#include <Deliverable.h> -#include <BrokerMessage.h> -#include <MessageStore.h> -#include <BrokerQueue.h> -#include <TxOp.h> - -namespace qpid { - namespace broker { - /** - * Defines the behaviour for publish operations on a - * transactional channel. Messages are routed through - * exchanges when received but are not at that stage delivered - * to the matching queues, rather the queues are held in an - * instance of this class. On prepare() the message is marked - * enqueued to the relevant queues in the MessagesStore. On - * commit() the messages will be passed to the queue for - * dispatch or to be added to the in-memory queue. - */ - class TxPublish : public TxOp, public Deliverable{ - class Prepare{ - TransactionContext* ctxt; - Message::shared_ptr& msg; - const std::string* const xid; - public: - Prepare(TransactionContext* ctxt, Message::shared_ptr& msg, const std::string* const xid); - void operator()(Queue::shared_ptr& queue); - }; - - class Commit{ - Message::shared_ptr& msg; - public: - Commit(Message::shared_ptr& msg); - void operator()(Queue::shared_ptr& queue); - }; - - Message::shared_ptr msg; - const std::string* const xid; - std::list<Queue::shared_ptr> queues; - - public: - TxPublish(Message::shared_ptr msg, const std::string* const xid = 0); - virtual bool prepare(TransactionContext* ctxt) throw(); - virtual void commit() throw(); - virtual void rollback() throw(); - - virtual void deliverTo(Queue::shared_ptr& queue); - - virtual ~TxPublish(){} - }; - } -} - - -#endif |
