/* * * Copyright (c) 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include "QpidError.h" #include "ScopedIncrement.h" #include "ProducerConsumer.h" namespace qpid { namespace sys { // // ================ ProducerConsumer ProducerConsumer::ProducerConsumer(size_t init_items) : items(init_items), waiters(0), stopped(false) {} void ProducerConsumer::stop() { Mutex::ScopedLock l(monitor); stopped = true; monitor.notifyAll(); // Wait for waiting consumers to wake up. while (waiters > 0) monitor.wait(); } size_t ProducerConsumer::available() const { Mutex::ScopedLock l(monitor); return items; } size_t ProducerConsumer::consumers() const { Mutex::ScopedLock l(monitor); return waiters; } // ================ Lock ProducerConsumer::Lock::Lock(ProducerConsumer& p) : pc(p), lock(p.monitor), status(INCOMPLETE) {} bool ProducerConsumer::Lock::isOk() const { return !pc.isStopped() && status==INCOMPLETE; } void ProducerConsumer::Lock::checkOk() const { assert(!pc.isStopped()); assert(status == INCOMPLETE); } ProducerConsumer::Lock::~Lock() { assert(status != INCOMPLETE || pc.isStopped()); } void ProducerConsumer::Lock::confirm() { checkOk(); status = CONFIRMED; } void ProducerConsumer::Lock::cancel() { checkOk(); status = CANCELLED; } // ================ ProducerLock ProducerConsumer::ProducerLock::ProducerLock(ProducerConsumer& p) : Lock(p) {} ProducerConsumer::ProducerLock::~ProducerLock() { if (status == CONFIRMED) { pc.items++; pc.monitor.notify(); // Notify a consumer. } } // ================ ConsumerLock ProducerConsumer::ConsumerLock::ConsumerLock(ProducerConsumer& p) : Lock(p) { if (isOk()) { ScopedIncrement inc(pc.waiters); while (pc.items == 0 && !pc.stopped) { pc.monitor.wait(); } } } ProducerConsumer::ConsumerLock::ConsumerLock( ProducerConsumer& p, const Time& timeout) : Lock(p) { if (isOk()) { // Don't wait if timeout==0 if (timeout == 0) { if (pc.items == 0) status = TIMEOUT; return; } else { Time deadline = now() + timeout; ScopedIncrement inc(pc.waiters); while (pc.items == 0 && !pc.stopped) { if (!pc.monitor.wait(deadline)) { status = TIMEOUT; return; } } } } } ProducerConsumer::ConsumerLock::~ConsumerLock() { if (pc.isStopped()) { if (pc.waiters == 0) pc.monitor.notifyAll(); // All waiters woken, notify stop thread(s) } else if (status==CONFIRMED) { pc.items--; if (pc.items > 0) pc.monitor.notify(); // Notify another consumer. } } }} // namespace qpid::sys