diff options
author | Alan Conway <aconway@apache.org> | 2006-11-13 19:28:23 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2006-11-13 19:28:23 +0000 |
commit | 922d1249a1a48c13a2e37f1efc1b3204ca5e9813 (patch) | |
tree | 0a5284b22967b19a97d02284d3bafe2ae9d79df0 /cpp/src | |
parent | fc5ad32cc8f2d4d793c0cf79382755e9fca2357c (diff) | |
download | qpid-python-922d1249a1a48c13a2e37f1efc1b3204ca5e9813.tar.gz |
EventChannel classes, start of epoll-based posix implementation.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@474452 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/Exception.h | 63 | ||||
-rw-r--r-- | cpp/src/qpid/posix/EpollEventChannel.cpp | 74 | ||||
-rw-r--r-- | cpp/src/qpid/posix/EpollEventChannel.h | 48 | ||||
-rw-r--r-- | cpp/src/qpid/posix/Socket.cpp | 12 | ||||
-rw-r--r-- | cpp/src/qpid/posix/check.cpp | 14 | ||||
-rw-r--r-- | cpp/src/qpid/posix/check.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/sys/EventChannel.h | 239 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Monitor.h | 28 | ||||
-rw-r--r-- | cpp/src/qpid/sys/SessionHandlerFactory.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Thread.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/sys/signal.h | 46 | ||||
-rw-r--r-- | cpp/src/qpidd.cpp | 4 |
12 files changed, 492 insertions, 50 deletions
diff --git a/cpp/src/qpid/Exception.h b/cpp/src/qpid/Exception.h index 2ba3620095..2aba43586d 100644 --- a/cpp/src/qpid/Exception.h +++ b/cpp/src/qpid/Exception.h @@ -24,26 +24,55 @@ #include <exception> #include <string> +#include <memory> namespace qpid { - /** - * Exception base class for all Qpid exceptions. - */ - class Exception : public std::exception - { - protected: - std::string whatStr; - - public: - Exception() throw() {} - Exception(const std::string& str) throw() : whatStr(str) {} - Exception(const char* str) throw() : whatStr(str) {} - virtual ~Exception() throw(); - - const char* what() const throw() { return whatStr.c_str(); } - virtual std::string toString() const throw() { return whatStr; } - }; +/** + * Exception base class for all Qpid exceptions. + */ +class Exception : public std::exception +{ + protected: + std::string whatStr; + + public: + Exception() throw() {} + Exception(const std::string& str) throw() : whatStr(str) {} + Exception(const char* str) throw() : whatStr(str) {} + virtual ~Exception() throw(); + + virtual const char* what() const throw() { return whatStr.c_str(); } + virtual std::string toString() const throw() { return whatStr; } +}; + +/** + * Wrapper for heap-allocated exceptions. Use like this: + * <code> + * std::auto_ptr<Exception> ex = new SomeEx(...) + * HeapException hex(ex); // Takes ownership + * throw hex; // Auto-deletes ex + * </code> + */ +class HeapException : public Exception, public std::auto_ptr<Exception> +{ + public: + HeapException() {} + HeapException(std::auto_ptr<Exception> e) : std::auto_ptr<Exception>(e) {} + + HeapException& operator=(std::auto_ptr<Exception>& e) { + std::auto_ptr<Exception>::operator=(e); + return *this; + } + + ~HeapException() throw() {} + + virtual const char* what() const throw() { return (*this)->what(); } + virtual std::string toString() const throw() { + return (*this)->toString(); + } +}; + } #endif /*!_Exception_*/ diff --git a/cpp/src/qpid/posix/EpollEventChannel.cpp b/cpp/src/qpid/posix/EpollEventChannel.cpp new file mode 100644 index 0000000000..1418507542 --- /dev/null +++ b/cpp/src/qpid/posix/EpollEventChannel.cpp @@ -0,0 +1,74 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <qpid/sys/EventChannel.h> +#include <sys/epoll.h> +#include "EpollEventChannel.h" + +namespace qpid { +namespace sys { + +EventChannel::shared_ptr EventChannel::create() +{ + return EventChannel::shared_ptr(new EpollEventChannel()); +} + +EpollEventChannel::EpollEventChannel() +{ + // TODO aconway 2006-11-13: How to choose size parameter? + static const size_t estimatedFdsForEpoll = 1000; + epollFd = epoll_create(estimatedFdsForEpoll); +} + +void +EpollEventChannel::post(ReadEvent& /*event*/) +{ +} + +void +EpollEventChannel::post(WriteEvent& /*event*/) +{ +} + +void +EpollEventChannel::post(AcceptEvent& /*event*/) +{ +} + +void +EpollEventChannel::post(NotifyEvent& /*event*/) +{ +} + +inline void +EpollEventChannel::post(Event& /*event*/) +{ +} + +Event* +EpollEventChannel::getEvent() +{ + return 0; +} + +void +EpollEventChannel::dispose(void* /*buffer*/, size_t) +{ +} + +}} diff --git a/cpp/src/qpid/posix/EpollEventChannel.h b/cpp/src/qpid/posix/EpollEventChannel.h new file mode 100644 index 0000000000..8128d5276f --- /dev/null +++ b/cpp/src/qpid/posix/EpollEventChannel.h @@ -0,0 +1,48 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <qpid/sys/EventChannel.h> + +namespace qpid { +namespace sys { + +/** Epoll-based implementation of the event channel */ +class EpollEventChannel : public EventChannel +{ + public: + + EpollEventChannel(); + ~EpollEventChannel(); + + virtual void post(ReadEvent& event); + virtual void post(WriteEvent& event); + virtual void post(AcceptEvent& event); + virtual void post(NotifyEvent& event); + + inline void post(Event& event); + + virtual Event* getEvent(); + + virtual void dispose(void* buffer, size_t size); + + private: + int epollFd; + +}; + +}} diff --git a/cpp/src/qpid/posix/Socket.cpp b/cpp/src/qpid/posix/Socket.cpp index 3101be54f4..1321ae6b0d 100644 --- a/cpp/src/qpid/posix/Socket.cpp +++ b/cpp/src/qpid/posix/Socket.cpp @@ -33,7 +33,7 @@ using namespace qpid::sys; Socket::Socket() : socket(::socket (PF_INET, SOCK_STREAM, 0)) { - if (socket == 0) CHECK(-1); + CHECKNN(socket == 0); } void @@ -53,16 +53,16 @@ Socket::connect(const std::string& host, int port) name.sin_family = AF_INET; name.sin_port = htons(port); struct hostent* hp = gethostbyname ( host.c_str() ); - if (hp == 0) CHECK(-1); // TODO aconway 2006-11-09: error message? + if (hp == 0) CHECK0(-1); // TODO aconway 2006-11-09: error message? memcpy(&name.sin_addr.s_addr, hp->h_addr_list[0], hp->h_length); - CHECK(::connect(socket, (struct sockaddr*)(&name), sizeof(name))); + CHECK0(::connect(socket, (struct sockaddr*)(&name), sizeof(name))); } void Socket::close() { if (socket == 0) return; - CHECK(::close(socket)); + CHECK0(::close(socket)); socket = 0; } @@ -73,7 +73,7 @@ Socket::send(const char* data, size_t size) if (sent < 0) { if (errno == ECONNRESET) return SOCKET_EOF; if (errno == ETIMEDOUT) return SOCKET_TIMEOUT; - CHECK(sent); + CHECK0(sent); } return sent; } @@ -84,7 +84,7 @@ Socket::recv(char* data, size_t size) ssize_t received = ::recv(socket, data, size, 0); if (received < 0) { if (errno == ETIMEDOUT) return SOCKET_TIMEOUT; - CHECK(received); + CHECK0(received); } return received; } diff --git a/cpp/src/qpid/posix/check.cpp b/cpp/src/qpid/posix/check.cpp index 5e8e9e7ca2..3470906639 100644 --- a/cpp/src/qpid/posix/check.cpp +++ b/cpp/src/qpid/posix/check.cpp @@ -22,10 +22,12 @@ #include <qpid/QpidError.h> #include "check.h" -void qpid::sys::check(long result, const char* file, const int line) { - if (result != 0) { - char buf[512]; - char* msg = strerror_r(errno, buf, sizeof(buf)); - throw QpidError(errno, msg, file, line); - } +namespace qpid { +namespace sys { + +std::string strError() { + char buf[512]; + return strerror_r(errno, buf, sizeof(buf)); } + +}} diff --git a/cpp/src/qpid/posix/check.h b/cpp/src/qpid/posix/check.h index d1de9360a4..666637b1c2 100644 --- a/cpp/src/qpid/posix/check.h +++ b/cpp/src/qpid/posix/check.h @@ -23,14 +23,16 @@ */ #include <errno.h> +#include <string> +#include <qpid/QpidError.h> namespace qpid { namespace sys { -void check(long result, const char* file, const int line); - -#define CHECK(N) qpid::sys::check(N, __FILE__, __LINE__) +std::string errnoToString(); +#define CHECK0(N) if ((N)!=0) THROW_QPID_ERROR(INTERNAL_ERROR, errnoToString()) +#define CHECKNN(N) if ((N)<0) THROW_QPID_ERROR(INTERNAL_ERROR, errnoToString()) }} diff --git a/cpp/src/qpid/sys/EventChannel.h b/cpp/src/qpid/sys/EventChannel.h new file mode 100644 index 0000000000..dd857c02c7 --- /dev/null +++ b/cpp/src/qpid/sys/EventChannel.h @@ -0,0 +1,239 @@ +#ifndef _sys_EventChannel_h +#define _sys_EventChannel_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <qpid/SharedObject.h> +#include <qpid/Exception.h> +#include <qpid/sys/Time.h> + +namespace qpid { +namespace sys { + +class EventChannel; + +class Event; +class ReadEvent; +class WriteEvent; +class AcceptEvent; +class NotifyEvent; + +/** + Active event channel. Events represent async IO requests or + inter-task synchronization. Posting an Event registers interest in + the IO or sync event. When it occurs the posted Event is + corresponding IO or sync event occurs they are returned to one + of the threads waiting on the channel. For more details see + the Event hierarchy. +*/ +class EventChannel : public qpid::SharedObject<EventChannel> +{ + public: + static EventChannel::shared_ptr create(); + + virtual ~EventChannel() {} + + virtual void post(ReadEvent& event) = 0; + virtual void post(WriteEvent& event) = 0; + virtual void post(AcceptEvent& event) = 0; + virtual void post(NotifyEvent& event) = 0; + + inline void post(Event& event); + + /** + * Wait for the next completed event. + * @return An Event or 0 to indicate the calling thread should shut down. + */ + virtual Event* getEvent() = 0; + + /** Dispose of a system-allocated buffer. Called by ReadEvent */ + virtual void dispose(void* buffer, size_t size) = 0; + + protected: + EventChannel() {} +}; + + +/** + * Base class for all events. There are two possible styles of use: + * + * Task style: the event is allocated as a local variable on the initiating + * task, which blocks in wait(). Event::dispatch() resumes that task + * with the event data available. + * + * Proactor style: Threads post events but do not + * wait. Event::dispatch() processes the event in the dispatching + * thread and then deletes itself. + * + * Tasks give less kernel context switching and blocking AND simpler + * coding. Tasks can call any number of pseudo-blocking opereations + * that are actually event post/wait pairs. At each such point the + * current thread can continue with the task or switch to another task + * to minimise blocking. + * + * With Proactor style dispatch() is an atomic unit of work as far as + * the EventChannel is concerned. To avoid such blocking the + * application has to be written as a collection of non-blocking + * dispatch() callbacks, which is more complex than tasks that can + * call pseudo-blocking operations. + */ +class Event : private boost::noncopyable +{ + public: + virtual ~Event() {} + + /** Post this event to the channel */ + virtual void post(EventChannel& channel) = 0; + + /** + * Block till the event is delivered. + * At most one task can wait on an event. + */ + virtual void wait() const = 0; + + /** + * Dispatch the event. Runs some event-specific code, may switch + * context to resume a waiting task. + */ + virtual void dispatch() = 0; +}; + + +/** + * Base class for asynchronous request events, provides exception + * handling. + */ +class RequestEvent : public Event +{ + public: + /** True if the async request failed */ + bool hasException() const { return ex.get(); } + + const qpid::Exception& getException() const { return *ex; } + + void setException(std::auto_ptr<qpid::Exception>& e) { ex = e; } + + /** If the event has an exception throw it, else do nothing */ + void verify() const { if (ex.get()) throw *ex; } + + void post(EventChannel& channel) { channel.post(*this); } + + private: + qpid::HeapException ex; +}; + + +/** An asynchronous read event. */ +class ReadEvent : public RequestEvent { + public: + /** + * Read data from fd. + */ + ReadEvent(int fileDescriptor, void* buffer, size_t bytesToRead) : + fd(fileDescriptor), data(buffer), size(bytesToRead) {} + + /** Number of bytes read. */ + size_t getBytesRead() const { verify(); return size; } + + /** + * If the system supports direct access to DMA buffers then + * it may provide a direct pointer to such a buffer to avoid + * a copy into the user buffer. + * @return true if getData() is returning a system-supplied buffer. + */ + bool isSystemData() const { verify(); return channel != 0; } + + /** + * Pointer to data read. Note if isSystemData() is true then this + * is NOT the same buffer that was supplied to the constructor. + * The user buffer is untouched. See dispose(). + */ + void* getData() const { verify(); return data; } + + /** Called by the event channel on completion. */ + void complete(EventChannel::shared_ptr ec, void* _data, size_t _size) { + if (data != _data) channel = ec; data = _data; size = _size; + } + + /** + * Dispose of system-provided data buffer, if any. This is + * automatically called by the destructor. + */ + void dispose() { if(channel && data) channel->dispose(data,size); data=0; } + + ~ReadEvent() { dispose(); } + + void post(EventChannel& channel) { channel.post(*this); } + + private: + int fd; + void* data; + size_t size; + EventChannel::shared_ptr channel; +}; + +/** Asynchronous write event */ +class WriteEvent : public RequestEvent { + public: + WriteEvent(int fileDescriptor, void* buffer, size_t bytesToWrite) : + fd(fileDescriptor), data(buffer), size(bytesToWrite) {} + + /** Number of bytes written */ + size_t getBytesWritten() const { verify(); return size; } + + void post(EventChannel& channel) { channel.post(*this); } + + private: + int fd; + void* data; + size_t size; +}; + +/** Asynchronous socket accept event */ +class AcceptEvent : public RequestEvent { + public: + /** Accept a connection on listeningFd */ + AcceptEvent(int listeningFd) : listen(listeningFd) {} + + /** Get accepted file descriptor */ + int getAcceptedFd() const { verify(); return accepted; } + + void post(EventChannel& channel) { channel.post(*this); } + + private: + int listen; + int accepted; +}; + +/** + * NotifyEvent is delievered immediately to be dispatched by an + * EventChannel thread. + */ +class NotifyEvent : public RequestEvent { + public: + void post(EventChannel& channel) { channel.post(*this); } +}; + + +inline void EventChannel::post(Event& event) { event.post(*this); } + +}} + + +#endif /*!_sys_EventChannel_h*/ diff --git a/cpp/src/qpid/sys/Monitor.h b/cpp/src/qpid/sys/Monitor.h index 54956777de..a3abe37748 100644 --- a/cpp/src/qpid/sys/Monitor.h +++ b/cpp/src/qpid/sys/Monitor.h @@ -164,46 +164,46 @@ struct PODMutex void PODMutex::lock() { - CHECK(pthread_mutex_lock(&mutex)); + CHECK0(pthread_mutex_lock(&mutex)); } void PODMutex::unlock() { - CHECK(pthread_mutex_unlock(&mutex)); + CHECK0(pthread_mutex_unlock(&mutex)); } void PODMutex::trylock() { - CHECK(pthread_mutex_trylock(&mutex)); + CHECK0(pthread_mutex_trylock(&mutex)); } Mutex::Mutex() { - CHECK(pthread_mutex_init(&mutex, 0)); + CHECK0(pthread_mutex_init(&mutex, 0)); } Mutex::~Mutex(){ - CHECK(pthread_mutex_destroy(&mutex)); + CHECK0(pthread_mutex_destroy(&mutex)); } void Mutex::lock() { - CHECK(pthread_mutex_lock(&mutex)); + CHECK0(pthread_mutex_lock(&mutex)); } void Mutex::unlock() { - CHECK(pthread_mutex_unlock(&mutex)); + CHECK0(pthread_mutex_unlock(&mutex)); } void Mutex::trylock() { - CHECK(pthread_mutex_trylock(&mutex)); + CHECK0(pthread_mutex_trylock(&mutex)); } Monitor::Monitor() { - CHECK(pthread_cond_init(&condition, 0)); + CHECK0(pthread_cond_init(&condition, 0)); } Monitor::~Monitor() { - CHECK(pthread_cond_destroy(&condition)); + CHECK0(pthread_cond_destroy(&condition)); } void Monitor::wait() { - CHECK(pthread_cond_wait(&condition, &mutex)); + CHECK0(pthread_cond_wait(&condition, &mutex)); } bool Monitor::wait(int64_t nsecs){ @@ -211,17 +211,17 @@ bool Monitor::wait(int64_t nsecs){ int status = pthread_cond_timedwait(&condition, &mutex, &t.getTimespec()); if(status != 0) { if (errno == ETIMEDOUT) return false; - CHECK(status); + CHECK0(status); } return true; } void Monitor::notify(){ - CHECK(pthread_cond_signal(&condition)); + CHECK0(pthread_cond_signal(&condition)); } void Monitor::notifyAll(){ - CHECK(pthread_cond_broadcast(&condition)); + CHECK0(pthread_cond_broadcast(&condition)); } diff --git a/cpp/src/qpid/sys/SessionHandlerFactory.h b/cpp/src/qpid/sys/SessionHandlerFactory.h index da58f5928e..2a01aebcb0 100644 --- a/cpp/src/qpid/sys/SessionHandlerFactory.h +++ b/cpp/src/qpid/sys/SessionHandlerFactory.h @@ -21,6 +21,8 @@ #ifndef _SessionHandlerFactory_ #define _SessionHandlerFactory_ +#include <boost/noncopyable.hpp> + namespace qpid { namespace sys { diff --git a/cpp/src/qpid/sys/Thread.h b/cpp/src/qpid/sys/Thread.h index ed42dfd811..79dfc184e0 100644 --- a/cpp/src/qpid/sys/Thread.h +++ b/cpp/src/qpid/sys/Thread.h @@ -87,11 +87,11 @@ Thread Thread::current(){ #else Thread::Thread(Runnable* runnable) { - CHECK(pthread_create(&thread, NULL, runRunnable, runnable)); + CHECK0(pthread_create(&thread, NULL, runRunnable, runnable)); } void Thread::join(){ - if (thread != 0) CHECK(pthread_join(thread, 0)); + if (thread != 0) CHECK0(pthread_join(thread, 0)); } Thread::Thread(pthread_t thr) : thread(thr) {} diff --git a/cpp/src/qpid/sys/signal.h b/cpp/src/qpid/sys/signal.h new file mode 100644 index 0000000000..6cae2b9e4a --- /dev/null +++ b/cpp/src/qpid/sys/signal.h @@ -0,0 +1,46 @@ +#ifndef _sys_signal_h +#define _sys_signal_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifdef USE_APR +# include <apr-1/apr_signal.h> +#else +# include <signal.h> +#endif + +namespace qpid { +namespace sys { + +typedef void (*SignalHandler)(int); + +SignalHandler signal(int sig, SignalHandler handler) +{ +#ifdef USE_APR + return apr_signal(sig, handler); +#else + return ::signal (sig, handler); +#endif +} + +}} + + + +#endif /*!_sys_signal_h*/ diff --git a/cpp/src/qpidd.cpp b/cpp/src/qpidd.cpp index f4afad0467..cd50760f17 100644 --- a/cpp/src/qpidd.cpp +++ b/cpp/src/qpidd.cpp @@ -20,7 +20,7 @@ */ #include <qpid/broker/Broker.h> #include <qpid/broker/Configuration.h> -#include <apr-1/apr_signal.h> +#include <qpid/sys/signal.h> #include <iostream> #include <memory> @@ -43,7 +43,7 @@ int main(int argc, char** argv) config.usage(); }else{ broker = Broker::create(config); - apr_signal(SIGINT, handle_signal); + qpid::sys::signal(SIGINT, handle_signal); broker->run(); } return 0; |