From dda71d21e76e01918ebec2d80dd8e077f94216e0 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Wed, 1 Nov 2006 01:19:12 +0000 Subject: Moved APR specific sources into src_apr. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@469738 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/Makefile | 36 +++-- cpp/options.mk | 12 +- cpp/src/qpid/broker/Queue.cpp | 6 +- cpp/src/qpid/broker/Queue.h | 4 +- cpp/src/qpid/concurrent/APRBase.cpp | 96 ------------ cpp/src/qpid/concurrent/APRBase.h | 63 -------- cpp/src/qpid/concurrent/Monitor.cpp | 60 -------- cpp/src/qpid/concurrent/Monitor.h | 56 ------- cpp/src/qpid/concurrent/Thread.cpp | 50 ------- cpp/src/qpid/concurrent/Thread.h | 48 ------ cpp/src/qpid/concurrent/ThreadFactory.cpp | 35 ----- cpp/src/qpid/concurrent/ThreadFactory.h | 44 ------ cpp/src/qpid/concurrent/ThreadPool.cpp | 83 ----------- cpp/src/qpid/concurrent/ThreadPool.h | 67 --------- cpp/src/qpid/concurrent/Time.h | 52 +++++++ cpp/src/qpid/io/APRPool.cpp | 39 ----- cpp/src/qpid/io/APRPool.h | 47 ------ cpp/src/qpid/io/APRSocket.cpp | 76 ---------- cpp/src/qpid/io/APRSocket.h | 45 ------ cpp/src/qpid/io/Acceptor.cpp | 78 ---------- cpp/src/qpid/io/Acceptor.h | 60 -------- cpp/src/qpid/io/Connector.cpp | 201 -------------------------- cpp/src/qpid/io/Connector.h | 95 ------------ cpp/src/qpid/io/LFProcessor.cpp | 193 ------------------------- cpp/src/qpid/io/LFProcessor.h | 119 --------------- cpp/src/qpid/io/LFSessionContext.cpp | 189 ------------------------ cpp/src/qpid/io/LFSessionContext.h | 88 ----------- cpp/src_apr/qpid/concurrent/APRBase.cpp | 96 ++++++++++++ cpp/src_apr/qpid/concurrent/APRBase.h | 63 ++++++++ cpp/src_apr/qpid/concurrent/Monitor.cpp | 60 ++++++++ cpp/src_apr/qpid/concurrent/Monitor.h | 56 +++++++ cpp/src_apr/qpid/concurrent/Thread.cpp | 50 +++++++ cpp/src_apr/qpid/concurrent/Thread.h | 48 ++++++ cpp/src_apr/qpid/concurrent/ThreadFactory.cpp | 35 +++++ cpp/src_apr/qpid/concurrent/ThreadFactory.h | 44 ++++++ cpp/src_apr/qpid/concurrent/ThreadPool.cpp | 83 +++++++++++ cpp/src_apr/qpid/concurrent/ThreadPool.h | 67 +++++++++ cpp/src_apr/qpid/concurrent/Time.cpp | 29 ++++ cpp/src_apr/qpid/io/APRPool.cpp | 39 +++++ cpp/src_apr/qpid/io/APRPool.h | 47 ++++++ cpp/src_apr/qpid/io/APRSocket.cpp | 76 ++++++++++ cpp/src_apr/qpid/io/APRSocket.h | 45 ++++++ cpp/src_apr/qpid/io/Acceptor.cpp | 78 ++++++++++ cpp/src_apr/qpid/io/Acceptor.h | 60 ++++++++ cpp/src_apr/qpid/io/Connector.cpp | 201 ++++++++++++++++++++++++++ cpp/src_apr/qpid/io/Connector.h | 95 ++++++++++++ cpp/src_apr/qpid/io/LFProcessor.cpp | 193 +++++++++++++++++++++++++ cpp/src_apr/qpid/io/LFProcessor.h | 119 +++++++++++++++ cpp/src_apr/qpid/io/LFSessionContext.cpp | 189 ++++++++++++++++++++++++ cpp/src_apr/qpid/io/LFSessionContext.h | 88 +++++++++++ 50 files changed, 1948 insertions(+), 1855 deletions(-) delete mode 100644 cpp/src/qpid/concurrent/APRBase.cpp delete mode 100644 cpp/src/qpid/concurrent/APRBase.h delete mode 100644 cpp/src/qpid/concurrent/Monitor.cpp delete mode 100644 cpp/src/qpid/concurrent/Monitor.h delete mode 100644 cpp/src/qpid/concurrent/Thread.cpp delete mode 100644 cpp/src/qpid/concurrent/Thread.h delete mode 100644 cpp/src/qpid/concurrent/ThreadFactory.cpp delete mode 100644 cpp/src/qpid/concurrent/ThreadFactory.h delete mode 100644 cpp/src/qpid/concurrent/ThreadPool.cpp delete mode 100644 cpp/src/qpid/concurrent/ThreadPool.h create mode 100644 cpp/src/qpid/concurrent/Time.h delete mode 100644 cpp/src/qpid/io/APRPool.cpp delete mode 100644 cpp/src/qpid/io/APRPool.h delete mode 100644 cpp/src/qpid/io/APRSocket.cpp delete mode 100644 cpp/src/qpid/io/APRSocket.h delete mode 100644 cpp/src/qpid/io/Acceptor.cpp delete mode 100644 cpp/src/qpid/io/Acceptor.h delete mode 100644 cpp/src/qpid/io/Connector.cpp delete mode 100644 cpp/src/qpid/io/Connector.h delete mode 100644 cpp/src/qpid/io/LFProcessor.cpp delete mode 100644 cpp/src/qpid/io/LFProcessor.h delete mode 100644 cpp/src/qpid/io/LFSessionContext.cpp delete mode 100644 cpp/src/qpid/io/LFSessionContext.h create mode 100644 cpp/src_apr/qpid/concurrent/APRBase.cpp create mode 100644 cpp/src_apr/qpid/concurrent/APRBase.h create mode 100644 cpp/src_apr/qpid/concurrent/Monitor.cpp create mode 100644 cpp/src_apr/qpid/concurrent/Monitor.h create mode 100644 cpp/src_apr/qpid/concurrent/Thread.cpp create mode 100644 cpp/src_apr/qpid/concurrent/Thread.h create mode 100644 cpp/src_apr/qpid/concurrent/ThreadFactory.cpp create mode 100644 cpp/src_apr/qpid/concurrent/ThreadFactory.h create mode 100644 cpp/src_apr/qpid/concurrent/ThreadPool.cpp create mode 100644 cpp/src_apr/qpid/concurrent/ThreadPool.h create mode 100644 cpp/src_apr/qpid/concurrent/Time.cpp create mode 100644 cpp/src_apr/qpid/io/APRPool.cpp create mode 100644 cpp/src_apr/qpid/io/APRPool.h create mode 100644 cpp/src_apr/qpid/io/APRSocket.cpp create mode 100644 cpp/src_apr/qpid/io/APRSocket.h create mode 100644 cpp/src_apr/qpid/io/Acceptor.cpp create mode 100644 cpp/src_apr/qpid/io/Acceptor.h create mode 100644 cpp/src_apr/qpid/io/Connector.cpp create mode 100644 cpp/src_apr/qpid/io/Connector.h create mode 100644 cpp/src_apr/qpid/io/LFProcessor.cpp create mode 100644 cpp/src_apr/qpid/io/LFProcessor.h create mode 100644 cpp/src_apr/qpid/io/LFSessionContext.cpp create mode 100644 cpp/src_apr/qpid/io/LFSessionContext.h diff --git a/cpp/Makefile b/cpp/Makefile index e81e89a6a7..b3cd65f6d7 100644 --- a/cpp/Makefile +++ b/cpp/Makefile @@ -39,7 +39,12 @@ $(GENDIR)/timestamp: $(wildcard etc/stylesheets/*.xsl) $(SPEC) rm -rf $(GENDIR) mkdir -p $(GENDIR)/qpid/framing ( cd $(GENDIR)/qpid/framing && for s in $(STYLESHEETS) ; do $(TRANSFORM) $$s ; done ) && echo > $(GENDIR)/timestamp -$(shell find $(GENDIR) -name *.cpp -o -name *.h): $(GENDIR)/timestamp + +# Dependencies for existing generated files. +GENFILES:=$(wildcard $(GENDIR)/qpid/*/*.cpp $(GENDIR)/qpid/*/*.h) +ifdef GENFILES +$(GENFILES): $(GENDIR)/timestamp +endif $(BUILDDIRS): mkdir -p $(BUILDDIRS) @@ -47,15 +52,15 @@ $(BUILDDIRS): ## Library rules LIB_common := $(call LIBFILE,common,1.0) -$(LIB_common): $(call OBJ_FROM,src,qpid/concurrent qpid/framing qpid/io qpid) $(call OBJ_FROM,$(GENDIR),qpid/framing) +$(LIB_common): $(call OBJECTS,qpid qpid/concurrent qpid/framing qpid/io) $(LIB_COMMAND) LIB_client :=$(call LIBFILE,client,1.0) -$(LIB_client): $(call OBJ_FROM,src,qpid/client) $(LIB_common) +$(LIB_client): $(call OBJECTS,qpid/client) $(LIB_common) $(LIB_COMMAND) LIB_broker :=$(call LIBFILE,broker,1.0) -$(LIB_broker): $(call OBJ_FROM,src,qpid/broker) $(LIB_common) +$(LIB_broker): $(call OBJECTS,qpid/broker) $(LIB_common) $(LIB_COMMAND) ## Daemon executable @@ -85,13 +90,14 @@ build/html: doxygen.cfg ## Implicit rules -# C++ compiile -$(OBJDIR)/%.o: src/%.cpp - mkdir -p $(dir $@) - $(CXX) $(CXXFLAGS) -c -o $@ $< -$(OBJDIR)/%.o: $(GENDIR)/%.cpp - mkdir -p $(dir $@) - $(CXX) $(CXXFLAGS) -c -o $@ $< +# C++ compile +define CPPRULE +$(OBJDIR)/%.o: $1/%.cpp + @mkdir -p $$(dir $$@) + $(CXX) $(CXXFLAGS) -c -o $$@ $$< +endef + +$(foreach dir,$(SRCDIRS),$(eval $(call CPPRULE,$(dir)))) # Unit test plugin libraries. $(TESTDIR)/%Test.so: test/unit/%Test.cpp @@ -106,9 +112,11 @@ CLIENT_TEST_SRC := $(wildcard test/client/*.cpp) CLIENT_TEST_EXE := $(CLIENT_TEST_SRC:test/client/%.cpp=$(TESTDIR)/%) all-nogen: $(CLIENT_TEST_EXE) -## #include dependencies --include $(shell find $(GENDIR) $(OBJDIR) -name '*.d') dummy-avoid-warning-if-none - +## include dependencies +DEPFILES:=$(wildcard $(OBJDIR)/*.d $(OBJDIR)/*/*.d $(OBJDIR)/*/*/*.d) +ifdef DEPFILES +-include $(DEPFILES) +endif ## Clean up diff --git a/cpp/options.mk b/cpp/options.mk index fc9cd26fff..fec94c4fba 100644 --- a/cpp/options.mk +++ b/cpp/options.mk @@ -45,7 +45,8 @@ LIBDIR:=build/$(BUILD)/lib OBJDIR:=build/$(BUILD)/obj TESTDIR:=build/$(BUILD)/test -BUILDDIRS:= $(BINDIR) $(LIBDIR) $(OBJDIR) $(TESTDIR) $(GENDIR) +BUILDDIRS := $(BINDIR) $(LIBDIR) $(OBJDIR) $(TESTDIR) $(GENDIR) +SRCDIRS := src src_$(PLATFORM) $(GENDIR) ## External dependencies: @@ -68,14 +69,17 @@ RELEASE := -O3 -DNDEBUG # WARN := -Werror -pedantic -Wall -Wextra -Wshadow -Wpointer-arith -Wcast-qual -Wcast-align -Wno-long-long -Wvolatile-register-var -Winvalid-pch -INCLUDES := -Isrc -I$(GENDIR) $(EXTRA_INCLUDES) +INCLUDES := $(SRCDIRS:%=-I%) $(EXTRA_INCLUDES) LDFLAGS := -L$(LIBDIR) $(LDFLAGS_$(PLATFORM)) CXXFLAGS := $(DEFINES) $(WARN) -MMD -fpic $(INCLUDES) $(CXXFLAGS_$(PLATFORM)) ## Macros for linking, must be late evaluated -# $(call OBJ_FROM,root,subdirs) -OBJ_FROM = $(foreach sub,$2,$(patsubst $1/%.cpp,$(OBJDIR)/%.o,$(wildcard $1/$(sub)/*.cpp))) +# Collect object files from a collection of src subdirs +# $(call OBJ_FROM,srcdir,subdir) +OBJECTS_1 = $(patsubst $1/$2/%.cpp,$(OBJDIR)/$2/%.o,$(wildcard $1/$2/*.cpp)) +OBJECTS = $(foreach src,$(SRCDIRS),$(foreach sub,$1,$(call OBJECTS_1,$(src),$(sub)))) + # $(call LIBFILE,name,version) LIBFILE =$(CURDIR)/$(LIBDIR)/libqpid_$1.so.$2 diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 88dad7aaf9..d671cea9a5 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -38,7 +38,7 @@ Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete, lastUsed(0), exclusive(0) { - if(autodelete) lastUsed = apr_time_as_msec(apr_time_now()); + if(autodelete) lastUsed = Time::now().msecs(); } Queue::~Queue(){ @@ -128,7 +128,7 @@ void Queue::consume(Consumer* c, bool requestExclusive){ void Queue::cancel(Consumer* c){ Locker locker(lock); consumers.erase(find(consumers.begin(), consumers.end(), c)); - if(autodelete && consumers.empty()) lastUsed = apr_time_as_msec(apr_time_now()); + if(autodelete && consumers.empty()) lastUsed = Time::now().msecs(); if(exclusive == c) exclusive = 0; } @@ -161,7 +161,7 @@ u_int32_t Queue::getConsumerCount() const{ bool Queue::canAutoDelete() const{ Locker locker(lock); - return lastUsed && ((apr_time_as_msec(apr_time_now()) - lastUsed) > autodelete); + return lastUsed && (Time::now().msecs() - lastUsed > autodelete); } void Queue::enqueue(Message::shared_ptr& msg, const string * const xid){ diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index f954e48c20..edc7c99b4f 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -21,13 +21,13 @@ #include #include #include -#include "apr-1/apr_time.h" #include "qpid/framing/amqp_types.h" #include "qpid/broker/Binding.h" #include "qpid/broker/ConnectionToken.h" #include "qpid/broker/Consumer.h" #include "qpid/broker/Message.h" #include "qpid/concurrent/Monitor.h" +#include "qpid/concurrent/Time.h" namespace qpid { namespace broker { @@ -57,7 +57,7 @@ namespace qpid { bool dispatching; int next; mutable qpid::concurrent::Monitor lock; - apr_time_t lastUsed; + int64_t lastUsed; Consumer* exclusive; bool startDispatching(); diff --git a/cpp/src/qpid/concurrent/APRBase.cpp b/cpp/src/qpid/concurrent/APRBase.cpp deleted file mode 100644 index 514c4d1048..0000000000 --- a/cpp/src/qpid/concurrent/APRBase.cpp +++ /dev/null @@ -1,96 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include -#include "qpid/concurrent/APRBase.h" -#include "qpid/QpidError.h" - -using namespace qpid::concurrent; - -APRBase* APRBase::instance = 0; - -APRBase* APRBase::getInstance(){ - if(instance == 0){ - instance = new APRBase(); - } - return instance; -} - - -APRBase::APRBase() : count(0){ - apr_initialize(); - CHECK_APR_SUCCESS(apr_pool_create(&pool, 0)); - CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool)); -} - -APRBase::~APRBase(){ - CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); - apr_pool_destroy(pool); - apr_terminate(); -} - -bool APRBase::_increment(){ - bool deleted(false); - CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); - if(this == instance){ - count++; - }else{ - deleted = true; - } - CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); - return !deleted; -} - -void APRBase::_decrement(){ - APRBase* copy = 0; - CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); - if(--count == 0){ - copy = instance; - instance = 0; - } - CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); - if(copy != 0){ - delete copy; - } -} - -void APRBase::increment(){ - int count = 0; - while(count++ < 2 && !getInstance()->_increment()){ - std::cout << "WARNING: APR initialization triggered concurrently with termination." << std::endl; - } -} - -void APRBase::decrement(){ - getInstance()->_decrement(); -} - -void qpid::concurrent::check(apr_status_t status, const std::string& file, const int line){ - if (status != APR_SUCCESS){ - const int size = 50; - char tmp[size]; - std::string msg(apr_strerror(status, tmp, size)); - throw QpidError(APR_ERROR + ((int) status), msg, file, line); - } -} - -std::string qpid::concurrent::get_desc(apr_status_t status){ - const int size = 50; - char tmp[size]; - return std::string(apr_strerror(status, tmp, size)); -} - diff --git a/cpp/src/qpid/concurrent/APRBase.h b/cpp/src/qpid/concurrent/APRBase.h deleted file mode 100644 index f3ff0f89c1..0000000000 --- a/cpp/src/qpid/concurrent/APRBase.h +++ /dev/null @@ -1,63 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRBase_ -#define _APRBase_ - -#include -#include "apr-1/apr_thread_mutex.h" -#include "apr-1/apr_errno.h" - -namespace qpid { -namespace concurrent { - - /** - * Use of APR libraries necessitates explicit init and terminate - * calls. Any class using APR libs should obtain the reference to - * this singleton and increment on construction, decrement on - * destruction. This class can then correctly initialise apr - * before the first use and terminate after the last use. - */ - class APRBase{ - static APRBase* instance; - apr_pool_t* pool; - apr_thread_mutex_t* mutex; - int count; - - APRBase(); - ~APRBase(); - static APRBase* getInstance(); - bool _increment(); - void _decrement(); - public: - static void increment(); - static void decrement(); - }; - - //this is also a convenient place for a helper function for error checking: - void check(apr_status_t status, const std::string& file, const int line); - std::string get_desc(apr_status_t status); - -#define CHECK_APR_SUCCESS(A) check(A, __FILE__, __LINE__); - -} -} - - - - -#endif diff --git a/cpp/src/qpid/concurrent/Monitor.cpp b/cpp/src/qpid/concurrent/Monitor.cpp deleted file mode 100644 index ae68cf8751..0000000000 --- a/cpp/src/qpid/concurrent/Monitor.cpp +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/concurrent/APRBase.h" -#include "qpid/concurrent/Monitor.h" -#include - -qpid::concurrent::Monitor::Monitor(){ - APRBase::increment(); - CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); - CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool)); - CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, pool)); -} - -qpid::concurrent::Monitor::~Monitor(){ - CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); - CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); - apr_pool_destroy(pool); - APRBase::decrement(); -} - -void qpid::concurrent::Monitor::wait(){ - CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex)); -} - - -void qpid::concurrent::Monitor::wait(u_int64_t time){ - apr_status_t status = apr_thread_cond_timedwait(condition, mutex, time * 1000); - if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status); -} - -void qpid::concurrent::Monitor::notify(){ - CHECK_APR_SUCCESS(apr_thread_cond_signal(condition)); -} - -void qpid::concurrent::Monitor::notifyAll(){ - CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); -} - -void qpid::concurrent::Monitor::acquire(){ - CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); -} - -void qpid::concurrent::Monitor::release(){ - CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); -} diff --git a/cpp/src/qpid/concurrent/Monitor.h b/cpp/src/qpid/concurrent/Monitor.h deleted file mode 100644 index a2777cb2f1..0000000000 --- a/cpp/src/qpid/concurrent/Monitor.h +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _Monitor_ -#define _Monitor_ - -#include "apr-1/apr_thread_mutex.h" -#include "apr-1/apr_thread_cond.h" -#include "qpid/concurrent/Monitor.h" - -namespace qpid { -namespace concurrent { - -class Monitor -{ - apr_pool_t* pool; - apr_thread_mutex_t* mutex; - apr_thread_cond_t* condition; - - public: - Monitor(); - virtual ~Monitor(); - virtual void wait(); - virtual void wait(u_int64_t time); - virtual void notify(); - virtual void notifyAll(); - virtual void acquire(); - virtual void release(); -}; - -class Locker -{ - public: - Locker(Monitor& monitor_) : monitor(monitor_) { monitor.acquire(); } - ~Locker() { monitor.release(); } - private: - Monitor& monitor; -}; -}} - - -#endif diff --git a/cpp/src/qpid/concurrent/Thread.cpp b/cpp/src/qpid/concurrent/Thread.cpp deleted file mode 100644 index 9bbc2f8131..0000000000 --- a/cpp/src/qpid/concurrent/Thread.cpp +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/concurrent/APRBase.h" -#include "qpid/concurrent/Thread.h" -#include "apr-1/apr_portable.h" - -using namespace qpid::concurrent; - -void* APR_THREAD_FUNC ExecRunnable(apr_thread_t* thread, void *data){ - ((Runnable*) data)->run(); - CHECK_APR_SUCCESS(apr_thread_exit(thread, APR_SUCCESS)); - return NULL; -} - -Thread::Thread(apr_pool_t* _pool, Runnable* _runnable) : runnable(_runnable), pool(_pool), runner(0) {} - -Thread::~Thread(){ -} - -void Thread::start(){ - CHECK_APR_SUCCESS(apr_thread_create(&runner, NULL, ExecRunnable,(void*) runnable, pool)); -} - -void Thread::join(){ - apr_status_t status; - if (runner) CHECK_APR_SUCCESS(apr_thread_join(&status, runner)); -} - -void Thread::interrupt(){ - if (runner) CHECK_APR_SUCCESS(apr_thread_exit(runner, APR_SUCCESS)); -} - -unsigned int qpid::concurrent::Thread::currentThread(){ - return apr_os_thread_current(); -} diff --git a/cpp/src/qpid/concurrent/Thread.h b/cpp/src/qpid/concurrent/Thread.h deleted file mode 100644 index d18bc153bf..0000000000 --- a/cpp/src/qpid/concurrent/Thread.h +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _Thread_ -#define _Thread_ - -#include "apr-1/apr_thread_proc.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/Runnable.h" -#include "qpid/concurrent/Thread.h" - -namespace qpid { -namespace concurrent { - - class Thread - { - const Runnable* runnable; - apr_pool_t* pool; - apr_thread_t* runner; - - public: - Thread(apr_pool_t* pool, Runnable* runnable); - virtual ~Thread(); - virtual void start(); - virtual void join(); - virtual void interrupt(); - static unsigned int currentThread(); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/ThreadFactory.cpp b/cpp/src/qpid/concurrent/ThreadFactory.cpp deleted file mode 100644 index b20f9f2b04..0000000000 --- a/cpp/src/qpid/concurrent/ThreadFactory.cpp +++ /dev/null @@ -1,35 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/concurrent/APRBase.h" -#include "qpid/concurrent/ThreadFactory.h" - -using namespace qpid::concurrent; - -ThreadFactory::ThreadFactory(){ - APRBase::increment(); - CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); -} - -ThreadFactory::~ThreadFactory(){ - apr_pool_destroy(pool); - APRBase::decrement(); -} - -Thread* ThreadFactory::create(Runnable* runnable){ - return new Thread(pool, runnable); -} diff --git a/cpp/src/qpid/concurrent/ThreadFactory.h b/cpp/src/qpid/concurrent/ThreadFactory.h deleted file mode 100644 index 572419cae6..0000000000 --- a/cpp/src/qpid/concurrent/ThreadFactory.h +++ /dev/null @@ -1,44 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _ThreadFactory_ -#define _ThreadFactory_ - -#include "apr-1/apr_thread_proc.h" - -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/concurrent/Runnable.h" - -namespace qpid { -namespace concurrent { - - class ThreadFactory - { - apr_pool_t* pool; - public: - ThreadFactory(); - virtual ~ThreadFactory(); - virtual Thread* create(Runnable* runnable); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/ThreadPool.cpp b/cpp/src/qpid/concurrent/ThreadPool.cpp deleted file mode 100644 index 5da19745a7..0000000000 --- a/cpp/src/qpid/concurrent/ThreadPool.cpp +++ /dev/null @@ -1,83 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/concurrent/ThreadPool.h" -#include "qpid/QpidError.h" -#include - -using namespace qpid::concurrent; - -ThreadPool::ThreadPool(int _size) : deleteFactory(true), size(_size), factory(new ThreadFactory()), running(false){ - worker = new Worker(this); -} - -ThreadPool::ThreadPool(int _size, ThreadFactory* _factory) : deleteFactory(false), size(_size), factory(_factory), running(false){ - worker = new Worker(this); -} - -ThreadPool::~ThreadPool(){ - if(deleteFactory) delete factory; -} - -void ThreadPool::addTask(Runnable* task){ - lock.acquire(); - tasks.push(task); - lock.notifyAll(); - lock.release(); -} - -void ThreadPool::runTask(){ - lock.acquire(); - while(tasks.empty()){ - lock.wait(); - } - Runnable* task = tasks.front(); - tasks.pop(); - lock.release(); - try{ - task->run(); - }catch(qpid::QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; - } -} - -void ThreadPool::start(){ - if(!running){ - running = true; - for(int i = 0; i < size; i++){ - Thread* t = factory->create(worker); - t->start(); - threads.push_back(t); - } - } -} - -void ThreadPool::stop(){ - if(!running){ - running = false; - lock.acquire(); - lock.notifyAll(); - lock.release(); - for(int i = 0; i < size; i++){ - threads[i]->join(); - delete threads[i]; - } - } -} - - diff --git a/cpp/src/qpid/concurrent/ThreadPool.h b/cpp/src/qpid/concurrent/ThreadPool.h deleted file mode 100644 index 11f0cc364f..0000000000 --- a/cpp/src/qpid/concurrent/ThreadPool.h +++ /dev/null @@ -1,67 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _ThreadPool_ -#define _ThreadPool_ - -#include -#include -#include "qpid/concurrent/Monitor.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/concurrent/ThreadPool.h" -#include "qpid/concurrent/Runnable.h" - -namespace qpid { -namespace concurrent { - - class ThreadPool - { - class Worker : public virtual Runnable{ - ThreadPool* pool; - public: - inline Worker(ThreadPool* _pool) : pool(_pool){} - inline virtual void run(){ - while(pool->running){ - pool->runTask(); - } - } - }; - const bool deleteFactory; - const int size; - ThreadFactory* factory; - Monitor lock; - std::vector threads; - std::queue tasks; - Worker* worker; - volatile bool running; - - void runTask(); - public: - ThreadPool(int size); - ThreadPool(int size, ThreadFactory* factory); - virtual void start(); - virtual void stop(); - virtual void addTask(Runnable* task); - virtual ~ThreadPool(); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/Time.h b/cpp/src/qpid/concurrent/Time.h new file mode 100644 index 0000000000..ec64ce8a85 --- /dev/null +++ b/cpp/src/qpid/concurrent/Time.h @@ -0,0 +1,52 @@ +#ifndef _concurrent_Time_h +#define _concurrent_Time_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +namespace qpid { +namespace concurrent { + +/** + * Time since the epoch. + */ +class Time +{ + public: + static const int64_t NANOS = 1000000000; + static const int64_t MICROS = 1000000; + static const int64_t MILLIS = 1000; + + static Time now(); + + Time(int64_t nsecs_) : ticks(nsecs_) {} + + int64_t nsecs() const { return ticks; } + int64_t usecs() const { return nsecs()/1000; } + int64_t msecs() const { return usecs()/1000; } + int64_t secs() const { return msecs()/1000; } + + private: + int64_t ticks; +}; + +}} + +#endif /*!_concurrent_Time_h*/ diff --git a/cpp/src/qpid/io/APRPool.cpp b/cpp/src/qpid/io/APRPool.cpp deleted file mode 100644 index edd434f16c..0000000000 --- a/cpp/src/qpid/io/APRPool.cpp +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "APRPool.h" -#include "qpid/concurrent/APRBase.h" -#include - -using namespace qpid::io; -using namespace qpid::concurrent; - -APRPool::APRPool(){ - APRBase::increment(); - CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); -} - -APRPool::~APRPool(){ - apr_pool_destroy(pool); - APRBase::decrement(); -} - -apr_pool_t* APRPool::get() { - return boost::details::pool::singleton_default::instance().pool; -} - diff --git a/cpp/src/qpid/io/APRPool.h b/cpp/src/qpid/io/APRPool.h deleted file mode 100644 index 063eedf1ee..0000000000 --- a/cpp/src/qpid/io/APRPool.h +++ /dev/null @@ -1,47 +0,0 @@ -#ifndef _APRPool_ -#define _APRPool_ - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include -#include - -namespace qpid { -namespace io { -/** - * Singleton APR memory pool. - */ -class APRPool : private boost::noncopyable { - public: - APRPool(); - ~APRPool(); - - /** Get singleton instance */ - static apr_pool_t* get(); - - private: - apr_pool_t* pool; -}; - -}} - - - - - -#endif /*!_APRPool_*/ diff --git a/cpp/src/qpid/io/APRSocket.cpp b/cpp/src/qpid/io/APRSocket.cpp deleted file mode 100644 index 824c376c3b..0000000000 --- a/cpp/src/qpid/io/APRSocket.cpp +++ /dev/null @@ -1,76 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/concurrent/APRBase.h" -#include "qpid/io/APRSocket.h" -#include -#include - -using namespace qpid::io; -using namespace qpid::framing; -using namespace qpid::concurrent; - -APRSocket::APRSocket(apr_socket_t* _socket) : socket(_socket), closed(false){ - -} - -void APRSocket::read(qpid::framing::Buffer& buffer){ - apr_size_t bytes; - bytes = buffer.available(); - apr_status_t s = apr_socket_recv(socket, buffer.start(), &bytes); - buffer.move(bytes); - if(APR_STATUS_IS_TIMEUP(s)){ - //timed out - }else if(APR_STATUS_IS_EOF(s)){ - close(); - } -} - -void APRSocket::write(qpid::framing::Buffer& buffer){ - apr_size_t bytes; - do{ - bytes = buffer.available(); - apr_socket_send(socket, buffer.start(), &bytes); - buffer.move(bytes); - }while(bytes > 0); -} - -void APRSocket::close(){ - if(!closed){ - std::cout << "Closing socket " << socket << "@" << this << std::endl; - CHECK_APR_SUCCESS(apr_socket_close(socket)); - closed = true; - } -} - -bool APRSocket::isOpen(){ - return !closed; -} - -u_int8_t APRSocket::read(){ - char data[1]; - apr_size_t bytes = 1; - apr_status_t s = apr_socket_recv(socket, data, &bytes); - if(APR_STATUS_IS_EOF(s) || bytes == 0){ - return 0; - }else{ - return *data; - } -} - -APRSocket::~APRSocket(){ -} diff --git a/cpp/src/qpid/io/APRSocket.h b/cpp/src/qpid/io/APRSocket.h deleted file mode 100644 index 0b6644dfb6..0000000000 --- a/cpp/src/qpid/io/APRSocket.h +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRSocket_ -#define _APRSocket_ - -#include "apr-1/apr_network_io.h" -#include "qpid/framing/Buffer.h" - -namespace qpid { -namespace io { - - class APRSocket - { - apr_socket_t* const socket; - volatile bool closed; - public: - APRSocket(apr_socket_t* socket); - void read(qpid::framing::Buffer& b); - void write(qpid::framing::Buffer& b); - void close(); - bool isOpen(); - u_int8_t read(); - ~APRSocket(); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/io/Acceptor.cpp b/cpp/src/qpid/io/Acceptor.cpp deleted file mode 100644 index f95d9448cf..0000000000 --- a/cpp/src/qpid/io/Acceptor.cpp +++ /dev/null @@ -1,78 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/io/Acceptor.h" -#include "qpid/concurrent/APRBase.h" -#include "APRPool.h" - -using namespace qpid::concurrent; -using namespace qpid::io; - -Acceptor::Acceptor(int16_t port_, int backlog, int threads) : - port(port_), - processor(APRPool::get(), threads, 1000, 5000000) -{ - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get())); - CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get())); - CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1)); - CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); - CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog)); -} - -int16_t Acceptor::getPort() const { - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); - return address->port; -} - -void Acceptor::run(SessionHandlerFactory* factory) { - running = true; - processor.start(); - std::cout << "Listening on port " << getPort() << "..." << std::endl; - while(running){ - apr_socket_t* client; - apr_status_t status = apr_socket_accept(&client, socket, APRPool::get()); - if(status == APR_SUCCESS){ - //make this socket non-blocking: - CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); - LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, false); - session->init(factory->create(session)); - }else{ - running = false; - if(status != APR_EINTR){ - std::cout << "ERROR: " << get_desc(status) << std::endl; - } - } - } - shutdown(); -} - -void Acceptor::shutdown() { - // TODO aconway 2006-10-12: Cleanup, this is not thread safe. - if (running) { - running = false; - processor.stop(); - CHECK_APR_SUCCESS(apr_socket_close(socket)); - } -} - - diff --git a/cpp/src/qpid/io/Acceptor.h b/cpp/src/qpid/io/Acceptor.h deleted file mode 100644 index bc189f7f6e..0000000000 --- a/cpp/src/qpid/io/Acceptor.h +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _LFAcceptor_ -#define _LFAcceptor_ - -#include "apr-1/apr_network_io.h" -#include "apr-1/apr_poll.h" -#include "apr-1/apr_time.h" - -#include "qpid/io/Acceptor.h" -#include "qpid/concurrent/Monitor.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/concurrent/ThreadPool.h" -#include "qpid/io/LFProcessor.h" -#include "qpid/io/LFSessionContext.h" -#include "qpid/concurrent/Runnable.h" -#include "qpid/io/SessionContext.h" -#include "qpid/io/SessionHandlerFactory.h" -#include "qpid/concurrent/Thread.h" -#include - -namespace qpid { -namespace io { - -/** APR Acceptor. */ -class Acceptor : public qpid::SharedObject -{ - public: - Acceptor(int16_t port, int backlog, int threads); - virtual int16_t getPort() const; - virtual void run(SessionHandlerFactory* factory); - virtual void shutdown(); - - private: - int16_t port; - LFProcessor processor; - apr_socket_t* socket; - volatile bool running; -}; - -} -} - - -#endif diff --git a/cpp/src/qpid/io/Connector.cpp b/cpp/src/qpid/io/Connector.cpp deleted file mode 100644 index ca487deb86..0000000000 --- a/cpp/src/qpid/io/Connector.cpp +++ /dev/null @@ -1,201 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include -#include "qpid/concurrent/APRBase.h" -#include "qpid/io/Connector.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/QpidError.h" - -using namespace qpid::io; -using namespace qpid::concurrent; -using namespace qpid::framing; -using qpid::QpidError; - -Connector::Connector(bool _debug, u_int32_t buffer_size) : - debug(_debug), - receive_buffer_size(buffer_size), - send_buffer_size(buffer_size), - closed(true), - lastIn(0), lastOut(0), - timeout(0), - idleIn(0), idleOut(0), - timeoutHandler(0), - shutdownHandler(0), - inbuf(receive_buffer_size), - outbuf(send_buffer_size){ - - APRBase::increment(); - - CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); - CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool)); - - threadFactory = new ThreadFactory(); - writeLock = new Monitor(); -} - -Connector::~Connector(){ - delete receiver; - delete writeLock; - delete threadFactory; - apr_pool_destroy(pool); - - APRBase::decrement(); -} - -void Connector::connect(const std::string& host, int port){ - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool)); - CHECK_APR_SUCCESS(apr_socket_connect(socket, address)); - closed = false; - - receiver = threadFactory->create(this); - receiver->start(); -} - -void Connector::init(ProtocolInitiation* header){ - writeBlock(header); - delete header; -} - -void Connector::close(){ - closed = true; - CHECK_APR_SUCCESS(apr_socket_close(socket)); - receiver->join(); -} - -void Connector::setInputHandler(InputHandler* handler){ - input = handler; -} - -void Connector::setShutdownHandler(ShutdownHandler* handler){ - shutdownHandler = handler; -} - -OutputHandler* Connector::getOutputHandler(){ - return this; -} - -void Connector::send(AMQFrame* frame){ - writeBlock(frame); - if(debug) std::cout << "SENT: " << *frame << std::endl; - delete frame; -} - -void Connector::writeBlock(AMQDataBlock* data){ - writeLock->acquire(); - data->encode(outbuf); - - //transfer data to wire - outbuf.flip(); - writeToSocket(outbuf.start(), outbuf.available()); - outbuf.clear(); - writeLock->release(); -} - -void Connector::writeToSocket(char* data, size_t available){ - apr_size_t bytes(available); - apr_size_t written(0); - while(written < available && !closed){ - apr_status_t status = apr_socket_send(socket, data + written, &bytes); - if(status == APR_TIMEUP){ - std::cout << "Write request timed out." << std::endl; - } - if(bytes == 0){ - std::cout << "Write request wrote 0 bytes." << std::endl; - } - lastOut = apr_time_as_msec(apr_time_now()); - written += bytes; - bytes = available - written; - } -} - -void Connector::checkIdle(apr_status_t status){ - if(timeoutHandler){ - apr_time_t now = apr_time_as_msec(apr_time_now()); - if(APR_STATUS_IS_TIMEUP(status)){ - if(idleIn && (now - lastIn > idleIn)){ - timeoutHandler->idleIn(); - } - }else if(APR_STATUS_IS_EOF(status)){ - closed = true; - CHECK_APR_SUCCESS(apr_socket_close(socket)); - if(shutdownHandler) shutdownHandler->shutdown(); - }else{ - lastIn = now; - } - if(idleOut && (now - lastOut > idleOut)){ - timeoutHandler->idleOut(); - } - } -} - -void Connector::setReadTimeout(u_int16_t t){ - idleIn = t * 1000;//t is in secs - if(idleIn && (!timeout || idleIn < timeout)){ - timeout = idleIn; - setSocketTimeout(); - } - -} - -void Connector::setWriteTimeout(u_int16_t t){ - idleOut = t * 1000;//t is in secs - if(idleOut && (!timeout || idleOut < timeout)){ - timeout = idleOut; - setSocketTimeout(); - } -} - -void Connector::setSocketTimeout(){ - //interval is in microseconds, timeout in milliseconds - //want the interval to be a bit shorter than the timeout, hence multiply - //by 800 rather than 1000. - apr_interval_time_t interval(timeout * 800); - apr_socket_timeout_set(socket, interval); -} - -void Connector::setTimeoutHandler(TimeoutHandler* handler){ - timeoutHandler = handler; -} - -void Connector::run(){ - try{ - while(!closed){ - apr_size_t bytes(inbuf.available()); - if(bytes < 1){ - THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); - } - checkIdle(apr_socket_recv(socket, inbuf.start(), &bytes)); - - if(bytes > 0){ - inbuf.move(bytes); - inbuf.flip();//position = 0, limit = total data read - - AMQFrame frame; - while(frame.decode(inbuf)){ - if(debug) std::cout << "RECV: " << frame << std::endl; - input->received(&frame); - } - //need to compact buffer to preserve any 'extra' data - inbuf.compact(); - } - } - }catch(QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; - } -} diff --git a/cpp/src/qpid/io/Connector.h b/cpp/src/qpid/io/Connector.h deleted file mode 100644 index 7c52f7e87b..0000000000 --- a/cpp/src/qpid/io/Connector.h +++ /dev/null @@ -1,95 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _Connector_ -#define _Connector_ - -#include "apr-1/apr_network_io.h" -#include "apr-1/apr_time.h" - -#include "qpid/framing/InputHandler.h" -#include "qpid/framing/OutputHandler.h" -#include "qpid/framing/InitiationHandler.h" -#include "qpid/framing/ProtocolInitiation.h" -#include "qpid/io/ShutdownHandler.h" -#include "qpid/io/TimeoutHandler.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/io/Connector.h" -#include "qpid/concurrent/Monitor.h" - -namespace qpid { -namespace io { - - class Connector : public virtual qpid::framing::OutputHandler, - private virtual qpid::concurrent::Runnable - { - const bool debug; - const int receive_buffer_size; - const int send_buffer_size; - - bool closed; - - apr_time_t lastIn; - apr_time_t lastOut; - apr_interval_time_t timeout; - u_int32_t idleIn; - u_int32_t idleOut; - - TimeoutHandler* timeoutHandler; - ShutdownHandler* shutdownHandler; - qpid::framing::InputHandler* input; - qpid::framing::InitiationHandler* initialiser; - qpid::framing::OutputHandler* output; - - qpid::framing::Buffer inbuf; - qpid::framing::Buffer outbuf; - - qpid::concurrent::Monitor* writeLock; - qpid::concurrent::ThreadFactory* threadFactory; - qpid::concurrent::Thread* receiver; - - apr_pool_t* pool; - apr_socket_t* socket; - - void checkIdle(apr_status_t status); - void writeBlock(qpid::framing::AMQDataBlock* data); - void writeToSocket(char* data, size_t available); - void setSocketTimeout(); - - void run(); - - public: - Connector(bool debug = false, u_int32_t buffer_size = 1024); - virtual ~Connector(); - virtual void connect(const std::string& host, int port); - virtual void init(qpid::framing::ProtocolInitiation* header); - virtual void close(); - virtual void setInputHandler(qpid::framing::InputHandler* handler); - virtual void setTimeoutHandler(TimeoutHandler* handler); - virtual void setShutdownHandler(ShutdownHandler* handler); - virtual qpid::framing::OutputHandler* getOutputHandler(); - virtual void send(qpid::framing::AMQFrame* frame); - virtual void setReadTimeout(u_int16_t timeout); - virtual void setWriteTimeout(u_int16_t timeout); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/io/LFProcessor.cpp b/cpp/src/qpid/io/LFProcessor.cpp deleted file mode 100644 index dabbdbecae..0000000000 --- a/cpp/src/qpid/io/LFProcessor.cpp +++ /dev/null @@ -1,193 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/io/LFProcessor.h" -#include "qpid/concurrent/APRBase.h" -#include "qpid/io/LFSessionContext.h" -#include "qpid/QpidError.h" -#include - -using namespace qpid::io; -using namespace qpid::concurrent; -using qpid::QpidError; - -// TODO aconway 2006-10-12: stopped is read outside locks. -// - -LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) : - size(_size), - timeout(_timeout), - signalledCount(0), - current(0), - count(0), - workerCount(_workers), - hasLeader(false), - workers(new Thread*[_workers]), - stopped(false) -{ - - CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE)); - //create & start the required number of threads - for(int i = 0; i < workerCount; i++){ - workers[i] = factory.create(this); - } -} - - -LFProcessor::~LFProcessor(){ - if (!stopped) stop(); - for(int i = 0; i < workerCount; i++){ - delete workers[i]; - } - delete[] workers; - CHECK_APR_SUCCESS(apr_pollset_destroy(pollset)); -} - -void LFProcessor::start(){ - for(int i = 0; i < workerCount; i++){ - workers[i]->start(); - } -} - -void LFProcessor::add(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); - countLock.acquire(); - sessions.push_back(reinterpret_cast(fd->client_data)); - count++; - countLock.release(); -} - -void LFProcessor::remove(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); - countLock.acquire(); - sessions.erase(find(sessions.begin(), sessions.end(), reinterpret_cast(fd->client_data))); - count--; - countLock.release(); -} - -void LFProcessor::reactivate(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); -} - -void LFProcessor::deactivate(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); -} - -void LFProcessor::update(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); - CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); -} - -bool LFProcessor::full(){ - Locker locker(countLock); - return count == size; -} - -bool LFProcessor::empty(){ - Locker locker(countLock); - return count == 0; -} - -void LFProcessor::poll() { - apr_status_t status = APR_EGENERAL; - do{ - current = 0; - if(!stopped){ - status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs); - } - }while(status != APR_SUCCESS && !stopped); -} - -void LFProcessor::run(){ - try{ - while(!stopped){ - leadLock.acquire(); - waitToLead(); - if(!stopped){ - const apr_pollfd_t* evt = getNextEvent(); - if(evt){ - LFSessionContext* session = reinterpret_cast(evt->client_data); - session->startProcessing(); - - relinquishLead(); - leadLock.release(); - - //process event: - if(evt->rtnevents & APR_POLLIN) session->read(); - if(evt->rtnevents & APR_POLLOUT) session->write(); - - if(session->isClosed()){ - session->handleClose(); - countLock.acquire(); - sessions.erase(find(sessions.begin(), sessions.end(), session)); - count--; - countLock.release(); - }else{ - session->stopProcessing(); - } - - }else{ - leadLock.release(); - } - }else{ - leadLock.release(); - } - } - }catch(QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; - } -} - -void LFProcessor::waitToLead(){ - while(hasLeader && !stopped) leadLock.wait(); - hasLeader = !stopped; -} - -void LFProcessor::relinquishLead(){ - hasLeader = false; - leadLock.notify(); -} - -const apr_pollfd_t* LFProcessor::getNextEvent(){ - while(true){ - if(stopped){ - return 0; - }else if(current < signalledCount){ - //use result of previous poll if one is available - return signalledFDs + (current++); - }else{ - //else poll to get new events - poll(); - } - } -} - -void LFProcessor::stop(){ - stopped = true; - leadLock.acquire(); - leadLock.notifyAll(); - leadLock.release(); - - for(int i = 0; i < workerCount; i++){ - workers[i]->join(); - } - - for(iterator i = sessions.begin(); i < sessions.end(); i++){ - (*i)->shutdown(); - } -} - diff --git a/cpp/src/qpid/io/LFProcessor.h b/cpp/src/qpid/io/LFProcessor.h deleted file mode 100644 index 5b61f444af..0000000000 --- a/cpp/src/qpid/io/LFProcessor.h +++ /dev/null @@ -1,119 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _LFProcessor_ -#define _LFProcessor_ - -#include "apr-1/apr_poll.h" -#include -#include -#include "qpid/concurrent/Monitor.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/concurrent/Runnable.h" - -namespace qpid { -namespace io { - - class LFSessionContext; - - /** - * This class processes a poll set using the leaders-followers - * pattern for thread synchronization: the leader will poll and on - * the poll returning, it will remove a session, promote a - * follower to leadership, then process the session. - */ - class LFProcessor : private virtual qpid::concurrent::Runnable - { - typedef std::vector::iterator iterator; - - const int size; - const apr_interval_time_t timeout; - apr_pollset_t* pollset; - int signalledCount; - int current; - const apr_pollfd_t* signalledFDs; - int count; - const int workerCount; - bool hasLeader; - qpid::concurrent::Thread** const workers; - qpid::concurrent::Monitor leadLock; - qpid::concurrent::Monitor countLock; - qpid::concurrent::ThreadFactory factory; - std::vector sessions; - volatile bool stopped; - - const apr_pollfd_t* getNextEvent(); - void waitToLead(); - void relinquishLead(); - void poll(); - virtual void run(); - - public: - LFProcessor(apr_pool_t* pool, int workers, int size, int timeout); - /** - * Add the fd to the poll set. Relies on the client_data being - * an instance of LFSessionContext. - */ - void add(const apr_pollfd_t* const fd); - /** - * Remove the fd from the poll set. - */ - void remove(const apr_pollfd_t* const fd); - /** - * Signal that the fd passed in, already part of the pollset, - * has had its flags altered. - */ - void update(const apr_pollfd_t* const fd); - /** - * Add an fd back to the poll set after deactivation. - */ - void reactivate(const apr_pollfd_t* const fd); - /** - * Temporarily remove the fd from the poll set. Called when processing - * is about to begin. - */ - void deactivate(const apr_pollfd_t* const fd); - /** - * Indicates whether the capacity of this processor has been - * reached (or whether it can still handle further fd's). - */ - bool full(); - /** - * Indicates whether there are any fd's registered. - */ - bool empty(); - /** - * Stop processing. - */ - void stop(); - /** - * Start processing. - */ - void start(); - /** - * Is processing stopped? - */ - bool isStopped(); - - ~LFProcessor(); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/io/LFSessionContext.cpp b/cpp/src/qpid/io/LFSessionContext.cpp deleted file mode 100644 index ca1e6431a6..0000000000 --- a/cpp/src/qpid/io/LFSessionContext.cpp +++ /dev/null @@ -1,189 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/io/LFSessionContext.h" -#include "qpid/concurrent/APRBase.h" -#include "qpid/QpidError.h" -#include - -using namespace qpid::concurrent; -using namespace qpid::io; -using namespace qpid::framing; - -LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket, - LFProcessor* const _processor, - bool _debug) : - debug(_debug), - socket(_socket), - initiated(false), - in(32768), - out(32768), - processor(_processor), - processing(false), - closing(false), - reading(0), - writing(0) -{ - - fd.p = _pool; - fd.desc_type = APR_POLL_SOCKET; - fd.reqevents = APR_POLLIN; - fd.client_data = this; - fd.desc.s = _socket; - - out.flip(); -} - -LFSessionContext::~LFSessionContext(){ - -} - -void LFSessionContext::read(){ - assert(!reading); // No concurrent read. - reading = Thread::currentThread(); - - socket.read(in); - in.flip(); - if(initiated){ - AMQFrame frame; - while(frame.decode(in)){ - if(debug) log("RECV", &frame); - handler->received(&frame); - } - }else{ - ProtocolInitiation protocolInit; - if(protocolInit.decode(in)){ - handler->initiated(&protocolInit); - initiated = true; - if(debug) std::cout << "INIT [" << &socket << "]" << std::endl; - } - } - in.compact(); - - reading = 0; -} - -void LFSessionContext::write(){ - assert(!writing); // No concurrent writes. - writing = Thread::currentThread(); - - bool done = isClosed(); - while(!done){ - if(out.available() > 0){ - socket.write(out); - if(out.available() > 0){ - writing = 0; - - //incomplete write, leave flags to receive notification of readiness to write - done = true;//finished processing for now, but write is still in progress - } - }else{ - //do we have any frames to write? - writeLock.acquire(); - if(!framesToWrite.empty()){ - out.clear(); - bool encoded(false); - AMQFrame* frame = framesToWrite.front(); - while(frame && out.available() >= frame->size()){ - encoded = true; - frame->encode(out); - if(debug) log("SENT", frame); - delete frame; - framesToWrite.pop(); - frame = framesToWrite.empty() ? 0 : framesToWrite.front(); - } - if(!encoded) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer."); - out.flip(); - }else{ - //reset flags, don't care about writability anymore - fd.reqevents = APR_POLLIN; - done = true; - - writing = 0; - - if(closing){ - socket.close(); - } - } - writeLock.release(); - } - } -} - -void LFSessionContext::send(AMQFrame* frame){ - writeLock.acquire(); - if(!closing){ - framesToWrite.push(frame); - if(!(fd.reqevents & APR_POLLOUT)){ - fd.reqevents |= APR_POLLOUT; - if(!processing){ - processor->update(&fd); - } - } - } - writeLock.release(); -} - -void LFSessionContext::startProcessing(){ - writeLock.acquire(); - processing = true; - processor->deactivate(&fd); - writeLock.release(); -} - -void LFSessionContext::stopProcessing(){ - writeLock.acquire(); - processor->reactivate(&fd); - processing = false; - writeLock.release(); -} - -void LFSessionContext::close(){ - closing = true; - writeLock.acquire(); - if(!processing){ - //allow pending frames to be written to socket - fd.reqevents = APR_POLLOUT; - processor->update(&fd); - } - writeLock.release(); -} - -void LFSessionContext::handleClose(){ - handler->closed(); - std::cout << "Session closed [" << &socket << "]" << std::endl; - delete handler; - delete this; -} - -void LFSessionContext::shutdown(){ - socket.close(); - handleClose(); -} - -void LFSessionContext::init(SessionHandler* _handler){ - handler = _handler; - processor->add(&fd); -} - -void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){ - logLock.acquire(); - std::cout << desc << " [" << &socket << "]: " << *frame << std::endl; - logLock.release(); -} - -Monitor LFSessionContext::logLock; diff --git a/cpp/src/qpid/io/LFSessionContext.h b/cpp/src/qpid/io/LFSessionContext.h deleted file mode 100644 index 8d30b54204..0000000000 --- a/cpp/src/qpid/io/LFSessionContext.h +++ /dev/null @@ -1,88 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _LFSessionContext_ -#define _LFSessionContext_ - -#include - -#include "apr-1/apr_network_io.h" -#include "apr-1/apr_poll.h" -#include "apr-1/apr_time.h" - -#include "qpid/framing/AMQFrame.h" -#include "qpid/concurrent/Monitor.h" -#include "qpid/io/APRSocket.h" -#include "qpid/framing/Buffer.h" -#include "qpid/io/LFProcessor.h" -#include "qpid/io/SessionContext.h" -#include "qpid/io/SessionHandler.h" - -namespace qpid { -namespace io { - - - class LFSessionContext : public virtual SessionContext - { - const bool debug; - APRSocket socket; - bool initiated; - - qpid::framing::Buffer in; - qpid::framing::Buffer out; - - SessionHandler* handler; - LFProcessor* const processor; - - apr_pollfd_t fd; - - std::queue framesToWrite; - qpid::concurrent::Monitor writeLock; - - bool processing; - bool closing; - - //these are just for debug, as a crude way of detecting concurrent access - volatile unsigned int reading; - volatile unsigned int writing; - - static qpid::concurrent::Monitor logLock; - void log(const std::string& desc, qpid::framing::AMQFrame* const frame); - - public: - LFSessionContext(apr_pool_t* pool, apr_socket_t* socket, - LFProcessor* const processor, - bool debug = false); - ~LFSessionContext(); - virtual void send(qpid::framing::AMQFrame* frame); - virtual void close(); - void read(); - void write(); - void init(SessionHandler* handler); - void startProcessing(); - void stopProcessing(); - void handleClose(); - void shutdown(); - inline apr_pollfd_t* const getFd(){ return &fd; } - inline bool isClosed(){ return !socket.isOpen(); } - }; - -} -} - - -#endif diff --git a/cpp/src_apr/qpid/concurrent/APRBase.cpp b/cpp/src_apr/qpid/concurrent/APRBase.cpp new file mode 100644 index 0000000000..514c4d1048 --- /dev/null +++ b/cpp/src_apr/qpid/concurrent/APRBase.cpp @@ -0,0 +1,96 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include +#include "qpid/concurrent/APRBase.h" +#include "qpid/QpidError.h" + +using namespace qpid::concurrent; + +APRBase* APRBase::instance = 0; + +APRBase* APRBase::getInstance(){ + if(instance == 0){ + instance = new APRBase(); + } + return instance; +} + + +APRBase::APRBase() : count(0){ + apr_initialize(); + CHECK_APR_SUCCESS(apr_pool_create(&pool, 0)); + CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool)); +} + +APRBase::~APRBase(){ + CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); + apr_pool_destroy(pool); + apr_terminate(); +} + +bool APRBase::_increment(){ + bool deleted(false); + CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); + if(this == instance){ + count++; + }else{ + deleted = true; + } + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); + return !deleted; +} + +void APRBase::_decrement(){ + APRBase* copy = 0; + CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); + if(--count == 0){ + copy = instance; + instance = 0; + } + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); + if(copy != 0){ + delete copy; + } +} + +void APRBase::increment(){ + int count = 0; + while(count++ < 2 && !getInstance()->_increment()){ + std::cout << "WARNING: APR initialization triggered concurrently with termination." << std::endl; + } +} + +void APRBase::decrement(){ + getInstance()->_decrement(); +} + +void qpid::concurrent::check(apr_status_t status, const std::string& file, const int line){ + if (status != APR_SUCCESS){ + const int size = 50; + char tmp[size]; + std::string msg(apr_strerror(status, tmp, size)); + throw QpidError(APR_ERROR + ((int) status), msg, file, line); + } +} + +std::string qpid::concurrent::get_desc(apr_status_t status){ + const int size = 50; + char tmp[size]; + return std::string(apr_strerror(status, tmp, size)); +} + diff --git a/cpp/src_apr/qpid/concurrent/APRBase.h b/cpp/src_apr/qpid/concurrent/APRBase.h new file mode 100644 index 0000000000..f3ff0f89c1 --- /dev/null +++ b/cpp/src_apr/qpid/concurrent/APRBase.h @@ -0,0 +1,63 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _APRBase_ +#define _APRBase_ + +#include +#include "apr-1/apr_thread_mutex.h" +#include "apr-1/apr_errno.h" + +namespace qpid { +namespace concurrent { + + /** + * Use of APR libraries necessitates explicit init and terminate + * calls. Any class using APR libs should obtain the reference to + * this singleton and increment on construction, decrement on + * destruction. This class can then correctly initialise apr + * before the first use and terminate after the last use. + */ + class APRBase{ + static APRBase* instance; + apr_pool_t* pool; + apr_thread_mutex_t* mutex; + int count; + + APRBase(); + ~APRBase(); + static APRBase* getInstance(); + bool _increment(); + void _decrement(); + public: + static void increment(); + static void decrement(); + }; + + //this is also a convenient place for a helper function for error checking: + void check(apr_status_t status, const std::string& file, const int line); + std::string get_desc(apr_status_t status); + +#define CHECK_APR_SUCCESS(A) check(A, __FILE__, __LINE__); + +} +} + + + + +#endif diff --git a/cpp/src_apr/qpid/concurrent/Monitor.cpp b/cpp/src_apr/qpid/concurrent/Monitor.cpp new file mode 100644 index 0000000000..ae68cf8751 --- /dev/null +++ b/cpp/src_apr/qpid/concurrent/Monitor.cpp @@ -0,0 +1,60 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/concurrent/APRBase.h" +#include "qpid/concurrent/Monitor.h" +#include + +qpid::concurrent::Monitor::Monitor(){ + APRBase::increment(); + CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); + CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool)); + CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, pool)); +} + +qpid::concurrent::Monitor::~Monitor(){ + CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); + CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); + apr_pool_destroy(pool); + APRBase::decrement(); +} + +void qpid::concurrent::Monitor::wait(){ + CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex)); +} + + +void qpid::concurrent::Monitor::wait(u_int64_t time){ + apr_status_t status = apr_thread_cond_timedwait(condition, mutex, time * 1000); + if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status); +} + +void qpid::concurrent::Monitor::notify(){ + CHECK_APR_SUCCESS(apr_thread_cond_signal(condition)); +} + +void qpid::concurrent::Monitor::notifyAll(){ + CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); +} + +void qpid::concurrent::Monitor::acquire(){ + CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); +} + +void qpid::concurrent::Monitor::release(){ + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); +} diff --git a/cpp/src_apr/qpid/concurrent/Monitor.h b/cpp/src_apr/qpid/concurrent/Monitor.h new file mode 100644 index 0000000000..a2777cb2f1 --- /dev/null +++ b/cpp/src_apr/qpid/concurrent/Monitor.h @@ -0,0 +1,56 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Monitor_ +#define _Monitor_ + +#include "apr-1/apr_thread_mutex.h" +#include "apr-1/apr_thread_cond.h" +#include "qpid/concurrent/Monitor.h" + +namespace qpid { +namespace concurrent { + +class Monitor +{ + apr_pool_t* pool; + apr_thread_mutex_t* mutex; + apr_thread_cond_t* condition; + + public: + Monitor(); + virtual ~Monitor(); + virtual void wait(); + virtual void wait(u_int64_t time); + virtual void notify(); + virtual void notifyAll(); + virtual void acquire(); + virtual void release(); +}; + +class Locker +{ + public: + Locker(Monitor& monitor_) : monitor(monitor_) { monitor.acquire(); } + ~Locker() { monitor.release(); } + private: + Monitor& monitor; +}; +}} + + +#endif diff --git a/cpp/src_apr/qpid/concurrent/Thread.cpp b/cpp/src_apr/qpid/concurrent/Thread.cpp new file mode 100644 index 0000000000..9bbc2f8131 --- /dev/null +++ b/cpp/src_apr/qpid/concurrent/Thread.cpp @@ -0,0 +1,50 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/concurrent/APRBase.h" +#include "qpid/concurrent/Thread.h" +#include "apr-1/apr_portable.h" + +using namespace qpid::concurrent; + +void* APR_THREAD_FUNC ExecRunnable(apr_thread_t* thread, void *data){ + ((Runnable*) data)->run(); + CHECK_APR_SUCCESS(apr_thread_exit(thread, APR_SUCCESS)); + return NULL; +} + +Thread::Thread(apr_pool_t* _pool, Runnable* _runnable) : runnable(_runnable), pool(_pool), runner(0) {} + +Thread::~Thread(){ +} + +void Thread::start(){ + CHECK_APR_SUCCESS(apr_thread_create(&runner, NULL, ExecRunnable,(void*) runnable, pool)); +} + +void Thread::join(){ + apr_status_t status; + if (runner) CHECK_APR_SUCCESS(apr_thread_join(&status, runner)); +} + +void Thread::interrupt(){ + if (runner) CHECK_APR_SUCCESS(apr_thread_exit(runner, APR_SUCCESS)); +} + +unsigned int qpid::concurrent::Thread::currentThread(){ + return apr_os_thread_current(); +} diff --git a/cpp/src_apr/qpid/concurrent/Thread.h b/cpp/src_apr/qpid/concurrent/Thread.h new file mode 100644 index 0000000000..d18bc153bf --- /dev/null +++ b/cpp/src_apr/qpid/concurrent/Thread.h @@ -0,0 +1,48 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Thread_ +#define _Thread_ + +#include "apr-1/apr_thread_proc.h" +#include "qpid/concurrent/Thread.h" +#include "qpid/concurrent/Runnable.h" +#include "qpid/concurrent/Thread.h" + +namespace qpid { +namespace concurrent { + + class Thread + { + const Runnable* runnable; + apr_pool_t* pool; + apr_thread_t* runner; + + public: + Thread(apr_pool_t* pool, Runnable* runnable); + virtual ~Thread(); + virtual void start(); + virtual void join(); + virtual void interrupt(); + static unsigned int currentThread(); + }; + +} +} + + +#endif diff --git a/cpp/src_apr/qpid/concurrent/ThreadFactory.cpp b/cpp/src_apr/qpid/concurrent/ThreadFactory.cpp new file mode 100644 index 0000000000..b20f9f2b04 --- /dev/null +++ b/cpp/src_apr/qpid/concurrent/ThreadFactory.cpp @@ -0,0 +1,35 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/concurrent/APRBase.h" +#include "qpid/concurrent/ThreadFactory.h" + +using namespace qpid::concurrent; + +ThreadFactory::ThreadFactory(){ + APRBase::increment(); + CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); +} + +ThreadFactory::~ThreadFactory(){ + apr_pool_destroy(pool); + APRBase::decrement(); +} + +Thread* ThreadFactory::create(Runnable* runnable){ + return new Thread(pool, runnable); +} diff --git a/cpp/src_apr/qpid/concurrent/ThreadFactory.h b/cpp/src_apr/qpid/concurrent/ThreadFactory.h new file mode 100644 index 0000000000..572419cae6 --- /dev/null +++ b/cpp/src_apr/qpid/concurrent/ThreadFactory.h @@ -0,0 +1,44 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _ThreadFactory_ +#define _ThreadFactory_ + +#include "apr-1/apr_thread_proc.h" + +#include "qpid/concurrent/Thread.h" +#include "qpid/concurrent/Thread.h" +#include "qpid/concurrent/ThreadFactory.h" +#include "qpid/concurrent/Runnable.h" + +namespace qpid { +namespace concurrent { + + class ThreadFactory + { + apr_pool_t* pool; + public: + ThreadFactory(); + virtual ~ThreadFactory(); + virtual Thread* create(Runnable* runnable); + }; + +} +} + + +#endif diff --git a/cpp/src_apr/qpid/concurrent/ThreadPool.cpp b/cpp/src_apr/qpid/concurrent/ThreadPool.cpp new file mode 100644 index 0000000000..5da19745a7 --- /dev/null +++ b/cpp/src_apr/qpid/concurrent/ThreadPool.cpp @@ -0,0 +1,83 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/concurrent/ThreadFactory.h" +#include "qpid/concurrent/ThreadPool.h" +#include "qpid/QpidError.h" +#include + +using namespace qpid::concurrent; + +ThreadPool::ThreadPool(int _size) : deleteFactory(true), size(_size), factory(new ThreadFactory()), running(false){ + worker = new Worker(this); +} + +ThreadPool::ThreadPool(int _size, ThreadFactory* _factory) : deleteFactory(false), size(_size), factory(_factory), running(false){ + worker = new Worker(this); +} + +ThreadPool::~ThreadPool(){ + if(deleteFactory) delete factory; +} + +void ThreadPool::addTask(Runnable* task){ + lock.acquire(); + tasks.push(task); + lock.notifyAll(); + lock.release(); +} + +void ThreadPool::runTask(){ + lock.acquire(); + while(tasks.empty()){ + lock.wait(); + } + Runnable* task = tasks.front(); + tasks.pop(); + lock.release(); + try{ + task->run(); + }catch(qpid::QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } +} + +void ThreadPool::start(){ + if(!running){ + running = true; + for(int i = 0; i < size; i++){ + Thread* t = factory->create(worker); + t->start(); + threads.push_back(t); + } + } +} + +void ThreadPool::stop(){ + if(!running){ + running = false; + lock.acquire(); + lock.notifyAll(); + lock.release(); + for(int i = 0; i < size; i++){ + threads[i]->join(); + delete threads[i]; + } + } +} + + diff --git a/cpp/src_apr/qpid/concurrent/ThreadPool.h b/cpp/src_apr/qpid/concurrent/ThreadPool.h new file mode 100644 index 0000000000..11f0cc364f --- /dev/null +++ b/cpp/src_apr/qpid/concurrent/ThreadPool.h @@ -0,0 +1,67 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _ThreadPool_ +#define _ThreadPool_ + +#include +#include +#include "qpid/concurrent/Monitor.h" +#include "qpid/concurrent/Thread.h" +#include "qpid/concurrent/ThreadFactory.h" +#include "qpid/concurrent/ThreadPool.h" +#include "qpid/concurrent/Runnable.h" + +namespace qpid { +namespace concurrent { + + class ThreadPool + { + class Worker : public virtual Runnable{ + ThreadPool* pool; + public: + inline Worker(ThreadPool* _pool) : pool(_pool){} + inline virtual void run(){ + while(pool->running){ + pool->runTask(); + } + } + }; + const bool deleteFactory; + const int size; + ThreadFactory* factory; + Monitor lock; + std::vector threads; + std::queue tasks; + Worker* worker; + volatile bool running; + + void runTask(); + public: + ThreadPool(int size); + ThreadPool(int size, ThreadFactory* factory); + virtual void start(); + virtual void stop(); + virtual void addTask(Runnable* task); + virtual ~ThreadPool(); + }; + +} +} + + +#endif diff --git a/cpp/src_apr/qpid/concurrent/Time.cpp b/cpp/src_apr/qpid/concurrent/Time.cpp new file mode 100644 index 0000000000..19f1f30140 --- /dev/null +++ b/cpp/src_apr/qpid/concurrent/Time.cpp @@ -0,0 +1,29 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include + +namespace qpid { +namespace concurrent { + +Time Time::now() { + return Time(apr_time_now()*1000); +} + +}} diff --git a/cpp/src_apr/qpid/io/APRPool.cpp b/cpp/src_apr/qpid/io/APRPool.cpp new file mode 100644 index 0000000000..edd434f16c --- /dev/null +++ b/cpp/src_apr/qpid/io/APRPool.cpp @@ -0,0 +1,39 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "APRPool.h" +#include "qpid/concurrent/APRBase.h" +#include + +using namespace qpid::io; +using namespace qpid::concurrent; + +APRPool::APRPool(){ + APRBase::increment(); + CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); +} + +APRPool::~APRPool(){ + apr_pool_destroy(pool); + APRBase::decrement(); +} + +apr_pool_t* APRPool::get() { + return boost::details::pool::singleton_default::instance().pool; +} + diff --git a/cpp/src_apr/qpid/io/APRPool.h b/cpp/src_apr/qpid/io/APRPool.h new file mode 100644 index 0000000000..063eedf1ee --- /dev/null +++ b/cpp/src_apr/qpid/io/APRPool.h @@ -0,0 +1,47 @@ +#ifndef _APRPool_ +#define _APRPool_ + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include +#include + +namespace qpid { +namespace io { +/** + * Singleton APR memory pool. + */ +class APRPool : private boost::noncopyable { + public: + APRPool(); + ~APRPool(); + + /** Get singleton instance */ + static apr_pool_t* get(); + + private: + apr_pool_t* pool; +}; + +}} + + + + + +#endif /*!_APRPool_*/ diff --git a/cpp/src_apr/qpid/io/APRSocket.cpp b/cpp/src_apr/qpid/io/APRSocket.cpp new file mode 100644 index 0000000000..824c376c3b --- /dev/null +++ b/cpp/src_apr/qpid/io/APRSocket.cpp @@ -0,0 +1,76 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/concurrent/APRBase.h" +#include "qpid/io/APRSocket.h" +#include +#include + +using namespace qpid::io; +using namespace qpid::framing; +using namespace qpid::concurrent; + +APRSocket::APRSocket(apr_socket_t* _socket) : socket(_socket), closed(false){ + +} + +void APRSocket::read(qpid::framing::Buffer& buffer){ + apr_size_t bytes; + bytes = buffer.available(); + apr_status_t s = apr_socket_recv(socket, buffer.start(), &bytes); + buffer.move(bytes); + if(APR_STATUS_IS_TIMEUP(s)){ + //timed out + }else if(APR_STATUS_IS_EOF(s)){ + close(); + } +} + +void APRSocket::write(qpid::framing::Buffer& buffer){ + apr_size_t bytes; + do{ + bytes = buffer.available(); + apr_socket_send(socket, buffer.start(), &bytes); + buffer.move(bytes); + }while(bytes > 0); +} + +void APRSocket::close(){ + if(!closed){ + std::cout << "Closing socket " << socket << "@" << this << std::endl; + CHECK_APR_SUCCESS(apr_socket_close(socket)); + closed = true; + } +} + +bool APRSocket::isOpen(){ + return !closed; +} + +u_int8_t APRSocket::read(){ + char data[1]; + apr_size_t bytes = 1; + apr_status_t s = apr_socket_recv(socket, data, &bytes); + if(APR_STATUS_IS_EOF(s) || bytes == 0){ + return 0; + }else{ + return *data; + } +} + +APRSocket::~APRSocket(){ +} diff --git a/cpp/src_apr/qpid/io/APRSocket.h b/cpp/src_apr/qpid/io/APRSocket.h new file mode 100644 index 0000000000..0b6644dfb6 --- /dev/null +++ b/cpp/src_apr/qpid/io/APRSocket.h @@ -0,0 +1,45 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _APRSocket_ +#define _APRSocket_ + +#include "apr-1/apr_network_io.h" +#include "qpid/framing/Buffer.h" + +namespace qpid { +namespace io { + + class APRSocket + { + apr_socket_t* const socket; + volatile bool closed; + public: + APRSocket(apr_socket_t* socket); + void read(qpid::framing::Buffer& b); + void write(qpid::framing::Buffer& b); + void close(); + bool isOpen(); + u_int8_t read(); + ~APRSocket(); + }; + +} +} + + +#endif diff --git a/cpp/src_apr/qpid/io/Acceptor.cpp b/cpp/src_apr/qpid/io/Acceptor.cpp new file mode 100644 index 0000000000..f95d9448cf --- /dev/null +++ b/cpp/src_apr/qpid/io/Acceptor.cpp @@ -0,0 +1,78 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/io/Acceptor.h" +#include "qpid/concurrent/APRBase.h" +#include "APRPool.h" + +using namespace qpid::concurrent; +using namespace qpid::io; + +Acceptor::Acceptor(int16_t port_, int backlog, int threads) : + port(port_), + processor(APRPool::get(), threads, 1000, 5000000) +{ + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get())); + CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get())); + CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1)); + CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); + CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog)); +} + +int16_t Acceptor::getPort() const { + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); + return address->port; +} + +void Acceptor::run(SessionHandlerFactory* factory) { + running = true; + processor.start(); + std::cout << "Listening on port " << getPort() << "..." << std::endl; + while(running){ + apr_socket_t* client; + apr_status_t status = apr_socket_accept(&client, socket, APRPool::get()); + if(status == APR_SUCCESS){ + //make this socket non-blocking: + CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); + LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, false); + session->init(factory->create(session)); + }else{ + running = false; + if(status != APR_EINTR){ + std::cout << "ERROR: " << get_desc(status) << std::endl; + } + } + } + shutdown(); +} + +void Acceptor::shutdown() { + // TODO aconway 2006-10-12: Cleanup, this is not thread safe. + if (running) { + running = false; + processor.stop(); + CHECK_APR_SUCCESS(apr_socket_close(socket)); + } +} + + diff --git a/cpp/src_apr/qpid/io/Acceptor.h b/cpp/src_apr/qpid/io/Acceptor.h new file mode 100644 index 0000000000..bc189f7f6e --- /dev/null +++ b/cpp/src_apr/qpid/io/Acceptor.h @@ -0,0 +1,60 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _LFAcceptor_ +#define _LFAcceptor_ + +#include "apr-1/apr_network_io.h" +#include "apr-1/apr_poll.h" +#include "apr-1/apr_time.h" + +#include "qpid/io/Acceptor.h" +#include "qpid/concurrent/Monitor.h" +#include "qpid/concurrent/ThreadFactory.h" +#include "qpid/concurrent/ThreadPool.h" +#include "qpid/io/LFProcessor.h" +#include "qpid/io/LFSessionContext.h" +#include "qpid/concurrent/Runnable.h" +#include "qpid/io/SessionContext.h" +#include "qpid/io/SessionHandlerFactory.h" +#include "qpid/concurrent/Thread.h" +#include + +namespace qpid { +namespace io { + +/** APR Acceptor. */ +class Acceptor : public qpid::SharedObject +{ + public: + Acceptor(int16_t port, int backlog, int threads); + virtual int16_t getPort() const; + virtual void run(SessionHandlerFactory* factory); + virtual void shutdown(); + + private: + int16_t port; + LFProcessor processor; + apr_socket_t* socket; + volatile bool running; +}; + +} +} + + +#endif diff --git a/cpp/src_apr/qpid/io/Connector.cpp b/cpp/src_apr/qpid/io/Connector.cpp new file mode 100644 index 0000000000..ca487deb86 --- /dev/null +++ b/cpp/src_apr/qpid/io/Connector.cpp @@ -0,0 +1,201 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include +#include "qpid/concurrent/APRBase.h" +#include "qpid/io/Connector.h" +#include "qpid/concurrent/ThreadFactory.h" +#include "qpid/QpidError.h" + +using namespace qpid::io; +using namespace qpid::concurrent; +using namespace qpid::framing; +using qpid::QpidError; + +Connector::Connector(bool _debug, u_int32_t buffer_size) : + debug(_debug), + receive_buffer_size(buffer_size), + send_buffer_size(buffer_size), + closed(true), + lastIn(0), lastOut(0), + timeout(0), + idleIn(0), idleOut(0), + timeoutHandler(0), + shutdownHandler(0), + inbuf(receive_buffer_size), + outbuf(send_buffer_size){ + + APRBase::increment(); + + CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); + CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool)); + + threadFactory = new ThreadFactory(); + writeLock = new Monitor(); +} + +Connector::~Connector(){ + delete receiver; + delete writeLock; + delete threadFactory; + apr_pool_destroy(pool); + + APRBase::decrement(); +} + +void Connector::connect(const std::string& host, int port){ + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool)); + CHECK_APR_SUCCESS(apr_socket_connect(socket, address)); + closed = false; + + receiver = threadFactory->create(this); + receiver->start(); +} + +void Connector::init(ProtocolInitiation* header){ + writeBlock(header); + delete header; +} + +void Connector::close(){ + closed = true; + CHECK_APR_SUCCESS(apr_socket_close(socket)); + receiver->join(); +} + +void Connector::setInputHandler(InputHandler* handler){ + input = handler; +} + +void Connector::setShutdownHandler(ShutdownHandler* handler){ + shutdownHandler = handler; +} + +OutputHandler* Connector::getOutputHandler(){ + return this; +} + +void Connector::send(AMQFrame* frame){ + writeBlock(frame); + if(debug) std::cout << "SENT: " << *frame << std::endl; + delete frame; +} + +void Connector::writeBlock(AMQDataBlock* data){ + writeLock->acquire(); + data->encode(outbuf); + + //transfer data to wire + outbuf.flip(); + writeToSocket(outbuf.start(), outbuf.available()); + outbuf.clear(); + writeLock->release(); +} + +void Connector::writeToSocket(char* data, size_t available){ + apr_size_t bytes(available); + apr_size_t written(0); + while(written < available && !closed){ + apr_status_t status = apr_socket_send(socket, data + written, &bytes); + if(status == APR_TIMEUP){ + std::cout << "Write request timed out." << std::endl; + } + if(bytes == 0){ + std::cout << "Write request wrote 0 bytes." << std::endl; + } + lastOut = apr_time_as_msec(apr_time_now()); + written += bytes; + bytes = available - written; + } +} + +void Connector::checkIdle(apr_status_t status){ + if(timeoutHandler){ + apr_time_t now = apr_time_as_msec(apr_time_now()); + if(APR_STATUS_IS_TIMEUP(status)){ + if(idleIn && (now - lastIn > idleIn)){ + timeoutHandler->idleIn(); + } + }else if(APR_STATUS_IS_EOF(status)){ + closed = true; + CHECK_APR_SUCCESS(apr_socket_close(socket)); + if(shutdownHandler) shutdownHandler->shutdown(); + }else{ + lastIn = now; + } + if(idleOut && (now - lastOut > idleOut)){ + timeoutHandler->idleOut(); + } + } +} + +void Connector::setReadTimeout(u_int16_t t){ + idleIn = t * 1000;//t is in secs + if(idleIn && (!timeout || idleIn < timeout)){ + timeout = idleIn; + setSocketTimeout(); + } + +} + +void Connector::setWriteTimeout(u_int16_t t){ + idleOut = t * 1000;//t is in secs + if(idleOut && (!timeout || idleOut < timeout)){ + timeout = idleOut; + setSocketTimeout(); + } +} + +void Connector::setSocketTimeout(){ + //interval is in microseconds, timeout in milliseconds + //want the interval to be a bit shorter than the timeout, hence multiply + //by 800 rather than 1000. + apr_interval_time_t interval(timeout * 800); + apr_socket_timeout_set(socket, interval); +} + +void Connector::setTimeoutHandler(TimeoutHandler* handler){ + timeoutHandler = handler; +} + +void Connector::run(){ + try{ + while(!closed){ + apr_size_t bytes(inbuf.available()); + if(bytes < 1){ + THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); + } + checkIdle(apr_socket_recv(socket, inbuf.start(), &bytes)); + + if(bytes > 0){ + inbuf.move(bytes); + inbuf.flip();//position = 0, limit = total data read + + AMQFrame frame; + while(frame.decode(inbuf)){ + if(debug) std::cout << "RECV: " << frame << std::endl; + input->received(&frame); + } + //need to compact buffer to preserve any 'extra' data + inbuf.compact(); + } + } + }catch(QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } +} diff --git a/cpp/src_apr/qpid/io/Connector.h b/cpp/src_apr/qpid/io/Connector.h new file mode 100644 index 0000000000..7c52f7e87b --- /dev/null +++ b/cpp/src_apr/qpid/io/Connector.h @@ -0,0 +1,95 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Connector_ +#define _Connector_ + +#include "apr-1/apr_network_io.h" +#include "apr-1/apr_time.h" + +#include "qpid/framing/InputHandler.h" +#include "qpid/framing/OutputHandler.h" +#include "qpid/framing/InitiationHandler.h" +#include "qpid/framing/ProtocolInitiation.h" +#include "qpid/io/ShutdownHandler.h" +#include "qpid/io/TimeoutHandler.h" +#include "qpid/concurrent/Thread.h" +#include "qpid/concurrent/ThreadFactory.h" +#include "qpid/io/Connector.h" +#include "qpid/concurrent/Monitor.h" + +namespace qpid { +namespace io { + + class Connector : public virtual qpid::framing::OutputHandler, + private virtual qpid::concurrent::Runnable + { + const bool debug; + const int receive_buffer_size; + const int send_buffer_size; + + bool closed; + + apr_time_t lastIn; + apr_time_t lastOut; + apr_interval_time_t timeout; + u_int32_t idleIn; + u_int32_t idleOut; + + TimeoutHandler* timeoutHandler; + ShutdownHandler* shutdownHandler; + qpid::framing::InputHandler* input; + qpid::framing::InitiationHandler* initialiser; + qpid::framing::OutputHandler* output; + + qpid::framing::Buffer inbuf; + qpid::framing::Buffer outbuf; + + qpid::concurrent::Monitor* writeLock; + qpid::concurrent::ThreadFactory* threadFactory; + qpid::concurrent::Thread* receiver; + + apr_pool_t* pool; + apr_socket_t* socket; + + void checkIdle(apr_status_t status); + void writeBlock(qpid::framing::AMQDataBlock* data); + void writeToSocket(char* data, size_t available); + void setSocketTimeout(); + + void run(); + + public: + Connector(bool debug = false, u_int32_t buffer_size = 1024); + virtual ~Connector(); + virtual void connect(const std::string& host, int port); + virtual void init(qpid::framing::ProtocolInitiation* header); + virtual void close(); + virtual void setInputHandler(qpid::framing::InputHandler* handler); + virtual void setTimeoutHandler(TimeoutHandler* handler); + virtual void setShutdownHandler(ShutdownHandler* handler); + virtual qpid::framing::OutputHandler* getOutputHandler(); + virtual void send(qpid::framing::AMQFrame* frame); + virtual void setReadTimeout(u_int16_t timeout); + virtual void setWriteTimeout(u_int16_t timeout); + }; + +} +} + + +#endif diff --git a/cpp/src_apr/qpid/io/LFProcessor.cpp b/cpp/src_apr/qpid/io/LFProcessor.cpp new file mode 100644 index 0000000000..dabbdbecae --- /dev/null +++ b/cpp/src_apr/qpid/io/LFProcessor.cpp @@ -0,0 +1,193 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/io/LFProcessor.h" +#include "qpid/concurrent/APRBase.h" +#include "qpid/io/LFSessionContext.h" +#include "qpid/QpidError.h" +#include + +using namespace qpid::io; +using namespace qpid::concurrent; +using qpid::QpidError; + +// TODO aconway 2006-10-12: stopped is read outside locks. +// + +LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) : + size(_size), + timeout(_timeout), + signalledCount(0), + current(0), + count(0), + workerCount(_workers), + hasLeader(false), + workers(new Thread*[_workers]), + stopped(false) +{ + + CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE)); + //create & start the required number of threads + for(int i = 0; i < workerCount; i++){ + workers[i] = factory.create(this); + } +} + + +LFProcessor::~LFProcessor(){ + if (!stopped) stop(); + for(int i = 0; i < workerCount; i++){ + delete workers[i]; + } + delete[] workers; + CHECK_APR_SUCCESS(apr_pollset_destroy(pollset)); +} + +void LFProcessor::start(){ + for(int i = 0; i < workerCount; i++){ + workers[i]->start(); + } +} + +void LFProcessor::add(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); + countLock.acquire(); + sessions.push_back(reinterpret_cast(fd->client_data)); + count++; + countLock.release(); +} + +void LFProcessor::remove(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); + countLock.acquire(); + sessions.erase(find(sessions.begin(), sessions.end(), reinterpret_cast(fd->client_data))); + count--; + countLock.release(); +} + +void LFProcessor::reactivate(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); +} + +void LFProcessor::deactivate(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); +} + +void LFProcessor::update(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); + CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); +} + +bool LFProcessor::full(){ + Locker locker(countLock); + return count == size; +} + +bool LFProcessor::empty(){ + Locker locker(countLock); + return count == 0; +} + +void LFProcessor::poll() { + apr_status_t status = APR_EGENERAL; + do{ + current = 0; + if(!stopped){ + status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs); + } + }while(status != APR_SUCCESS && !stopped); +} + +void LFProcessor::run(){ + try{ + while(!stopped){ + leadLock.acquire(); + waitToLead(); + if(!stopped){ + const apr_pollfd_t* evt = getNextEvent(); + if(evt){ + LFSessionContext* session = reinterpret_cast(evt->client_data); + session->startProcessing(); + + relinquishLead(); + leadLock.release(); + + //process event: + if(evt->rtnevents & APR_POLLIN) session->read(); + if(evt->rtnevents & APR_POLLOUT) session->write(); + + if(session->isClosed()){ + session->handleClose(); + countLock.acquire(); + sessions.erase(find(sessions.begin(), sessions.end(), session)); + count--; + countLock.release(); + }else{ + session->stopProcessing(); + } + + }else{ + leadLock.release(); + } + }else{ + leadLock.release(); + } + } + }catch(QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } +} + +void LFProcessor::waitToLead(){ + while(hasLeader && !stopped) leadLock.wait(); + hasLeader = !stopped; +} + +void LFProcessor::relinquishLead(){ + hasLeader = false; + leadLock.notify(); +} + +const apr_pollfd_t* LFProcessor::getNextEvent(){ + while(true){ + if(stopped){ + return 0; + }else if(current < signalledCount){ + //use result of previous poll if one is available + return signalledFDs + (current++); + }else{ + //else poll to get new events + poll(); + } + } +} + +void LFProcessor::stop(){ + stopped = true; + leadLock.acquire(); + leadLock.notifyAll(); + leadLock.release(); + + for(int i = 0; i < workerCount; i++){ + workers[i]->join(); + } + + for(iterator i = sessions.begin(); i < sessions.end(); i++){ + (*i)->shutdown(); + } +} + diff --git a/cpp/src_apr/qpid/io/LFProcessor.h b/cpp/src_apr/qpid/io/LFProcessor.h new file mode 100644 index 0000000000..5b61f444af --- /dev/null +++ b/cpp/src_apr/qpid/io/LFProcessor.h @@ -0,0 +1,119 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _LFProcessor_ +#define _LFProcessor_ + +#include "apr-1/apr_poll.h" +#include +#include +#include "qpid/concurrent/Monitor.h" +#include "qpid/concurrent/ThreadFactory.h" +#include "qpid/concurrent/Runnable.h" + +namespace qpid { +namespace io { + + class LFSessionContext; + + /** + * This class processes a poll set using the leaders-followers + * pattern for thread synchronization: the leader will poll and on + * the poll returning, it will remove a session, promote a + * follower to leadership, then process the session. + */ + class LFProcessor : private virtual qpid::concurrent::Runnable + { + typedef std::vector::iterator iterator; + + const int size; + const apr_interval_time_t timeout; + apr_pollset_t* pollset; + int signalledCount; + int current; + const apr_pollfd_t* signalledFDs; + int count; + const int workerCount; + bool hasLeader; + qpid::concurrent::Thread** const workers; + qpid::concurrent::Monitor leadLock; + qpid::concurrent::Monitor countLock; + qpid::concurrent::ThreadFactory factory; + std::vector sessions; + volatile bool stopped; + + const apr_pollfd_t* getNextEvent(); + void waitToLead(); + void relinquishLead(); + void poll(); + virtual void run(); + + public: + LFProcessor(apr_pool_t* pool, int workers, int size, int timeout); + /** + * Add the fd to the poll set. Relies on the client_data being + * an instance of LFSessionContext. + */ + void add(const apr_pollfd_t* const fd); + /** + * Remove the fd from the poll set. + */ + void remove(const apr_pollfd_t* const fd); + /** + * Signal that the fd passed in, already part of the pollset, + * has had its flags altered. + */ + void update(const apr_pollfd_t* const fd); + /** + * Add an fd back to the poll set after deactivation. + */ + void reactivate(const apr_pollfd_t* const fd); + /** + * Temporarily remove the fd from the poll set. Called when processing + * is about to begin. + */ + void deactivate(const apr_pollfd_t* const fd); + /** + * Indicates whether the capacity of this processor has been + * reached (or whether it can still handle further fd's). + */ + bool full(); + /** + * Indicates whether there are any fd's registered. + */ + bool empty(); + /** + * Stop processing. + */ + void stop(); + /** + * Start processing. + */ + void start(); + /** + * Is processing stopped? + */ + bool isStopped(); + + ~LFProcessor(); + }; + +} +} + + +#endif diff --git a/cpp/src_apr/qpid/io/LFSessionContext.cpp b/cpp/src_apr/qpid/io/LFSessionContext.cpp new file mode 100644 index 0000000000..ca1e6431a6 --- /dev/null +++ b/cpp/src_apr/qpid/io/LFSessionContext.cpp @@ -0,0 +1,189 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/io/LFSessionContext.h" +#include "qpid/concurrent/APRBase.h" +#include "qpid/QpidError.h" +#include + +using namespace qpid::concurrent; +using namespace qpid::io; +using namespace qpid::framing; + +LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket, + LFProcessor* const _processor, + bool _debug) : + debug(_debug), + socket(_socket), + initiated(false), + in(32768), + out(32768), + processor(_processor), + processing(false), + closing(false), + reading(0), + writing(0) +{ + + fd.p = _pool; + fd.desc_type = APR_POLL_SOCKET; + fd.reqevents = APR_POLLIN; + fd.client_data = this; + fd.desc.s = _socket; + + out.flip(); +} + +LFSessionContext::~LFSessionContext(){ + +} + +void LFSessionContext::read(){ + assert(!reading); // No concurrent read. + reading = Thread::currentThread(); + + socket.read(in); + in.flip(); + if(initiated){ + AMQFrame frame; + while(frame.decode(in)){ + if(debug) log("RECV", &frame); + handler->received(&frame); + } + }else{ + ProtocolInitiation protocolInit; + if(protocolInit.decode(in)){ + handler->initiated(&protocolInit); + initiated = true; + if(debug) std::cout << "INIT [" << &socket << "]" << std::endl; + } + } + in.compact(); + + reading = 0; +} + +void LFSessionContext::write(){ + assert(!writing); // No concurrent writes. + writing = Thread::currentThread(); + + bool done = isClosed(); + while(!done){ + if(out.available() > 0){ + socket.write(out); + if(out.available() > 0){ + writing = 0; + + //incomplete write, leave flags to receive notification of readiness to write + done = true;//finished processing for now, but write is still in progress + } + }else{ + //do we have any frames to write? + writeLock.acquire(); + if(!framesToWrite.empty()){ + out.clear(); + bool encoded(false); + AMQFrame* frame = framesToWrite.front(); + while(frame && out.available() >= frame->size()){ + encoded = true; + frame->encode(out); + if(debug) log("SENT", frame); + delete frame; + framesToWrite.pop(); + frame = framesToWrite.empty() ? 0 : framesToWrite.front(); + } + if(!encoded) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer."); + out.flip(); + }else{ + //reset flags, don't care about writability anymore + fd.reqevents = APR_POLLIN; + done = true; + + writing = 0; + + if(closing){ + socket.close(); + } + } + writeLock.release(); + } + } +} + +void LFSessionContext::send(AMQFrame* frame){ + writeLock.acquire(); + if(!closing){ + framesToWrite.push(frame); + if(!(fd.reqevents & APR_POLLOUT)){ + fd.reqevents |= APR_POLLOUT; + if(!processing){ + processor->update(&fd); + } + } + } + writeLock.release(); +} + +void LFSessionContext::startProcessing(){ + writeLock.acquire(); + processing = true; + processor->deactivate(&fd); + writeLock.release(); +} + +void LFSessionContext::stopProcessing(){ + writeLock.acquire(); + processor->reactivate(&fd); + processing = false; + writeLock.release(); +} + +void LFSessionContext::close(){ + closing = true; + writeLock.acquire(); + if(!processing){ + //allow pending frames to be written to socket + fd.reqevents = APR_POLLOUT; + processor->update(&fd); + } + writeLock.release(); +} + +void LFSessionContext::handleClose(){ + handler->closed(); + std::cout << "Session closed [" << &socket << "]" << std::endl; + delete handler; + delete this; +} + +void LFSessionContext::shutdown(){ + socket.close(); + handleClose(); +} + +void LFSessionContext::init(SessionHandler* _handler){ + handler = _handler; + processor->add(&fd); +} + +void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){ + logLock.acquire(); + std::cout << desc << " [" << &socket << "]: " << *frame << std::endl; + logLock.release(); +} + +Monitor LFSessionContext::logLock; diff --git a/cpp/src_apr/qpid/io/LFSessionContext.h b/cpp/src_apr/qpid/io/LFSessionContext.h new file mode 100644 index 0000000000..8d30b54204 --- /dev/null +++ b/cpp/src_apr/qpid/io/LFSessionContext.h @@ -0,0 +1,88 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _LFSessionContext_ +#define _LFSessionContext_ + +#include + +#include "apr-1/apr_network_io.h" +#include "apr-1/apr_poll.h" +#include "apr-1/apr_time.h" + +#include "qpid/framing/AMQFrame.h" +#include "qpid/concurrent/Monitor.h" +#include "qpid/io/APRSocket.h" +#include "qpid/framing/Buffer.h" +#include "qpid/io/LFProcessor.h" +#include "qpid/io/SessionContext.h" +#include "qpid/io/SessionHandler.h" + +namespace qpid { +namespace io { + + + class LFSessionContext : public virtual SessionContext + { + const bool debug; + APRSocket socket; + bool initiated; + + qpid::framing::Buffer in; + qpid::framing::Buffer out; + + SessionHandler* handler; + LFProcessor* const processor; + + apr_pollfd_t fd; + + std::queue framesToWrite; + qpid::concurrent::Monitor writeLock; + + bool processing; + bool closing; + + //these are just for debug, as a crude way of detecting concurrent access + volatile unsigned int reading; + volatile unsigned int writing; + + static qpid::concurrent::Monitor logLock; + void log(const std::string& desc, qpid::framing::AMQFrame* const frame); + + public: + LFSessionContext(apr_pool_t* pool, apr_socket_t* socket, + LFProcessor* const processor, + bool debug = false); + ~LFSessionContext(); + virtual void send(qpid::framing::AMQFrame* frame); + virtual void close(); + void read(); + void write(); + void init(SessionHandler* handler); + void startProcessing(); + void stopProcessing(); + void handleClose(); + void shutdown(); + inline apr_pollfd_t* const getFd(){ return &fd; } + inline bool isClosed(){ return !socket.isOpen(); } + }; + +} +} + + +#endif -- cgit v1.2.1