summaryrefslogtreecommitdiff
path: root/cpp/broker
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/broker')
-rw-r--r--cpp/broker/Makefile39
-rw-r--r--cpp/broker/inc/AutoDelete.h54
-rw-r--r--cpp/broker/inc/Binding.h35
-rw-r--r--cpp/broker/inc/Broker.h86
-rw-r--r--cpp/broker/inc/Channel.h199
-rw-r--r--cpp/broker/inc/Configuration.h135
-rw-r--r--cpp/broker/inc/ConnectionToken.h35
-rw-r--r--cpp/broker/inc/Consumer.h34
-rw-r--r--cpp/broker/inc/DirectExchange.h52
-rw-r--r--cpp/broker/inc/Exchange.h41
-rw-r--r--cpp/broker/inc/ExchangeBinding.h45
-rw-r--r--cpp/broker/inc/ExchangeRegistry.h44
-rw-r--r--cpp/broker/inc/FanOutExchange.h55
-rw-r--r--cpp/broker/inc/HeadersExchange.h60
-rw-r--r--cpp/broker/inc/Message.h89
-rw-r--r--cpp/broker/inc/NameGenerator.h36
-rw-r--r--cpp/broker/inc/Queue.h106
-rw-r--r--cpp/broker/inc/QueueRegistry.h88
-rw-r--r--cpp/broker/inc/Router.h39
-rw-r--r--cpp/broker/inc/SessionHandlerFactoryImpl.h49
-rw-r--r--cpp/broker/inc/SessionHandlerImpl.h233
-rw-r--r--cpp/broker/inc/TopicExchange.h94
-rw-r--r--cpp/broker/src/AutoDelete.cpp93
-rw-r--r--cpp/broker/src/Broker.cpp84
-rw-r--r--cpp/broker/src/Channel.cpp256
-rw-r--r--cpp/broker/src/Configuration.cpp196
-rw-r--r--cpp/broker/src/DirectExchange.cpp72
-rw-r--r--cpp/broker/src/ExchangeBinding.cpp32
-rw-r--r--cpp/broker/src/ExchangeRegistry.cpp57
-rw-r--r--cpp/broker/src/FanOutExchange.cpp56
-rw-r--r--cpp/broker/src/HeadersExchange.cpp120
-rw-r--r--cpp/broker/src/Message.cpp100
-rw-r--r--cpp/broker/src/NameGenerator.cpp29
-rw-r--r--cpp/broker/src/Queue.cpp155
-rw-r--r--cpp/broker/src/QueueRegistry.cpp72
-rw-r--r--cpp/broker/src/Router.cpp32
-rw-r--r--cpp/broker/src/SessionHandlerFactoryImpl.cpp50
-rw-r--r--cpp/broker/src/SessionHandlerImpl.cpp405
-rw-r--r--cpp/broker/src/TopicExchange.cpp163
-rw-r--r--cpp/broker/test/ChannelTest.cpp162
-rw-r--r--cpp/broker/test/ExchangeTest.cpp65
-rw-r--r--cpp/broker/test/HeadersExchangeTest.cpp112
-rw-r--r--cpp/broker/test/Makefile20
-rw-r--r--cpp/broker/test/MessageTest.cpp54
-rw-r--r--cpp/broker/test/QueueRegistryTest.cpp76
-rw-r--r--cpp/broker/test/QueueTest.cpp176
-rw-r--r--cpp/broker/test/RouterTest.cpp86
-rw-r--r--cpp/broker/test/TopicExchangeTest.cpp184
-rw-r--r--cpp/broker/test/ValueTest.cpp96
49 files changed, 0 insertions, 4651 deletions
diff --git a/cpp/broker/Makefile b/cpp/broker/Makefile
deleted file mode 100644
index 5c96589d95..0000000000
--- a/cpp/broker/Makefile
+++ /dev/null
@@ -1,39 +0,0 @@
-#
-# Copyright (c) 2006 The Apache Software Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-#
-# Build broker library.
-#
-
-QPID_HOME = ../..
-include ${QPID_HOME}/cpp/options.mk
-TARGET=$(BROKER_LIB)
-SOURCES= $(wildcard src/*.cpp)
-OBJECTS= $(subst .cpp,.o,$(SOURCES))
-
-.PHONY: all clean
-
-all: $(TARGET)
- @$(MAKE) -C test all
-
-clean:
- -@rm -f $(TARGET) ${OBJECTS} src/*.d
- @$(MAKE) -C test clean
-
-$(TARGET): $(OBJECTS)
- $(CXX) -shared -o $@ $(LDFLAGS) $(OBJECTS) -lapr-1 $(COMMON_LIB) $(LIBDIR)
-
--include $(SOURCES:.cpp=.d)
diff --git a/cpp/broker/inc/AutoDelete.h b/cpp/broker/inc/AutoDelete.h
deleted file mode 100644
index 864d68358f..0000000000
--- a/cpp/broker/inc/AutoDelete.h
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _AutoDelete_
-#define _AutoDelete_
-
-#include <iostream>
-#include <queue>
-#include "MonitorImpl.h"
-#include "Queue.h"
-#include "QueueRegistry.h"
-#include "ThreadFactoryImpl.h"
-
-namespace qpid {
- namespace broker{
- class AutoDelete : private virtual qpid::concurrent::Runnable{
- qpid::concurrent::ThreadFactoryImpl factory;
- qpid::concurrent::MonitorImpl lock;
- qpid::concurrent::MonitorImpl monitor;
- std::queue<Queue::shared_ptr> queues;
- QueueRegistry* const registry;
- const u_int32_t period;
- volatile bool stopped;
- qpid::concurrent::Thread* runner;
-
- Queue::shared_ptr const pop();
- void process();
- virtual void run();
-
- public:
- AutoDelete(QueueRegistry* const registry, u_int32_t period);
- void add(Queue::shared_ptr const);
- void start();
- void stop();
- };
- }
-}
-
-
-#endif
diff --git a/cpp/broker/inc/Binding.h b/cpp/broker/inc/Binding.h
deleted file mode 100644
index b11419e92c..0000000000
--- a/cpp/broker/inc/Binding.h
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _Binding_
-#define _Binding_
-
-#include "FieldTable.h"
-
-namespace qpid {
- namespace broker {
- class Binding{
- public:
- virtual void cancel() = 0;
- virtual ~Binding(){}
- };
- }
-}
-
-
-#endif
-
diff --git a/cpp/broker/inc/Broker.h b/cpp/broker/inc/Broker.h
deleted file mode 100644
index 0cd2bd749e..0000000000
--- a/cpp/broker/inc/Broker.h
+++ /dev/null
@@ -1,86 +0,0 @@
-#ifndef _Broker_
-#define _Broker_
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "Acceptor.h"
-#include "Configuration.h"
-#include "Runnable.h"
-#include "SessionHandlerFactoryImpl.h"
-#include <boost/noncopyable.hpp>
-#include <tr1/memory>
-
-namespace qpid {
- namespace broker {
- /**
- * A broker instance.
- */
- class Broker : public qpid::concurrent::Runnable, private boost::noncopyable {
- Broker(const Configuration& config); // Private, use create()
- std::auto_ptr<qpid::io::Acceptor> acceptor;
- SessionHandlerFactoryImpl factory;
- int16_t port;
- bool isBound;
-
- public:
- static const int16_t DEFAULT_PORT;
-
- virtual ~Broker();
- typedef std::tr1::shared_ptr<Broker> shared_ptr;
-
- /**
- * Create a broker.
- * @param port Port to listen on or 0 to pick a port dynamically.
- */
- static shared_ptr create(int port = DEFAULT_PORT);
-
- /**
- * Create a broker from a Configuration.
- */
- static shared_ptr create(const Configuration& config);
-
- /**
- * Bind to the listening port.
- * @return The port number bound.
- */
- virtual int16_t bind();
-
- /**
- * Return listening port. If called before bind this is
- * the configured port. If called after it is the actual
- * port, which will be different if the configured port is
- * 0.
- */
- virtual int16_t getPort() { return port; }
-
- /**
- * Run the broker. Implements Runnable::run() so the broker
- * can be run in a separate thread.
- */
- virtual void run();
-
- /** Shut down the broker */
- virtual void shutdown();
- };
- }
-}
-
-
-
-#endif /*!_Broker_*/
diff --git a/cpp/broker/inc/Channel.h b/cpp/broker/inc/Channel.h
deleted file mode 100644
index 862d249ce1..0000000000
--- a/cpp/broker/inc/Channel.h
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _Channel_
-#define _Channel_
-
-#include <algorithm>
-#include <map>
-#include "AMQContentBody.h"
-#include "AMQHeaderBody.h"
-#include "BasicPublishBody.h"
-#include "Binding.h"
-#include "Consumer.h"
-#include "Message.h"
-#include "MonitorImpl.h"
-#include "NameGenerator.h"
-#include "OutputHandler.h"
-#include "Queue.h"
-
-namespace qpid {
- namespace broker {
- /**
- * Maintains state for an AMQP channel. Handles incoming and
- * outgoing messages for that channel.
- */
- class Channel{
- private:
- class ConsumerImpl : public virtual Consumer{
- Channel* parent;
- string tag;
- Queue::shared_ptr queue;
- ConnectionToken* const connection;
- const bool ackExpected;
- bool blocked;
- public:
- ConsumerImpl(Channel* parent, string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack);
- virtual bool deliver(Message::shared_ptr& msg);
- void cancel();
- void requestDispatch();
- };
-
- typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator;
-
- struct AckRecord{
- Message::shared_ptr msg;
- Queue::shared_ptr queue;
- string consumerTag;
- u_int64_t deliveryTag;
- bool pull;
-
- AckRecord(Message::shared_ptr _msg,
- Queue::shared_ptr _queue,
- const string _consumerTag,
- const u_int64_t _deliveryTag) : msg(_msg),
- queue(_queue),
- consumerTag(_consumerTag),
- deliveryTag(_deliveryTag),
- pull(false){}
-
- AckRecord(Message::shared_ptr _msg,
- Queue::shared_ptr _queue,
- const u_int64_t _deliveryTag) : msg(_msg),
- queue(_queue),
- consumerTag(""),
- deliveryTag(_deliveryTag),
- pull(true){}
- };
-
- typedef std::vector<AckRecord>::iterator ack_iterator;
-
- class MatchAck{
- const u_int64_t tag;
- public:
- MatchAck(u_int64_t tag);
- bool operator()(AckRecord& record) const;
- };
-
- class Requeue{
- public:
- void operator()(AckRecord& record) const;
- };
-
- class Redeliver{
- Channel* const channel;
- public:
- Redeliver(Channel* const channel);
- void operator()(AckRecord& record) const;
- };
-
- class CalculatePrefetch{
- u_int32_t size;
- u_int16_t count;
- public:
- CalculatePrefetch();
- void operator()(AckRecord& record);
- u_int32_t getSize();
- u_int16_t getCount();
- };
-
- const int id;
- qpid::framing::OutputHandler* out;
- u_int64_t deliveryTag;
- Queue::shared_ptr defaultQueue;
- bool transactional;
- std::map<string, ConsumerImpl*> consumers;
- u_int32_t prefetchSize;
- u_int16_t prefetchCount;
- u_int32_t outstandingSize;
- u_int16_t outstandingCount;
- u_int32_t framesize;
- Message::shared_ptr message;
- NameGenerator tagGenerator;
- std::vector<AckRecord> unacknowledged;
- qpid::concurrent::MonitorImpl deliveryLock;
-
- void deliver(Message::shared_ptr& msg, string& tag, Queue::shared_ptr& queue, bool ackExpected);
- void checkMessage(const std::string& text);
- bool checkPrefetch(Message::shared_ptr& msg);
- void cancel(consumer_iterator consumer);
-
- template<class Operation> Operation processMessage(Operation route){
- if(message->isComplete()){
- route(message);
- message.reset();
- }
- return route;
- }
-
-
- public:
- Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize);
- ~Channel();
- inline void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
- inline Queue::shared_ptr getDefaultQueue(){ return defaultQueue; }
- inline u_int32_t setPrefetchSize(u_int32_t size){ return prefetchSize = size; }
- inline u_int16_t setPrefetchCount(u_int16_t count){ return prefetchCount = count; }
- bool exists(const string& consumerTag);
- void consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0);
- void cancel(const string& tag);
- bool get(Queue::shared_ptr queue, bool ackExpected);
- void begin();
- void close();
- void commit();
- void rollback();
- void ack(u_int64_t deliveryTag, bool multiple);
- void recover(bool requeue);
-
- /**
- * Handles the initial publish request though a
- * channel. The header and (if applicable) content will be
- * accumulated through calls to handleHeader() and
- * handleContent()
- */
- void handlePublish(Message* msg);
-
- /**
- * A template method that handles a received header and if
- * there is no content routes it using the functor passed
- * in.
- */
- template<class Operation> Operation handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, Operation route){
- checkMessage("Invalid message sequence: got header before publish.");
- message->setHeader(header);
- return processMessage(route);
- }
-
- /**
- * A template method that handles a received content and
- * if this completes the message, routes it using the
- * functor passed in.
- */
- template<class Operation> Operation handleContent(qpid::framing::AMQContentBody::shared_ptr content, Operation route){
- checkMessage("Invalid message sequence: got content before publish.");
- message->addContent(content);
- return processMessage(route);
- }
-
- };
-
- struct InvalidAckException{};
- }
-}
-
-
-#endif
diff --git a/cpp/broker/inc/Configuration.h b/cpp/broker/inc/Configuration.h
deleted file mode 100644
index aaabdd23a0..0000000000
--- a/cpp/broker/inc/Configuration.h
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _Configuration_
-#define _Configuration_
-
-#include <cstdlib>
-#include <iostream>
-#include <vector>
-#include "Exception.h"
-
-namespace qpid {
- namespace broker {
- class Configuration{
- class Option {
- const std::string flag;
- const std::string name;
- const std::string desc;
-
- bool match(const std::string& arg);
-
- protected:
- virtual bool needsValue() const = 0;
- virtual void setValue(const std::string& value) = 0;
-
- public:
- Option(const char flag, const std::string& name, const std::string& desc);
- Option(const std::string& name, const std::string& desc);
- virtual ~Option();
-
- bool parse(int& i, char** argv, int argc);
- void print(std::ostream& out) const;
- };
-
- class IntOption : public Option{
- const int defaultValue;
- int value;
- public:
- IntOption(char flag, const std::string& name, const std::string& desc, const int value = 0);
- IntOption(const std::string& name, const std::string& desc, const int value = 0);
- virtual ~IntOption();
-
- int getValue() const;
- virtual bool needsValue() const;
- virtual void setValue(const std::string& value);
- virtual void setValue(int _value) { value = _value; }
- };
-
- class StringOption : public Option{
- const std::string defaultValue;
- std::string value;
- public:
- StringOption(char flag, const std::string& name, const std::string& desc, const std::string value = "");
- StringOption(const std::string& name, const std::string& desc, const std::string value = "");
- virtual ~StringOption();
-
- const std::string& getValue() const;
- virtual bool needsValue() const;
- virtual void setValue(const std::string& value);
- };
-
- class BoolOption : public Option{
- const bool defaultValue;
- bool value;
- public:
- BoolOption(char flag, const std::string& name, const std::string& desc, const bool value = 0);
- BoolOption(const std::string& name, const std::string& desc, const bool value = 0);
- virtual ~BoolOption();
-
- bool getValue() const;
- virtual bool needsValue() const;
- virtual void setValue(const std::string& value);
- virtual void setValue(bool _value) { value = _value; }
- };
-
- BoolOption trace;
- IntOption port;
- IntOption workerThreads;
- IntOption maxConnections;
- IntOption connectionBacklog;
- StringOption acceptor;
- BoolOption help;
-
- typedef std::vector<Option*>::iterator op_iterator;
- std::vector<Option*> options;
-
- public:
- class ParseException : public Exception {
- public:
- ParseException(const std::string& msg) : Exception(msg) {}
- };
-
-
- Configuration();
- ~Configuration();
-
- void parse(int argc, char** argv);
-
- bool isHelp() const;
- bool isTrace() const;
- int getPort() const;
- int getWorkerThreads() const;
- int getMaxConnections() const;
- int getConnectionBacklog() const;
- std::string getAcceptor() const;
-
- void setHelp(bool b) { help.setValue(b); }
- void setTrace(bool b) { trace.setValue(b); }
- void setPort(int i) { port.setValue(i); }
- void setWorkerThreads(int i) { workerThreads.setValue(i); }
- void setMaxConnections(int i) { maxConnections.setValue(i); }
- void setConnectionBacklog(int i) { connectionBacklog.setValue(i); }
- void setAcceptor(const std::string& val) { acceptor.setValue(val); }
-
- void usage();
- };
- }
-}
-
-
-#endif
diff --git a/cpp/broker/inc/ConnectionToken.h b/cpp/broker/inc/ConnectionToken.h
deleted file mode 100644
index 1faefec2cc..0000000000
--- a/cpp/broker/inc/ConnectionToken.h
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _ConnectionToken_
-#define _ConnectionToken_
-
-namespace qpid {
- namespace broker {
- /**
- * An empty interface allowing opaque implementations of some
- * form of token to identify a connection.
- */
- class ConnectionToken{
- public:
- virtual ~ConnectionToken(){}
- };
- }
-}
-
-
-#endif
diff --git a/cpp/broker/inc/Consumer.h b/cpp/broker/inc/Consumer.h
deleted file mode 100644
index af2d5d7812..0000000000
--- a/cpp/broker/inc/Consumer.h
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _Consumer_
-#define _Consumer_
-
-#include "Message.h"
-
-namespace qpid {
- namespace broker {
- class Consumer{
- public:
- virtual bool deliver(Message::shared_ptr& msg) = 0;
- virtual ~Consumer(){}
- };
- }
-}
-
-
-#endif
diff --git a/cpp/broker/inc/DirectExchange.h b/cpp/broker/inc/DirectExchange.h
deleted file mode 100644
index faf5a0b949..0000000000
--- a/cpp/broker/inc/DirectExchange.h
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _DirectExchange_
-#define _DirectExchange_
-
-#include <map>
-#include <vector>
-#include "Exchange.h"
-#include "FieldTable.h"
-#include "Message.h"
-#include "MonitorImpl.h"
-#include "Queue.h"
-
-namespace qpid {
-namespace broker {
- class DirectExchange : public virtual Exchange{
- std::map<string, std::vector<Queue::shared_ptr> > bindings;
- qpid::concurrent::MonitorImpl lock;
-
- public:
- static const std::string typeName;
-
- DirectExchange(const std::string& name);
-
- virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args);
-
- virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args);
-
- virtual void route(Message::shared_ptr& msg, const std::string& routingKey, qpid::framing::FieldTable* args);
-
- virtual ~DirectExchange();
- };
-}
-}
-
-
-#endif
diff --git a/cpp/broker/inc/Exchange.h b/cpp/broker/inc/Exchange.h
deleted file mode 100644
index 1fdc00fae5..0000000000
--- a/cpp/broker/inc/Exchange.h
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _Exchange_
-#define _Exchange_
-
-#include "FieldTable.h"
-#include "Message.h"
-#include "Queue.h"
-
-namespace qpid {
-namespace broker {
- class Exchange{
- const std::string name;
- public:
- explicit Exchange(const std::string& _name) : name(_name) {}
- virtual ~Exchange(){}
- std::string getName() { return name; }
- virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0;
- virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0;
- virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args) = 0;
- };
-}
-}
-
-
-#endif
diff --git a/cpp/broker/inc/ExchangeBinding.h b/cpp/broker/inc/ExchangeBinding.h
deleted file mode 100644
index 4cbb73acbf..0000000000
--- a/cpp/broker/inc/ExchangeBinding.h
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _ExchangeBinding_
-#define _ExchangeBinding_
-
-#include "Binding.h"
-#include "FieldTable.h"
-#include "Queue.h"
-
-namespace qpid {
- namespace broker {
- class Exchange;
- class Queue;
-
- class ExchangeBinding : public virtual Binding{
- Exchange* e;
- Queue::shared_ptr q;
- const string key;
- qpid::framing::FieldTable* args;
- public:
- ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, qpid::framing::FieldTable* _args);
- virtual void cancel();
- virtual ~ExchangeBinding();
- };
- }
-}
-
-
-#endif
-
diff --git a/cpp/broker/inc/ExchangeRegistry.h b/cpp/broker/inc/ExchangeRegistry.h
deleted file mode 100644
index a4a778482c..0000000000
--- a/cpp/broker/inc/ExchangeRegistry.h
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _ExchangeRegistry_
-#define _ExchangeRegistry_
-
-#include <map>
-#include "Exchange.h"
-#include "Monitor.h"
-
-namespace qpid {
-namespace broker {
- class ExchangeRegistry{
- typedef std::map<string, Exchange*> ExchangeMap;
- ExchangeMap exchanges;
- qpid::concurrent::Monitor* lock;
- public:
- ExchangeRegistry();
- void declare(Exchange* exchange);
- void destroy(const string& name);
- Exchange* get(const string& name);
- Exchange* getDefault();
- inline qpid::concurrent::Monitor* getLock(){ return lock; }
- ~ExchangeRegistry();
- };
-}
-}
-
-
-#endif
diff --git a/cpp/broker/inc/FanOutExchange.h b/cpp/broker/inc/FanOutExchange.h
deleted file mode 100644
index 1932e8429c..0000000000
--- a/cpp/broker/inc/FanOutExchange.h
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _FanOutExchange_
-#define _FanOutExchange_
-
-#include <map>
-#include <vector>
-#include "Exchange.h"
-#include "FieldTable.h"
-#include "Message.h"
-#include "MonitorImpl.h"
-#include "Queue.h"
-
-namespace qpid {
-namespace broker {
-
-class FanOutExchange : public virtual Exchange {
- std::vector<Queue::shared_ptr> bindings;
- qpid::concurrent::MonitorImpl lock;
-
- public:
- static const std::string typeName;
-
- FanOutExchange(const std::string& name);
-
- virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args);
-
- virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args);
-
- virtual void route(Message::shared_ptr& msg, const std::string& routingKey, qpid::framing::FieldTable* args);
-
- virtual ~FanOutExchange();
-};
-
-}
-}
-
-
-
-#endif
diff --git a/cpp/broker/inc/HeadersExchange.h b/cpp/broker/inc/HeadersExchange.h
deleted file mode 100644
index 08bf0bb735..0000000000
--- a/cpp/broker/inc/HeadersExchange.h
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _HeadersExchange_
-#define _HeadersExchange_
-
-#include <vector>
-#include "Exchange.h"
-#include "FieldTable.h"
-#include "Message.h"
-#include "MonitorImpl.h"
-#include "Queue.h"
-
-namespace qpid {
-namespace broker {
-
-
-class HeadersExchange : public virtual Exchange {
- typedef std::pair<qpid::framing::FieldTable, Queue::shared_ptr> Binding;
- typedef std::vector<Binding> Bindings;
-
- Bindings bindings;
- qpid::concurrent::MonitorImpl lock;
-
- public:
- static const std::string typeName;
-
- HeadersExchange(const string& name);
-
- virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
-
- virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
-
- virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args);
-
- virtual ~HeadersExchange();
-
- static bool match(const qpid::framing::FieldTable& bindArgs, const qpid::framing::FieldTable& msgArgs);
-};
-
-
-
-}
-}
-
-#endif
diff --git a/cpp/broker/inc/Message.h b/cpp/broker/inc/Message.h
deleted file mode 100644
index 94b9aa5bdd..0000000000
--- a/cpp/broker/inc/Message.h
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _Message_
-#define _Message_
-
-#include "memory.h"
-#include "AMQContentBody.h"
-#include "AMQHeaderBody.h"
-#include "BasicHeaderProperties.h"
-#include "BasicPublishBody.h"
-#include "ConnectionToken.h"
-#include "OutputHandler.h"
-
-namespace qpid {
- namespace broker {
- class ExchangeRegistry;
-
- /**
- * Represents an AMQP message, i.e. a header body, a list of
- * content bodies and some details about the publication
- * request.
- */
- class Message{
- typedef std::vector<qpid::framing::AMQContentBody::shared_ptr> content_list;
- typedef content_list::iterator content_iterator;
-
- const ConnectionToken* const publisher;
- const string exchange;
- const string routingKey;
- const bool mandatory;
- const bool immediate;
- bool redelivered;
- qpid::framing::AMQHeaderBody::shared_ptr header;
- content_list content;
- u_int64_t size;
-
- void sendContent(qpid::framing::OutputHandler* out,
- int channel, u_int32_t framesize);
-
- public:
- typedef std::tr1::shared_ptr<Message> shared_ptr;
-
- Message(const ConnectionToken* const publisher,
- const string& exchange, const string& routingKey,
- bool mandatory, bool immediate);
- ~Message();
- void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
- void addContent(qpid::framing::AMQContentBody::shared_ptr data);
- bool isComplete();
- const ConnectionToken* const getPublisher();
-
- void deliver(qpid::framing::OutputHandler* out,
- int channel,
- const string& consumerTag,
- u_int64_t deliveryTag,
- u_int32_t framesize);
- void sendGetOk(qpid::framing::OutputHandler* out,
- int channel,
- u_int32_t messageCount,
- u_int64_t deliveryTag,
- u_int32_t framesize);
- void redeliver();
-
- qpid::framing::BasicHeaderProperties* getHeaderProperties();
- const string& getRoutingKey() const { return routingKey; }
- const string& getExchange() const { return exchange; }
- u_int64_t contentSize() const{ return size; }
-
- };
- }
-}
-
-
-#endif
diff --git a/cpp/broker/inc/NameGenerator.h b/cpp/broker/inc/NameGenerator.h
deleted file mode 100644
index 6e6e0acf28..0000000000
--- a/cpp/broker/inc/NameGenerator.h
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _NameGenerator_
-#define _NameGenerator_
-
-#include "Message.h"
-
-namespace qpid {
- namespace broker {
- class NameGenerator{
- const std::string base;
- unsigned int counter;
- public:
- NameGenerator(const std::string& base);
- std::string generate();
- };
- }
-}
-
-
-#endif
diff --git a/cpp/broker/inc/Queue.h b/cpp/broker/inc/Queue.h
deleted file mode 100644
index 2229ba6235..0000000000
--- a/cpp/broker/inc/Queue.h
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _Queue_
-#define _Queue_
-
-#include <vector>
-#include <queue>
-#include "memory.h"
-#include "apr_time.h"
-#include "amqp_types.h"
-#include "Binding.h"
-#include "ConnectionToken.h"
-#include "Consumer.h"
-#include "Message.h"
-#include "MonitorImpl.h"
-
-namespace qpid {
- namespace broker {
-
- /**
- * Thrown when exclusive access would be violated.
- */
- struct ExclusiveAccessException{};
-
- /**
- * The brokers representation of an amqp queue. Messages are
- * delivered to a queue from where they can be dispatched to
- * registered consumers or be stored until dequeued or until one
- * or more consumers registers.
- */
- class Queue{
- const string name;
- const u_int32_t autodelete;
- const bool durable;
- const ConnectionToken* const owner;
- std::vector<Consumer*> consumers;
- std::queue<Binding*> bindings;
- std::queue<Message::shared_ptr> messages;
- bool queueing;
- bool dispatching;
- int next;
- mutable qpid::concurrent::MonitorImpl lock;
- apr_time_t lastUsed;
- Consumer* exclusive;
-
- bool startDispatching();
- bool dispatch(Message::shared_ptr& msg);
-
- public:
-
- typedef std::tr1::shared_ptr<Queue> shared_ptr;
-
- typedef std::vector<shared_ptr> vector;
-
- Queue(const string& name, bool durable = false, u_int32_t autodelete = 0, const ConnectionToken* const owner = 0);
- ~Queue();
- /**
- * Informs the queue of a binding that should be cancelled on
- * destruction of the queue.
- */
- void bound(Binding* b);
- /**
- * Delivers a message to the queue from where it will be
- * dispatched to immediately to a consumer if one is
- * available or stored for dequeue or later dispatch if
- * not.
- */
- void deliver(Message::shared_ptr& msg);
- /**
- * Dispatch any queued messages providing there are
- * consumers for them. Only one thread can be dispatching
- * at any time, but this method (rather than the caller)
- * is responsible for ensuring that.
- */
- void dispatch();
- void consume(Consumer* c, bool exclusive = false);
- void cancel(Consumer* c);
- Message::shared_ptr dequeue();
- u_int32_t purge();
- u_int32_t getMessageCount() const;
- u_int32_t getConsumerCount() const;
- inline const string& getName() const { return name; }
- inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; }
- inline bool hasExclusiveConsumer() const { return exclusive; }
- bool canAutoDelete() const;
- };
- }
-}
-
-
-#endif
diff --git a/cpp/broker/inc/QueueRegistry.h b/cpp/broker/inc/QueueRegistry.h
deleted file mode 100644
index ac12aa8f88..0000000000
--- a/cpp/broker/inc/QueueRegistry.h
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _QueueRegistry_
-#define _QueueRegistry_
-
-#include <map>
-#include "MonitorImpl.h"
-#include "Queue.h"
-
-namespace qpid {
-namespace broker {
-
-class SessionHandlerImpl;
-
-/**
- * A registry of queues indexed by queue name.
- *
- * Queues are reference counted using shared_ptr to ensure that they
- * are deleted when and only when they are no longer in use.
- *
- */
-class QueueRegistry{
-
- public:
- QueueRegistry();
- ~QueueRegistry();
-
- /**
- * Declare a queue.
- *
- * @return The queue and a boolean flag which is true if the queue
- * was created by this declare call false if it already existed.
- */
- std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, u_int32_t autodelete = 0, const ConnectionToken* const owner = 0);
-
- /**
- * Destroy the named queue.
- *
- * Note: if the queue is in use it is not actually destroyed until
- * all shared_ptrs to it are destroyed. During that time it is
- * possible that a new queue with the same name may be
- * created. This should not create any problems as the new and
- * old queues exist independently. The registry has
- * forgotten the old queue so there can be no confusion for
- * subsequent calls to find or declare with the same name.
- *
- */
- void destroy(const string& name);
-
- /**
- * Find the named queue. Return 0 if not found.
- */
- Queue::shared_ptr find(const string& name);
-
- /**
- * Generate unique queue name.
- */
- string generateName();
-
- private:
- typedef std::map<string, Queue::shared_ptr> QueueMap;
- QueueMap queues;
- qpid::concurrent::MonitorImpl lock;
- int counter;
-
-};
-
-
-}
-}
-
-
-#endif
diff --git a/cpp/broker/inc/Router.h b/cpp/broker/inc/Router.h
deleted file mode 100644
index d462b69832..0000000000
--- a/cpp/broker/inc/Router.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _Router_
-#define _Router_
-
-#include "ExchangeRegistry.h"
-#include "Message.h"
-
-/**
- * A routing functor
- */
-namespace qpid {
- namespace broker {
- class Router{
- ExchangeRegistry& registry;
- public:
- Router(ExchangeRegistry& registry);
- void operator()(Message::shared_ptr& msg);
- };
- }
-}
-
-
-#endif
diff --git a/cpp/broker/inc/SessionHandlerFactoryImpl.h b/cpp/broker/inc/SessionHandlerFactoryImpl.h
deleted file mode 100644
index 2317a6667b..0000000000
--- a/cpp/broker/inc/SessionHandlerFactoryImpl.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _SessionHandlerFactoryImpl_
-#define _SessionHandlerFactoryImpl_
-
-#include "AMQFrame.h"
-#include "AutoDelete.h"
-#include "DirectExchange.h"
-#include "ExchangeRegistry.h"
-#include "ProtocolInitiation.h"
-#include "QueueRegistry.h"
-#include "SessionHandlerFactory.h"
-#include "TimeoutHandler.h"
-
-namespace qpid {
- namespace broker {
-
- class SessionHandlerFactoryImpl : public virtual qpid::io::SessionHandlerFactory
- {
- QueueRegistry queues;
- ExchangeRegistry exchanges;
- const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
- AutoDelete cleaner;
- public:
- SessionHandlerFactoryImpl(u_int32_t timeout = 30000);
- virtual qpid::io::SessionHandler* create(qpid::io::SessionContext* ctxt);
- virtual ~SessionHandlerFactoryImpl();
- };
-
- }
-}
-
-
-#endif
diff --git a/cpp/broker/inc/SessionHandlerImpl.h b/cpp/broker/inc/SessionHandlerImpl.h
deleted file mode 100644
index 549f51f5a1..0000000000
--- a/cpp/broker/inc/SessionHandlerImpl.h
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _SessionHandlerImpl_
-#define _SessionHandlerImpl_
-
-#include <map>
-#include <sstream>
-#include <vector>
-#include <exception>
-#include "AMQFrame.h"
-#include "AMQP_ClientProxy.h"
-#include "AMQP_ServerOperations.h"
-#include "AutoDelete.h"
-#include "ExchangeRegistry.h"
-#include "Channel.h"
-#include "ConnectionToken.h"
-#include "DirectExchange.h"
-#include "OutputHandler.h"
-#include "ProtocolInitiation.h"
-#include "QueueRegistry.h"
-#include "SessionContext.h"
-#include "SessionHandler.h"
-#include "TimeoutHandler.h"
-#include "TopicExchange.h"
-
-namespace qpid {
-namespace broker {
-
-struct ChannelException : public std::exception {
- u_int16_t code;
- string text;
- ChannelException(u_int16_t _code, string _text) : code(_code), text(_text) {}
- ~ChannelException() throw() {}
- const char* what() const throw() { return text.c_str(); }
-};
-
-struct ConnectionException : public std::exception {
- u_int16_t code;
- string text;
- ConnectionException(u_int16_t _code, string _text) : code(_code), text(_text) {}
- ~ConnectionException() throw() {}
- const char* what() const throw() { return text.c_str(); }
-};
-
-class SessionHandlerImpl : public virtual qpid::io::SessionHandler,
- public virtual qpid::framing::AMQP_ServerOperations,
- public virtual ConnectionToken
-{
- typedef std::map<u_int16_t, Channel*>::iterator channel_iterator;
- typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
-
- qpid::io::SessionContext* context;
- qpid::framing::AMQP_ClientProxy client;
- QueueRegistry* queues;
- ExchangeRegistry* const exchanges;
- AutoDelete* const cleaner;
- const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
-
- ConnectionHandler* connectionHandler;
- ChannelHandler* channelHandler;
- BasicHandler* basicHandler;
- ExchangeHandler* exchangeHandler;
- QueueHandler* queueHandler;
-
- std::map<u_int16_t, Channel*> channels;
- std::vector<Queue::shared_ptr> exclusiveQueues;
-
- u_int32_t framemax;
- u_int16_t heartbeat;
-
- void handleHeader(u_int16_t channel, qpid::framing::AMQHeaderBody::shared_ptr body);
- void handleContent(u_int16_t channel, qpid::framing::AMQContentBody::shared_ptr body);
- void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
-
- Channel* getChannel(u_int16_t channel);
- /**
- * Get named queue, never returns 0.
- * @return: named queue or default queue for channel if name=""
- * @exception: ChannelException if no queue of that name is found.
- * @exception: ConnectionException if no queue specified and channel has not declared one.
- */
- Queue::shared_ptr getQueue(const string& name, u_int16_t channel);
-
- Exchange* findExchange(const string& name);
-
- public:
- SessionHandlerImpl(qpid::io::SessionContext* context, QueueRegistry* queues,
- ExchangeRegistry* exchanges, AutoDelete* cleaner, const u_int32_t timeout);
- virtual void received(qpid::framing::AMQFrame* frame);
- virtual void initiated(qpid::framing::ProtocolInitiation* header);
- virtual void idleOut();
- virtual void idleIn();
- virtual void closed();
- virtual ~SessionHandlerImpl();
-
- class ConnectionHandlerImpl : public virtual ConnectionHandler{
- SessionHandlerImpl* parent;
- public:
- inline ConnectionHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
-
- virtual void startOk(u_int16_t channel, qpid::framing::FieldTable& clientProperties, string& mechanism,
- string& response, string& locale);
-
- virtual void secureOk(u_int16_t channel, string& response);
-
- virtual void tuneOk(u_int16_t channel, u_int16_t channelMax, u_int32_t frameMax, u_int16_t heartbeat);
-
- virtual void open(u_int16_t channel, string& virtualHost, string& capabilities, bool insist);
-
- virtual void close(u_int16_t channel, u_int16_t replyCode, string& replyText, u_int16_t classId,
- u_int16_t methodId);
-
- virtual void closeOk(u_int16_t channel);
-
- virtual ~ConnectionHandlerImpl(){}
- };
-
- class ChannelHandlerImpl : public virtual ChannelHandler{
- SessionHandlerImpl* parent;
- public:
- inline ChannelHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
-
- virtual void open(u_int16_t channel, string& outOfBand);
-
- virtual void flow(u_int16_t channel, bool active);
-
- virtual void flowOk(u_int16_t channel, bool active);
-
- virtual void close(u_int16_t channel, u_int16_t replyCode, string& replyText,
- u_int16_t classId, u_int16_t methodId);
-
- virtual void closeOk(u_int16_t channel);
-
- virtual ~ChannelHandlerImpl(){}
- };
-
- class ExchangeHandlerImpl : public virtual ExchangeHandler{
- SessionHandlerImpl* parent;
- public:
- inline ExchangeHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
-
- virtual void declare(u_int16_t channel, u_int16_t ticket, string& exchange, string& type,
- bool passive, bool durable, bool autoDelete, bool internal, bool nowait,
- qpid::framing::FieldTable& arguments);
-
- virtual void delete_(u_int16_t channel, u_int16_t ticket, string& exchange, bool ifUnused, bool nowait);
-
- virtual ~ExchangeHandlerImpl(){}
- };
-
-
- class QueueHandlerImpl : public virtual QueueHandler{
- SessionHandlerImpl* parent;
- public:
- inline QueueHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
-
- virtual void declare(u_int16_t channel, u_int16_t ticket, string& queue,
- bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait, qpid::framing::FieldTable& arguments);
-
- virtual void bind(u_int16_t channel, u_int16_t ticket, string& queue,
- string& exchange, string& routingKey, bool nowait,
- qpid::framing::FieldTable& arguments);
-
- virtual void purge(u_int16_t channel, u_int16_t ticket, string& queue,
- bool nowait);
-
- virtual void delete_(u_int16_t channel, u_int16_t ticket, string& queue, bool ifUnused, bool ifEmpty,
- bool nowait);
-
- virtual ~QueueHandlerImpl(){}
- };
-
- class BasicHandlerImpl : public virtual BasicHandler{
- SessionHandlerImpl* parent;
- public:
- inline BasicHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
-
- virtual void qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global);
-
- virtual void consume(u_int16_t channel, u_int16_t ticket, string& queue, string& consumerTag,
- bool noLocal, bool noAck, bool exclusive, bool nowait);
-
- virtual void cancel(u_int16_t channel, string& consumerTag, bool nowait);
-
- virtual void publish(u_int16_t channel, u_int16_t ticket, string& exchange, string& routingKey,
- bool mandatory, bool immediate);
-
- virtual void get(u_int16_t channel, u_int16_t ticket, string& queue, bool noAck);
-
- virtual void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple);
-
- virtual void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue);
-
- virtual void recover(u_int16_t channel, bool requeue);
-
- virtual ~BasicHandlerImpl(){}
- };
-
- inline virtual ChannelHandler* getChannelHandler(){ return channelHandler; }
- inline virtual ConnectionHandler* getConnectionHandler(){ return connectionHandler; }
- inline virtual BasicHandler* getBasicHandler(){ return basicHandler; }
- inline virtual ExchangeHandler* getExchangeHandler(){ return exchangeHandler; }
- inline virtual QueueHandler* getQueueHandler(){ return queueHandler; }
-
- inline virtual AccessHandler* getAccessHandler(){ return 0; }
- inline virtual FileHandler* getFileHandler(){ return 0; }
- inline virtual StreamHandler* getStreamHandler(){ return 0; }
- inline virtual TxHandler* getTxHandler(){ return 0; }
- inline virtual DtxHandler* getDtxHandler(){ return 0; }
- inline virtual TunnelHandler* getTunnelHandler(){ return 0; }
-};
-
-}
-}
-
-
-#endif
diff --git a/cpp/broker/inc/TopicExchange.h b/cpp/broker/inc/TopicExchange.h
deleted file mode 100644
index 227280103f..0000000000
--- a/cpp/broker/inc/TopicExchange.h
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _TopicExchange_
-#define _TopicExchange_
-
-#include <tr1/unordered_map>
-#include <vector>
-#include "Exchange.h"
-#include "FieldTable.h"
-#include "Message.h"
-#include "MonitorImpl.h"
-#include "Queue.h"
-
-namespace qpid {
-namespace broker {
-
-/** A vector of string tokens */
-class Tokens : public std::vector<std::string> {
- public:
- Tokens() {};
- // Default copy, assign, dtor are sufficient.
-
- /** Tokenize s, provides automatic conversion of string to Tokens */
- Tokens(const std::string& s) { operator=(s); }
- /** Tokenize s */
- Tokens & operator=(const std::string& s);
-
- struct Hash { size_t operator()(const Tokens&) const; };
- typedef std::equal_to<Tokens> Equal;
-};
-
-/**
- * Tokens that have been normalized as a pattern and can be matched
- * with topic Tokens. Normalized meands all sequences of mixed * and
- * # are reduced to a series of * followed by at most one #.
- */
-class TopicPattern : public Tokens
-{
- public:
- TopicPattern() {}
- // Default copy, assign, dtor are sufficient.
- TopicPattern(const Tokens& tokens) { operator=(tokens); }
- TopicPattern(const std::string& str) { operator=(str); }
- TopicPattern& operator=(const Tokens&);
- TopicPattern& operator=(const std::string& str) { return operator=(Tokens(str)); }
-
- /** Match a topic */
- bool match(const std::string& topic) { return match(Tokens(topic)); }
- bool match(const Tokens& topic) const;
-
- private:
- void normalize();
-};
-
-class TopicExchange : public virtual Exchange{
- typedef std::tr1::unordered_map<TopicPattern, Queue::vector, TopicPattern::Hash> BindingMap;
- BindingMap bindings;
- qpid::concurrent::MonitorImpl lock;
-
- public:
- static const std::string typeName;
-
- TopicExchange(const string& name);
-
- virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
-
- virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
-
- virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args);
-
- virtual ~TopicExchange();
-};
-
-
-
-}
-}
-
-#endif
diff --git a/cpp/broker/src/AutoDelete.cpp b/cpp/broker/src/AutoDelete.cpp
deleted file mode 100644
index 6793ec449d..0000000000
--- a/cpp/broker/src/AutoDelete.cpp
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "AutoDelete.h"
-
-using namespace qpid::broker;
-
-AutoDelete::AutoDelete(QueueRegistry* const _registry, u_int32_t _period) : registry(_registry),
- period(_period),
- stopped(true),
- runner(0){}
-
-void AutoDelete::add(Queue::shared_ptr const queue){
- lock.acquire();
- queues.push(queue);
- lock.release();
-}
-
-Queue::shared_ptr const AutoDelete::pop(){
- Queue::shared_ptr next;
- lock.acquire();
- if(!queues.empty()){
- next = queues.front();
- queues.pop();
- }
- lock.release();
- return next;
-}
-
-void AutoDelete::process(){
- Queue::shared_ptr seen;
- for(Queue::shared_ptr q = pop(); q; q = pop()){
- if(seen == q){
- add(q);
- break;
- }else if(q->canAutoDelete()){
- std::string name(q->getName());
- registry->destroy(name);
- std::cout << "INFO: Auto-deleted queue named " << name << std::endl;
- }else{
- add(q);
- if(!seen) seen = q;
- }
- }
-}
-
-void AutoDelete::run(){
- monitor.acquire();
- while(!stopped){
- process();
- monitor.wait(period);
- }
- monitor.release();
-}
-
-void AutoDelete::start(){
- monitor.acquire();
- if(stopped){
- runner = factory.create(this);
- stopped = false;
- monitor.release();
- runner->start();
- }else{
- monitor.release();
- }
-}
-
-void AutoDelete::stop(){
- monitor.acquire();
- if(!stopped){
- stopped = true;
- monitor.notify();
- monitor.release();
- runner->join();
- delete runner;
- }else{
- monitor.release();
- }
-}
diff --git a/cpp/broker/src/Broker.cpp b/cpp/broker/src/Broker.cpp
deleted file mode 100644
index b6472d1729..0000000000
--- a/cpp/broker/src/Broker.cpp
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <iostream>
-#include <memory>
-#include "Broker.h"
-#include "Acceptor.h"
-#include "Configuration.h"
-#include "QpidError.h"
-#include "SessionHandlerFactoryImpl.h"
-#include "BlockingAPRAcceptor.h"
-#include "LFAcceptor.h"
-
-
-using namespace qpid::broker;
-using namespace qpid::io;
-
-namespace {
- Acceptor* createAcceptor(const Configuration& config){
- const string type(config.getAcceptor());
- if("blocking" == type){
- std::cout << "Using blocking acceptor " << std::endl;
- return new BlockingAPRAcceptor(config.isTrace(), config.getConnectionBacklog());
- }else if("non-blocking" == type){
- std::cout << "Using non-blocking acceptor " << std::endl;
- return new LFAcceptor(config.isTrace(),
- config.getConnectionBacklog(),
- config.getWorkerThreads(),
- config.getMaxConnections());
- }
- throw Configuration::ParseException("Unrecognised acceptor: " + type);
- }
-}
-
-Broker::Broker(const Configuration& config) :
- acceptor(createAcceptor(config)),
- port(config.getPort()),
- isBound(false) {}
-
-Broker::shared_ptr Broker::create(int port)
-{
- Configuration config;
- config.setPort(port);
- return create(config);
-}
-
-Broker::shared_ptr Broker::create(const Configuration& config) {
- return Broker::shared_ptr(new Broker(config));
-}
-
-int16_t Broker::bind()
-{
- if (!isBound) {
- port = acceptor->bind(port);
- }
- return port;
-}
-
-void Broker::run() {
- bind();
- acceptor->run(&factory);
-}
-
-void Broker::shutdown() {
- acceptor->shutdown();
-}
-
-Broker::~Broker() { }
-
-const int16_t Broker::DEFAULT_PORT(5672);
diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp
deleted file mode 100644
index 34d69716c4..0000000000
--- a/cpp/broker/src/Channel.cpp
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "Channel.h"
-#include "QpidError.h"
-#include <iostream>
-#include <sstream>
-#include <assert.h>
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::concurrent;
-
-
-Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) :
- id(_id),
- out(_out),
- deliveryTag(1),
- transactional(false),
- prefetchSize(0),
- prefetchCount(0),
- outstandingSize(0),
- outstandingCount(0),
- framesize(_framesize),
- tagGenerator("sgen"){}
-
-Channel::~Channel(){
-}
-
-bool Channel::exists(const string& consumerTag){
- return consumers.find(consumerTag) != consumers.end();
-}
-
-void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection){
- if(tag.empty()) tag = tagGenerator.generate();
-
- ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks));
- try{
- queue->consume(c, exclusive);//may throw exception
- consumers[tag] = c;
- }catch(ExclusiveAccessException& e){
- delete c;
- throw e;
- }
-}
-
-void Channel::cancel(consumer_iterator i){
- ConsumerImpl* c = i->second;
- consumers.erase(i);
- if(c){
- c->cancel();
- delete c;
- }
-}
-
-void Channel::cancel(const string& tag){
- consumer_iterator i = consumers.find(tag);
- if(i != consumers.end()){
- cancel(i);
- }
-}
-
-void Channel::close(){
- //cancel all consumers
- for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){
- cancel(i);
- }
-}
-
-void Channel::begin(){
- transactional = true;
-}
-
-void Channel::commit(){
-
-}
-
-void Channel::rollback(){
-
-}
-
-void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){
- Locker locker(deliveryLock);
-
- u_int64_t myDeliveryTag = deliveryTag++;
- if(ackExpected){
- unacknowledged.push_back(AckRecord(msg, queue, consumerTag, myDeliveryTag));
- outstandingSize += msg->contentSize();
- outstandingCount++;
- }
- //send deliver method, header and content(s)
- msg->deliver(out, id, consumerTag, myDeliveryTag, framesize);
-}
-
-bool Channel::checkPrefetch(Message::shared_ptr& msg){
- Locker locker(deliveryLock);
- bool countOk = !prefetchCount || prefetchCount > unacknowledged.size();
- bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstandingSize || unacknowledged.empty();
- return countOk && sizeOk;
-}
-
-Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, string& _tag,
- Queue::shared_ptr _queue,
- ConnectionToken* const _connection, bool ack) : parent(_parent),
- tag(_tag),
- queue(_queue),
- connection(_connection),
- ackExpected(ack),
- blocked(false){
-}
-
-bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
- if(!connection || connection != msg->getPublisher()){//check for no_local
- if(ackExpected && !parent->checkPrefetch(msg)){
- blocked = true;
- }else{
- blocked = false;
- parent->deliver(msg, tag, queue, ackExpected);
- return true;
- }
- }
- return false;
-}
-
-void Channel::ConsumerImpl::cancel(){
- if(queue) queue->cancel(this);
-}
-
-void Channel::ConsumerImpl::requestDispatch(){
- if(blocked) queue->dispatch();
-}
-
-void Channel::checkMessage(const std::string& text){
- if(!message.get()){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, text);
- }
-}
-
-void Channel::handlePublish(Message* msg){
- if(message.get()){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed.");
- }
- message = Message::shared_ptr(msg);
-}
-
-void Channel::ack(u_int64_t _deliveryTag, bool multiple){
- Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery
-
- ack_iterator i = find_if(unacknowledged.begin(), unacknowledged.end(), MatchAck(_deliveryTag));
- if(i == unacknowledged.end()){
- throw InvalidAckException();
- }else if(multiple){
- unacknowledged.erase(unacknowledged.begin(), ++i);
- //recompute prefetch outstanding (note: messages delivered through get are ignored)
- CalculatePrefetch calc(for_each(unacknowledged.begin(), unacknowledged.end(), CalculatePrefetch()));
- outstandingSize = calc.getSize();
- outstandingCount = calc.getCount();
- }else{
- if(!i->pull){
- outstandingSize -= i->msg->contentSize();
- outstandingCount--;
- }
- unacknowledged.erase(i);
- }
-
- //if the prefetch limit had previously been reached, there may
- //be messages that can be now be delivered
- for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){
- j->second->requestDispatch();
- }
-}
-
-void Channel::recover(bool requeue){
- Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery
-
- if(requeue){
- outstandingSize = 0;
- outstandingCount = 0;
- ack_iterator start(unacknowledged.begin());
- ack_iterator end(unacknowledged.end());
- for_each(start, end, Requeue());
- unacknowledged.erase(start, end);
- }else{
- for_each(unacknowledged.begin(), unacknowledged.end(), Redeliver(this));
- }
-}
-
-bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
- Message::shared_ptr msg = queue->dequeue();
- if(msg){
- Locker locker(deliveryLock);
- u_int64_t myDeliveryTag = deliveryTag++;
- msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize);
- if(ackExpected){
- unacknowledged.push_back(AckRecord(msg, queue, myDeliveryTag));
- }
- return true;
- }else{
- return false;
- }
-}
-
-Channel::MatchAck::MatchAck(u_int64_t _tag) : tag(_tag) {}
-
-bool Channel::MatchAck::operator()(AckRecord& record) const{
- return tag == record.deliveryTag;
-}
-
-void Channel::Requeue::operator()(AckRecord& record) const{
- record.msg->redeliver();
- record.queue->deliver(record.msg);
-}
-
-Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {}
-
-void Channel::Redeliver::operator()(AckRecord& record) const{
- if(record.pull){
- //if message was originally sent as response to get, we must requeue it
- record.msg->redeliver();
- record.queue->deliver(record.msg);
- }else{
- record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize);
- }
-}
-
-Channel::CalculatePrefetch::CalculatePrefetch() : size(0){}
-
-void Channel::CalculatePrefetch::operator()(AckRecord& record){
- if(!record.pull){
- //ignore messages that were sent in response to get when calculating prefetch
- size += record.msg->contentSize();
- count++;
- }
-}
-
-u_int32_t Channel::CalculatePrefetch::getSize(){
- return size;
-}
-
-u_int16_t Channel::CalculatePrefetch::getCount(){
- return count;
-}
diff --git a/cpp/broker/src/Configuration.cpp b/cpp/broker/src/Configuration.cpp
deleted file mode 100644
index 6e7df7889e..0000000000
--- a/cpp/broker/src/Configuration.cpp
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "Configuration.h"
-#include <string.h>
-
-using namespace qpid::broker;
-using namespace std;
-
-Configuration::Configuration() :
- trace('t', "trace", "Print incoming & outgoing frames to the console (default=false)", false),
- port('p', "port", "Sets the port to listen on (default=5672)", 5672),
- workerThreads("worker-threads", "Sets the number of worker threads to use (default=5). Only valid for non-blocking acceptor.", 5),
- maxConnections("max-connections", "Sets the maximum number of connections the broker can accept (default=500). Only valid for non-blocking acceptor.", 500),
- connectionBacklog("connection-backlog", "Sets the connection backlog for the servers socket (default=10)", 10),
- acceptor('a', "acceptor", "Sets the acceptor to use. Currently only two values are recognised, blocking and non-blocking (which is the default)", "non-blocking"),
- help("help", "Prints usage information", false)
-{
- options.push_back(&trace);
- options.push_back(&port);
- options.push_back(&workerThreads);
- options.push_back(&maxConnections);
- options.push_back(&connectionBacklog);
- options.push_back(&acceptor);
- options.push_back(&help);
-}
-
-Configuration::~Configuration(){}
-
-void Configuration::parse(int argc, char** argv){
- int position = 1;
- while(position < argc){
- bool matched(false);
- for(op_iterator i = options.begin(); i < options.end() && !matched; i++){
- matched = (*i)->parse(position, argv, argc);
- }
- if(!matched){
- std::cout << "Warning: skipping unrecognised option " << argv[position] << std::endl;
- position++;
- }
- }
-}
-
-void Configuration::usage(){
- for(op_iterator i = options.begin(); i < options.end(); i++){
- (*i)->print(std::cout);
- }
-}
-
-bool Configuration::isHelp() const {
- return help.getValue();
-}
-
-bool Configuration::isTrace() const {
- return trace.getValue();
-}
-
-int Configuration::getPort() const {
- return port.getValue();
-}
-
-int Configuration::getWorkerThreads() const {
- return workerThreads.getValue();
-}
-
-int Configuration::getMaxConnections() const {
- return maxConnections.getValue();
-}
-
-int Configuration::getConnectionBacklog() const {
- return connectionBacklog.getValue();
-}
-
-string Configuration::getAcceptor() const {
- return acceptor.getValue();
-}
-
-Configuration::Option::Option(const char _flag, const string& _name, const string& _desc) :
- flag(string("-") + _flag), name("--" +_name), desc(_desc) {}
-
-Configuration::Option::Option(const string& _name, const string& _desc) :
- flag(""), name("--" + _name), desc(_desc) {}
-
-Configuration::Option::~Option(){}
-
-bool Configuration::Option::match(const string& arg){
- return flag == arg || name == arg;
-}
-
-bool Configuration::Option::parse(int& i, char** argv, int argc){
- const string arg(argv[i]);
- if(match(arg)){
- if(needsValue()){
- if(++i < argc) setValue(argv[i]);
- else throw ParseException("Argument " + arg + " requires a value!");
- }else{
- setValue("");
- }
- i++;
- return true;
- }else{
- return false;
- }
-}
-
-void Configuration::Option::print(ostream& out) const {
- out << " ";
- if(flag.length() > 0){
- out << flag << " or ";
- }
- out << name;
- if(needsValue()) out << "<value>";
- out << std::endl;
- out << " " << desc << std::endl;
-}
-
-
-// String Option:
-
-Configuration::StringOption::StringOption(const char _flag, const string& _name, const string& _desc, const string _value) :
- Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
-
-Configuration::StringOption::StringOption(const string& _name, const string& _desc, const string _value) :
- Option(_name,_desc), defaultValue(_value), value(_value) {}
-
-Configuration::StringOption::~StringOption(){}
-
-const string& Configuration::StringOption::getValue() const {
- return value;
-}
-
-bool Configuration::StringOption::needsValue() const {
- return true;
-}
-
-void Configuration::StringOption::setValue(const std::string& _value){
- value = _value;
-}
-
-// Int Option:
-
-Configuration::IntOption::IntOption(const char _flag, const string& _name, const string& _desc, const int _value) :
- Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
-
-Configuration::IntOption::IntOption(const string& _name, const string& _desc, const int _value) :
- Option(_name,_desc), defaultValue(_value), value(_value) {}
-
-Configuration::IntOption::~IntOption(){}
-
-int Configuration::IntOption::getValue() const {
- return value;
-}
-
-bool Configuration::IntOption::needsValue() const {
- return true;
-}
-
-void Configuration::IntOption::setValue(const std::string& _value){
- value = atoi(_value.c_str());
-}
-
-// Bool Option:
-
-Configuration::BoolOption::BoolOption(const char _flag, const string& _name, const string& _desc, const bool _value) :
- Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
-
-Configuration::BoolOption::BoolOption(const string& _name, const string& _desc, const bool _value) :
- Option(_name,_desc), defaultValue(_value), value(_value) {}
-
-Configuration::BoolOption::~BoolOption(){}
-
-bool Configuration::BoolOption::getValue() const {
- return value;
-}
-
-bool Configuration::BoolOption::needsValue() const {
- return false;
-}
-
-void Configuration::BoolOption::setValue(const std::string& _value){
- value = strcasecmp(_value.c_str(), "true") == 0;
-}
diff --git a/cpp/broker/src/DirectExchange.cpp b/cpp/broker/src/DirectExchange.cpp
deleted file mode 100644
index 94cfbc766d..0000000000
--- a/cpp/broker/src/DirectExchange.cpp
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "DirectExchange.h"
-#include "ExchangeBinding.h"
-#include <iostream>
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-
-DirectExchange::DirectExchange(const string& _name) : Exchange(_name) {
-
-}
-
-void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
- lock.acquire();
- std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
- std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
- if(i == queues.end()){
- bindings[routingKey].push_back(queue);
- queue->bound(new ExchangeBinding(this, queue, routingKey, args));
- }
- lock.release();
-}
-
-void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* /*args*/){
- lock.acquire();
- std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
-
- std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
- if(i < queues.end()){
- queues.erase(i);
- if(queues.empty()){
- bindings.erase(routingKey);
- }
- }
- lock.release();
-}
-
-void DirectExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* /*args*/){
- lock.acquire();
- std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
- int count(0);
- for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){
- (*i)->deliver(msg);
- }
- if(!count){
- std::cout << "WARNING: DirectExchange " << getName() << " could not route message with key " << routingKey << std::endl;
- }
- lock.release();
-}
-
-DirectExchange::~DirectExchange(){
-
-}
-
-
-const std::string DirectExchange::typeName("direct");
diff --git a/cpp/broker/src/ExchangeBinding.cpp b/cpp/broker/src/ExchangeBinding.cpp
deleted file mode 100644
index 6160a67fd3..0000000000
--- a/cpp/broker/src/ExchangeBinding.cpp
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "ExchangeBinding.h"
-#include "Exchange.h"
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-
-ExchangeBinding::ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, FieldTable* _args) : e(_e), q(_q), key(_key), args(_args){}
-
-void ExchangeBinding::cancel(){
- e->unbind(q, key, args);
- delete this;
-}
-
-ExchangeBinding::~ExchangeBinding(){
-}
diff --git a/cpp/broker/src/ExchangeRegistry.cpp b/cpp/broker/src/ExchangeRegistry.cpp
deleted file mode 100644
index 05396382a7..0000000000
--- a/cpp/broker/src/ExchangeRegistry.cpp
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "ExchangeRegistry.h"
-#include "MonitorImpl.h"
-
-using namespace qpid::broker;
-using namespace qpid::concurrent;
-
-ExchangeRegistry::ExchangeRegistry() : lock(new MonitorImpl()){}
-
-ExchangeRegistry::~ExchangeRegistry(){
- for (ExchangeMap::iterator i = exchanges.begin(); i != exchanges.end(); ++i)
- {
- delete i->second;
- }
- delete lock;
-}
-
-void ExchangeRegistry::declare(Exchange* exchange){
- exchanges[exchange->getName()] = exchange;
-}
-
-void ExchangeRegistry::destroy(const string& name){
- if(exchanges[name]){
- delete exchanges[name];
- exchanges.erase(name);
- }
-}
-
-Exchange* ExchangeRegistry::get(const string& name){
- return exchanges[name];
-}
-
-namespace
-{
-const std::string empty;
-}
-
-Exchange* ExchangeRegistry::getDefault()
-{
- return get(empty);
-}
diff --git a/cpp/broker/src/FanOutExchange.cpp b/cpp/broker/src/FanOutExchange.cpp
deleted file mode 100644
index e8cb8f6315..0000000000
--- a/cpp/broker/src/FanOutExchange.cpp
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "FanOutExchange.h"
-#include "ExchangeBinding.h"
-#include <algorithm>
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::concurrent;
-
-FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {}
-
-void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
- Locker locker(lock);
- // Add if not already present.
- Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
- if (i == bindings.end()) {
- bindings.push_back(queue);
- queue->bound(new ExchangeBinding(this, queue, routingKey, args));
- }
-}
-
-void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, FieldTable* /*args*/){
- Locker locker(lock);
- Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
- if (i != bindings.end()) {
- bindings.erase(i);
- // TODO aconway 2006-09-14: What about the ExchangeBinding object? Don't we have to verify routingKey/args match?
- }
-}
-
-void FanOutExchange::route(Message::shared_ptr& msg, const string& /*routingKey*/, FieldTable* /*args*/){
- Locker locker(lock);
- for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){
- (*i)->deliver(msg);
- }
-}
-
-FanOutExchange::~FanOutExchange() {}
-
-const std::string FanOutExchange::typeName("fanout");
diff --git a/cpp/broker/src/HeadersExchange.cpp b/cpp/broker/src/HeadersExchange.cpp
deleted file mode 100644
index 65204cdb85..0000000000
--- a/cpp/broker/src/HeadersExchange.cpp
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "HeadersExchange.h"
-#include "ExchangeBinding.h"
-#include "Value.h"
-#include "QpidError.h"
-#include <algorithm>
-
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::concurrent;
-
-// TODO aconway 2006-09-20: More efficient matching algorithm.
-// The current search algorithm really sucks.
-// Fieldtables are heavy, maybe use shared_ptr to do handle-body.
-
-using namespace qpid::broker;
-
-namespace {
- const std::string all("all");
- const std::string any("any");
- const std::string x_match("x-match");
-}
-
-HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { }
-
-void HeadersExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
- std::cout << "HeadersExchange::bind" << std::endl;
- Locker locker(lock);
- std::string what = args->getString("x-match");
- if (what != all && what != any) {
- THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to headers exchange.");
- }
- bindings.push_back(Binding(*args, queue));
- queue->bound(new ExchangeBinding(this, queue, routingKey, args));
-}
-
-void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, FieldTable* args){
- Locker locker(lock);
- Bindings::iterator i =
- std::find(bindings.begin(),bindings.end(), Binding(*args, queue));
- if (i != bindings.end()) bindings.erase(i);
-}
-
-
-void HeadersExchange::route(Message::shared_ptr& msg, const string& /*routingKey*/, FieldTable* args){
- std::cout << "route: " << *args << std::endl;
- Locker locker(lock);;
- for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- if (match(i->first, *args)) i->second->deliver(msg);
- }
-}
-
-HeadersExchange::~HeadersExchange() {}
-
-const std::string HeadersExchange::typeName("headers");
-
-namespace
-{
-
- bool match_values(const Value& bind, const Value& msg) {
- return dynamic_cast<const EmptyValue*>(&bind) || bind == msg;
- }
-
-}
-
-
-bool HeadersExchange::match(const FieldTable& bind, const FieldTable& msg) {
- typedef FieldTable::ValueMap Map;
- std::string what = bind.getString(x_match);
- if (what == all) {
- for (Map::const_iterator i = bind.getMap().begin();
- i != bind.getMap().end();
- ++i)
- {
- if (i->first != x_match)
- {
- Map::const_iterator j = msg.getMap().find(i->first);
- if (j == msg.getMap().end()) return false;
- if (!match_values(*(i->second), *(j->second))) return false;
- }
- }
- return true;
- } else if (what == any) {
- for (Map::const_iterator i = bind.getMap().begin();
- i != bind.getMap().end();
- ++i)
- {
- if (i->first != x_match)
- {
- Map::const_iterator j = msg.getMap().find(i->first);
- if (j != msg.getMap().end()) {
- if (match_values(*(i->second), *(j->second))) return true;
- }
- }
- }
- return false;
- } else {
- return false;
- }
-}
-
-
-
diff --git a/cpp/broker/src/Message.cpp b/cpp/broker/src/Message.cpp
deleted file mode 100644
index 0a8a5f7a4d..0000000000
--- a/cpp/broker/src/Message.cpp
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "MonitorImpl.h"
-#include "Message.h"
-#include "ExchangeRegistry.h"
-#include <iostream>
-
-using namespace std::tr1;//for *_pointer_cast methods
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::concurrent;
-
-
-Message::Message(const ConnectionToken* const _publisher,
- const string& _exchange, const string& _routingKey,
- bool _mandatory, bool _immediate) : publisher(_publisher),
- exchange(_exchange),
- routingKey(_routingKey),
- mandatory(_mandatory),
- immediate(_immediate),
- redelivered(false),
- size(0){
-
-}
-
-Message::~Message(){
-}
-
-void Message::setHeader(AMQHeaderBody::shared_ptr _header){
- this->header = _header;
-}
-
-void Message::addContent(AMQContentBody::shared_ptr data){
- content.push_back(data);
- size += data->size();
-}
-
-bool Message::isComplete(){
- return header.get() && (header->getContentSize() == contentSize());
-}
-
-void Message::redeliver(){
- redelivered = true;
-}
-
-void Message::deliver(OutputHandler* out, int channel,
- const string& consumerTag, u_int64_t deliveryTag,
- u_int32_t framesize){
-
- out->send(new AMQFrame(channel, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey)));
- sendContent(out, channel, framesize);
-}
-
-void Message::sendGetOk(OutputHandler* out,
- int channel,
- u_int32_t messageCount,
- u_int64_t deliveryTag,
- u_int32_t framesize){
-
- out->send(new AMQFrame(channel, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount)));
- sendContent(out, channel, framesize);
-}
-
-void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize){
- AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
- out->send(new AMQFrame(channel, headerBody));
- for(content_iterator i = content.begin(); i != content.end(); i++){
- if((*i)->size() > framesize){
- //TODO: need to split it
- std::cout << "WARNING: Dropped message. Re-fragmentation not yet implemented." << std::endl;
- }else{
- AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i);
- out->send(new AMQFrame(channel, contentBody));
- }
- }
-}
-
-BasicHeaderProperties* Message::getHeaderProperties(){
- return dynamic_cast<BasicHeaderProperties*>(header->getProperties());
-}
-
-const ConnectionToken* const Message::getPublisher(){
- return publisher;
-}
-
diff --git a/cpp/broker/src/NameGenerator.cpp b/cpp/broker/src/NameGenerator.cpp
deleted file mode 100644
index 46aa385a7e..0000000000
--- a/cpp/broker/src/NameGenerator.cpp
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "NameGenerator.h"
-#include <sstream>
-
-using namespace qpid::broker;
-
-NameGenerator::NameGenerator(const std::string& _base) : base(_base), counter(1) {}
-
-std::string NameGenerator::generate(){
- std::stringstream ss;
- ss << base << counter++;
- return ss.str();
-}
diff --git a/cpp/broker/src/Queue.cpp b/cpp/broker/src/Queue.cpp
deleted file mode 100644
index eaaa3ffa31..0000000000
--- a/cpp/broker/src/Queue.cpp
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "Queue.h"
-#include "MonitorImpl.h"
-#include <iostream>
-
-using namespace qpid::broker;
-using namespace qpid::concurrent;
-
-Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete, const ConnectionToken* const _owner) :
- name(_name),
- autodelete(_autodelete),
- durable(_durable),
- owner(_owner),
- queueing(false),
- dispatching(false),
- next(0),
- lastUsed(0),
- exclusive(0)
-{
- if(autodelete) lastUsed = apr_time_as_msec(apr_time_now());
-}
-
-Queue::~Queue(){
- for(Binding* b = bindings.front(); !bindings.empty(); b = bindings.front()){
- b->cancel();
- bindings.pop();
- }
-}
-
-void Queue::bound(Binding* b){
- bindings.push(b);
-}
-
-void Queue::deliver(Message::shared_ptr& msg){
- Locker locker(lock);
- if(queueing || !dispatch(msg)){
- queueing = true;
- messages.push(msg);
- }
-}
-
-bool Queue::dispatch(Message::shared_ptr& msg){
- if(consumers.empty()){
- return false;
- }else if(exclusive){
- if(!exclusive->deliver(msg)){
- std::cout << "WARNING: Dropping undeliverable message from queue with exclusive consumer." << std::endl;
- }
- return true;
- }else{
- //deliver to next consumer
- next = next % consumers.size();
- Consumer* c = consumers[next];
- int start = next;
- while(c){
- next++;
- if(c->deliver(msg)) return true;
-
- next = next % consumers.size();
- c = next == start ? 0 : consumers[next];
- }
- return false;
- }
-}
-
-bool Queue::startDispatching(){
- Locker locker(lock);
- if(queueing && !dispatching){
- dispatching = true;
- return true;
- }else{
- return false;
- }
-}
-
-void Queue::dispatch(){
- bool proceed = startDispatching();
- while(proceed){
- Locker locker(lock);
- if(!messages.empty() && dispatch(messages.front())){
- messages.pop();
- }else{
- dispatching = false;
- proceed = false;
- queueing = !messages.empty();
- }
- }
-}
-
-void Queue::consume(Consumer* c, bool requestExclusive){
- Locker locker(lock);
- if(exclusive) throw ExclusiveAccessException();
- if(requestExclusive){
- if(!consumers.empty()) throw ExclusiveAccessException();
- exclusive = c;
- }
-
- if(autodelete && consumers.empty()) lastUsed = 0;
- consumers.push_back(c);
-}
-
-void Queue::cancel(Consumer* c){
- Locker locker(lock);
- consumers.erase(find(consumers.begin(), consumers.end(), c));
- if(autodelete && consumers.empty()) lastUsed = apr_time_as_msec(apr_time_now());
- if(exclusive == c) exclusive = 0;
-}
-
-Message::shared_ptr Queue::dequeue(){
- Locker locker(lock);
- Message::shared_ptr msg;
- if(!messages.empty()){
- msg = messages.front();
- messages.pop();
- }
- return msg;
-}
-
-u_int32_t Queue::purge(){
- Locker locker(lock);
- int count = messages.size();
- while(!messages.empty()) messages.pop();
- return count;
-}
-
-u_int32_t Queue::getMessageCount() const{
- Locker locker(lock);
- return messages.size();
-}
-
-u_int32_t Queue::getConsumerCount() const{
- Locker locker(lock);
- return consumers.size();
-}
-
-bool Queue::canAutoDelete() const{
- Locker locker(lock);
- return lastUsed && ((apr_time_as_msec(apr_time_now()) - lastUsed) > autodelete);
-}
diff --git a/cpp/broker/src/QueueRegistry.cpp b/cpp/broker/src/QueueRegistry.cpp
deleted file mode 100644
index f807415314..0000000000
--- a/cpp/broker/src/QueueRegistry.cpp
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "QueueRegistry.h"
-#include "MonitorImpl.h"
-#include "SessionHandlerImpl.h"
-#include <sstream>
-#include <assert.h>
-
-using namespace qpid::broker;
-using namespace qpid::concurrent;
-
-QueueRegistry::QueueRegistry() : counter(1){}
-
-QueueRegistry::~QueueRegistry(){}
-
-std::pair<Queue::shared_ptr, bool>
-QueueRegistry::declare(const string& declareName, bool durable, u_int32_t autoDelete, const ConnectionToken* owner)
-{
- Locker locker(lock);
- string name = declareName.empty() ? generateName() : declareName;
- assert(!name.empty());
- QueueMap::iterator i = queues.find(name);
- if (i == queues.end()) {
- Queue::shared_ptr queue(new Queue(name, durable, autoDelete, owner));
- queues[name] = queue;
- return std::pair<Queue::shared_ptr, bool>(queue, true);
- } else {
- return std::pair<Queue::shared_ptr, bool>(i->second, false);
- }
-}
-
-void QueueRegistry::destroy(const string& name){
- Locker locker(lock);
- queues.erase(name);
-}
-
-Queue::shared_ptr QueueRegistry::find(const string& name){
- Locker locker(lock);
- QueueMap::iterator i = queues.find(name);
- if (i == queues.end()) {
- return Queue::shared_ptr();
- } else {
- return i->second;
- }
-}
-
-string QueueRegistry::generateName(){
- string name;
- do {
- std::stringstream ss;
- ss << "tmp_" << counter++;
- name = ss.str();
- // Thread safety: Private function, only called with lock held
- // so this is OK.
- } while(queues.find(name) != queues.end());
- return name;
-}
diff --git a/cpp/broker/src/Router.cpp b/cpp/broker/src/Router.cpp
deleted file mode 100644
index c2dd74bf7d..0000000000
--- a/cpp/broker/src/Router.cpp
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "Router.h"
-
-using namespace qpid::broker;
-
-Router::Router(ExchangeRegistry& _registry) : registry(_registry){}
-
-void Router::operator()(Message::shared_ptr& msg){
- Exchange* exchange = registry.get(msg->getExchange());
- if(exchange){
- exchange->route(msg, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders()));
- }else{
- std::cout << "WARNING: Could not route message, unknown exchange: " << msg->getExchange() << std::endl;
- }
-
-}
diff --git a/cpp/broker/src/SessionHandlerFactoryImpl.cpp b/cpp/broker/src/SessionHandlerFactoryImpl.cpp
deleted file mode 100644
index 39c627afef..0000000000
--- a/cpp/broker/src/SessionHandlerFactoryImpl.cpp
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "SessionHandlerFactoryImpl.h"
-#include "SessionHandlerImpl.h"
-#include "FanOutExchange.h"
-#include "HeadersExchange.h"
-
-using namespace qpid::broker;
-using namespace qpid::io;
-
-namespace
-{
-const std::string empty;
-const std::string amq_direct("amq.direct");
-const std::string amq_topic("amq.topic");
-const std::string amq_fanout("amq.fanout");
-const std::string amq_match("amq.match");
-}
-
-SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : timeout(_timeout), cleaner(&queues, timeout/10){
- exchanges.declare(new DirectExchange(empty)); // Default exchange.
- exchanges.declare(new DirectExchange(amq_direct));
- exchanges.declare(new TopicExchange(amq_topic));
- exchanges.declare(new FanOutExchange(amq_fanout));
- exchanges.declare(new HeadersExchange(amq_match));
- cleaner.start();
-}
-
-SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt){
- return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, timeout);
-}
-
-SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl(){
- cleaner.stop();
-}
diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp
deleted file mode 100644
index 0d8539332c..0000000000
--- a/cpp/broker/src/SessionHandlerImpl.cpp
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <iostream>
-#include "SessionHandlerImpl.h"
-#include "FanOutExchange.h"
-#include "HeadersExchange.h"
-#include "Router.h"
-#include "TopicExchange.h"
-#include "assert.h"
-
-using namespace std::tr1;
-using namespace qpid::broker;
-using namespace qpid::io;
-using namespace qpid::framing;
-using namespace qpid::concurrent;
-
-SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context,
- QueueRegistry* _queues,
- ExchangeRegistry* _exchanges,
- AutoDelete* _cleaner,
- const u_int32_t _timeout) :
- context(_context),
- client(context),
- queues(_queues),
- exchanges(_exchanges),
- cleaner(_cleaner),
- timeout(_timeout),
- connectionHandler(new ConnectionHandlerImpl(this)),
- channelHandler(new ChannelHandlerImpl(this)),
- basicHandler(new BasicHandlerImpl(this)),
- exchangeHandler(new ExchangeHandlerImpl(this)),
- queueHandler(new QueueHandlerImpl(this)),
- framemax(65536),
- heartbeat(0) {}
-
-SessionHandlerImpl::~SessionHandlerImpl(){
- // TODO aconway 2006-09-07: Should be auto_ptr or plain members.
- delete channelHandler;
- delete connectionHandler;
- delete basicHandler;
- delete exchangeHandler;
- delete queueHandler;
-}
-
-Channel* SessionHandlerImpl::getChannel(u_int16_t channel){
- channel_iterator i = channels.find(channel);
- if(i == channels.end()){
- throw ConnectionException(504, "Unknown channel: " + channel);
- }
- return i->second;
-}
-
-Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t channel){
- Queue::shared_ptr queue;
- if (name.empty()) {
- queue = getChannel(channel)->getDefaultQueue();
- if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" );
- } else {
- queue = queues->find(name);
- if (queue == 0) {
- throw ChannelException( 404, "Queue not found: " + name);
- }
- }
- return queue;
-}
-
-
-Exchange* SessionHandlerImpl::findExchange(const string& name){
- exchanges->getLock()->acquire();
- Exchange* exchange(exchanges->get(name));
- exchanges->getLock()->release();
- return exchange;
-}
-
-void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
- u_int16_t channel = frame->getChannel();
- AMQBody::shared_ptr body = frame->getBody();
- AMQMethodBody::shared_ptr method;
-
- switch(body->type())
- {
- case METHOD_BODY:
- method = dynamic_pointer_cast<AMQMethodBody, AMQBody>(body);
- try{
- method->invoke(*this, channel);
- }catch(ChannelException& e){
- channels[channel]->close();
- channels.erase(channel);
- client.getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
- }catch(ConnectionException& e){
- client.getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId());
- }
- break;
-
- case HEADER_BODY:
- this->handleHeader(channel, dynamic_pointer_cast<AMQHeaderBody, AMQBody>(body));
- break;
-
- case CONTENT_BODY:
- this->handleContent(channel, dynamic_pointer_cast<AMQContentBody, AMQBody>(body));
- break;
-
- case HEARTBEAT_BODY:
- //channel must be 0
- this->handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody, AMQBody>(body));
- break;
- }
-}
-
-void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* /*header*/){
- //send connection start
- FieldTable properties;
- string mechanisms("PLAIN");
- string locales("en_US");
- client.getConnection().start(0, 8, 0, properties, mechanisms, locales);
-}
-
-void SessionHandlerImpl::idleOut(){
-
-}
-
-void SessionHandlerImpl::idleIn(){
-
-}
-
-void SessionHandlerImpl::closed(){
- for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){
- Channel* c = i->second;
- channels.erase(i);
- c->close();
- delete c;
- }
- for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){
- string name = (*i)->getName();
- queues->destroy(name);
- exclusiveQueues.erase(i);
- }
-}
-
-void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){
- getChannel(channel)->handleHeader(body, Router(*exchanges));
-}
-
-void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){
- getChannel(channel)->handleContent(body, Router(*exchanges));
-}
-
-void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
- std::cout << "SessionHandlerImpl::handleHeartbeat()" << std::endl;
-}
-
-void SessionHandlerImpl::ConnectionHandlerImpl::startOk(
- u_int16_t /*channel*/, FieldTable& /*clientProperties*/, string& /*mechanism*/,
- string& /*response*/, string& /*locale*/){
-
- parent->client.getConnection().tune(0, 100, parent->framemax, parent->heartbeat);
-}
-
-void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, string& /*response*/){}
-
-void SessionHandlerImpl::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){
- parent->framemax = framemax;
- parent->heartbeat = heartbeat;
-}
-
-void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t /*channel*/, string& /*virtualHost*/, string& /*capabilities*/, bool /*insist*/){
- string knownhosts;
- parent->client.getConnection().openOk(0, knownhosts);
-}
-
-void SessionHandlerImpl::ConnectionHandlerImpl::close(
- u_int16_t /*channel*/, u_int16_t /*replyCode*/, string& /*replyText*/,
- u_int16_t /*classId*/, u_int16_t /*methodId*/)
-{
- parent->client.getConnection().closeOk(0);
- parent->context->close();
-}
-
-void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){
- parent->context->close();
-}
-
-
-
-void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, string& /*outOfBand*/){
- parent->channels[channel] = new Channel(parent->context, channel, parent->framemax);
- parent->client.getChannel().openOk(channel);
-}
-
-void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){}
-void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){}
-
-void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, string& /*replyText*/,
- u_int16_t /*classId*/, u_int16_t /*methodId*/){
- Channel* c = parent->getChannel(channel);
- if(c){
- parent->channels.erase(channel);
- c->close();
- delete c;
- parent->client.getChannel().closeOk(channel);
- }
-}
-
-void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){}
-
-
-
-void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, string& exchange, string& type,
- bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait,
- FieldTable& /*arguments*/){
-
- if(!passive && (
- type != TopicExchange::typeName &&
- type != DirectExchange::typeName &&
- type != FanOutExchange::typeName &&
- type != HeadersExchange::typeName
- )
- )
- {
- throw ChannelException(540, "Exchange type not implemented: " + type);
- }
-
- parent->exchanges->getLock()->acquire();
- if(!parent->exchanges->get(exchange)){
- if(type == TopicExchange::typeName){
- parent->exchanges->declare(new TopicExchange(exchange));
- }else if(type == DirectExchange::typeName){
- parent->exchanges->declare(new DirectExchange(exchange));
- }else if(type == FanOutExchange::typeName){
- parent->exchanges->declare(new DirectExchange(exchange));
- }else if (type == HeadersExchange::typeName) {
- parent->exchanges->declare(new HeadersExchange(exchange));
- }
- }
- parent->exchanges->getLock()->release();
- if(!nowait){
- parent->client.getExchange().declareOk(channel);
- }
-}
-
-void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, string& exchange, bool /*ifUnused*/, bool nowait){
- //TODO: implement unused
- parent->exchanges->getLock()->acquire();
- parent->exchanges->destroy(exchange);
- parent->exchanges->getLock()->release();
- if(!nowait) parent->client.getExchange().deleteOk(channel);
-}
-
-void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, string& name,
- bool passive, bool durable, bool exclusive,
- bool autoDelete, bool nowait, FieldTable& /*arguments*/){
- Queue::shared_ptr queue;
- if (passive && !name.empty()) {
- queue = parent->getQueue(name, channel);
- } else {
- std::pair<Queue::shared_ptr, bool> queue_created = parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, exclusive ? parent : 0);
- queue = queue_created.first;
- assert(queue);
- if (queue_created.second) { // This is a new queue
- parent->getChannel(channel)->setDefaultQueue(queue);
- //add default binding:
- parent->exchanges->getDefault()->bind(queue, name, 0);
- if(exclusive){
- parent->exclusiveQueues.push_back(queue);
- } else if(autoDelete){
- parent->cleaner->add(queue);
- }
- }
- }
- if(exclusive && !queue->isExclusiveOwner(parent)){
- throw ChannelException(405, "Cannot grant exclusive access to queue");
- }
- if(!nowait){
- name = queue->getName();
- parent->client.getQueue().declareOk(channel, name, queue->getMessageCount(), queue->getConsumerCount());
- }
-}
-
-void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, string& queueName,
- string& exchangeName, string& routingKey, bool nowait,
- FieldTable& arguments){
-
- Queue::shared_ptr queue = parent->getQueue(queueName, channel);
- Exchange* exchange = parent->exchanges->get(exchangeName);
- if(exchange){
- if(routingKey.empty() && queueName.empty()) routingKey = queue->getName();
- exchange->bind(queue, routingKey, &arguments);
- if(!nowait) parent->client.getQueue().bindOk(channel);
- }else{
- throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName);
- }
-}
-
-void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, string& queueName, bool nowait){
-
- Queue::shared_ptr queue = parent->getQueue(queueName, channel);
- int count = queue->purge();
- if(!nowait) parent->client.getQueue().purgeOk(channel, count);
-}
-
-void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, string& queue,
- bool ifUnused, bool ifEmpty, bool nowait){
- ChannelException error(0, "");
- int count(0);
- Queue::shared_ptr q = parent->getQueue(queue, channel);
- if(ifEmpty && q->getMessageCount() > 0){
- throw ChannelException(406, "Queue not empty.");
- }else if(ifUnused && q->getConsumerCount() > 0){
- throw ChannelException(406, "Queue in use.");
- }else{
- //remove the queue from the list of exclusive queues if necessary
- if(q->isExclusiveOwner(parent)){
- queue_iterator i = find(parent->exclusiveQueues.begin(), parent->exclusiveQueues.end(), q);
- if(i < parent->exclusiveQueues.end()) parent->exclusiveQueues.erase(i);
- }
- count = q->getMessageCount();
- parent->queues->destroy(queue);
- }
- if(!nowait) parent->client.getQueue().deleteOk(channel, count);
-}
-
-
-
-
-void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){
- //TODO: handle global
- parent->getChannel(channel)->setPrefetchSize(prefetchSize);
- parent->getChannel(channel)->setPrefetchCount(prefetchCount);
- parent->client.getBasic().qosOk(channel);
-}
-
-void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_t /*ticket*/,
- string& queueName, string& consumerTag,
- bool noLocal, bool noAck, bool exclusive,
- bool nowait){
-
- Queue::shared_ptr queue = parent->getQueue(queueName, channelId);
- Channel* channel = parent->channels[channelId];
- if(!consumerTag.empty() && channel->exists(consumerTag)){
- throw ConnectionException(530, "Consumer tags must be unique");
- }
-
- try{
- channel->consume(consumerTag, queue, !noAck, exclusive, noLocal ? parent : 0);
- if(!nowait) parent->client.getBasic().consumeOk(channelId, consumerTag);
-
- //allow messages to be dispatched if required as there is now a consumer:
- queue->dispatch();
- }catch(ExclusiveAccessException& e){
- if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
- else throw ChannelException(403, "Access would violate previously granted exclusivity");
- }
-
-}
-
-void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, string& consumerTag, bool nowait){
- parent->getChannel(channel)->cancel(consumerTag);
- if(!nowait) parent->client.getBasic().cancelOk(channel, consumerTag);
-}
-
-void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/,
- string& exchange, string& routingKey,
- bool mandatory, bool immediate){
-
- Message* msg = new Message(parent, exchange, routingKey, mandatory, immediate);
- parent->getChannel(channel)->handlePublish(msg);
-}
-
-void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, string& queueName, bool noAck){
- Queue::shared_ptr queue = parent->getQueue(queueName, channelId);
- if(!parent->getChannel(channelId)->get(queue, !noAck)){
- string clusterId;//not used, part of an imatix hack
- parent->client.getBasic().getEmpty(channelId, clusterId);
- }
-}
-
-void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){
- try{
- parent->getChannel(channel)->ack(deliveryTag, multiple);
- }catch(InvalidAckException& e){
- throw ConnectionException(530, "Received ack for unrecognised delivery tag");
- }
-}
-
-void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){}
-
-void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){
- parent->getChannel(channel)->recover(requeue);
-}
-
diff --git a/cpp/broker/src/TopicExchange.cpp b/cpp/broker/src/TopicExchange.cpp
deleted file mode 100644
index 53977747c4..0000000000
--- a/cpp/broker/src/TopicExchange.cpp
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "TopicExchange.h"
-#include "ExchangeBinding.h"
-#include <algorithm>
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-
-
-// TODO aconway 2006-09-20: More efficient matching algorithm.
-// Areas for improvement:
-// - excessive string copying: should be 0 copy, match from original buffer.
-// - match/lookup: use descision tree or other more efficient structure.
-
-Tokens& Tokens::operator=(const std::string& s) {
- clear();
- if (s.empty()) return *this;
- std::string::const_iterator i = s.begin();
- while (true) {
- // Invariant: i is at the beginning of the next untokenized word.
- std::string::const_iterator j = find(i, s.end(), '.');
- push_back(std::string(i, j));
- if (j == s.end()) return *this;
- i = j + 1;
- }
- return *this;
-}
-
-size_t Tokens::Hash::operator()(const Tokens& p) const {
- size_t hash = 0;
- for (Tokens::const_iterator i = p.begin(); i != p.end(); ++i) {
- hash += std::tr1::hash<std::string>()(*i);
- }
- return hash;
-}
-
-TopicPattern& TopicPattern::operator=(const Tokens& tokens) {
- Tokens::operator=(tokens);
- normalize();
- return *this;
-}
-
-namespace {
-const std::string hashmark("#");
-const std::string star("*");
-}
-
-void TopicPattern::normalize() {
- std::string word;
- Tokens::iterator i = begin();
- while (i != end()) {
- if (*i == hashmark) {
- ++i;
- while (i != end()) {
- // Invariant: *(i-1)==#, [begin()..i-1] is normalized.
- if (*i == star) { // Move * before #.
- std::swap(*i, *(i-1));
- ++i;
- } else if (*i == hashmark) {
- erase(i); // Remove extra #
- } else {
- break;
- }
- }
- } else {
- i ++;
- }
- }
-}
-
-
-namespace {
-// TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string.
-// Need more efficient Tokens impl that can operate on a string in place.
-//
-bool do_match(Tokens::const_iterator pattern_begin, Tokens::const_iterator pattern_end, Tokens::const_iterator target_begin, Tokens::const_iterator target_end)
-{
- // Invariant: [pattern_begin..p) matches [target_begin..t)
- Tokens::const_iterator p = pattern_begin;
- Tokens::const_iterator t = target_begin;
- while (p != pattern_end && t != target_end)
- {
- if (*p == star || *p == *t) {
- ++p, ++t;
- } else if (*p == hashmark) {
- ++p;
- if (do_match(p, pattern_end, t, target_end)) return true;
- while (t != target_end) {
- ++t;
- if (do_match(p, pattern_end, t, target_end)) return true;
- }
- return false;
- } else {
- return false;
- }
- }
- while (p != pattern_end && *p == hashmark) ++p; // Ignore trailing #
- return t == target_end && p == pattern_end;
-}
-}
-
-bool TopicPattern::match(const Tokens& target) const
-{
- return do_match(begin(), end(), target.begin(), target.end());
-}
-
-TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { }
-
-void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
- lock.acquire();
- TopicPattern routingPattern(routingKey);
- bindings[routingPattern].push_back(queue);
- queue->bound(new ExchangeBinding(this, queue, routingKey, args));
- lock.release();
-}
-
-void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* /*args*/){
- lock.acquire();
- BindingMap::iterator bi = bindings.find(TopicPattern(routingKey));
- Queue::vector& qv(bi->second);
- if (bi == bindings.end()) return;
- Queue::vector::iterator q = find(qv.begin(), qv.end(), queue);
- if(q == qv.end()) return;
- qv.erase(q);
- if(qv.empty()) bindings.erase(bi);
- lock.release();
-}
-
-
-void TopicExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* /*args*/){
- lock.acquire();
- for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
- if (i->first.match(routingKey)) {
- Queue::vector& qv(i->second);
- for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){
- (*j)->deliver(msg);
- }
- }
- }
- lock.release();
-}
-
-TopicExchange::~TopicExchange() {}
-
-const std::string TopicExchange::typeName("topic");
-
-
diff --git a/cpp/broker/test/ChannelTest.cpp b/cpp/broker/test/ChannelTest.cpp
deleted file mode 100644
index 001a0d9c00..0000000000
--- a/cpp/broker/test/ChannelTest.cpp
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "Channel.h"
-#include "Message.h"
-#include <qpid_test_plugin.h>
-#include <iostream>
-#include <memory>
-
-using namespace std::tr1;
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::concurrent;
-
-struct DummyRouter{
- Message::shared_ptr last;
-
- void operator()(Message::shared_ptr& msg){
- last = msg;
- }
-};
-
-struct DummyHandler : OutputHandler{
- std::vector<AMQFrame*> frames;
-
- virtual void send(AMQFrame* frame){
- frames.push_back(frame);
- }
-};
-
-
-class ChannelTest : public CppUnit::TestCase
-{
- CPPUNIT_TEST_SUITE(ChannelTest);
- CPPUNIT_TEST(testIncoming);
- CPPUNIT_TEST(testConsumerMgmt);
- CPPUNIT_TEST(testDeliveryNoAck);
- CPPUNIT_TEST_SUITE_END();
-
- public:
-
- void testIncoming(){
- Channel channel(0, 0, 10000);
- string routingKey("my_routing_key");
- channel.handlePublish(new Message(0, "test", routingKey, false, false));
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
- header->setContentSize(14);
- string data1("abcdefg");
- string data2("hijklmn");
- AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
- AMQContentBody::shared_ptr part2(new AMQContentBody(data2));
-
- CPPUNIT_ASSERT(!channel.handleHeader(header, DummyRouter()).last);
- CPPUNIT_ASSERT(!channel.handleContent(part1, DummyRouter()).last);
- DummyRouter router = channel.handleContent(part2, DummyRouter());
- CPPUNIT_ASSERT(router.last);
- CPPUNIT_ASSERT_EQUAL(routingKey, router.last->getRoutingKey());
- }
-
- void testConsumerMgmt(){
- Queue::shared_ptr queue(new Queue("my_queue"));
- Channel channel(0, 0, 0);
- CPPUNIT_ASSERT(!channel.exists("my_consumer"));
-
- ConnectionToken* owner;
- string tag("my_consumer");
- channel.consume(tag, queue, false, false, owner);
- string tagA;
- string tagB;
- channel.consume(tagA, queue, false, false, owner);
- channel.consume(tagB, queue, false, false, owner);
- CPPUNIT_ASSERT_EQUAL((u_int32_t) 3, queue->getConsumerCount());
- CPPUNIT_ASSERT(channel.exists("my_consumer"));
- CPPUNIT_ASSERT(channel.exists(tagA));
- CPPUNIT_ASSERT(channel.exists(tagB));
- channel.cancel(tagA);
- CPPUNIT_ASSERT_EQUAL((u_int32_t) 2, queue->getConsumerCount());
- CPPUNIT_ASSERT(channel.exists("my_consumer"));
- CPPUNIT_ASSERT(!channel.exists(tagA));
- CPPUNIT_ASSERT(channel.exists(tagB));
- channel.close();
- CPPUNIT_ASSERT_EQUAL((u_int32_t) 0, queue->getConsumerCount());
- }
-
- void testDeliveryNoAck(){
- DummyHandler handler;
- Channel channel(&handler, 7, 10000);
-
- Message::shared_ptr msg(new Message(0, "test", "my_routing_key", false, false));
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
- header->setContentSize(14);
- msg->setHeader(header);
- AMQContentBody::shared_ptr body(new AMQContentBody("abcdefghijklmn"));
- msg->addContent(body);
-
- Queue::shared_ptr queue(new Queue("my_queue"));
- ConnectionToken* owner(0);
- string tag("no_ack");
- channel.consume(tag, queue, false, false, owner);
-
- queue->deliver(msg);
- CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel());
- BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody()));
- AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody()));
- AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody()));
- CPPUNIT_ASSERT(deliver);
- CPPUNIT_ASSERT(contentHeader);
- CPPUNIT_ASSERT(contentBody);
- CPPUNIT_ASSERT_EQUAL(string("abcdefghijklmn"), contentBody->getData());
- }
-
- void testDeliveryAndRecovery(){
- DummyHandler handler;
- Channel channel(&handler, 7, 10000);
-
- Message::shared_ptr msg(new Message(0, "test", "my_routing_key", false, false));
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
- header->setContentSize(14);
- msg->setHeader(header);
- AMQContentBody::shared_ptr body(new AMQContentBody("abcdefghijklmn"));
- msg->addContent(body);
-
- Queue::shared_ptr queue(new Queue("my_queue"));
- ConnectionToken* owner;
- string tag("ack");
- channel.consume(tag, queue, true, false, owner);
-
- queue->deliver(msg);
- CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel());
- CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel());
- BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody()));
- AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody()));
- AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody()));
- CPPUNIT_ASSERT(deliver);
- CPPUNIT_ASSERT(contentHeader);
- CPPUNIT_ASSERT(contentBody);
- CPPUNIT_ASSERT_EQUAL(string("abcdefghijklmn"), contentBody->getData());
- }
-};
-
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(ChannelTest);
diff --git a/cpp/broker/test/ExchangeTest.cpp b/cpp/broker/test/ExchangeTest.cpp
deleted file mode 100644
index 8c702ff836..0000000000
--- a/cpp/broker/test/ExchangeTest.cpp
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "DirectExchange.h"
-#include "Exchange.h"
-#include "Queue.h"
-#include "TopicExchange.h"
-#include <qpid_test_plugin.h>
-#include <iostream>
-
-using namespace qpid::broker;
-using namespace qpid::concurrent;
-
-class ExchangeTest : public CppUnit::TestCase
-{
- CPPUNIT_TEST_SUITE(ExchangeTest);
- CPPUNIT_TEST(testMe);
- CPPUNIT_TEST_SUITE_END();
-
- public:
-
- // TODO aconway 2006-09-12: Need more detailed tests.
-
- void testMe()
- {
- Queue::shared_ptr queue(new Queue("queue", true, true));
- Queue::shared_ptr queue2(new Queue("queue2", true, true));
-
- TopicExchange topic("topic");
- topic.bind(queue, "abc", 0);
- topic.bind(queue2, "abc", 0);
-
- DirectExchange direct("direct");
- direct.bind(queue, "abc", 0);
- direct.bind(queue2, "abc", 0);
-
- queue.reset();
- queue2.reset();
-
- Message::shared_ptr msg = Message::shared_ptr(new Message(0, "e", "A", true, true));
- topic.route(msg, "abc", 0);
- direct.route(msg, "abc", 0);
-
- // TODO aconway 2006-09-12: TODO Why no assertions?
- }
-};
-
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(ExchangeTest);
diff --git a/cpp/broker/test/HeadersExchangeTest.cpp b/cpp/broker/test/HeadersExchangeTest.cpp
deleted file mode 100644
index d56e00543d..0000000000
--- a/cpp/broker/test/HeadersExchangeTest.cpp
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "HeadersExchange.h"
-#include "FieldTable.h"
-#include "Value.h"
-#include <qpid_test_plugin.h>
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-
-class HeadersExchangeTest : public CppUnit::TestCase
-{
- CPPUNIT_TEST_SUITE(HeadersExchangeTest);
- CPPUNIT_TEST(testMatchAll);
- CPPUNIT_TEST(testMatchAny);
- CPPUNIT_TEST(testMatchEmptyValue);
- CPPUNIT_TEST(testMatchEmptyArgs);
- CPPUNIT_TEST(testMatchNoXMatch);
- CPPUNIT_TEST_SUITE_END();
-
- public:
-
- void testMatchAll()
- {
- FieldTable b, m;
- b.setString("x-match", "all");
- b.setString("foo", "FOO");
- b.setInt("n", 42);
- m.setString("foo", "FOO");
- m.setInt("n", 42);
- CPPUNIT_ASSERT(HeadersExchange::match(b, m));
-
- // Ignore extras.
- m.setString("extra", "x");
- CPPUNIT_ASSERT(HeadersExchange::match(b, m));
-
- // Fail mismatch, wrong value.
- m.setString("foo", "NotFoo");
- CPPUNIT_ASSERT(!HeadersExchange::match(b, m));
-
- // Fail mismatch, missing value
- m.erase("foo");
- CPPUNIT_ASSERT(!HeadersExchange::match(b, m));
- }
-
- void testMatchAny()
- {
- FieldTable b, m;
- b.setString("x-match", "any");
- b.setString("foo", "FOO");
- b.setInt("n", 42);
- m.setString("foo", "FOO");
- CPPUNIT_ASSERT(HeadersExchange::match(b, m));
- m.erase("foo");
- CPPUNIT_ASSERT(!HeadersExchange::match(b, m));
- m.setInt("n", 42);
- CPPUNIT_ASSERT(HeadersExchange::match(b, m));
- }
-
- void testMatchEmptyValue()
- {
- FieldTable b, m;
- b.setString("x-match", "all");
- b.getMap()["foo"] = FieldTable::ValuePtr(new EmptyValue());
- b.getMap()["n"] = FieldTable::ValuePtr(new EmptyValue());
- CPPUNIT_ASSERT(!HeadersExchange::match(b, m));
- m.setString("foo", "blah");
- m.setInt("n", 123);
- }
-
- void testMatchEmptyArgs()
- {
- FieldTable b, m;
- m.setString("foo", "FOO");
-
- b.setString("x-match", "all");
- CPPUNIT_ASSERT(HeadersExchange::match(b, m));
- b.setString("x-match", "any");
- CPPUNIT_ASSERT(!HeadersExchange::match(b, m));
- }
-
-
- void testMatchNoXMatch()
- {
- FieldTable b, m;
- b.setString("foo", "FOO");
- m.setString("foo", "FOO");
- CPPUNIT_ASSERT(!HeadersExchange::match(b, m));
- }
-
-
-};
-
-// make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(HeadersExchangeTest);
diff --git a/cpp/broker/test/Makefile b/cpp/broker/test/Makefile
deleted file mode 100644
index 172ce564bf..0000000000
--- a/cpp/broker/test/Makefile
+++ /dev/null
@@ -1,20 +0,0 @@
-#
-# Copyright (c) 2006 The Apache Software Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-QPID_HOME = ../../..
-LDLIBS=-lapr-1 -lcppunit $(COMMON_LIB) $(BROKER_LIB)
-include ${QPID_HOME}/cpp/test_plugins.mk
-
diff --git a/cpp/broker/test/MessageTest.cpp b/cpp/broker/test/MessageTest.cpp
deleted file mode 100644
index fc2c6e01bb..0000000000
--- a/cpp/broker/test/MessageTest.cpp
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "APRBase.h"
-#include "Message.h"
-#include <qpid_test_plugin.h>
-#include <iostream>
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::concurrent;
-
-class MessageTest : public CppUnit::TestCase
-{
- CPPUNIT_TEST_SUITE(MessageTest);
- CPPUNIT_TEST(testMe);
- CPPUNIT_TEST_SUITE_END();
-
- public:
-
- // TODO aconway 2006-09-12: Need more detailed tests,
- // need tests to assert something!
- //
- void testMe()
- {
- APRBase::increment();
- const int size(10);
- for(int i = 0; i < size; i++){
- Message::shared_ptr msg = Message::shared_ptr(new Message(0, "A", "B", true, true));
- msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody()));
- msg->addContent(AMQContentBody::shared_ptr(new AMQContentBody()));
- msg.reset();
- }
- }
-};
-
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(MessageTest);
-
diff --git a/cpp/broker/test/QueueRegistryTest.cpp b/cpp/broker/test/QueueRegistryTest.cpp
deleted file mode 100644
index bd739aaad5..0000000000
--- a/cpp/broker/test/QueueRegistryTest.cpp
+++ /dev/null
@@ -1,76 +0,0 @@
-#include "QueueRegistry.h"
-#include <qpid_test_plugin.h>
-#include <string>
-
-using namespace qpid::broker;
-
-class QueueRegistryTest : public CppUnit::TestCase
-{
- CPPUNIT_TEST_SUITE(QueueRegistryTest);
- CPPUNIT_TEST(testDeclare);
- CPPUNIT_TEST(testDeclareTmp);
- CPPUNIT_TEST(testFind);
- CPPUNIT_TEST(testDestroy);
- CPPUNIT_TEST_SUITE_END();
-
- private:
- std::string foo, bar;
- QueueRegistry reg;
- std::pair<Queue::shared_ptr, bool> qc;
-
- public:
- void setUp() {
- foo = "foo";
- bar = "bar";
- }
-
- void testDeclare() {
- qc = reg.declare(foo, false, 0, 0);
- Queue::shared_ptr q = qc.first;
- CPPUNIT_ASSERT(q);
- CPPUNIT_ASSERT(qc.second); // New queue
- CPPUNIT_ASSERT_EQUAL(foo, q->getName());
-
- qc = reg.declare(foo, false, 0, 0);
- CPPUNIT_ASSERT_EQUAL(q, qc.first);
- CPPUNIT_ASSERT(!qc.second);
-
- qc = reg.declare(bar, false, 0, 0);
- q = qc.first;
- CPPUNIT_ASSERT(q);
- CPPUNIT_ASSERT_EQUAL(true, qc.second);
- CPPUNIT_ASSERT_EQUAL(bar, q->getName());
- }
-
- void testDeclareTmp()
- {
- qc = reg.declare(std::string(), false, 0, 0);
- CPPUNIT_ASSERT(qc.second);
- CPPUNIT_ASSERT_EQUAL(std::string("tmp_1"), qc.first->getName());
- }
-
- void testFind() {
- CPPUNIT_ASSERT(reg.find(foo) == 0);
-
- reg.declare(foo, false, 0, 0);
- reg.declare(bar, false, 0, 0);
- Queue::shared_ptr q = reg.find(bar);
- CPPUNIT_ASSERT(q);
- CPPUNIT_ASSERT_EQUAL(bar, q->getName());
- }
-
- void testDestroy() {
- qc = reg.declare(foo, false, 0, 0);
- reg.destroy(foo);
- // Queue is gone from the registry.
- CPPUNIT_ASSERT(reg.find(foo) == 0);
- // Queue is not actually destroyed till we drop our reference.
- CPPUNIT_ASSERT_EQUAL(foo, qc.first->getName());
- // We shoud be the only reference.
- CPPUNIT_ASSERT_EQUAL(1L, qc.first.use_count());
- }
-};
-
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(QueueRegistryTest);
diff --git a/cpp/broker/test/QueueTest.cpp b/cpp/broker/test/QueueTest.cpp
deleted file mode 100644
index 1b4eb814cb..0000000000
--- a/cpp/broker/test/QueueTest.cpp
+++ /dev/null
@@ -1,176 +0,0 @@
- /*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "Queue.h"
-#include "QueueRegistry.h"
-#include <qpid_test_plugin.h>
-#include <iostream>
-
-using namespace qpid::broker;
-using namespace qpid::concurrent;
-
-
-class TestBinding : public virtual Binding{
- bool cancelled;
-
-public:
- TestBinding();
- virtual void cancel();
- bool isCancelled();
-};
-
-class TestConsumer : public virtual Consumer{
-public:
- Message::shared_ptr last;
-
- virtual bool deliver(Message::shared_ptr& msg);
-};
-
-
-class QueueTest : public CppUnit::TestCase
-{
- CPPUNIT_TEST_SUITE(QueueTest);
- CPPUNIT_TEST(testConsumers);
- CPPUNIT_TEST(testBinding);
- CPPUNIT_TEST(testRegistry);
- CPPUNIT_TEST(testDequeue);
- CPPUNIT_TEST_SUITE_END();
-
- public:
- void testConsumers(){
- Queue::shared_ptr queue(new Queue("my_queue", true, true));
-
- //Test adding consumers:
- TestConsumer c1;
- TestConsumer c2;
- queue->consume(&c1);
- queue->consume(&c2);
-
- CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getConsumerCount());
-
- //Test basic delivery:
- Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true));
- Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true));
- Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true));
-
- queue->deliver(msg1);
- CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get());
-
- queue->deliver(msg2);
- CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get());
-
- queue->deliver(msg3);
- CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get());
-
- //Test cancellation:
- queue->cancel(&c1);
- CPPUNIT_ASSERT_EQUAL(u_int32_t(1), queue->getConsumerCount());
- queue->cancel(&c2);
- CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getConsumerCount());
- }
-
- void testBinding(){
- Queue::shared_ptr queue(new Queue("my_queue", true, true));
- //Test bindings:
- TestBinding a;
- TestBinding b;
- queue->bound(&a);
- queue->bound(&b);
-
- queue.reset();
-
- CPPUNIT_ASSERT(a.isCancelled());
- CPPUNIT_ASSERT(b.isCancelled());
- }
-
- void testRegistry(){
- //Test use of queues in registry:
- QueueRegistry registry;
- registry.declare("queue1", true, true);
- registry.declare("queue2", true, true);
- registry.declare("queue3", true, true);
-
- CPPUNIT_ASSERT(registry.find("queue1"));
- CPPUNIT_ASSERT(registry.find("queue2"));
- CPPUNIT_ASSERT(registry.find("queue3"));
-
- registry.destroy("queue1");
- registry.destroy("queue2");
- registry.destroy("queue3");
-
- CPPUNIT_ASSERT(!registry.find("queue1"));
- CPPUNIT_ASSERT(!registry.find("queue2"));
- CPPUNIT_ASSERT(!registry.find("queue3"));
- }
-
- void testDequeue(){
- Queue::shared_ptr queue(new Queue("my_queue", true, true));
-
- Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true));
- Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true));
- Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true));
- Message::shared_ptr received;
-
- queue->deliver(msg1);
- queue->deliver(msg2);
- queue->deliver(msg3);
-
- CPPUNIT_ASSERT_EQUAL(u_int32_t(3), queue->getMessageCount());
-
- received = queue->dequeue();
- CPPUNIT_ASSERT_EQUAL(msg1.get(), received.get());
- CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getMessageCount());
-
- received = queue->dequeue();
- CPPUNIT_ASSERT_EQUAL(msg2.get(), received.get());
- CPPUNIT_ASSERT_EQUAL(u_int32_t(1), queue->getMessageCount());
-
- TestConsumer consumer;
- queue->consume(&consumer);
- queue->dispatch();
- CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get());
- CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getMessageCount());
-
- received = queue->dequeue();
- CPPUNIT_ASSERT(!received);
- CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getMessageCount());
-
- }
-};
-
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(QueueTest);
-
-//TestBinding
-TestBinding::TestBinding() : cancelled(false) {}
-
-void TestBinding::cancel(){
- CPPUNIT_ASSERT(!cancelled);
- cancelled = true;
-}
-
-bool TestBinding::isCancelled(){
- return cancelled;
-}
-
-//TestConsumer
-bool TestConsumer::deliver(Message::shared_ptr& msg){
- last = msg;
- return true;
-}
-
diff --git a/cpp/broker/test/RouterTest.cpp b/cpp/broker/test/RouterTest.cpp
deleted file mode 100644
index b1d4b9739f..0000000000
--- a/cpp/broker/test/RouterTest.cpp
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "Channel.h"
-#include "Exchange.h"
-#include "ExchangeRegistry.h"
-#include "Message.h"
-#include "Router.h"
-#include <qpid_test_plugin.h>
-#include <iostream>
-#include <memory>
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::concurrent;
-
-struct TestExchange : public Exchange{
- Message::shared_ptr msg;
- string routingKey;
- FieldTable* args;
-
- TestExchange() : Exchange("test"), args(0) {}
-
- void bind(Queue::shared_ptr /*queue*/, const string& /*routingKey*/, FieldTable* /*args*/){}
-
- void unbind(Queue::shared_ptr /*queue*/, const string& /*routingKey*/, FieldTable* /*args*/){ }
-
- void route(Message::shared_ptr& _msg, const string& _routingKey, FieldTable* _args){
- msg = _msg;
- routingKey = _routingKey;
- args = _args;
- }
-};
-
-class RouterTest : public CppUnit::TestCase
-{
- CPPUNIT_TEST_SUITE(RouterTest);
- CPPUNIT_TEST(test);
- CPPUNIT_TEST_SUITE_END();
-
- public:
-
- void test()
- {
- ExchangeRegistry registry;
- TestExchange* exchange = new TestExchange();
- registry.declare(exchange);
-
- string routingKey("my_routing_key");
- string name("name");
- string value("value");
- Message::shared_ptr msg(new Message(0, "test", routingKey, false, false));
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
-
- dynamic_cast<BasicHeaderProperties*>(header->getProperties())->getHeaders().setString(name, value);
- msg->setHeader(header);
-
- Router router(registry);
- router(msg);
-
- CPPUNIT_ASSERT(exchange->msg);
- CPPUNIT_ASSERT_EQUAL(msg, exchange->msg);
- CPPUNIT_ASSERT_EQUAL(routingKey, exchange->msg->getRoutingKey());
- CPPUNIT_ASSERT_EQUAL(routingKey, exchange->routingKey);
- CPPUNIT_ASSERT_EQUAL(value, exchange->args->getString(name));
- }
-};
-
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(RouterTest);
-
diff --git a/cpp/broker/test/TopicExchangeTest.cpp b/cpp/broker/test/TopicExchangeTest.cpp
deleted file mode 100644
index d9b49fc603..0000000000
--- a/cpp/broker/test/TopicExchangeTest.cpp
+++ /dev/null
@@ -1,184 +0,0 @@
-#include "TopicExchange.h"
-#include <qpid_test_plugin.h>
-
-using namespace qpid::broker;
-
-Tokens makeTokens(char** begin, char** end)
-{
- Tokens t;
- t.insert(t.end(), begin, end);
- return t;
-}
-
-// Calculate size of an array.
-#define LEN(a) (sizeof(a)/sizeof(a[0]))
-
-// Convert array to token vector
-#define TOKENS(a) makeTokens(a, a + LEN(a))
-
-// Allow CPPUNIT_EQUALS to print a Tokens.
-// TODO aconway 2006-09-19: Make it a template and put it in a shared test lib.
-//
-CppUnit::OStringStream& operator <<(CppUnit::OStringStream& out, const Tokens& v)
-{
- out << "[ ";
- for (Tokens::const_iterator i = v.begin();
- i != v.end(); ++i)
- {
- out << '"' << *i << '"' << (i+1 == v.end() ? "]" : ", ");
- }
- return out;
-}
-
-
-class TokensTest : public CppUnit::TestCase
-{
- CPPUNIT_TEST_SUITE(TokensTest);
- CPPUNIT_TEST(testTokens);
- CPPUNIT_TEST_SUITE_END();
-
- public:
- void testTokens()
- {
- Tokens tokens("hello.world");
- char* expect[] = {"hello", "world"};
- CPPUNIT_ASSERT_EQUAL(TOKENS(expect), tokens);
-
- tokens = "a.b.c";
- char* expect2[] = { "a", "b", "c" };
- CPPUNIT_ASSERT_EQUAL(TOKENS(expect2), tokens);
-
- tokens = "";
- CPPUNIT_ASSERT(tokens.empty());
-
- tokens = "x";
- char* expect3[] = { "x" };
- CPPUNIT_ASSERT_EQUAL(TOKENS(expect3), tokens);
-
- tokens = (".x");
- char* expect4[] = { "", "x" };
- CPPUNIT_ASSERT_EQUAL(TOKENS(expect4), tokens);
-
- tokens = ("x.");
- char* expect5[] = { "x", "" };
- CPPUNIT_ASSERT_EQUAL(TOKENS(expect5), tokens);
-
- tokens = (".");
- char* expect6[] = { "", "" };
- CPPUNIT_ASSERT_EQUAL(TOKENS(expect6), tokens);
-
- tokens = ("..");
- char* expect7[] = { "", "", "" };
- CPPUNIT_ASSERT_EQUAL(TOKENS(expect7), tokens);
- }
-
-};
-
-#define ASSERT_NORMALIZED(expect, pattern) \
- CPPUNIT_ASSERT_EQUAL(Tokens(expect), static_cast<Tokens>(TopicPattern(pattern)))
-class TopicPatternTest : public CppUnit::TestCase
-{
- CPPUNIT_TEST_SUITE(TopicPatternTest);
- CPPUNIT_TEST(testNormalize);
- CPPUNIT_TEST(testPlain);
- CPPUNIT_TEST(testStar);
- CPPUNIT_TEST(testHash);
- CPPUNIT_TEST(testMixed);
- CPPUNIT_TEST(testCombo);
- CPPUNIT_TEST_SUITE_END();
-
- public:
-
- void testNormalize()
- {
- CPPUNIT_ASSERT(TopicPattern("").empty());
- ASSERT_NORMALIZED("a.b.c", "a.b.c");
- ASSERT_NORMALIZED("a.*.c", "a.*.c");
- ASSERT_NORMALIZED("#", "#");
- ASSERT_NORMALIZED("#", "#.#.#.#");
- ASSERT_NORMALIZED("*.*.*.#", "#.*.#.*.#.#.*");
- ASSERT_NORMALIZED("a.*.*.*.#", "a.*.#.*.#.*.#");
- ASSERT_NORMALIZED("a.*.*.*.#", "a.*.#.*.#.*");
- }
-
- void testPlain() {
- TopicPattern p("ab.cd.e");
- CPPUNIT_ASSERT(p.match("ab.cd.e"));
- CPPUNIT_ASSERT(!p.match("abx.cd.e"));
- CPPUNIT_ASSERT(!p.match("ab.cd"));
- CPPUNIT_ASSERT(!p.match("ab.cd..e."));
- CPPUNIT_ASSERT(!p.match("ab.cd.e."));
- CPPUNIT_ASSERT(!p.match(".ab.cd.e"));
-
- p = "";
- CPPUNIT_ASSERT(p.match(""));
-
- p = ".";
- CPPUNIT_ASSERT(p.match("."));
- }
-
-
- void testStar()
- {
- TopicPattern p("a.*.b");
- CPPUNIT_ASSERT(p.match("a.xx.b"));
- CPPUNIT_ASSERT(!p.match("a.b"));
-
- p = "*.x";
- CPPUNIT_ASSERT(p.match("y.x"));
- CPPUNIT_ASSERT(p.match(".x"));
- CPPUNIT_ASSERT(!p.match("x"));
-
- p = "x.x.*";
- CPPUNIT_ASSERT(p.match("x.x.y"));
- CPPUNIT_ASSERT(p.match("x.x."));
- CPPUNIT_ASSERT(!p.match("x.x"));
- CPPUNIT_ASSERT(!p.match("q.x.y"));
- }
-
- void testHash()
- {
- TopicPattern p("a.#.b");
- CPPUNIT_ASSERT(p.match("a.b"));
- CPPUNIT_ASSERT(p.match("a.x.b"));
- CPPUNIT_ASSERT(p.match("a..x.y.zz.b"));
- CPPUNIT_ASSERT(!p.match("a.b."));
- CPPUNIT_ASSERT(!p.match("q.x.b"));
-
- p = "a.#";
- CPPUNIT_ASSERT(p.match("a"));
- CPPUNIT_ASSERT(p.match("a.b"));
- CPPUNIT_ASSERT(p.match("a.b.c"));
-
- p = "#.a";
- CPPUNIT_ASSERT(p.match("a"));
- CPPUNIT_ASSERT(p.match("x.y.a"));
- }
-
- void testMixed()
- {
- TopicPattern p("*.x.#.y");
- CPPUNIT_ASSERT(p.match("a.x.y"));
- CPPUNIT_ASSERT(p.match("a.x.p.qq.y"));
- CPPUNIT_ASSERT(!p.match("a.a.x.y"));
- CPPUNIT_ASSERT(!p.match("aa.x.b.c"));
-
- p = "a.#.b.*";
- CPPUNIT_ASSERT(p.match("a.b.x"));
- CPPUNIT_ASSERT(p.match("a.x.x.x.b.x"));
- }
-
- void testCombo() {
- TopicPattern p("*.#.#.*.*.#");
- CPPUNIT_ASSERT(p.match("x.y.z"));
- CPPUNIT_ASSERT(p.match("x.y.z.a.b.c"));
- CPPUNIT_ASSERT(!p.match("x.y"));
- CPPUNIT_ASSERT(!p.match("x"));
- }
-};
-
-
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(TopicPatternTest);
-CPPUNIT_TEST_SUITE_REGISTRATION(TokensTest);
diff --git a/cpp/broker/test/ValueTest.cpp b/cpp/broker/test/ValueTest.cpp
deleted file mode 100644
index ec9659e603..0000000000
--- a/cpp/broker/test/ValueTest.cpp
+++ /dev/null
@@ -1,96 +0,0 @@
-#include "Value.h"
-#include <qpid_test_plugin.h>
-
-using namespace qpid::framing;
-
-
-// Allow CPPUNIT_EQUALS to print a Tokens.
-// TODO aconway 2006-09-19: Make it a template and put it in a shared test lib.
-//
-template <class T>
-CppUnit::OStringStream& operator <<(CppUnit::OStringStream& out,
- const ValueOps<T>& v)
-{
- out << v.getValue();
- return out;
-}
-
-
-class ValueTest : public CppUnit::TestCase
-{
- CPPUNIT_TEST_SUITE(ValueTest);
- CPPUNIT_TEST(testStringValueEquals);
- CPPUNIT_TEST(testIntegerValueEquals);
- CPPUNIT_TEST(testDecimalValueEquals);
- CPPUNIT_TEST(testFieldTableValueEquals);
- CPPUNIT_TEST_SUITE_END();
-
- StringValue s;
- IntegerValue i;
- DecimalValue d;
- FieldTableValue ft;
- EmptyValue e;
-
- public:
- ValueTest() :
- s("abc"),
- i(42),
- d(1234,2)
-
- {
- ft.getValue().setString("foo", "FOO");
- ft.getValue().setInt("magic", 7);
- }
-
- void testStringValueEquals()
- {
-
- CPPUNIT_ASSERT(StringValue("abc") == s);
- CPPUNIT_ASSERT(s != StringValue("foo"));
- CPPUNIT_ASSERT(s != e);
- CPPUNIT_ASSERT(e != d);
- CPPUNIT_ASSERT(e != ft);
- }
-
- void testIntegerValueEquals()
- {
- CPPUNIT_ASSERT(IntegerValue(42) == i);
- CPPUNIT_ASSERT(IntegerValue(5) != i);
- CPPUNIT_ASSERT(i != e);
- CPPUNIT_ASSERT(i != d);
- }
-
- void testDecimalValueEquals()
- {
- CPPUNIT_ASSERT(DecimalValue(1234, 2) == d);
- CPPUNIT_ASSERT(DecimalValue(12345, 2) != d);
- CPPUNIT_ASSERT(DecimalValue(1234, 3) != d);
- CPPUNIT_ASSERT(d != s);
- }
-
-
- void testFieldTableValueEquals()
- {
- CPPUNIT_ASSERT_EQUAL(std::string("FOO"),
- ft.getValue().getString("foo"));
- CPPUNIT_ASSERT_EQUAL(7, ft.getValue().getInt("magic"));
-
- FieldTableValue f2;
- CPPUNIT_ASSERT(ft != f2);
- f2.getValue().setString("foo", "FOO");
- CPPUNIT_ASSERT(ft != f2);
- f2.getValue().setInt("magic", 7);
- CPPUNIT_ASSERT_EQUAL(ft,f2);
- CPPUNIT_ASSERT(ft == f2);
- f2.getValue().setString("foo", "BAR");
- CPPUNIT_ASSERT(ft != f2);
- CPPUNIT_ASSERT(ft != i);
- }
-
-};
-
-
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(ValueTest);
-