/* * * Copyright (c) 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include "check.h" #include "EventChannel.h" using namespace std; // Convenience template to zero out a struct. template struct ZeroStruct : public S { ZeroStruct() { memset(this, 0, sizeof(*this)); } }; namespace qpid { namespace sys { /** * EventHandler wraps an epoll file descriptor. Acts as private * interface between EventChannel and subclasses. * * Also implements Event interface for events that are not associated * with a file descriptor and are passed via the message queue. */ class EventHandler : public Event, private Monitor { public: EventHandler(int epollSize = 256); ~EventHandler(); int getEpollFd() { return epollFd; } void epollAdd(int fd, uint32_t epollEvents, Event* event); void epollMod(int fd, uint32_t epollEvents, Event* event); void epollDel(int fd); void mqPut(Event* event); Event* mqGet(); protected: // Should never be called, only complete. void prepare(EventHandler&) { assert(0); } Event* complete(EventHandler& eh); private: int epollFd; std::string mqName; int mqFd; std::queue mqEvents; }; EventHandler::EventHandler(int epollSize) { epollFd = epoll_create(epollSize); if (epollFd < 0) throw QPID_POSIX_ERROR(errno); // Create a POSIX message queue for non-fd events. // We write one byte and never read it is always ready for read // when we add it to epoll. // ZeroStruct attr; attr.mq_maxmsg = 1; attr.mq_msgsize = 1; do { char tmpnam[L_tmpnam]; tmpnam_r(tmpnam); mqName = tmpnam + 4; // Skip "tmp/" mqFd = mq_open( mqName.c_str(), O_CREAT|O_EXCL|O_RDWR|O_NONBLOCK, S_IRWXU, &attr); if (mqFd < 0) throw QPID_POSIX_ERROR(errno); } while (mqFd == EEXIST); // Name already taken, try again. static char zero = '\0'; mq_send(mqFd, &zero, 1, 0); epollAdd(mqFd, 0, this); } EventHandler::~EventHandler() { mq_close(mqFd); mq_unlink(mqName.c_str()); } void EventHandler::mqPut(Event* event) { ScopedLock l(*this); assert(event != 0); mqEvents.push(event); epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this); } Event* EventHandler::mqGet() { ScopedLock l(*this); if (mqEvents.empty()) return 0; Event* event = mqEvents.front(); mqEvents.pop(); if(!mqEvents.empty()) epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this); return event; } void EventHandler::epollAdd(int fd, uint32_t epollEvents, Event* event) { ZeroStruct ee; ee.data.ptr = event; ee.events = epollEvents; if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ee) < 0) throw QPID_POSIX_ERROR(errno); } void EventHandler::epollMod(int fd, uint32_t epollEvents, Event* event) { ZeroStruct ee; ee.data.ptr = event; ee.events = epollEvents; if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ee) < 0) throw QPID_POSIX_ERROR(errno); } void EventHandler::epollDel(int fd) { if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0) < 0) throw QPID_POSIX_ERROR(errno); } Event* EventHandler::complete(EventHandler& eh) { assert(&eh == this); Event* event = mqGet(); return event==0 ? 0 : event->complete(eh); } // ================================================================ // EventChannel EventChannel::shared_ptr EventChannel::create() { return shared_ptr(new EventChannel()); } EventChannel::EventChannel() : handler(new EventHandler()) {} EventChannel::~EventChannel() {} void EventChannel::postEvent(Event& e) { e.prepare(*handler); } Event* EventChannel::getEvent() { static const int infiniteTimeout = -1; ZeroStruct epollEvent; // Loop until we can complete the event. Some events may re-post // themselves and return 0 from complete, e.g. partial reads. // Event* event = 0; while (event == 0) { int eventCount = epoll_wait(handler->getEpollFd(), &epollEvent, 1, infiniteTimeout); if (eventCount < 0) { if (errno != EINTR) { // TODO aconway 2006-11-28: Proper handling/logging of errors. cerr << BOOST_CURRENT_FUNCTION << " ignoring error " << PosixError::getMessage(errno) << endl; assert(0); } } else if (eventCount == 1) { event = reinterpret_cast(epollEvent.data.ptr); assert(event != 0); try { event = event->complete(*handler); } catch (const Exception& e) { if (event) event->setError(e); } catch (const std::exception& e) { if (event) event->setError(e); } } } return event; } Event::~Event() {} void Event::prepare(EventHandler& handler) { handler.mqPut(this); } bool Event::hasError() const { return error; } void Event::throwIfError() throw (Exception) { if (hasError()) error.throwSelf(); } Event* Event::complete(EventHandler&) { return this; } void Event::dispatch() { try { if (!callback.empty()) callback(); } catch (const std::exception&) { throw; } catch (...) { throw QPID_ERROR(INTERNAL_ERROR, "Unknown exception."); } } void Event::setError(const ExceptionHolder& e) { error = e; } void ReadEvent::prepare(EventHandler& handler) { handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this); } ssize_t ReadEvent::doRead() { ssize_t n = ::read(descriptor, static_cast(buffer) + received, size - received); if (n > 0) received += n; return n; } Event* ReadEvent::complete(EventHandler& handler) { // Read as much as possible without blocking. ssize_t n = doRead(); while (n > 0 && received < size) doRead(); if (received == size) { handler.epollDel(descriptor); received = 0; // Reset for re-use. return this; } else if (n <0 && (errno == EAGAIN)) { // Keep polling for more. handler.epollMod(descriptor, EPOLLIN | EPOLLONESHOT, this); return 0; } else { // Unexpected EOF or error. Throw ENODATA for EOF. handler.epollDel(descriptor); received = 0; // Reset for re-use. throw QPID_POSIX_ERROR((n < 0) ? errno : ENODATA); } } void WriteEvent::prepare(EventHandler& handler) { handler.epollAdd(descriptor, EPOLLOUT | EPOLLONESHOT, this); } Event* WriteEvent::complete(EventHandler& handler) { ssize_t n = write(descriptor, static_cast(buffer) + written, size - written); if (n < 0) throw QPID_POSIX_ERROR(errno); written += n; if(written < size) { // Keep polling. handler.epollMod(descriptor, EPOLLOUT | EPOLLONESHOT, this); return 0; } written = 0; // Reset for re-use. handler.epollDel(descriptor); return this; } void AcceptEvent::prepare(EventHandler& handler) { handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this); } Event* AcceptEvent::complete(EventHandler& handler) { handler.epollDel(descriptor); accepted = ::accept(descriptor, 0, 0); if (accepted < 0) throw QPID_POSIX_ERROR(errno); return this; } }}