/* * * Copyright (c) 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include #include #include #include "EventChannelConnection.h" #include "sys/ConnectionInputHandlerFactory.h" #include "QpidError.h" using namespace std; using namespace qpid; using namespace qpid::framing; namespace qpid { namespace sys { const size_t EventChannelConnection::bufferSize = 65536; EventChannelConnection::EventChannelConnection( EventChannelThreads::shared_ptr threads_, ConnectionInputHandlerFactory& factory_, int rfd, int wfd, bool isTrace_ ) : readFd(rfd), writeFd(wfd ? wfd : rfd), readCallback(boost::bind(&EventChannelConnection::closeOnException, this, &EventChannelConnection::endInitRead)), isWriting(false), isClosed(false), threads(threads_), handler(factory_.create(this)), in(bufferSize), out(bufferSize), isTrace(isTrace_) { BOOST_ASSERT(readFd > 0); BOOST_ASSERT(writeFd > 0); closeOnException(&EventChannelConnection::startRead); } void EventChannelConnection::send(std::auto_ptr frame) { { Monitor::ScopedLock lock(monitor); assert(frame.get()); writeFrames.push_back(frame.release()); } closeOnException(&EventChannelConnection::startWrite); } void EventChannelConnection::close() { { Monitor::ScopedLock lock(monitor); if (isClosed) return; isClosed = true; } ::close(readFd); ::close(writeFd); { Monitor::ScopedLock lock(monitor); while (busyThreads > 0) monitor.wait(); } handler->closed(); } void EventChannelConnection::closeNoThrow() { Exception::tryCatchLog( boost::bind(&EventChannelConnection::close, this), false, "Exception closing channel" ); } /** * Call f in a try/catch block and close the connection if * an exception is thrown. */ void EventChannelConnection::closeOnException(MemberFnPtr f) { try { Exception::tryCatchLog( boost::bind(f, this), "Closing connection due to exception" ); return; } catch (...) { // Exception was already logged by tryCatchLog closeNoThrow(); } } // Post the write event. // Always called inside closeOnException. // Called by endWrite and send, but only one thread writes at a time. // void EventChannelConnection::startWrite() { FrameQueue::auto_type frame; { Monitor::ScopedLock lock(monitor); // Stop if closed or a write event is already in progress. if (isClosed || isWriting) return; if (writeFrames.empty()) { isWriting = false; return; } isWriting = true; frame = writeFrames.pop_front(); } // No need to lock here - only one thread can be writing at a time. out.clear(); if (isTrace) cout << "Send on socket " << writeFd << ": " << *frame << endl; frame->encode(out); out.flip(); writeEvent = WriteEvent( writeFd, out.start(), out.available(), boost::bind(&EventChannelConnection::closeOnException, this, &EventChannelConnection::endWrite)); threads->post(writeEvent); } // ScopedBusy ctor increments busyThreads. // dtor decrements and calls monitor.notifyAll if it reaches 0. // struct EventChannelConnection::ScopedBusy : public AtomicCount::ScopedIncrement { ScopedBusy(EventChannelConnection& ecc) : AtomicCount::ScopedIncrement( ecc.busyThreads, boost::bind(&Monitor::notifyAll, &ecc.monitor)) {} }; // Write event completed. // Always called by a channel thread inside closeOnException. // void EventChannelConnection::endWrite() { ScopedBusy(*this); { Monitor::ScopedLock lock(monitor); isWriting = false; if (isClosed) return; writeEvent.throwIfException(); } // Check if there's more in to write in the write queue. startWrite(); } // Post the read event. // Always called inside closeOnException. // Called from ctor and end[Init]Read, so only one call at a time // is possible since we only post one read event at a time. // void EventChannelConnection::startRead() { // Non blocking read, as much as we can swallow. readEvent = ReadEvent( readFd, in.start(), in.available(), readCallback,true); threads->post(readEvent); } // Completion of initial read, expect protocolInit. // Always called inside closeOnException in channel thread. // Only called by one thread at a time. void EventChannelConnection::endInitRead() { ScopedBusy(*this); if (!isClosed) { readEvent.throwIfException(); in.move(readEvent.getBytesRead()); in.flip(); ProtocolInitiation protocolInit; if(protocolInit.decode(in)){ handler->initiated(&protocolInit); readCallback = boost::bind( &EventChannelConnection::closeOnException, this, &EventChannelConnection::endRead); } in.compact(); // Continue reading. startRead(); } } // Normal reads, expect a frame. // Always called inside closeOnException in channel thread. void EventChannelConnection::endRead() { ScopedBusy(*this); if (!isClosed) { readEvent.throwIfException(); in.move(readEvent.getBytesRead()); in.flip(); AMQFrame frame; while (frame.decode(in)) { // TODO aconway 2006-11-30: received should take Frame& if (isTrace) cout << "Received on socket " << readFd << ": " << frame << endl; handler->received(&frame); } in.compact(); startRead(); } } }} // namespace qpid::sys