diff options
Diffstat (limited to 'cpp/broker')
49 files changed, 0 insertions, 4651 deletions
diff --git a/cpp/broker/Makefile b/cpp/broker/Makefile deleted file mode 100644 index 5c96589d95..0000000000 --- a/cpp/broker/Makefile +++ /dev/null @@ -1,39 +0,0 @@ -# -# Copyright (c) 2006 The Apache Software Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# -# Build broker library. -# - -QPID_HOME = ../.. -include ${QPID_HOME}/cpp/options.mk -TARGET=$(BROKER_LIB) -SOURCES= $(wildcard src/*.cpp) -OBJECTS= $(subst .cpp,.o,$(SOURCES)) - -.PHONY: all clean - -all: $(TARGET) - @$(MAKE) -C test all - -clean: - -@rm -f $(TARGET) ${OBJECTS} src/*.d - @$(MAKE) -C test clean - -$(TARGET): $(OBJECTS) - $(CXX) -shared -o $@ $(LDFLAGS) $(OBJECTS) -lapr-1 $(COMMON_LIB) $(LIBDIR) - --include $(SOURCES:.cpp=.d) diff --git a/cpp/broker/inc/AutoDelete.h b/cpp/broker/inc/AutoDelete.h deleted file mode 100644 index 864d68358f..0000000000 --- a/cpp/broker/inc/AutoDelete.h +++ /dev/null @@ -1,54 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _AutoDelete_ -#define _AutoDelete_ - -#include <iostream> -#include <queue> -#include "MonitorImpl.h" -#include "Queue.h" -#include "QueueRegistry.h" -#include "ThreadFactoryImpl.h" - -namespace qpid { - namespace broker{ - class AutoDelete : private virtual qpid::concurrent::Runnable{ - qpid::concurrent::ThreadFactoryImpl factory; - qpid::concurrent::MonitorImpl lock; - qpid::concurrent::MonitorImpl monitor; - std::queue<Queue::shared_ptr> queues; - QueueRegistry* const registry; - const u_int32_t period; - volatile bool stopped; - qpid::concurrent::Thread* runner; - - Queue::shared_ptr const pop(); - void process(); - virtual void run(); - - public: - AutoDelete(QueueRegistry* const registry, u_int32_t period); - void add(Queue::shared_ptr const); - void start(); - void stop(); - }; - } -} - - -#endif diff --git a/cpp/broker/inc/Binding.h b/cpp/broker/inc/Binding.h deleted file mode 100644 index b11419e92c..0000000000 --- a/cpp/broker/inc/Binding.h +++ /dev/null @@ -1,35 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _Binding_ -#define _Binding_ - -#include "FieldTable.h" - -namespace qpid { - namespace broker { - class Binding{ - public: - virtual void cancel() = 0; - virtual ~Binding(){} - }; - } -} - - -#endif - diff --git a/cpp/broker/inc/Broker.h b/cpp/broker/inc/Broker.h deleted file mode 100644 index 0cd2bd749e..0000000000 --- a/cpp/broker/inc/Broker.h +++ /dev/null @@ -1,86 +0,0 @@ -#ifndef _Broker_ -#define _Broker_ - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "Acceptor.h" -#include "Configuration.h" -#include "Runnable.h" -#include "SessionHandlerFactoryImpl.h" -#include <boost/noncopyable.hpp> -#include <tr1/memory> - -namespace qpid { - namespace broker { - /** - * A broker instance. - */ - class Broker : public qpid::concurrent::Runnable, private boost::noncopyable { - Broker(const Configuration& config); // Private, use create() - std::auto_ptr<qpid::io::Acceptor> acceptor; - SessionHandlerFactoryImpl factory; - int16_t port; - bool isBound; - - public: - static const int16_t DEFAULT_PORT; - - virtual ~Broker(); - typedef std::tr1::shared_ptr<Broker> shared_ptr; - - /** - * Create a broker. - * @param port Port to listen on or 0 to pick a port dynamically. - */ - static shared_ptr create(int port = DEFAULT_PORT); - - /** - * Create a broker from a Configuration. - */ - static shared_ptr create(const Configuration& config); - - /** - * Bind to the listening port. - * @return The port number bound. - */ - virtual int16_t bind(); - - /** - * Return listening port. If called before bind this is - * the configured port. If called after it is the actual - * port, which will be different if the configured port is - * 0. - */ - virtual int16_t getPort() { return port; } - - /** - * Run the broker. Implements Runnable::run() so the broker - * can be run in a separate thread. - */ - virtual void run(); - - /** Shut down the broker */ - virtual void shutdown(); - }; - } -} - - - -#endif /*!_Broker_*/ diff --git a/cpp/broker/inc/Channel.h b/cpp/broker/inc/Channel.h deleted file mode 100644 index 862d249ce1..0000000000 --- a/cpp/broker/inc/Channel.h +++ /dev/null @@ -1,199 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _Channel_ -#define _Channel_ - -#include <algorithm> -#include <map> -#include "AMQContentBody.h" -#include "AMQHeaderBody.h" -#include "BasicPublishBody.h" -#include "Binding.h" -#include "Consumer.h" -#include "Message.h" -#include "MonitorImpl.h" -#include "NameGenerator.h" -#include "OutputHandler.h" -#include "Queue.h" - -namespace qpid { - namespace broker { - /** - * Maintains state for an AMQP channel. Handles incoming and - * outgoing messages for that channel. - */ - class Channel{ - private: - class ConsumerImpl : public virtual Consumer{ - Channel* parent; - string tag; - Queue::shared_ptr queue; - ConnectionToken* const connection; - const bool ackExpected; - bool blocked; - public: - ConsumerImpl(Channel* parent, string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack); - virtual bool deliver(Message::shared_ptr& msg); - void cancel(); - void requestDispatch(); - }; - - typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator; - - struct AckRecord{ - Message::shared_ptr msg; - Queue::shared_ptr queue; - string consumerTag; - u_int64_t deliveryTag; - bool pull; - - AckRecord(Message::shared_ptr _msg, - Queue::shared_ptr _queue, - const string _consumerTag, - const u_int64_t _deliveryTag) : msg(_msg), - queue(_queue), - consumerTag(_consumerTag), - deliveryTag(_deliveryTag), - pull(false){} - - AckRecord(Message::shared_ptr _msg, - Queue::shared_ptr _queue, - const u_int64_t _deliveryTag) : msg(_msg), - queue(_queue), - consumerTag(""), - deliveryTag(_deliveryTag), - pull(true){} - }; - - typedef std::vector<AckRecord>::iterator ack_iterator; - - class MatchAck{ - const u_int64_t tag; - public: - MatchAck(u_int64_t tag); - bool operator()(AckRecord& record) const; - }; - - class Requeue{ - public: - void operator()(AckRecord& record) const; - }; - - class Redeliver{ - Channel* const channel; - public: - Redeliver(Channel* const channel); - void operator()(AckRecord& record) const; - }; - - class CalculatePrefetch{ - u_int32_t size; - u_int16_t count; - public: - CalculatePrefetch(); - void operator()(AckRecord& record); - u_int32_t getSize(); - u_int16_t getCount(); - }; - - const int id; - qpid::framing::OutputHandler* out; - u_int64_t deliveryTag; - Queue::shared_ptr defaultQueue; - bool transactional; - std::map<string, ConsumerImpl*> consumers; - u_int32_t prefetchSize; - u_int16_t prefetchCount; - u_int32_t outstandingSize; - u_int16_t outstandingCount; - u_int32_t framesize; - Message::shared_ptr message; - NameGenerator tagGenerator; - std::vector<AckRecord> unacknowledged; - qpid::concurrent::MonitorImpl deliveryLock; - - void deliver(Message::shared_ptr& msg, string& tag, Queue::shared_ptr& queue, bool ackExpected); - void checkMessage(const std::string& text); - bool checkPrefetch(Message::shared_ptr& msg); - void cancel(consumer_iterator consumer); - - template<class Operation> Operation processMessage(Operation route){ - if(message->isComplete()){ - route(message); - message.reset(); - } - return route; - } - - - public: - Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize); - ~Channel(); - inline void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; } - inline Queue::shared_ptr getDefaultQueue(){ return defaultQueue; } - inline u_int32_t setPrefetchSize(u_int32_t size){ return prefetchSize = size; } - inline u_int16_t setPrefetchCount(u_int16_t count){ return prefetchCount = count; } - bool exists(const string& consumerTag); - void consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0); - void cancel(const string& tag); - bool get(Queue::shared_ptr queue, bool ackExpected); - void begin(); - void close(); - void commit(); - void rollback(); - void ack(u_int64_t deliveryTag, bool multiple); - void recover(bool requeue); - - /** - * Handles the initial publish request though a - * channel. The header and (if applicable) content will be - * accumulated through calls to handleHeader() and - * handleContent() - */ - void handlePublish(Message* msg); - - /** - * A template method that handles a received header and if - * there is no content routes it using the functor passed - * in. - */ - template<class Operation> Operation handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, Operation route){ - checkMessage("Invalid message sequence: got header before publish."); - message->setHeader(header); - return processMessage(route); - } - - /** - * A template method that handles a received content and - * if this completes the message, routes it using the - * functor passed in. - */ - template<class Operation> Operation handleContent(qpid::framing::AMQContentBody::shared_ptr content, Operation route){ - checkMessage("Invalid message sequence: got content before publish."); - message->addContent(content); - return processMessage(route); - } - - }; - - struct InvalidAckException{}; - } -} - - -#endif diff --git a/cpp/broker/inc/Configuration.h b/cpp/broker/inc/Configuration.h deleted file mode 100644 index aaabdd23a0..0000000000 --- a/cpp/broker/inc/Configuration.h +++ /dev/null @@ -1,135 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _Configuration_ -#define _Configuration_ - -#include <cstdlib> -#include <iostream> -#include <vector> -#include "Exception.h" - -namespace qpid { - namespace broker { - class Configuration{ - class Option { - const std::string flag; - const std::string name; - const std::string desc; - - bool match(const std::string& arg); - - protected: - virtual bool needsValue() const = 0; - virtual void setValue(const std::string& value) = 0; - - public: - Option(const char flag, const std::string& name, const std::string& desc); - Option(const std::string& name, const std::string& desc); - virtual ~Option(); - - bool parse(int& i, char** argv, int argc); - void print(std::ostream& out) const; - }; - - class IntOption : public Option{ - const int defaultValue; - int value; - public: - IntOption(char flag, const std::string& name, const std::string& desc, const int value = 0); - IntOption(const std::string& name, const std::string& desc, const int value = 0); - virtual ~IntOption(); - - int getValue() const; - virtual bool needsValue() const; - virtual void setValue(const std::string& value); - virtual void setValue(int _value) { value = _value; } - }; - - class StringOption : public Option{ - const std::string defaultValue; - std::string value; - public: - StringOption(char flag, const std::string& name, const std::string& desc, const std::string value = ""); - StringOption(const std::string& name, const std::string& desc, const std::string value = ""); - virtual ~StringOption(); - - const std::string& getValue() const; - virtual bool needsValue() const; - virtual void setValue(const std::string& value); - }; - - class BoolOption : public Option{ - const bool defaultValue; - bool value; - public: - BoolOption(char flag, const std::string& name, const std::string& desc, const bool value = 0); - BoolOption(const std::string& name, const std::string& desc, const bool value = 0); - virtual ~BoolOption(); - - bool getValue() const; - virtual bool needsValue() const; - virtual void setValue(const std::string& value); - virtual void setValue(bool _value) { value = _value; } - }; - - BoolOption trace; - IntOption port; - IntOption workerThreads; - IntOption maxConnections; - IntOption connectionBacklog; - StringOption acceptor; - BoolOption help; - - typedef std::vector<Option*>::iterator op_iterator; - std::vector<Option*> options; - - public: - class ParseException : public Exception { - public: - ParseException(const std::string& msg) : Exception(msg) {} - }; - - - Configuration(); - ~Configuration(); - - void parse(int argc, char** argv); - - bool isHelp() const; - bool isTrace() const; - int getPort() const; - int getWorkerThreads() const; - int getMaxConnections() const; - int getConnectionBacklog() const; - std::string getAcceptor() const; - - void setHelp(bool b) { help.setValue(b); } - void setTrace(bool b) { trace.setValue(b); } - void setPort(int i) { port.setValue(i); } - void setWorkerThreads(int i) { workerThreads.setValue(i); } - void setMaxConnections(int i) { maxConnections.setValue(i); } - void setConnectionBacklog(int i) { connectionBacklog.setValue(i); } - void setAcceptor(const std::string& val) { acceptor.setValue(val); } - - void usage(); - }; - } -} - - -#endif diff --git a/cpp/broker/inc/ConnectionToken.h b/cpp/broker/inc/ConnectionToken.h deleted file mode 100644 index 1faefec2cc..0000000000 --- a/cpp/broker/inc/ConnectionToken.h +++ /dev/null @@ -1,35 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _ConnectionToken_ -#define _ConnectionToken_ - -namespace qpid { - namespace broker { - /** - * An empty interface allowing opaque implementations of some - * form of token to identify a connection. - */ - class ConnectionToken{ - public: - virtual ~ConnectionToken(){} - }; - } -} - - -#endif diff --git a/cpp/broker/inc/Consumer.h b/cpp/broker/inc/Consumer.h deleted file mode 100644 index af2d5d7812..0000000000 --- a/cpp/broker/inc/Consumer.h +++ /dev/null @@ -1,34 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _Consumer_ -#define _Consumer_ - -#include "Message.h" - -namespace qpid { - namespace broker { - class Consumer{ - public: - virtual bool deliver(Message::shared_ptr& msg) = 0; - virtual ~Consumer(){} - }; - } -} - - -#endif diff --git a/cpp/broker/inc/DirectExchange.h b/cpp/broker/inc/DirectExchange.h deleted file mode 100644 index faf5a0b949..0000000000 --- a/cpp/broker/inc/DirectExchange.h +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _DirectExchange_ -#define _DirectExchange_ - -#include <map> -#include <vector> -#include "Exchange.h" -#include "FieldTable.h" -#include "Message.h" -#include "MonitorImpl.h" -#include "Queue.h" - -namespace qpid { -namespace broker { - class DirectExchange : public virtual Exchange{ - std::map<string, std::vector<Queue::shared_ptr> > bindings; - qpid::concurrent::MonitorImpl lock; - - public: - static const std::string typeName; - - DirectExchange(const std::string& name); - - virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args); - - virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args); - - virtual void route(Message::shared_ptr& msg, const std::string& routingKey, qpid::framing::FieldTable* args); - - virtual ~DirectExchange(); - }; -} -} - - -#endif diff --git a/cpp/broker/inc/Exchange.h b/cpp/broker/inc/Exchange.h deleted file mode 100644 index 1fdc00fae5..0000000000 --- a/cpp/broker/inc/Exchange.h +++ /dev/null @@ -1,41 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _Exchange_ -#define _Exchange_ - -#include "FieldTable.h" -#include "Message.h" -#include "Queue.h" - -namespace qpid { -namespace broker { - class Exchange{ - const std::string name; - public: - explicit Exchange(const std::string& _name) : name(_name) {} - virtual ~Exchange(){} - std::string getName() { return name; } - virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0; - virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0; - virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args) = 0; - }; -} -} - - -#endif diff --git a/cpp/broker/inc/ExchangeBinding.h b/cpp/broker/inc/ExchangeBinding.h deleted file mode 100644 index 4cbb73acbf..0000000000 --- a/cpp/broker/inc/ExchangeBinding.h +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _ExchangeBinding_ -#define _ExchangeBinding_ - -#include "Binding.h" -#include "FieldTable.h" -#include "Queue.h" - -namespace qpid { - namespace broker { - class Exchange; - class Queue; - - class ExchangeBinding : public virtual Binding{ - Exchange* e; - Queue::shared_ptr q; - const string key; - qpid::framing::FieldTable* args; - public: - ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, qpid::framing::FieldTable* _args); - virtual void cancel(); - virtual ~ExchangeBinding(); - }; - } -} - - -#endif - diff --git a/cpp/broker/inc/ExchangeRegistry.h b/cpp/broker/inc/ExchangeRegistry.h deleted file mode 100644 index a4a778482c..0000000000 --- a/cpp/broker/inc/ExchangeRegistry.h +++ /dev/null @@ -1,44 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _ExchangeRegistry_ -#define _ExchangeRegistry_ - -#include <map> -#include "Exchange.h" -#include "Monitor.h" - -namespace qpid { -namespace broker { - class ExchangeRegistry{ - typedef std::map<string, Exchange*> ExchangeMap; - ExchangeMap exchanges; - qpid::concurrent::Monitor* lock; - public: - ExchangeRegistry(); - void declare(Exchange* exchange); - void destroy(const string& name); - Exchange* get(const string& name); - Exchange* getDefault(); - inline qpid::concurrent::Monitor* getLock(){ return lock; } - ~ExchangeRegistry(); - }; -} -} - - -#endif diff --git a/cpp/broker/inc/FanOutExchange.h b/cpp/broker/inc/FanOutExchange.h deleted file mode 100644 index 1932e8429c..0000000000 --- a/cpp/broker/inc/FanOutExchange.h +++ /dev/null @@ -1,55 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _FanOutExchange_ -#define _FanOutExchange_ - -#include <map> -#include <vector> -#include "Exchange.h" -#include "FieldTable.h" -#include "Message.h" -#include "MonitorImpl.h" -#include "Queue.h" - -namespace qpid { -namespace broker { - -class FanOutExchange : public virtual Exchange { - std::vector<Queue::shared_ptr> bindings; - qpid::concurrent::MonitorImpl lock; - - public: - static const std::string typeName; - - FanOutExchange(const std::string& name); - - virtual void bind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args); - - virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args); - - virtual void route(Message::shared_ptr& msg, const std::string& routingKey, qpid::framing::FieldTable* args); - - virtual ~FanOutExchange(); -}; - -} -} - - - -#endif diff --git a/cpp/broker/inc/HeadersExchange.h b/cpp/broker/inc/HeadersExchange.h deleted file mode 100644 index 08bf0bb735..0000000000 --- a/cpp/broker/inc/HeadersExchange.h +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _HeadersExchange_ -#define _HeadersExchange_ - -#include <vector> -#include "Exchange.h" -#include "FieldTable.h" -#include "Message.h" -#include "MonitorImpl.h" -#include "Queue.h" - -namespace qpid { -namespace broker { - - -class HeadersExchange : public virtual Exchange { - typedef std::pair<qpid::framing::FieldTable, Queue::shared_ptr> Binding; - typedef std::vector<Binding> Bindings; - - Bindings bindings; - qpid::concurrent::MonitorImpl lock; - - public: - static const std::string typeName; - - HeadersExchange(const string& name); - - virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args); - - virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args); - - virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args); - - virtual ~HeadersExchange(); - - static bool match(const qpid::framing::FieldTable& bindArgs, const qpid::framing::FieldTable& msgArgs); -}; - - - -} -} - -#endif diff --git a/cpp/broker/inc/Message.h b/cpp/broker/inc/Message.h deleted file mode 100644 index 94b9aa5bdd..0000000000 --- a/cpp/broker/inc/Message.h +++ /dev/null @@ -1,89 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _Message_ -#define _Message_ - -#include "memory.h" -#include "AMQContentBody.h" -#include "AMQHeaderBody.h" -#include "BasicHeaderProperties.h" -#include "BasicPublishBody.h" -#include "ConnectionToken.h" -#include "OutputHandler.h" - -namespace qpid { - namespace broker { - class ExchangeRegistry; - - /** - * Represents an AMQP message, i.e. a header body, a list of - * content bodies and some details about the publication - * request. - */ - class Message{ - typedef std::vector<qpid::framing::AMQContentBody::shared_ptr> content_list; - typedef content_list::iterator content_iterator; - - const ConnectionToken* const publisher; - const string exchange; - const string routingKey; - const bool mandatory; - const bool immediate; - bool redelivered; - qpid::framing::AMQHeaderBody::shared_ptr header; - content_list content; - u_int64_t size; - - void sendContent(qpid::framing::OutputHandler* out, - int channel, u_int32_t framesize); - - public: - typedef std::tr1::shared_ptr<Message> shared_ptr; - - Message(const ConnectionToken* const publisher, - const string& exchange, const string& routingKey, - bool mandatory, bool immediate); - ~Message(); - void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header); - void addContent(qpid::framing::AMQContentBody::shared_ptr data); - bool isComplete(); - const ConnectionToken* const getPublisher(); - - void deliver(qpid::framing::OutputHandler* out, - int channel, - const string& consumerTag, - u_int64_t deliveryTag, - u_int32_t framesize); - void sendGetOk(qpid::framing::OutputHandler* out, - int channel, - u_int32_t messageCount, - u_int64_t deliveryTag, - u_int32_t framesize); - void redeliver(); - - qpid::framing::BasicHeaderProperties* getHeaderProperties(); - const string& getRoutingKey() const { return routingKey; } - const string& getExchange() const { return exchange; } - u_int64_t contentSize() const{ return size; } - - }; - } -} - - -#endif diff --git a/cpp/broker/inc/NameGenerator.h b/cpp/broker/inc/NameGenerator.h deleted file mode 100644 index 6e6e0acf28..0000000000 --- a/cpp/broker/inc/NameGenerator.h +++ /dev/null @@ -1,36 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _NameGenerator_ -#define _NameGenerator_ - -#include "Message.h" - -namespace qpid { - namespace broker { - class NameGenerator{ - const std::string base; - unsigned int counter; - public: - NameGenerator(const std::string& base); - std::string generate(); - }; - } -} - - -#endif diff --git a/cpp/broker/inc/Queue.h b/cpp/broker/inc/Queue.h deleted file mode 100644 index 2229ba6235..0000000000 --- a/cpp/broker/inc/Queue.h +++ /dev/null @@ -1,106 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _Queue_ -#define _Queue_ - -#include <vector> -#include <queue> -#include "memory.h" -#include "apr_time.h" -#include "amqp_types.h" -#include "Binding.h" -#include "ConnectionToken.h" -#include "Consumer.h" -#include "Message.h" -#include "MonitorImpl.h" - -namespace qpid { - namespace broker { - - /** - * Thrown when exclusive access would be violated. - */ - struct ExclusiveAccessException{}; - - /** - * The brokers representation of an amqp queue. Messages are - * delivered to a queue from where they can be dispatched to - * registered consumers or be stored until dequeued or until one - * or more consumers registers. - */ - class Queue{ - const string name; - const u_int32_t autodelete; - const bool durable; - const ConnectionToken* const owner; - std::vector<Consumer*> consumers; - std::queue<Binding*> bindings; - std::queue<Message::shared_ptr> messages; - bool queueing; - bool dispatching; - int next; - mutable qpid::concurrent::MonitorImpl lock; - apr_time_t lastUsed; - Consumer* exclusive; - - bool startDispatching(); - bool dispatch(Message::shared_ptr& msg); - - public: - - typedef std::tr1::shared_ptr<Queue> shared_ptr; - - typedef std::vector<shared_ptr> vector; - - Queue(const string& name, bool durable = false, u_int32_t autodelete = 0, const ConnectionToken* const owner = 0); - ~Queue(); - /** - * Informs the queue of a binding that should be cancelled on - * destruction of the queue. - */ - void bound(Binding* b); - /** - * Delivers a message to the queue from where it will be - * dispatched to immediately to a consumer if one is - * available or stored for dequeue or later dispatch if - * not. - */ - void deliver(Message::shared_ptr& msg); - /** - * Dispatch any queued messages providing there are - * consumers for them. Only one thread can be dispatching - * at any time, but this method (rather than the caller) - * is responsible for ensuring that. - */ - void dispatch(); - void consume(Consumer* c, bool exclusive = false); - void cancel(Consumer* c); - Message::shared_ptr dequeue(); - u_int32_t purge(); - u_int32_t getMessageCount() const; - u_int32_t getConsumerCount() const; - inline const string& getName() const { return name; } - inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; } - inline bool hasExclusiveConsumer() const { return exclusive; } - bool canAutoDelete() const; - }; - } -} - - -#endif diff --git a/cpp/broker/inc/QueueRegistry.h b/cpp/broker/inc/QueueRegistry.h deleted file mode 100644 index ac12aa8f88..0000000000 --- a/cpp/broker/inc/QueueRegistry.h +++ /dev/null @@ -1,88 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _QueueRegistry_ -#define _QueueRegistry_ - -#include <map> -#include "MonitorImpl.h" -#include "Queue.h" - -namespace qpid { -namespace broker { - -class SessionHandlerImpl; - -/** - * A registry of queues indexed by queue name. - * - * Queues are reference counted using shared_ptr to ensure that they - * are deleted when and only when they are no longer in use. - * - */ -class QueueRegistry{ - - public: - QueueRegistry(); - ~QueueRegistry(); - - /** - * Declare a queue. - * - * @return The queue and a boolean flag which is true if the queue - * was created by this declare call false if it already existed. - */ - std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, u_int32_t autodelete = 0, const ConnectionToken* const owner = 0); - - /** - * Destroy the named queue. - * - * Note: if the queue is in use it is not actually destroyed until - * all shared_ptrs to it are destroyed. During that time it is - * possible that a new queue with the same name may be - * created. This should not create any problems as the new and - * old queues exist independently. The registry has - * forgotten the old queue so there can be no confusion for - * subsequent calls to find or declare with the same name. - * - */ - void destroy(const string& name); - - /** - * Find the named queue. Return 0 if not found. - */ - Queue::shared_ptr find(const string& name); - - /** - * Generate unique queue name. - */ - string generateName(); - - private: - typedef std::map<string, Queue::shared_ptr> QueueMap; - QueueMap queues; - qpid::concurrent::MonitorImpl lock; - int counter; - -}; - - -} -} - - -#endif diff --git a/cpp/broker/inc/Router.h b/cpp/broker/inc/Router.h deleted file mode 100644 index d462b69832..0000000000 --- a/cpp/broker/inc/Router.h +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _Router_ -#define _Router_ - -#include "ExchangeRegistry.h" -#include "Message.h" - -/** - * A routing functor - */ -namespace qpid { - namespace broker { - class Router{ - ExchangeRegistry& registry; - public: - Router(ExchangeRegistry& registry); - void operator()(Message::shared_ptr& msg); - }; - } -} - - -#endif diff --git a/cpp/broker/inc/SessionHandlerFactoryImpl.h b/cpp/broker/inc/SessionHandlerFactoryImpl.h deleted file mode 100644 index 2317a6667b..0000000000 --- a/cpp/broker/inc/SessionHandlerFactoryImpl.h +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _SessionHandlerFactoryImpl_ -#define _SessionHandlerFactoryImpl_ - -#include "AMQFrame.h" -#include "AutoDelete.h" -#include "DirectExchange.h" -#include "ExchangeRegistry.h" -#include "ProtocolInitiation.h" -#include "QueueRegistry.h" -#include "SessionHandlerFactory.h" -#include "TimeoutHandler.h" - -namespace qpid { - namespace broker { - - class SessionHandlerFactoryImpl : public virtual qpid::io::SessionHandlerFactory - { - QueueRegistry queues; - ExchangeRegistry exchanges; - const u_int32_t timeout;//timeout for auto-deleted queues (in ms) - AutoDelete cleaner; - public: - SessionHandlerFactoryImpl(u_int32_t timeout = 30000); - virtual qpid::io::SessionHandler* create(qpid::io::SessionContext* ctxt); - virtual ~SessionHandlerFactoryImpl(); - }; - - } -} - - -#endif diff --git a/cpp/broker/inc/SessionHandlerImpl.h b/cpp/broker/inc/SessionHandlerImpl.h deleted file mode 100644 index 549f51f5a1..0000000000 --- a/cpp/broker/inc/SessionHandlerImpl.h +++ /dev/null @@ -1,233 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _SessionHandlerImpl_ -#define _SessionHandlerImpl_ - -#include <map> -#include <sstream> -#include <vector> -#include <exception> -#include "AMQFrame.h" -#include "AMQP_ClientProxy.h" -#include "AMQP_ServerOperations.h" -#include "AutoDelete.h" -#include "ExchangeRegistry.h" -#include "Channel.h" -#include "ConnectionToken.h" -#include "DirectExchange.h" -#include "OutputHandler.h" -#include "ProtocolInitiation.h" -#include "QueueRegistry.h" -#include "SessionContext.h" -#include "SessionHandler.h" -#include "TimeoutHandler.h" -#include "TopicExchange.h" - -namespace qpid { -namespace broker { - -struct ChannelException : public std::exception { - u_int16_t code; - string text; - ChannelException(u_int16_t _code, string _text) : code(_code), text(_text) {} - ~ChannelException() throw() {} - const char* what() const throw() { return text.c_str(); } -}; - -struct ConnectionException : public std::exception { - u_int16_t code; - string text; - ConnectionException(u_int16_t _code, string _text) : code(_code), text(_text) {} - ~ConnectionException() throw() {} - const char* what() const throw() { return text.c_str(); } -}; - -class SessionHandlerImpl : public virtual qpid::io::SessionHandler, - public virtual qpid::framing::AMQP_ServerOperations, - public virtual ConnectionToken -{ - typedef std::map<u_int16_t, Channel*>::iterator channel_iterator; - typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; - - qpid::io::SessionContext* context; - qpid::framing::AMQP_ClientProxy client; - QueueRegistry* queues; - ExchangeRegistry* const exchanges; - AutoDelete* const cleaner; - const u_int32_t timeout;//timeout for auto-deleted queues (in ms) - - ConnectionHandler* connectionHandler; - ChannelHandler* channelHandler; - BasicHandler* basicHandler; - ExchangeHandler* exchangeHandler; - QueueHandler* queueHandler; - - std::map<u_int16_t, Channel*> channels; - std::vector<Queue::shared_ptr> exclusiveQueues; - - u_int32_t framemax; - u_int16_t heartbeat; - - void handleHeader(u_int16_t channel, qpid::framing::AMQHeaderBody::shared_ptr body); - void handleContent(u_int16_t channel, qpid::framing::AMQContentBody::shared_ptr body); - void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body); - - Channel* getChannel(u_int16_t channel); - /** - * Get named queue, never returns 0. - * @return: named queue or default queue for channel if name="" - * @exception: ChannelException if no queue of that name is found. - * @exception: ConnectionException if no queue specified and channel has not declared one. - */ - Queue::shared_ptr getQueue(const string& name, u_int16_t channel); - - Exchange* findExchange(const string& name); - - public: - SessionHandlerImpl(qpid::io::SessionContext* context, QueueRegistry* queues, - ExchangeRegistry* exchanges, AutoDelete* cleaner, const u_int32_t timeout); - virtual void received(qpid::framing::AMQFrame* frame); - virtual void initiated(qpid::framing::ProtocolInitiation* header); - virtual void idleOut(); - virtual void idleIn(); - virtual void closed(); - virtual ~SessionHandlerImpl(); - - class ConnectionHandlerImpl : public virtual ConnectionHandler{ - SessionHandlerImpl* parent; - public: - inline ConnectionHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} - - virtual void startOk(u_int16_t channel, qpid::framing::FieldTable& clientProperties, string& mechanism, - string& response, string& locale); - - virtual void secureOk(u_int16_t channel, string& response); - - virtual void tuneOk(u_int16_t channel, u_int16_t channelMax, u_int32_t frameMax, u_int16_t heartbeat); - - virtual void open(u_int16_t channel, string& virtualHost, string& capabilities, bool insist); - - virtual void close(u_int16_t channel, u_int16_t replyCode, string& replyText, u_int16_t classId, - u_int16_t methodId); - - virtual void closeOk(u_int16_t channel); - - virtual ~ConnectionHandlerImpl(){} - }; - - class ChannelHandlerImpl : public virtual ChannelHandler{ - SessionHandlerImpl* parent; - public: - inline ChannelHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} - - virtual void open(u_int16_t channel, string& outOfBand); - - virtual void flow(u_int16_t channel, bool active); - - virtual void flowOk(u_int16_t channel, bool active); - - virtual void close(u_int16_t channel, u_int16_t replyCode, string& replyText, - u_int16_t classId, u_int16_t methodId); - - virtual void closeOk(u_int16_t channel); - - virtual ~ChannelHandlerImpl(){} - }; - - class ExchangeHandlerImpl : public virtual ExchangeHandler{ - SessionHandlerImpl* parent; - public: - inline ExchangeHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} - - virtual void declare(u_int16_t channel, u_int16_t ticket, string& exchange, string& type, - bool passive, bool durable, bool autoDelete, bool internal, bool nowait, - qpid::framing::FieldTable& arguments); - - virtual void delete_(u_int16_t channel, u_int16_t ticket, string& exchange, bool ifUnused, bool nowait); - - virtual ~ExchangeHandlerImpl(){} - }; - - - class QueueHandlerImpl : public virtual QueueHandler{ - SessionHandlerImpl* parent; - public: - inline QueueHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} - - virtual void declare(u_int16_t channel, u_int16_t ticket, string& queue, - bool passive, bool durable, bool exclusive, - bool autoDelete, bool nowait, qpid::framing::FieldTable& arguments); - - virtual void bind(u_int16_t channel, u_int16_t ticket, string& queue, - string& exchange, string& routingKey, bool nowait, - qpid::framing::FieldTable& arguments); - - virtual void purge(u_int16_t channel, u_int16_t ticket, string& queue, - bool nowait); - - virtual void delete_(u_int16_t channel, u_int16_t ticket, string& queue, bool ifUnused, bool ifEmpty, - bool nowait); - - virtual ~QueueHandlerImpl(){} - }; - - class BasicHandlerImpl : public virtual BasicHandler{ - SessionHandlerImpl* parent; - public: - inline BasicHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} - - virtual void qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global); - - virtual void consume(u_int16_t channel, u_int16_t ticket, string& queue, string& consumerTag, - bool noLocal, bool noAck, bool exclusive, bool nowait); - - virtual void cancel(u_int16_t channel, string& consumerTag, bool nowait); - - virtual void publish(u_int16_t channel, u_int16_t ticket, string& exchange, string& routingKey, - bool mandatory, bool immediate); - - virtual void get(u_int16_t channel, u_int16_t ticket, string& queue, bool noAck); - - virtual void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple); - - virtual void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue); - - virtual void recover(u_int16_t channel, bool requeue); - - virtual ~BasicHandlerImpl(){} - }; - - inline virtual ChannelHandler* getChannelHandler(){ return channelHandler; } - inline virtual ConnectionHandler* getConnectionHandler(){ return connectionHandler; } - inline virtual BasicHandler* getBasicHandler(){ return basicHandler; } - inline virtual ExchangeHandler* getExchangeHandler(){ return exchangeHandler; } - inline virtual QueueHandler* getQueueHandler(){ return queueHandler; } - - inline virtual AccessHandler* getAccessHandler(){ return 0; } - inline virtual FileHandler* getFileHandler(){ return 0; } - inline virtual StreamHandler* getStreamHandler(){ return 0; } - inline virtual TxHandler* getTxHandler(){ return 0; } - inline virtual DtxHandler* getDtxHandler(){ return 0; } - inline virtual TunnelHandler* getTunnelHandler(){ return 0; } -}; - -} -} - - -#endif diff --git a/cpp/broker/inc/TopicExchange.h b/cpp/broker/inc/TopicExchange.h deleted file mode 100644 index 227280103f..0000000000 --- a/cpp/broker/inc/TopicExchange.h +++ /dev/null @@ -1,94 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _TopicExchange_ -#define _TopicExchange_ - -#include <tr1/unordered_map> -#include <vector> -#include "Exchange.h" -#include "FieldTable.h" -#include "Message.h" -#include "MonitorImpl.h" -#include "Queue.h" - -namespace qpid { -namespace broker { - -/** A vector of string tokens */ -class Tokens : public std::vector<std::string> { - public: - Tokens() {}; - // Default copy, assign, dtor are sufficient. - - /** Tokenize s, provides automatic conversion of string to Tokens */ - Tokens(const std::string& s) { operator=(s); } - /** Tokenize s */ - Tokens & operator=(const std::string& s); - - struct Hash { size_t operator()(const Tokens&) const; }; - typedef std::equal_to<Tokens> Equal; -}; - -/** - * Tokens that have been normalized as a pattern and can be matched - * with topic Tokens. Normalized meands all sequences of mixed * and - * # are reduced to a series of * followed by at most one #. - */ -class TopicPattern : public Tokens -{ - public: - TopicPattern() {} - // Default copy, assign, dtor are sufficient. - TopicPattern(const Tokens& tokens) { operator=(tokens); } - TopicPattern(const std::string& str) { operator=(str); } - TopicPattern& operator=(const Tokens&); - TopicPattern& operator=(const std::string& str) { return operator=(Tokens(str)); } - - /** Match a topic */ - bool match(const std::string& topic) { return match(Tokens(topic)); } - bool match(const Tokens& topic) const; - - private: - void normalize(); -}; - -class TopicExchange : public virtual Exchange{ - typedef std::tr1::unordered_map<TopicPattern, Queue::vector, TopicPattern::Hash> BindingMap; - BindingMap bindings; - qpid::concurrent::MonitorImpl lock; - - public: - static const std::string typeName; - - TopicExchange(const string& name); - - virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args); - - virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args); - - virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args); - - virtual ~TopicExchange(); -}; - - - -} -} - -#endif diff --git a/cpp/broker/src/AutoDelete.cpp b/cpp/broker/src/AutoDelete.cpp deleted file mode 100644 index 6793ec449d..0000000000 --- a/cpp/broker/src/AutoDelete.cpp +++ /dev/null @@ -1,93 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "AutoDelete.h" - -using namespace qpid::broker; - -AutoDelete::AutoDelete(QueueRegistry* const _registry, u_int32_t _period) : registry(_registry), - period(_period), - stopped(true), - runner(0){} - -void AutoDelete::add(Queue::shared_ptr const queue){ - lock.acquire(); - queues.push(queue); - lock.release(); -} - -Queue::shared_ptr const AutoDelete::pop(){ - Queue::shared_ptr next; - lock.acquire(); - if(!queues.empty()){ - next = queues.front(); - queues.pop(); - } - lock.release(); - return next; -} - -void AutoDelete::process(){ - Queue::shared_ptr seen; - for(Queue::shared_ptr q = pop(); q; q = pop()){ - if(seen == q){ - add(q); - break; - }else if(q->canAutoDelete()){ - std::string name(q->getName()); - registry->destroy(name); - std::cout << "INFO: Auto-deleted queue named " << name << std::endl; - }else{ - add(q); - if(!seen) seen = q; - } - } -} - -void AutoDelete::run(){ - monitor.acquire(); - while(!stopped){ - process(); - monitor.wait(period); - } - monitor.release(); -} - -void AutoDelete::start(){ - monitor.acquire(); - if(stopped){ - runner = factory.create(this); - stopped = false; - monitor.release(); - runner->start(); - }else{ - monitor.release(); - } -} - -void AutoDelete::stop(){ - monitor.acquire(); - if(!stopped){ - stopped = true; - monitor.notify(); - monitor.release(); - runner->join(); - delete runner; - }else{ - monitor.release(); - } -} diff --git a/cpp/broker/src/Broker.cpp b/cpp/broker/src/Broker.cpp deleted file mode 100644 index b6472d1729..0000000000 --- a/cpp/broker/src/Broker.cpp +++ /dev/null @@ -1,84 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <iostream> -#include <memory> -#include "Broker.h" -#include "Acceptor.h" -#include "Configuration.h" -#include "QpidError.h" -#include "SessionHandlerFactoryImpl.h" -#include "BlockingAPRAcceptor.h" -#include "LFAcceptor.h" - - -using namespace qpid::broker; -using namespace qpid::io; - -namespace { - Acceptor* createAcceptor(const Configuration& config){ - const string type(config.getAcceptor()); - if("blocking" == type){ - std::cout << "Using blocking acceptor " << std::endl; - return new BlockingAPRAcceptor(config.isTrace(), config.getConnectionBacklog()); - }else if("non-blocking" == type){ - std::cout << "Using non-blocking acceptor " << std::endl; - return new LFAcceptor(config.isTrace(), - config.getConnectionBacklog(), - config.getWorkerThreads(), - config.getMaxConnections()); - } - throw Configuration::ParseException("Unrecognised acceptor: " + type); - } -} - -Broker::Broker(const Configuration& config) : - acceptor(createAcceptor(config)), - port(config.getPort()), - isBound(false) {} - -Broker::shared_ptr Broker::create(int port) -{ - Configuration config; - config.setPort(port); - return create(config); -} - -Broker::shared_ptr Broker::create(const Configuration& config) { - return Broker::shared_ptr(new Broker(config)); -} - -int16_t Broker::bind() -{ - if (!isBound) { - port = acceptor->bind(port); - } - return port; -} - -void Broker::run() { - bind(); - acceptor->run(&factory); -} - -void Broker::shutdown() { - acceptor->shutdown(); -} - -Broker::~Broker() { } - -const int16_t Broker::DEFAULT_PORT(5672); diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp deleted file mode 100644 index 34d69716c4..0000000000 --- a/cpp/broker/src/Channel.cpp +++ /dev/null @@ -1,256 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "Channel.h" -#include "QpidError.h" -#include <iostream> -#include <sstream> -#include <assert.h> - -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::concurrent; - - -Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : - id(_id), - out(_out), - deliveryTag(1), - transactional(false), - prefetchSize(0), - prefetchCount(0), - outstandingSize(0), - outstandingCount(0), - framesize(_framesize), - tagGenerator("sgen"){} - -Channel::~Channel(){ -} - -bool Channel::exists(const string& consumerTag){ - return consumers.find(consumerTag) != consumers.end(); -} - -void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection){ - if(tag.empty()) tag = tagGenerator.generate(); - - ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks)); - try{ - queue->consume(c, exclusive);//may throw exception - consumers[tag] = c; - }catch(ExclusiveAccessException& e){ - delete c; - throw e; - } -} - -void Channel::cancel(consumer_iterator i){ - ConsumerImpl* c = i->second; - consumers.erase(i); - if(c){ - c->cancel(); - delete c; - } -} - -void Channel::cancel(const string& tag){ - consumer_iterator i = consumers.find(tag); - if(i != consumers.end()){ - cancel(i); - } -} - -void Channel::close(){ - //cancel all consumers - for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){ - cancel(i); - } -} - -void Channel::begin(){ - transactional = true; -} - -void Channel::commit(){ - -} - -void Channel::rollback(){ - -} - -void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){ - Locker locker(deliveryLock); - - u_int64_t myDeliveryTag = deliveryTag++; - if(ackExpected){ - unacknowledged.push_back(AckRecord(msg, queue, consumerTag, myDeliveryTag)); - outstandingSize += msg->contentSize(); - outstandingCount++; - } - //send deliver method, header and content(s) - msg->deliver(out, id, consumerTag, myDeliveryTag, framesize); -} - -bool Channel::checkPrefetch(Message::shared_ptr& msg){ - Locker locker(deliveryLock); - bool countOk = !prefetchCount || prefetchCount > unacknowledged.size(); - bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstandingSize || unacknowledged.empty(); - return countOk && sizeOk; -} - -Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, string& _tag, - Queue::shared_ptr _queue, - ConnectionToken* const _connection, bool ack) : parent(_parent), - tag(_tag), - queue(_queue), - connection(_connection), - ackExpected(ack), - blocked(false){ -} - -bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ - if(!connection || connection != msg->getPublisher()){//check for no_local - if(ackExpected && !parent->checkPrefetch(msg)){ - blocked = true; - }else{ - blocked = false; - parent->deliver(msg, tag, queue, ackExpected); - return true; - } - } - return false; -} - -void Channel::ConsumerImpl::cancel(){ - if(queue) queue->cancel(this); -} - -void Channel::ConsumerImpl::requestDispatch(){ - if(blocked) queue->dispatch(); -} - -void Channel::checkMessage(const std::string& text){ - if(!message.get()){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, text); - } -} - -void Channel::handlePublish(Message* msg){ - if(message.get()){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed."); - } - message = Message::shared_ptr(msg); -} - -void Channel::ack(u_int64_t _deliveryTag, bool multiple){ - Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery - - ack_iterator i = find_if(unacknowledged.begin(), unacknowledged.end(), MatchAck(_deliveryTag)); - if(i == unacknowledged.end()){ - throw InvalidAckException(); - }else if(multiple){ - unacknowledged.erase(unacknowledged.begin(), ++i); - //recompute prefetch outstanding (note: messages delivered through get are ignored) - CalculatePrefetch calc(for_each(unacknowledged.begin(), unacknowledged.end(), CalculatePrefetch())); - outstandingSize = calc.getSize(); - outstandingCount = calc.getCount(); - }else{ - if(!i->pull){ - outstandingSize -= i->msg->contentSize(); - outstandingCount--; - } - unacknowledged.erase(i); - } - - //if the prefetch limit had previously been reached, there may - //be messages that can be now be delivered - for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){ - j->second->requestDispatch(); - } -} - -void Channel::recover(bool requeue){ - Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery - - if(requeue){ - outstandingSize = 0; - outstandingCount = 0; - ack_iterator start(unacknowledged.begin()); - ack_iterator end(unacknowledged.end()); - for_each(start, end, Requeue()); - unacknowledged.erase(start, end); - }else{ - for_each(unacknowledged.begin(), unacknowledged.end(), Redeliver(this)); - } -} - -bool Channel::get(Queue::shared_ptr queue, bool ackExpected){ - Message::shared_ptr msg = queue->dequeue(); - if(msg){ - Locker locker(deliveryLock); - u_int64_t myDeliveryTag = deliveryTag++; - msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize); - if(ackExpected){ - unacknowledged.push_back(AckRecord(msg, queue, myDeliveryTag)); - } - return true; - }else{ - return false; - } -} - -Channel::MatchAck::MatchAck(u_int64_t _tag) : tag(_tag) {} - -bool Channel::MatchAck::operator()(AckRecord& record) const{ - return tag == record.deliveryTag; -} - -void Channel::Requeue::operator()(AckRecord& record) const{ - record.msg->redeliver(); - record.queue->deliver(record.msg); -} - -Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {} - -void Channel::Redeliver::operator()(AckRecord& record) const{ - if(record.pull){ - //if message was originally sent as response to get, we must requeue it - record.msg->redeliver(); - record.queue->deliver(record.msg); - }else{ - record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize); - } -} - -Channel::CalculatePrefetch::CalculatePrefetch() : size(0){} - -void Channel::CalculatePrefetch::operator()(AckRecord& record){ - if(!record.pull){ - //ignore messages that were sent in response to get when calculating prefetch - size += record.msg->contentSize(); - count++; - } -} - -u_int32_t Channel::CalculatePrefetch::getSize(){ - return size; -} - -u_int16_t Channel::CalculatePrefetch::getCount(){ - return count; -} diff --git a/cpp/broker/src/Configuration.cpp b/cpp/broker/src/Configuration.cpp deleted file mode 100644 index 6e7df7889e..0000000000 --- a/cpp/broker/src/Configuration.cpp +++ /dev/null @@ -1,196 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "Configuration.h" -#include <string.h> - -using namespace qpid::broker; -using namespace std; - -Configuration::Configuration() : - trace('t', "trace", "Print incoming & outgoing frames to the console (default=false)", false), - port('p', "port", "Sets the port to listen on (default=5672)", 5672), - workerThreads("worker-threads", "Sets the number of worker threads to use (default=5). Only valid for non-blocking acceptor.", 5), - maxConnections("max-connections", "Sets the maximum number of connections the broker can accept (default=500). Only valid for non-blocking acceptor.", 500), - connectionBacklog("connection-backlog", "Sets the connection backlog for the servers socket (default=10)", 10), - acceptor('a', "acceptor", "Sets the acceptor to use. Currently only two values are recognised, blocking and non-blocking (which is the default)", "non-blocking"), - help("help", "Prints usage information", false) -{ - options.push_back(&trace); - options.push_back(&port); - options.push_back(&workerThreads); - options.push_back(&maxConnections); - options.push_back(&connectionBacklog); - options.push_back(&acceptor); - options.push_back(&help); -} - -Configuration::~Configuration(){} - -void Configuration::parse(int argc, char** argv){ - int position = 1; - while(position < argc){ - bool matched(false); - for(op_iterator i = options.begin(); i < options.end() && !matched; i++){ - matched = (*i)->parse(position, argv, argc); - } - if(!matched){ - std::cout << "Warning: skipping unrecognised option " << argv[position] << std::endl; - position++; - } - } -} - -void Configuration::usage(){ - for(op_iterator i = options.begin(); i < options.end(); i++){ - (*i)->print(std::cout); - } -} - -bool Configuration::isHelp() const { - return help.getValue(); -} - -bool Configuration::isTrace() const { - return trace.getValue(); -} - -int Configuration::getPort() const { - return port.getValue(); -} - -int Configuration::getWorkerThreads() const { - return workerThreads.getValue(); -} - -int Configuration::getMaxConnections() const { - return maxConnections.getValue(); -} - -int Configuration::getConnectionBacklog() const { - return connectionBacklog.getValue(); -} - -string Configuration::getAcceptor() const { - return acceptor.getValue(); -} - -Configuration::Option::Option(const char _flag, const string& _name, const string& _desc) : - flag(string("-") + _flag), name("--" +_name), desc(_desc) {} - -Configuration::Option::Option(const string& _name, const string& _desc) : - flag(""), name("--" + _name), desc(_desc) {} - -Configuration::Option::~Option(){} - -bool Configuration::Option::match(const string& arg){ - return flag == arg || name == arg; -} - -bool Configuration::Option::parse(int& i, char** argv, int argc){ - const string arg(argv[i]); - if(match(arg)){ - if(needsValue()){ - if(++i < argc) setValue(argv[i]); - else throw ParseException("Argument " + arg + " requires a value!"); - }else{ - setValue(""); - } - i++; - return true; - }else{ - return false; - } -} - -void Configuration::Option::print(ostream& out) const { - out << " "; - if(flag.length() > 0){ - out << flag << " or "; - } - out << name; - if(needsValue()) out << "<value>"; - out << std::endl; - out << " " << desc << std::endl; -} - - -// String Option: - -Configuration::StringOption::StringOption(const char _flag, const string& _name, const string& _desc, const string _value) : - Option(_flag,_name,_desc), defaultValue(_value), value(_value) {} - -Configuration::StringOption::StringOption(const string& _name, const string& _desc, const string _value) : - Option(_name,_desc), defaultValue(_value), value(_value) {} - -Configuration::StringOption::~StringOption(){} - -const string& Configuration::StringOption::getValue() const { - return value; -} - -bool Configuration::StringOption::needsValue() const { - return true; -} - -void Configuration::StringOption::setValue(const std::string& _value){ - value = _value; -} - -// Int Option: - -Configuration::IntOption::IntOption(const char _flag, const string& _name, const string& _desc, const int _value) : - Option(_flag,_name,_desc), defaultValue(_value), value(_value) {} - -Configuration::IntOption::IntOption(const string& _name, const string& _desc, const int _value) : - Option(_name,_desc), defaultValue(_value), value(_value) {} - -Configuration::IntOption::~IntOption(){} - -int Configuration::IntOption::getValue() const { - return value; -} - -bool Configuration::IntOption::needsValue() const { - return true; -} - -void Configuration::IntOption::setValue(const std::string& _value){ - value = atoi(_value.c_str()); -} - -// Bool Option: - -Configuration::BoolOption::BoolOption(const char _flag, const string& _name, const string& _desc, const bool _value) : - Option(_flag,_name,_desc), defaultValue(_value), value(_value) {} - -Configuration::BoolOption::BoolOption(const string& _name, const string& _desc, const bool _value) : - Option(_name,_desc), defaultValue(_value), value(_value) {} - -Configuration::BoolOption::~BoolOption(){} - -bool Configuration::BoolOption::getValue() const { - return value; -} - -bool Configuration::BoolOption::needsValue() const { - return false; -} - -void Configuration::BoolOption::setValue(const std::string& _value){ - value = strcasecmp(_value.c_str(), "true") == 0; -} diff --git a/cpp/broker/src/DirectExchange.cpp b/cpp/broker/src/DirectExchange.cpp deleted file mode 100644 index 94cfbc766d..0000000000 --- a/cpp/broker/src/DirectExchange.cpp +++ /dev/null @@ -1,72 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "DirectExchange.h" -#include "ExchangeBinding.h" -#include <iostream> - -using namespace qpid::broker; -using namespace qpid::framing; - -DirectExchange::DirectExchange(const string& _name) : Exchange(_name) { - -} - -void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ - lock.acquire(); - std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); - std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue); - if(i == queues.end()){ - bindings[routingKey].push_back(queue); - queue->bound(new ExchangeBinding(this, queue, routingKey, args)); - } - lock.release(); -} - -void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* /*args*/){ - lock.acquire(); - std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); - - std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue); - if(i < queues.end()){ - queues.erase(i); - if(queues.empty()){ - bindings.erase(routingKey); - } - } - lock.release(); -} - -void DirectExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* /*args*/){ - lock.acquire(); - std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); - int count(0); - for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){ - (*i)->deliver(msg); - } - if(!count){ - std::cout << "WARNING: DirectExchange " << getName() << " could not route message with key " << routingKey << std::endl; - } - lock.release(); -} - -DirectExchange::~DirectExchange(){ - -} - - -const std::string DirectExchange::typeName("direct"); diff --git a/cpp/broker/src/ExchangeBinding.cpp b/cpp/broker/src/ExchangeBinding.cpp deleted file mode 100644 index 6160a67fd3..0000000000 --- a/cpp/broker/src/ExchangeBinding.cpp +++ /dev/null @@ -1,32 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "ExchangeBinding.h" -#include "Exchange.h" - -using namespace qpid::broker; -using namespace qpid::framing; - -ExchangeBinding::ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, FieldTable* _args) : e(_e), q(_q), key(_key), args(_args){} - -void ExchangeBinding::cancel(){ - e->unbind(q, key, args); - delete this; -} - -ExchangeBinding::~ExchangeBinding(){ -} diff --git a/cpp/broker/src/ExchangeRegistry.cpp b/cpp/broker/src/ExchangeRegistry.cpp deleted file mode 100644 index 05396382a7..0000000000 --- a/cpp/broker/src/ExchangeRegistry.cpp +++ /dev/null @@ -1,57 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "ExchangeRegistry.h" -#include "MonitorImpl.h" - -using namespace qpid::broker; -using namespace qpid::concurrent; - -ExchangeRegistry::ExchangeRegistry() : lock(new MonitorImpl()){} - -ExchangeRegistry::~ExchangeRegistry(){ - for (ExchangeMap::iterator i = exchanges.begin(); i != exchanges.end(); ++i) - { - delete i->second; - } - delete lock; -} - -void ExchangeRegistry::declare(Exchange* exchange){ - exchanges[exchange->getName()] = exchange; -} - -void ExchangeRegistry::destroy(const string& name){ - if(exchanges[name]){ - delete exchanges[name]; - exchanges.erase(name); - } -} - -Exchange* ExchangeRegistry::get(const string& name){ - return exchanges[name]; -} - -namespace -{ -const std::string empty; -} - -Exchange* ExchangeRegistry::getDefault() -{ - return get(empty); -} diff --git a/cpp/broker/src/FanOutExchange.cpp b/cpp/broker/src/FanOutExchange.cpp deleted file mode 100644 index e8cb8f6315..0000000000 --- a/cpp/broker/src/FanOutExchange.cpp +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "FanOutExchange.h" -#include "ExchangeBinding.h" -#include <algorithm> - -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::concurrent; - -FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {} - -void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ - Locker locker(lock); - // Add if not already present. - Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue); - if (i == bindings.end()) { - bindings.push_back(queue); - queue->bound(new ExchangeBinding(this, queue, routingKey, args)); - } -} - -void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, FieldTable* /*args*/){ - Locker locker(lock); - Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue); - if (i != bindings.end()) { - bindings.erase(i); - // TODO aconway 2006-09-14: What about the ExchangeBinding object? Don't we have to verify routingKey/args match? - } -} - -void FanOutExchange::route(Message::shared_ptr& msg, const string& /*routingKey*/, FieldTable* /*args*/){ - Locker locker(lock); - for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){ - (*i)->deliver(msg); - } -} - -FanOutExchange::~FanOutExchange() {} - -const std::string FanOutExchange::typeName("fanout"); diff --git a/cpp/broker/src/HeadersExchange.cpp b/cpp/broker/src/HeadersExchange.cpp deleted file mode 100644 index 65204cdb85..0000000000 --- a/cpp/broker/src/HeadersExchange.cpp +++ /dev/null @@ -1,120 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "HeadersExchange.h" -#include "ExchangeBinding.h" -#include "Value.h" -#include "QpidError.h" -#include <algorithm> - - -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::concurrent; - -// TODO aconway 2006-09-20: More efficient matching algorithm. -// The current search algorithm really sucks. -// Fieldtables are heavy, maybe use shared_ptr to do handle-body. - -using namespace qpid::broker; - -namespace { - const std::string all("all"); - const std::string any("any"); - const std::string x_match("x-match"); -} - -HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { } - -void HeadersExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ - std::cout << "HeadersExchange::bind" << std::endl; - Locker locker(lock); - std::string what = args->getString("x-match"); - if (what != all && what != any) { - THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to headers exchange."); - } - bindings.push_back(Binding(*args, queue)); - queue->bound(new ExchangeBinding(this, queue, routingKey, args)); -} - -void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, FieldTable* args){ - Locker locker(lock); - Bindings::iterator i = - std::find(bindings.begin(),bindings.end(), Binding(*args, queue)); - if (i != bindings.end()) bindings.erase(i); -} - - -void HeadersExchange::route(Message::shared_ptr& msg, const string& /*routingKey*/, FieldTable* args){ - std::cout << "route: " << *args << std::endl; - Locker locker(lock);; - for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) { - if (match(i->first, *args)) i->second->deliver(msg); - } -} - -HeadersExchange::~HeadersExchange() {} - -const std::string HeadersExchange::typeName("headers"); - -namespace -{ - - bool match_values(const Value& bind, const Value& msg) { - return dynamic_cast<const EmptyValue*>(&bind) || bind == msg; - } - -} - - -bool HeadersExchange::match(const FieldTable& bind, const FieldTable& msg) { - typedef FieldTable::ValueMap Map; - std::string what = bind.getString(x_match); - if (what == all) { - for (Map::const_iterator i = bind.getMap().begin(); - i != bind.getMap().end(); - ++i) - { - if (i->first != x_match) - { - Map::const_iterator j = msg.getMap().find(i->first); - if (j == msg.getMap().end()) return false; - if (!match_values(*(i->second), *(j->second))) return false; - } - } - return true; - } else if (what == any) { - for (Map::const_iterator i = bind.getMap().begin(); - i != bind.getMap().end(); - ++i) - { - if (i->first != x_match) - { - Map::const_iterator j = msg.getMap().find(i->first); - if (j != msg.getMap().end()) { - if (match_values(*(i->second), *(j->second))) return true; - } - } - } - return false; - } else { - return false; - } -} - - - diff --git a/cpp/broker/src/Message.cpp b/cpp/broker/src/Message.cpp deleted file mode 100644 index 0a8a5f7a4d..0000000000 --- a/cpp/broker/src/Message.cpp +++ /dev/null @@ -1,100 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "MonitorImpl.h" -#include "Message.h" -#include "ExchangeRegistry.h" -#include <iostream> - -using namespace std::tr1;//for *_pointer_cast methods -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::concurrent; - - -Message::Message(const ConnectionToken* const _publisher, - const string& _exchange, const string& _routingKey, - bool _mandatory, bool _immediate) : publisher(_publisher), - exchange(_exchange), - routingKey(_routingKey), - mandatory(_mandatory), - immediate(_immediate), - redelivered(false), - size(0){ - -} - -Message::~Message(){ -} - -void Message::setHeader(AMQHeaderBody::shared_ptr _header){ - this->header = _header; -} - -void Message::addContent(AMQContentBody::shared_ptr data){ - content.push_back(data); - size += data->size(); -} - -bool Message::isComplete(){ - return header.get() && (header->getContentSize() == contentSize()); -} - -void Message::redeliver(){ - redelivered = true; -} - -void Message::deliver(OutputHandler* out, int channel, - const string& consumerTag, u_int64_t deliveryTag, - u_int32_t framesize){ - - out->send(new AMQFrame(channel, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey))); - sendContent(out, channel, framesize); -} - -void Message::sendGetOk(OutputHandler* out, - int channel, - u_int32_t messageCount, - u_int64_t deliveryTag, - u_int32_t framesize){ - - out->send(new AMQFrame(channel, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount))); - sendContent(out, channel, framesize); -} - -void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize){ - AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header); - out->send(new AMQFrame(channel, headerBody)); - for(content_iterator i = content.begin(); i != content.end(); i++){ - if((*i)->size() > framesize){ - //TODO: need to split it - std::cout << "WARNING: Dropped message. Re-fragmentation not yet implemented." << std::endl; - }else{ - AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i); - out->send(new AMQFrame(channel, contentBody)); - } - } -} - -BasicHeaderProperties* Message::getHeaderProperties(){ - return dynamic_cast<BasicHeaderProperties*>(header->getProperties()); -} - -const ConnectionToken* const Message::getPublisher(){ - return publisher; -} - diff --git a/cpp/broker/src/NameGenerator.cpp b/cpp/broker/src/NameGenerator.cpp deleted file mode 100644 index 46aa385a7e..0000000000 --- a/cpp/broker/src/NameGenerator.cpp +++ /dev/null @@ -1,29 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "NameGenerator.h" -#include <sstream> - -using namespace qpid::broker; - -NameGenerator::NameGenerator(const std::string& _base) : base(_base), counter(1) {} - -std::string NameGenerator::generate(){ - std::stringstream ss; - ss << base << counter++; - return ss.str(); -} diff --git a/cpp/broker/src/Queue.cpp b/cpp/broker/src/Queue.cpp deleted file mode 100644 index eaaa3ffa31..0000000000 --- a/cpp/broker/src/Queue.cpp +++ /dev/null @@ -1,155 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "Queue.h" -#include "MonitorImpl.h" -#include <iostream> - -using namespace qpid::broker; -using namespace qpid::concurrent; - -Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete, const ConnectionToken* const _owner) : - name(_name), - autodelete(_autodelete), - durable(_durable), - owner(_owner), - queueing(false), - dispatching(false), - next(0), - lastUsed(0), - exclusive(0) -{ - if(autodelete) lastUsed = apr_time_as_msec(apr_time_now()); -} - -Queue::~Queue(){ - for(Binding* b = bindings.front(); !bindings.empty(); b = bindings.front()){ - b->cancel(); - bindings.pop(); - } -} - -void Queue::bound(Binding* b){ - bindings.push(b); -} - -void Queue::deliver(Message::shared_ptr& msg){ - Locker locker(lock); - if(queueing || !dispatch(msg)){ - queueing = true; - messages.push(msg); - } -} - -bool Queue::dispatch(Message::shared_ptr& msg){ - if(consumers.empty()){ - return false; - }else if(exclusive){ - if(!exclusive->deliver(msg)){ - std::cout << "WARNING: Dropping undeliverable message from queue with exclusive consumer." << std::endl; - } - return true; - }else{ - //deliver to next consumer - next = next % consumers.size(); - Consumer* c = consumers[next]; - int start = next; - while(c){ - next++; - if(c->deliver(msg)) return true; - - next = next % consumers.size(); - c = next == start ? 0 : consumers[next]; - } - return false; - } -} - -bool Queue::startDispatching(){ - Locker locker(lock); - if(queueing && !dispatching){ - dispatching = true; - return true; - }else{ - return false; - } -} - -void Queue::dispatch(){ - bool proceed = startDispatching(); - while(proceed){ - Locker locker(lock); - if(!messages.empty() && dispatch(messages.front())){ - messages.pop(); - }else{ - dispatching = false; - proceed = false; - queueing = !messages.empty(); - } - } -} - -void Queue::consume(Consumer* c, bool requestExclusive){ - Locker locker(lock); - if(exclusive) throw ExclusiveAccessException(); - if(requestExclusive){ - if(!consumers.empty()) throw ExclusiveAccessException(); - exclusive = c; - } - - if(autodelete && consumers.empty()) lastUsed = 0; - consumers.push_back(c); -} - -void Queue::cancel(Consumer* c){ - Locker locker(lock); - consumers.erase(find(consumers.begin(), consumers.end(), c)); - if(autodelete && consumers.empty()) lastUsed = apr_time_as_msec(apr_time_now()); - if(exclusive == c) exclusive = 0; -} - -Message::shared_ptr Queue::dequeue(){ - Locker locker(lock); - Message::shared_ptr msg; - if(!messages.empty()){ - msg = messages.front(); - messages.pop(); - } - return msg; -} - -u_int32_t Queue::purge(){ - Locker locker(lock); - int count = messages.size(); - while(!messages.empty()) messages.pop(); - return count; -} - -u_int32_t Queue::getMessageCount() const{ - Locker locker(lock); - return messages.size(); -} - -u_int32_t Queue::getConsumerCount() const{ - Locker locker(lock); - return consumers.size(); -} - -bool Queue::canAutoDelete() const{ - Locker locker(lock); - return lastUsed && ((apr_time_as_msec(apr_time_now()) - lastUsed) > autodelete); -} diff --git a/cpp/broker/src/QueueRegistry.cpp b/cpp/broker/src/QueueRegistry.cpp deleted file mode 100644 index f807415314..0000000000 --- a/cpp/broker/src/QueueRegistry.cpp +++ /dev/null @@ -1,72 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "QueueRegistry.h" -#include "MonitorImpl.h" -#include "SessionHandlerImpl.h" -#include <sstream> -#include <assert.h> - -using namespace qpid::broker; -using namespace qpid::concurrent; - -QueueRegistry::QueueRegistry() : counter(1){} - -QueueRegistry::~QueueRegistry(){} - -std::pair<Queue::shared_ptr, bool> -QueueRegistry::declare(const string& declareName, bool durable, u_int32_t autoDelete, const ConnectionToken* owner) -{ - Locker locker(lock); - string name = declareName.empty() ? generateName() : declareName; - assert(!name.empty()); - QueueMap::iterator i = queues.find(name); - if (i == queues.end()) { - Queue::shared_ptr queue(new Queue(name, durable, autoDelete, owner)); - queues[name] = queue; - return std::pair<Queue::shared_ptr, bool>(queue, true); - } else { - return std::pair<Queue::shared_ptr, bool>(i->second, false); - } -} - -void QueueRegistry::destroy(const string& name){ - Locker locker(lock); - queues.erase(name); -} - -Queue::shared_ptr QueueRegistry::find(const string& name){ - Locker locker(lock); - QueueMap::iterator i = queues.find(name); - if (i == queues.end()) { - return Queue::shared_ptr(); - } else { - return i->second; - } -} - -string QueueRegistry::generateName(){ - string name; - do { - std::stringstream ss; - ss << "tmp_" << counter++; - name = ss.str(); - // Thread safety: Private function, only called with lock held - // so this is OK. - } while(queues.find(name) != queues.end()); - return name; -} diff --git a/cpp/broker/src/Router.cpp b/cpp/broker/src/Router.cpp deleted file mode 100644 index c2dd74bf7d..0000000000 --- a/cpp/broker/src/Router.cpp +++ /dev/null @@ -1,32 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "Router.h" - -using namespace qpid::broker; - -Router::Router(ExchangeRegistry& _registry) : registry(_registry){} - -void Router::operator()(Message::shared_ptr& msg){ - Exchange* exchange = registry.get(msg->getExchange()); - if(exchange){ - exchange->route(msg, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders())); - }else{ - std::cout << "WARNING: Could not route message, unknown exchange: " << msg->getExchange() << std::endl; - } - -} diff --git a/cpp/broker/src/SessionHandlerFactoryImpl.cpp b/cpp/broker/src/SessionHandlerFactoryImpl.cpp deleted file mode 100644 index 39c627afef..0000000000 --- a/cpp/broker/src/SessionHandlerFactoryImpl.cpp +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "SessionHandlerFactoryImpl.h" -#include "SessionHandlerImpl.h" -#include "FanOutExchange.h" -#include "HeadersExchange.h" - -using namespace qpid::broker; -using namespace qpid::io; - -namespace -{ -const std::string empty; -const std::string amq_direct("amq.direct"); -const std::string amq_topic("amq.topic"); -const std::string amq_fanout("amq.fanout"); -const std::string amq_match("amq.match"); -} - -SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : timeout(_timeout), cleaner(&queues, timeout/10){ - exchanges.declare(new DirectExchange(empty)); // Default exchange. - exchanges.declare(new DirectExchange(amq_direct)); - exchanges.declare(new TopicExchange(amq_topic)); - exchanges.declare(new FanOutExchange(amq_fanout)); - exchanges.declare(new HeadersExchange(amq_match)); - cleaner.start(); -} - -SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt){ - return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, timeout); -} - -SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl(){ - cleaner.stop(); -} diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp deleted file mode 100644 index 0d8539332c..0000000000 --- a/cpp/broker/src/SessionHandlerImpl.cpp +++ /dev/null @@ -1,405 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <iostream> -#include "SessionHandlerImpl.h" -#include "FanOutExchange.h" -#include "HeadersExchange.h" -#include "Router.h" -#include "TopicExchange.h" -#include "assert.h" - -using namespace std::tr1; -using namespace qpid::broker; -using namespace qpid::io; -using namespace qpid::framing; -using namespace qpid::concurrent; - -SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context, - QueueRegistry* _queues, - ExchangeRegistry* _exchanges, - AutoDelete* _cleaner, - const u_int32_t _timeout) : - context(_context), - client(context), - queues(_queues), - exchanges(_exchanges), - cleaner(_cleaner), - timeout(_timeout), - connectionHandler(new ConnectionHandlerImpl(this)), - channelHandler(new ChannelHandlerImpl(this)), - basicHandler(new BasicHandlerImpl(this)), - exchangeHandler(new ExchangeHandlerImpl(this)), - queueHandler(new QueueHandlerImpl(this)), - framemax(65536), - heartbeat(0) {} - -SessionHandlerImpl::~SessionHandlerImpl(){ - // TODO aconway 2006-09-07: Should be auto_ptr or plain members. - delete channelHandler; - delete connectionHandler; - delete basicHandler; - delete exchangeHandler; - delete queueHandler; -} - -Channel* SessionHandlerImpl::getChannel(u_int16_t channel){ - channel_iterator i = channels.find(channel); - if(i == channels.end()){ - throw ConnectionException(504, "Unknown channel: " + channel); - } - return i->second; -} - -Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t channel){ - Queue::shared_ptr queue; - if (name.empty()) { - queue = getChannel(channel)->getDefaultQueue(); - if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" ); - } else { - queue = queues->find(name); - if (queue == 0) { - throw ChannelException( 404, "Queue not found: " + name); - } - } - return queue; -} - - -Exchange* SessionHandlerImpl::findExchange(const string& name){ - exchanges->getLock()->acquire(); - Exchange* exchange(exchanges->get(name)); - exchanges->getLock()->release(); - return exchange; -} - -void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){ - u_int16_t channel = frame->getChannel(); - AMQBody::shared_ptr body = frame->getBody(); - AMQMethodBody::shared_ptr method; - - switch(body->type()) - { - case METHOD_BODY: - method = dynamic_pointer_cast<AMQMethodBody, AMQBody>(body); - try{ - method->invoke(*this, channel); - }catch(ChannelException& e){ - channels[channel]->close(); - channels.erase(channel); - client.getChannel().close(channel, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); - }catch(ConnectionException& e){ - client.getConnection().close(0, e.code, e.text, method->amqpClassId(), method->amqpMethodId()); - } - break; - - case HEADER_BODY: - this->handleHeader(channel, dynamic_pointer_cast<AMQHeaderBody, AMQBody>(body)); - break; - - case CONTENT_BODY: - this->handleContent(channel, dynamic_pointer_cast<AMQContentBody, AMQBody>(body)); - break; - - case HEARTBEAT_BODY: - //channel must be 0 - this->handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody, AMQBody>(body)); - break; - } -} - -void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* /*header*/){ - //send connection start - FieldTable properties; - string mechanisms("PLAIN"); - string locales("en_US"); - client.getConnection().start(0, 8, 0, properties, mechanisms, locales); -} - -void SessionHandlerImpl::idleOut(){ - -} - -void SessionHandlerImpl::idleIn(){ - -} - -void SessionHandlerImpl::closed(){ - for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){ - Channel* c = i->second; - channels.erase(i); - c->close(); - delete c; - } - for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){ - string name = (*i)->getName(); - queues->destroy(name); - exclusiveQueues.erase(i); - } -} - -void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){ - getChannel(channel)->handleHeader(body, Router(*exchanges)); -} - -void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){ - getChannel(channel)->handleContent(body, Router(*exchanges)); -} - -void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ - std::cout << "SessionHandlerImpl::handleHeartbeat()" << std::endl; -} - -void SessionHandlerImpl::ConnectionHandlerImpl::startOk( - u_int16_t /*channel*/, FieldTable& /*clientProperties*/, string& /*mechanism*/, - string& /*response*/, string& /*locale*/){ - - parent->client.getConnection().tune(0, 100, parent->framemax, parent->heartbeat); -} - -void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t /*channel*/, string& /*response*/){} - -void SessionHandlerImpl::ConnectionHandlerImpl::tuneOk(u_int16_t /*channel*/, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat){ - parent->framemax = framemax; - parent->heartbeat = heartbeat; -} - -void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t /*channel*/, string& /*virtualHost*/, string& /*capabilities*/, bool /*insist*/){ - string knownhosts; - parent->client.getConnection().openOk(0, knownhosts); -} - -void SessionHandlerImpl::ConnectionHandlerImpl::close( - u_int16_t /*channel*/, u_int16_t /*replyCode*/, string& /*replyText*/, - u_int16_t /*classId*/, u_int16_t /*methodId*/) -{ - parent->client.getConnection().closeOk(0); - parent->context->close(); -} - -void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t /*channel*/){ - parent->context->close(); -} - - - -void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, string& /*outOfBand*/){ - parent->channels[channel] = new Channel(parent->context, channel, parent->framemax); - parent->client.getChannel().openOk(channel); -} - -void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){} -void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){} - -void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, string& /*replyText*/, - u_int16_t /*classId*/, u_int16_t /*methodId*/){ - Channel* c = parent->getChannel(channel); - if(c){ - parent->channels.erase(channel); - c->close(); - delete c; - parent->client.getChannel().closeOk(channel); - } -} - -void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t /*channel*/){} - - - -void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, string& exchange, string& type, - bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, - FieldTable& /*arguments*/){ - - if(!passive && ( - type != TopicExchange::typeName && - type != DirectExchange::typeName && - type != FanOutExchange::typeName && - type != HeadersExchange::typeName - ) - ) - { - throw ChannelException(540, "Exchange type not implemented: " + type); - } - - parent->exchanges->getLock()->acquire(); - if(!parent->exchanges->get(exchange)){ - if(type == TopicExchange::typeName){ - parent->exchanges->declare(new TopicExchange(exchange)); - }else if(type == DirectExchange::typeName){ - parent->exchanges->declare(new DirectExchange(exchange)); - }else if(type == FanOutExchange::typeName){ - parent->exchanges->declare(new DirectExchange(exchange)); - }else if (type == HeadersExchange::typeName) { - parent->exchanges->declare(new HeadersExchange(exchange)); - } - } - parent->exchanges->getLock()->release(); - if(!nowait){ - parent->client.getExchange().declareOk(channel); - } -} - -void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, string& exchange, bool /*ifUnused*/, bool nowait){ - //TODO: implement unused - parent->exchanges->getLock()->acquire(); - parent->exchanges->destroy(exchange); - parent->exchanges->getLock()->release(); - if(!nowait) parent->client.getExchange().deleteOk(channel); -} - -void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t /*ticket*/, string& name, - bool passive, bool durable, bool exclusive, - bool autoDelete, bool nowait, FieldTable& /*arguments*/){ - Queue::shared_ptr queue; - if (passive && !name.empty()) { - queue = parent->getQueue(name, channel); - } else { - std::pair<Queue::shared_ptr, bool> queue_created = parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, exclusive ? parent : 0); - queue = queue_created.first; - assert(queue); - if (queue_created.second) { // This is a new queue - parent->getChannel(channel)->setDefaultQueue(queue); - //add default binding: - parent->exchanges->getDefault()->bind(queue, name, 0); - if(exclusive){ - parent->exclusiveQueues.push_back(queue); - } else if(autoDelete){ - parent->cleaner->add(queue); - } - } - } - if(exclusive && !queue->isExclusiveOwner(parent)){ - throw ChannelException(405, "Cannot grant exclusive access to queue"); - } - if(!nowait){ - name = queue->getName(); - parent->client.getQueue().declareOk(channel, name, queue->getMessageCount(), queue->getConsumerCount()); - } -} - -void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*ticket*/, string& queueName, - string& exchangeName, string& routingKey, bool nowait, - FieldTable& arguments){ - - Queue::shared_ptr queue = parent->getQueue(queueName, channel); - Exchange* exchange = parent->exchanges->get(exchangeName); - if(exchange){ - if(routingKey.empty() && queueName.empty()) routingKey = queue->getName(); - exchange->bind(queue, routingKey, &arguments); - if(!nowait) parent->client.getQueue().bindOk(channel); - }else{ - throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName); - } -} - -void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t /*ticket*/, string& queueName, bool nowait){ - - Queue::shared_ptr queue = parent->getQueue(queueName, channel); - int count = queue->purge(); - if(!nowait) parent->client.getQueue().purgeOk(channel, count); -} - -void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t /*ticket*/, string& queue, - bool ifUnused, bool ifEmpty, bool nowait){ - ChannelException error(0, ""); - int count(0); - Queue::shared_ptr q = parent->getQueue(queue, channel); - if(ifEmpty && q->getMessageCount() > 0){ - throw ChannelException(406, "Queue not empty."); - }else if(ifUnused && q->getConsumerCount() > 0){ - throw ChannelException(406, "Queue in use."); - }else{ - //remove the queue from the list of exclusive queues if necessary - if(q->isExclusiveOwner(parent)){ - queue_iterator i = find(parent->exclusiveQueues.begin(), parent->exclusiveQueues.end(), q); - if(i < parent->exclusiveQueues.end()) parent->exclusiveQueues.erase(i); - } - count = q->getMessageCount(); - parent->queues->destroy(queue); - } - if(!nowait) parent->client.getQueue().deleteOk(channel, count); -} - - - - -void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ - //TODO: handle global - parent->getChannel(channel)->setPrefetchSize(prefetchSize); - parent->getChannel(channel)->setPrefetchCount(prefetchCount); - parent->client.getBasic().qosOk(channel); -} - -void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_t /*ticket*/, - string& queueName, string& consumerTag, - bool noLocal, bool noAck, bool exclusive, - bool nowait){ - - Queue::shared_ptr queue = parent->getQueue(queueName, channelId); - Channel* channel = parent->channels[channelId]; - if(!consumerTag.empty() && channel->exists(consumerTag)){ - throw ConnectionException(530, "Consumer tags must be unique"); - } - - try{ - channel->consume(consumerTag, queue, !noAck, exclusive, noLocal ? parent : 0); - if(!nowait) parent->client.getBasic().consumeOk(channelId, consumerTag); - - //allow messages to be dispatched if required as there is now a consumer: - queue->dispatch(); - }catch(ExclusiveAccessException& e){ - if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); - else throw ChannelException(403, "Access would violate previously granted exclusivity"); - } - -} - -void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, string& consumerTag, bool nowait){ - parent->getChannel(channel)->cancel(consumerTag); - if(!nowait) parent->client.getBasic().cancelOk(channel, consumerTag); -} - -void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, - string& exchange, string& routingKey, - bool mandatory, bool immediate){ - - Message* msg = new Message(parent, exchange, routingKey, mandatory, immediate); - parent->getChannel(channel)->handlePublish(msg); -} - -void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, string& queueName, bool noAck){ - Queue::shared_ptr queue = parent->getQueue(queueName, channelId); - if(!parent->getChannel(channelId)->get(queue, !noAck)){ - string clusterId;//not used, part of an imatix hack - parent->client.getBasic().getEmpty(channelId, clusterId); - } -} - -void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){ - try{ - parent->getChannel(channel)->ack(deliveryTag, multiple); - }catch(InvalidAckException& e){ - throw ConnectionException(530, "Received ack for unrecognised delivery tag"); - } -} - -void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t /*channel*/, u_int64_t /*deliveryTag*/, bool /*requeue*/){} - -void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){ - parent->getChannel(channel)->recover(requeue); -} - diff --git a/cpp/broker/src/TopicExchange.cpp b/cpp/broker/src/TopicExchange.cpp deleted file mode 100644 index 53977747c4..0000000000 --- a/cpp/broker/src/TopicExchange.cpp +++ /dev/null @@ -1,163 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "TopicExchange.h" -#include "ExchangeBinding.h" -#include <algorithm> - -using namespace qpid::broker; -using namespace qpid::framing; - - -// TODO aconway 2006-09-20: More efficient matching algorithm. -// Areas for improvement: -// - excessive string copying: should be 0 copy, match from original buffer. -// - match/lookup: use descision tree or other more efficient structure. - -Tokens& Tokens::operator=(const std::string& s) { - clear(); - if (s.empty()) return *this; - std::string::const_iterator i = s.begin(); - while (true) { - // Invariant: i is at the beginning of the next untokenized word. - std::string::const_iterator j = find(i, s.end(), '.'); - push_back(std::string(i, j)); - if (j == s.end()) return *this; - i = j + 1; - } - return *this; -} - -size_t Tokens::Hash::operator()(const Tokens& p) const { - size_t hash = 0; - for (Tokens::const_iterator i = p.begin(); i != p.end(); ++i) { - hash += std::tr1::hash<std::string>()(*i); - } - return hash; -} - -TopicPattern& TopicPattern::operator=(const Tokens& tokens) { - Tokens::operator=(tokens); - normalize(); - return *this; -} - -namespace { -const std::string hashmark("#"); -const std::string star("*"); -} - -void TopicPattern::normalize() { - std::string word; - Tokens::iterator i = begin(); - while (i != end()) { - if (*i == hashmark) { - ++i; - while (i != end()) { - // Invariant: *(i-1)==#, [begin()..i-1] is normalized. - if (*i == star) { // Move * before #. - std::swap(*i, *(i-1)); - ++i; - } else if (*i == hashmark) { - erase(i); // Remove extra # - } else { - break; - } - } - } else { - i ++; - } - } -} - - -namespace { -// TODO aconway 2006-09-20: Ineficient to convert every routingKey to a string. -// Need more efficient Tokens impl that can operate on a string in place. -// -bool do_match(Tokens::const_iterator pattern_begin, Tokens::const_iterator pattern_end, Tokens::const_iterator target_begin, Tokens::const_iterator target_end) -{ - // Invariant: [pattern_begin..p) matches [target_begin..t) - Tokens::const_iterator p = pattern_begin; - Tokens::const_iterator t = target_begin; - while (p != pattern_end && t != target_end) - { - if (*p == star || *p == *t) { - ++p, ++t; - } else if (*p == hashmark) { - ++p; - if (do_match(p, pattern_end, t, target_end)) return true; - while (t != target_end) { - ++t; - if (do_match(p, pattern_end, t, target_end)) return true; - } - return false; - } else { - return false; - } - } - while (p != pattern_end && *p == hashmark) ++p; // Ignore trailing # - return t == target_end && p == pattern_end; -} -} - -bool TopicPattern::match(const Tokens& target) const -{ - return do_match(begin(), end(), target.begin(), target.end()); -} - -TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { } - -void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ - lock.acquire(); - TopicPattern routingPattern(routingKey); - bindings[routingPattern].push_back(queue); - queue->bound(new ExchangeBinding(this, queue, routingKey, args)); - lock.release(); -} - -void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* /*args*/){ - lock.acquire(); - BindingMap::iterator bi = bindings.find(TopicPattern(routingKey)); - Queue::vector& qv(bi->second); - if (bi == bindings.end()) return; - Queue::vector::iterator q = find(qv.begin(), qv.end(), queue); - if(q == qv.end()) return; - qv.erase(q); - if(qv.empty()) bindings.erase(bi); - lock.release(); -} - - -void TopicExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* /*args*/){ - lock.acquire(); - for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { - if (i->first.match(routingKey)) { - Queue::vector& qv(i->second); - for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){ - (*j)->deliver(msg); - } - } - } - lock.release(); -} - -TopicExchange::~TopicExchange() {} - -const std::string TopicExchange::typeName("topic"); - - diff --git a/cpp/broker/test/ChannelTest.cpp b/cpp/broker/test/ChannelTest.cpp deleted file mode 100644 index 001a0d9c00..0000000000 --- a/cpp/broker/test/ChannelTest.cpp +++ /dev/null @@ -1,162 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "Channel.h" -#include "Message.h" -#include <qpid_test_plugin.h> -#include <iostream> -#include <memory> - -using namespace std::tr1; -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::concurrent; - -struct DummyRouter{ - Message::shared_ptr last; - - void operator()(Message::shared_ptr& msg){ - last = msg; - } -}; - -struct DummyHandler : OutputHandler{ - std::vector<AMQFrame*> frames; - - virtual void send(AMQFrame* frame){ - frames.push_back(frame); - } -}; - - -class ChannelTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(ChannelTest); - CPPUNIT_TEST(testIncoming); - CPPUNIT_TEST(testConsumerMgmt); - CPPUNIT_TEST(testDeliveryNoAck); - CPPUNIT_TEST_SUITE_END(); - - public: - - void testIncoming(){ - Channel channel(0, 0, 10000); - string routingKey("my_routing_key"); - channel.handlePublish(new Message(0, "test", routingKey, false, false)); - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - header->setContentSize(14); - string data1("abcdefg"); - string data2("hijklmn"); - AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); - AMQContentBody::shared_ptr part2(new AMQContentBody(data2)); - - CPPUNIT_ASSERT(!channel.handleHeader(header, DummyRouter()).last); - CPPUNIT_ASSERT(!channel.handleContent(part1, DummyRouter()).last); - DummyRouter router = channel.handleContent(part2, DummyRouter()); - CPPUNIT_ASSERT(router.last); - CPPUNIT_ASSERT_EQUAL(routingKey, router.last->getRoutingKey()); - } - - void testConsumerMgmt(){ - Queue::shared_ptr queue(new Queue("my_queue")); - Channel channel(0, 0, 0); - CPPUNIT_ASSERT(!channel.exists("my_consumer")); - - ConnectionToken* owner; - string tag("my_consumer"); - channel.consume(tag, queue, false, false, owner); - string tagA; - string tagB; - channel.consume(tagA, queue, false, false, owner); - channel.consume(tagB, queue, false, false, owner); - CPPUNIT_ASSERT_EQUAL((u_int32_t) 3, queue->getConsumerCount()); - CPPUNIT_ASSERT(channel.exists("my_consumer")); - CPPUNIT_ASSERT(channel.exists(tagA)); - CPPUNIT_ASSERT(channel.exists(tagB)); - channel.cancel(tagA); - CPPUNIT_ASSERT_EQUAL((u_int32_t) 2, queue->getConsumerCount()); - CPPUNIT_ASSERT(channel.exists("my_consumer")); - CPPUNIT_ASSERT(!channel.exists(tagA)); - CPPUNIT_ASSERT(channel.exists(tagB)); - channel.close(); - CPPUNIT_ASSERT_EQUAL((u_int32_t) 0, queue->getConsumerCount()); - } - - void testDeliveryNoAck(){ - DummyHandler handler; - Channel channel(&handler, 7, 10000); - - Message::shared_ptr msg(new Message(0, "test", "my_routing_key", false, false)); - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - header->setContentSize(14); - msg->setHeader(header); - AMQContentBody::shared_ptr body(new AMQContentBody("abcdefghijklmn")); - msg->addContent(body); - - Queue::shared_ptr queue(new Queue("my_queue")); - ConnectionToken* owner(0); - string tag("no_ack"); - channel.consume(tag, queue, false, false, owner); - - queue->deliver(msg); - CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel()); - BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody())); - AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody())); - AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody())); - CPPUNIT_ASSERT(deliver); - CPPUNIT_ASSERT(contentHeader); - CPPUNIT_ASSERT(contentBody); - CPPUNIT_ASSERT_EQUAL(string("abcdefghijklmn"), contentBody->getData()); - } - - void testDeliveryAndRecovery(){ - DummyHandler handler; - Channel channel(&handler, 7, 10000); - - Message::shared_ptr msg(new Message(0, "test", "my_routing_key", false, false)); - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - header->setContentSize(14); - msg->setHeader(header); - AMQContentBody::shared_ptr body(new AMQContentBody("abcdefghijklmn")); - msg->addContent(body); - - Queue::shared_ptr queue(new Queue("my_queue")); - ConnectionToken* owner; - string tag("ack"); - channel.consume(tag, queue, true, false, owner); - - queue->deliver(msg); - CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel()); - CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel()); - BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody())); - AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody())); - AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody())); - CPPUNIT_ASSERT(deliver); - CPPUNIT_ASSERT(contentHeader); - CPPUNIT_ASSERT(contentBody); - CPPUNIT_ASSERT_EQUAL(string("abcdefghijklmn"), contentBody->getData()); - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(ChannelTest); diff --git a/cpp/broker/test/ExchangeTest.cpp b/cpp/broker/test/ExchangeTest.cpp deleted file mode 100644 index 8c702ff836..0000000000 --- a/cpp/broker/test/ExchangeTest.cpp +++ /dev/null @@ -1,65 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "DirectExchange.h" -#include "Exchange.h" -#include "Queue.h" -#include "TopicExchange.h" -#include <qpid_test_plugin.h> -#include <iostream> - -using namespace qpid::broker; -using namespace qpid::concurrent; - -class ExchangeTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(ExchangeTest); - CPPUNIT_TEST(testMe); - CPPUNIT_TEST_SUITE_END(); - - public: - - // TODO aconway 2006-09-12: Need more detailed tests. - - void testMe() - { - Queue::shared_ptr queue(new Queue("queue", true, true)); - Queue::shared_ptr queue2(new Queue("queue2", true, true)); - - TopicExchange topic("topic"); - topic.bind(queue, "abc", 0); - topic.bind(queue2, "abc", 0); - - DirectExchange direct("direct"); - direct.bind(queue, "abc", 0); - direct.bind(queue2, "abc", 0); - - queue.reset(); - queue2.reset(); - - Message::shared_ptr msg = Message::shared_ptr(new Message(0, "e", "A", true, true)); - topic.route(msg, "abc", 0); - direct.route(msg, "abc", 0); - - // TODO aconway 2006-09-12: TODO Why no assertions? - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(ExchangeTest); diff --git a/cpp/broker/test/HeadersExchangeTest.cpp b/cpp/broker/test/HeadersExchangeTest.cpp deleted file mode 100644 index d56e00543d..0000000000 --- a/cpp/broker/test/HeadersExchangeTest.cpp +++ /dev/null @@ -1,112 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "HeadersExchange.h" -#include "FieldTable.h" -#include "Value.h" -#include <qpid_test_plugin.h> - -using namespace qpid::broker; -using namespace qpid::framing; - -class HeadersExchangeTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(HeadersExchangeTest); - CPPUNIT_TEST(testMatchAll); - CPPUNIT_TEST(testMatchAny); - CPPUNIT_TEST(testMatchEmptyValue); - CPPUNIT_TEST(testMatchEmptyArgs); - CPPUNIT_TEST(testMatchNoXMatch); - CPPUNIT_TEST_SUITE_END(); - - public: - - void testMatchAll() - { - FieldTable b, m; - b.setString("x-match", "all"); - b.setString("foo", "FOO"); - b.setInt("n", 42); - m.setString("foo", "FOO"); - m.setInt("n", 42); - CPPUNIT_ASSERT(HeadersExchange::match(b, m)); - - // Ignore extras. - m.setString("extra", "x"); - CPPUNIT_ASSERT(HeadersExchange::match(b, m)); - - // Fail mismatch, wrong value. - m.setString("foo", "NotFoo"); - CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); - - // Fail mismatch, missing value - m.erase("foo"); - CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); - } - - void testMatchAny() - { - FieldTable b, m; - b.setString("x-match", "any"); - b.setString("foo", "FOO"); - b.setInt("n", 42); - m.setString("foo", "FOO"); - CPPUNIT_ASSERT(HeadersExchange::match(b, m)); - m.erase("foo"); - CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); - m.setInt("n", 42); - CPPUNIT_ASSERT(HeadersExchange::match(b, m)); - } - - void testMatchEmptyValue() - { - FieldTable b, m; - b.setString("x-match", "all"); - b.getMap()["foo"] = FieldTable::ValuePtr(new EmptyValue()); - b.getMap()["n"] = FieldTable::ValuePtr(new EmptyValue()); - CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); - m.setString("foo", "blah"); - m.setInt("n", 123); - } - - void testMatchEmptyArgs() - { - FieldTable b, m; - m.setString("foo", "FOO"); - - b.setString("x-match", "all"); - CPPUNIT_ASSERT(HeadersExchange::match(b, m)); - b.setString("x-match", "any"); - CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); - } - - - void testMatchNoXMatch() - { - FieldTable b, m; - b.setString("foo", "FOO"); - m.setString("foo", "FOO"); - CPPUNIT_ASSERT(!HeadersExchange::match(b, m)); - } - - -}; - -// make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(HeadersExchangeTest); diff --git a/cpp/broker/test/Makefile b/cpp/broker/test/Makefile deleted file mode 100644 index 172ce564bf..0000000000 --- a/cpp/broker/test/Makefile +++ /dev/null @@ -1,20 +0,0 @@ -# -# Copyright (c) 2006 The Apache Software Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -QPID_HOME = ../../.. -LDLIBS=-lapr-1 -lcppunit $(COMMON_LIB) $(BROKER_LIB) -include ${QPID_HOME}/cpp/test_plugins.mk - diff --git a/cpp/broker/test/MessageTest.cpp b/cpp/broker/test/MessageTest.cpp deleted file mode 100644 index fc2c6e01bb..0000000000 --- a/cpp/broker/test/MessageTest.cpp +++ /dev/null @@ -1,54 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "APRBase.h" -#include "Message.h" -#include <qpid_test_plugin.h> -#include <iostream> - -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::concurrent; - -class MessageTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(MessageTest); - CPPUNIT_TEST(testMe); - CPPUNIT_TEST_SUITE_END(); - - public: - - // TODO aconway 2006-09-12: Need more detailed tests, - // need tests to assert something! - // - void testMe() - { - APRBase::increment(); - const int size(10); - for(int i = 0; i < size; i++){ - Message::shared_ptr msg = Message::shared_ptr(new Message(0, "A", "B", true, true)); - msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody())); - msg->addContent(AMQContentBody::shared_ptr(new AMQContentBody())); - msg.reset(); - } - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(MessageTest); - diff --git a/cpp/broker/test/QueueRegistryTest.cpp b/cpp/broker/test/QueueRegistryTest.cpp deleted file mode 100644 index bd739aaad5..0000000000 --- a/cpp/broker/test/QueueRegistryTest.cpp +++ /dev/null @@ -1,76 +0,0 @@ -#include "QueueRegistry.h" -#include <qpid_test_plugin.h> -#include <string> - -using namespace qpid::broker; - -class QueueRegistryTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(QueueRegistryTest); - CPPUNIT_TEST(testDeclare); - CPPUNIT_TEST(testDeclareTmp); - CPPUNIT_TEST(testFind); - CPPUNIT_TEST(testDestroy); - CPPUNIT_TEST_SUITE_END(); - - private: - std::string foo, bar; - QueueRegistry reg; - std::pair<Queue::shared_ptr, bool> qc; - - public: - void setUp() { - foo = "foo"; - bar = "bar"; - } - - void testDeclare() { - qc = reg.declare(foo, false, 0, 0); - Queue::shared_ptr q = qc.first; - CPPUNIT_ASSERT(q); - CPPUNIT_ASSERT(qc.second); // New queue - CPPUNIT_ASSERT_EQUAL(foo, q->getName()); - - qc = reg.declare(foo, false, 0, 0); - CPPUNIT_ASSERT_EQUAL(q, qc.first); - CPPUNIT_ASSERT(!qc.second); - - qc = reg.declare(bar, false, 0, 0); - q = qc.first; - CPPUNIT_ASSERT(q); - CPPUNIT_ASSERT_EQUAL(true, qc.second); - CPPUNIT_ASSERT_EQUAL(bar, q->getName()); - } - - void testDeclareTmp() - { - qc = reg.declare(std::string(), false, 0, 0); - CPPUNIT_ASSERT(qc.second); - CPPUNIT_ASSERT_EQUAL(std::string("tmp_1"), qc.first->getName()); - } - - void testFind() { - CPPUNIT_ASSERT(reg.find(foo) == 0); - - reg.declare(foo, false, 0, 0); - reg.declare(bar, false, 0, 0); - Queue::shared_ptr q = reg.find(bar); - CPPUNIT_ASSERT(q); - CPPUNIT_ASSERT_EQUAL(bar, q->getName()); - } - - void testDestroy() { - qc = reg.declare(foo, false, 0, 0); - reg.destroy(foo); - // Queue is gone from the registry. - CPPUNIT_ASSERT(reg.find(foo) == 0); - // Queue is not actually destroyed till we drop our reference. - CPPUNIT_ASSERT_EQUAL(foo, qc.first->getName()); - // We shoud be the only reference. - CPPUNIT_ASSERT_EQUAL(1L, qc.first.use_count()); - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(QueueRegistryTest); diff --git a/cpp/broker/test/QueueTest.cpp b/cpp/broker/test/QueueTest.cpp deleted file mode 100644 index 1b4eb814cb..0000000000 --- a/cpp/broker/test/QueueTest.cpp +++ /dev/null @@ -1,176 +0,0 @@ - /* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "Queue.h" -#include "QueueRegistry.h" -#include <qpid_test_plugin.h> -#include <iostream> - -using namespace qpid::broker; -using namespace qpid::concurrent; - - -class TestBinding : public virtual Binding{ - bool cancelled; - -public: - TestBinding(); - virtual void cancel(); - bool isCancelled(); -}; - -class TestConsumer : public virtual Consumer{ -public: - Message::shared_ptr last; - - virtual bool deliver(Message::shared_ptr& msg); -}; - - -class QueueTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(QueueTest); - CPPUNIT_TEST(testConsumers); - CPPUNIT_TEST(testBinding); - CPPUNIT_TEST(testRegistry); - CPPUNIT_TEST(testDequeue); - CPPUNIT_TEST_SUITE_END(); - - public: - void testConsumers(){ - Queue::shared_ptr queue(new Queue("my_queue", true, true)); - - //Test adding consumers: - TestConsumer c1; - TestConsumer c2; - queue->consume(&c1); - queue->consume(&c2); - - CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getConsumerCount()); - - //Test basic delivery: - Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true)); - Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true)); - Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true)); - - queue->deliver(msg1); - CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get()); - - queue->deliver(msg2); - CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get()); - - queue->deliver(msg3); - CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get()); - - //Test cancellation: - queue->cancel(&c1); - CPPUNIT_ASSERT_EQUAL(u_int32_t(1), queue->getConsumerCount()); - queue->cancel(&c2); - CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getConsumerCount()); - } - - void testBinding(){ - Queue::shared_ptr queue(new Queue("my_queue", true, true)); - //Test bindings: - TestBinding a; - TestBinding b; - queue->bound(&a); - queue->bound(&b); - - queue.reset(); - - CPPUNIT_ASSERT(a.isCancelled()); - CPPUNIT_ASSERT(b.isCancelled()); - } - - void testRegistry(){ - //Test use of queues in registry: - QueueRegistry registry; - registry.declare("queue1", true, true); - registry.declare("queue2", true, true); - registry.declare("queue3", true, true); - - CPPUNIT_ASSERT(registry.find("queue1")); - CPPUNIT_ASSERT(registry.find("queue2")); - CPPUNIT_ASSERT(registry.find("queue3")); - - registry.destroy("queue1"); - registry.destroy("queue2"); - registry.destroy("queue3"); - - CPPUNIT_ASSERT(!registry.find("queue1")); - CPPUNIT_ASSERT(!registry.find("queue2")); - CPPUNIT_ASSERT(!registry.find("queue3")); - } - - void testDequeue(){ - Queue::shared_ptr queue(new Queue("my_queue", true, true)); - - Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true)); - Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true)); - Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true)); - Message::shared_ptr received; - - queue->deliver(msg1); - queue->deliver(msg2); - queue->deliver(msg3); - - CPPUNIT_ASSERT_EQUAL(u_int32_t(3), queue->getMessageCount()); - - received = queue->dequeue(); - CPPUNIT_ASSERT_EQUAL(msg1.get(), received.get()); - CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getMessageCount()); - - received = queue->dequeue(); - CPPUNIT_ASSERT_EQUAL(msg2.get(), received.get()); - CPPUNIT_ASSERT_EQUAL(u_int32_t(1), queue->getMessageCount()); - - TestConsumer consumer; - queue->consume(&consumer); - queue->dispatch(); - CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get()); - CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getMessageCount()); - - received = queue->dequeue(); - CPPUNIT_ASSERT(!received); - CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getMessageCount()); - - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(QueueTest); - -//TestBinding -TestBinding::TestBinding() : cancelled(false) {} - -void TestBinding::cancel(){ - CPPUNIT_ASSERT(!cancelled); - cancelled = true; -} - -bool TestBinding::isCancelled(){ - return cancelled; -} - -//TestConsumer -bool TestConsumer::deliver(Message::shared_ptr& msg){ - last = msg; - return true; -} - diff --git a/cpp/broker/test/RouterTest.cpp b/cpp/broker/test/RouterTest.cpp deleted file mode 100644 index b1d4b9739f..0000000000 --- a/cpp/broker/test/RouterTest.cpp +++ /dev/null @@ -1,86 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "Channel.h" -#include "Exchange.h" -#include "ExchangeRegistry.h" -#include "Message.h" -#include "Router.h" -#include <qpid_test_plugin.h> -#include <iostream> -#include <memory> - -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::concurrent; - -struct TestExchange : public Exchange{ - Message::shared_ptr msg; - string routingKey; - FieldTable* args; - - TestExchange() : Exchange("test"), args(0) {} - - void bind(Queue::shared_ptr /*queue*/, const string& /*routingKey*/, FieldTable* /*args*/){} - - void unbind(Queue::shared_ptr /*queue*/, const string& /*routingKey*/, FieldTable* /*args*/){ } - - void route(Message::shared_ptr& _msg, const string& _routingKey, FieldTable* _args){ - msg = _msg; - routingKey = _routingKey; - args = _args; - } -}; - -class RouterTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(RouterTest); - CPPUNIT_TEST(test); - CPPUNIT_TEST_SUITE_END(); - - public: - - void test() - { - ExchangeRegistry registry; - TestExchange* exchange = new TestExchange(); - registry.declare(exchange); - - string routingKey("my_routing_key"); - string name("name"); - string value("value"); - Message::shared_ptr msg(new Message(0, "test", routingKey, false, false)); - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - - dynamic_cast<BasicHeaderProperties*>(header->getProperties())->getHeaders().setString(name, value); - msg->setHeader(header); - - Router router(registry); - router(msg); - - CPPUNIT_ASSERT(exchange->msg); - CPPUNIT_ASSERT_EQUAL(msg, exchange->msg); - CPPUNIT_ASSERT_EQUAL(routingKey, exchange->msg->getRoutingKey()); - CPPUNIT_ASSERT_EQUAL(routingKey, exchange->routingKey); - CPPUNIT_ASSERT_EQUAL(value, exchange->args->getString(name)); - } -}; - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(RouterTest); - diff --git a/cpp/broker/test/TopicExchangeTest.cpp b/cpp/broker/test/TopicExchangeTest.cpp deleted file mode 100644 index d9b49fc603..0000000000 --- a/cpp/broker/test/TopicExchangeTest.cpp +++ /dev/null @@ -1,184 +0,0 @@ -#include "TopicExchange.h" -#include <qpid_test_plugin.h> - -using namespace qpid::broker; - -Tokens makeTokens(char** begin, char** end) -{ - Tokens t; - t.insert(t.end(), begin, end); - return t; -} - -// Calculate size of an array. -#define LEN(a) (sizeof(a)/sizeof(a[0])) - -// Convert array to token vector -#define TOKENS(a) makeTokens(a, a + LEN(a)) - -// Allow CPPUNIT_EQUALS to print a Tokens. -// TODO aconway 2006-09-19: Make it a template and put it in a shared test lib. -// -CppUnit::OStringStream& operator <<(CppUnit::OStringStream& out, const Tokens& v) -{ - out << "[ "; - for (Tokens::const_iterator i = v.begin(); - i != v.end(); ++i) - { - out << '"' << *i << '"' << (i+1 == v.end() ? "]" : ", "); - } - return out; -} - - -class TokensTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(TokensTest); - CPPUNIT_TEST(testTokens); - CPPUNIT_TEST_SUITE_END(); - - public: - void testTokens() - { - Tokens tokens("hello.world"); - char* expect[] = {"hello", "world"}; - CPPUNIT_ASSERT_EQUAL(TOKENS(expect), tokens); - - tokens = "a.b.c"; - char* expect2[] = { "a", "b", "c" }; - CPPUNIT_ASSERT_EQUAL(TOKENS(expect2), tokens); - - tokens = ""; - CPPUNIT_ASSERT(tokens.empty()); - - tokens = "x"; - char* expect3[] = { "x" }; - CPPUNIT_ASSERT_EQUAL(TOKENS(expect3), tokens); - - tokens = (".x"); - char* expect4[] = { "", "x" }; - CPPUNIT_ASSERT_EQUAL(TOKENS(expect4), tokens); - - tokens = ("x."); - char* expect5[] = { "x", "" }; - CPPUNIT_ASSERT_EQUAL(TOKENS(expect5), tokens); - - tokens = ("."); - char* expect6[] = { "", "" }; - CPPUNIT_ASSERT_EQUAL(TOKENS(expect6), tokens); - - tokens = (".."); - char* expect7[] = { "", "", "" }; - CPPUNIT_ASSERT_EQUAL(TOKENS(expect7), tokens); - } - -}; - -#define ASSERT_NORMALIZED(expect, pattern) \ - CPPUNIT_ASSERT_EQUAL(Tokens(expect), static_cast<Tokens>(TopicPattern(pattern))) -class TopicPatternTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(TopicPatternTest); - CPPUNIT_TEST(testNormalize); - CPPUNIT_TEST(testPlain); - CPPUNIT_TEST(testStar); - CPPUNIT_TEST(testHash); - CPPUNIT_TEST(testMixed); - CPPUNIT_TEST(testCombo); - CPPUNIT_TEST_SUITE_END(); - - public: - - void testNormalize() - { - CPPUNIT_ASSERT(TopicPattern("").empty()); - ASSERT_NORMALIZED("a.b.c", "a.b.c"); - ASSERT_NORMALIZED("a.*.c", "a.*.c"); - ASSERT_NORMALIZED("#", "#"); - ASSERT_NORMALIZED("#", "#.#.#.#"); - ASSERT_NORMALIZED("*.*.*.#", "#.*.#.*.#.#.*"); - ASSERT_NORMALIZED("a.*.*.*.#", "a.*.#.*.#.*.#"); - ASSERT_NORMALIZED("a.*.*.*.#", "a.*.#.*.#.*"); - } - - void testPlain() { - TopicPattern p("ab.cd.e"); - CPPUNIT_ASSERT(p.match("ab.cd.e")); - CPPUNIT_ASSERT(!p.match("abx.cd.e")); - CPPUNIT_ASSERT(!p.match("ab.cd")); - CPPUNIT_ASSERT(!p.match("ab.cd..e.")); - CPPUNIT_ASSERT(!p.match("ab.cd.e.")); - CPPUNIT_ASSERT(!p.match(".ab.cd.e")); - - p = ""; - CPPUNIT_ASSERT(p.match("")); - - p = "."; - CPPUNIT_ASSERT(p.match(".")); - } - - - void testStar() - { - TopicPattern p("a.*.b"); - CPPUNIT_ASSERT(p.match("a.xx.b")); - CPPUNIT_ASSERT(!p.match("a.b")); - - p = "*.x"; - CPPUNIT_ASSERT(p.match("y.x")); - CPPUNIT_ASSERT(p.match(".x")); - CPPUNIT_ASSERT(!p.match("x")); - - p = "x.x.*"; - CPPUNIT_ASSERT(p.match("x.x.y")); - CPPUNIT_ASSERT(p.match("x.x.")); - CPPUNIT_ASSERT(!p.match("x.x")); - CPPUNIT_ASSERT(!p.match("q.x.y")); - } - - void testHash() - { - TopicPattern p("a.#.b"); - CPPUNIT_ASSERT(p.match("a.b")); - CPPUNIT_ASSERT(p.match("a.x.b")); - CPPUNIT_ASSERT(p.match("a..x.y.zz.b")); - CPPUNIT_ASSERT(!p.match("a.b.")); - CPPUNIT_ASSERT(!p.match("q.x.b")); - - p = "a.#"; - CPPUNIT_ASSERT(p.match("a")); - CPPUNIT_ASSERT(p.match("a.b")); - CPPUNIT_ASSERT(p.match("a.b.c")); - - p = "#.a"; - CPPUNIT_ASSERT(p.match("a")); - CPPUNIT_ASSERT(p.match("x.y.a")); - } - - void testMixed() - { - TopicPattern p("*.x.#.y"); - CPPUNIT_ASSERT(p.match("a.x.y")); - CPPUNIT_ASSERT(p.match("a.x.p.qq.y")); - CPPUNIT_ASSERT(!p.match("a.a.x.y")); - CPPUNIT_ASSERT(!p.match("aa.x.b.c")); - - p = "a.#.b.*"; - CPPUNIT_ASSERT(p.match("a.b.x")); - CPPUNIT_ASSERT(p.match("a.x.x.x.b.x")); - } - - void testCombo() { - TopicPattern p("*.#.#.*.*.#"); - CPPUNIT_ASSERT(p.match("x.y.z")); - CPPUNIT_ASSERT(p.match("x.y.z.a.b.c")); - CPPUNIT_ASSERT(!p.match("x.y")); - CPPUNIT_ASSERT(!p.match("x")); - } -}; - - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(TopicPatternTest); -CPPUNIT_TEST_SUITE_REGISTRATION(TokensTest); diff --git a/cpp/broker/test/ValueTest.cpp b/cpp/broker/test/ValueTest.cpp deleted file mode 100644 index ec9659e603..0000000000 --- a/cpp/broker/test/ValueTest.cpp +++ /dev/null @@ -1,96 +0,0 @@ -#include "Value.h" -#include <qpid_test_plugin.h> - -using namespace qpid::framing; - - -// Allow CPPUNIT_EQUALS to print a Tokens. -// TODO aconway 2006-09-19: Make it a template and put it in a shared test lib. -// -template <class T> -CppUnit::OStringStream& operator <<(CppUnit::OStringStream& out, - const ValueOps<T>& v) -{ - out << v.getValue(); - return out; -} - - -class ValueTest : public CppUnit::TestCase -{ - CPPUNIT_TEST_SUITE(ValueTest); - CPPUNIT_TEST(testStringValueEquals); - CPPUNIT_TEST(testIntegerValueEquals); - CPPUNIT_TEST(testDecimalValueEquals); - CPPUNIT_TEST(testFieldTableValueEquals); - CPPUNIT_TEST_SUITE_END(); - - StringValue s; - IntegerValue i; - DecimalValue d; - FieldTableValue ft; - EmptyValue e; - - public: - ValueTest() : - s("abc"), - i(42), - d(1234,2) - - { - ft.getValue().setString("foo", "FOO"); - ft.getValue().setInt("magic", 7); - } - - void testStringValueEquals() - { - - CPPUNIT_ASSERT(StringValue("abc") == s); - CPPUNIT_ASSERT(s != StringValue("foo")); - CPPUNIT_ASSERT(s != e); - CPPUNIT_ASSERT(e != d); - CPPUNIT_ASSERT(e != ft); - } - - void testIntegerValueEquals() - { - CPPUNIT_ASSERT(IntegerValue(42) == i); - CPPUNIT_ASSERT(IntegerValue(5) != i); - CPPUNIT_ASSERT(i != e); - CPPUNIT_ASSERT(i != d); - } - - void testDecimalValueEquals() - { - CPPUNIT_ASSERT(DecimalValue(1234, 2) == d); - CPPUNIT_ASSERT(DecimalValue(12345, 2) != d); - CPPUNIT_ASSERT(DecimalValue(1234, 3) != d); - CPPUNIT_ASSERT(d != s); - } - - - void testFieldTableValueEquals() - { - CPPUNIT_ASSERT_EQUAL(std::string("FOO"), - ft.getValue().getString("foo")); - CPPUNIT_ASSERT_EQUAL(7, ft.getValue().getInt("magic")); - - FieldTableValue f2; - CPPUNIT_ASSERT(ft != f2); - f2.getValue().setString("foo", "FOO"); - CPPUNIT_ASSERT(ft != f2); - f2.getValue().setInt("magic", 7); - CPPUNIT_ASSERT_EQUAL(ft,f2); - CPPUNIT_ASSERT(ft == f2); - f2.getValue().setString("foo", "BAR"); - CPPUNIT_ASSERT(ft != f2); - CPPUNIT_ASSERT(ft != i); - } - -}; - - -// Make this test suite a plugin. -CPPUNIT_PLUGIN_IMPLEMENT(); -CPPUNIT_TEST_SUITE_REGISTRATION(ValueTest); - |
