#ifndef _sys_ProducerConsumer_h #define _sys_ProducerConsumer_h /* * * Copyright (c) 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include #include "Exception.h" #include "sys/Monitor.h" namespace qpid { namespace sys { /** * Producer-consumer synchronisation. * * Producers increase the number of available items, consumers reduce it. * Consumers wait till an item is available. Waiting threads can be * woken for shutdown using stop(). * * Note: Currently implements unbounded producer-consumer, i.e. no limit * to available items, producers never block. Can be extended to support * bounded PC if required. * // TODO aconway 2007-02-13: example, from tests. */ class ProducerConsumer { public: ProducerConsumer(size_t init_items=0); ~ProducerConsumer() { stop(); } /** * Wake any threads waiting for ProducerLock or ConsumerLock. *@post No threads are waiting in Producer or Consumer locks. */ void stop(); /** True if queue is stopped */ bool isStopped() { return stopped; } /** Number of items available for consumers */ size_t available() const; /** Number of consumers waiting for items */ size_t consumers() const; /** True if available == 0 */ bool empty() const { return available() == 0; } /** * Base class for producer and consumer locks. */ class Lock : private boost::noncopyable { public: /** * You must call isOk() after creating a lock to verify its state. * *@return true means the lock succeeded. You MUST call either *confirm() or cancel() before the lock goes out of scope. * * false means the lock failed - timed out or the * ProducerConsumer is stopped. You should not do anything in * the scope of the lock. */ bool isOk() const; /** * Confirm that an item was produced/consumed. *@pre isOk() */ void confirm(); /** * Cancel the lock to indicate nothing was produced/consumed. * Note that locks are not actually released until destroyed. * *@pre isOk() */ void cancel(); /** True if this lock experienced a timeout */ bool isTimedOut() const { return status == TIMEOUT; } /** True if we have been stopped */ bool isStopped() const { return pc.isStopped(); } ProducerConsumer& pc; protected: /** Lock status */ enum Status { INCOMPLETE, CONFIRMED, CANCELLED, TIMEOUT }; Lock(ProducerConsumer& p); ~Lock(); void checkOk() const; Mutex::ScopedLock lock; Status status; }; /** Lock for code that produces items. */ struct ProducerLock : public Lock { /** * Acquire locks to produce an item. *@post If isOk() the calling thread has exclusive access * to produce an item. */ ProducerLock(ProducerConsumer& p); /** Release locks, signal waiting consumers if confirm() was called. */ ~ProducerLock(); }; /** Lock for code that consumes items */ struct ConsumerLock : public Lock { /** * Wait for an item to consume and acquire locks. * *@post If isOk() there is at least one item available and the *calling thread has exclusive access to consume it. */ ConsumerLock(ProducerConsumer& p); /** * Wait up to timeout to acquire lock. *@post If isOk() caller has a producer lock. * If isTimedOut() there was a timeout. * If neither then we were stopped. */ ConsumerLock(ProducerConsumer& p, const Time& timeout); /** Release locks */ ~ConsumerLock(); }; private: mutable Monitor monitor; size_t items; size_t waiters; bool stopped; friend class Lock; friend class ProducerLock; friend class ConsumerLock; }; }} // namespace qpid::sys #endif /*!_sys_ProducerConsumer_h*/