diff options
Diffstat (limited to 'src/main/cpp/asyncappender.cpp')
-rw-r--r-- | src/main/cpp/asyncappender.cpp | 364 |
1 files changed, 364 insertions, 0 deletions
diff --git a/src/main/cpp/asyncappender.cpp b/src/main/cpp/asyncappender.cpp new file mode 100644 index 0000000..44732c2 --- /dev/null +++ b/src/main/cpp/asyncappender.cpp @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#if defined(_MSC_VER) +#pragma warning ( disable: 4231 4251 4275 4786 ) +#endif + +#include <log4cxx/asyncappender.h> + + +#include <log4cxx/helpers/loglog.h> +#include <log4cxx/spi/loggingevent.h> +#include <apr_thread_proc.h> +#include <apr_thread_mutex.h> +#include <apr_thread_cond.h> +#include <log4cxx/helpers/condition.h> +#include <log4cxx/helpers/synchronized.h> +#include <log4cxx/helpers/stringhelper.h> +#include <apr_atomic.h> +#include <log4cxx/helpers/optionconverter.h> + + +using namespace log4cxx; +using namespace log4cxx::helpers; +using namespace log4cxx::spi; + + +IMPLEMENT_LOG4CXX_OBJECT(AsyncAppender) + + +AsyncAppender::AsyncAppender() +: AppenderSkeleton(), + buffer(), + bufferMutex(pool), + bufferNotFull(pool), + bufferNotEmpty(pool), + discardMap(new DiscardMap()), + bufferSize(DEFAULT_BUFFER_SIZE), + appenders(new AppenderAttachableImpl(pool)), + dispatcher(), + locationInfo(false), + blocking(true) { +#if APR_HAS_THREADS + dispatcher.run(dispatch, this); +#endif +} + +AsyncAppender::~AsyncAppender() +{ + finalize(); + delete discardMap; +} + +void AsyncAppender::addRef() const { + ObjectImpl::addRef(); +} + +void AsyncAppender::releaseRef() const { + ObjectImpl::releaseRef(); +} + +void AsyncAppender::addAppender(const AppenderPtr& newAppender) +{ + synchronized sync(appenders->getMutex()); + appenders->addAppender(newAppender); +} + + +void AsyncAppender::setOption(const LogString& option, + const LogString& value) { + if (StringHelper::equalsIgnoreCase(option, LOG4CXX_STR("LOCATIONINFO"), LOG4CXX_STR("locationinfo"))) { + setLocationInfo(OptionConverter::toBoolean(value, false)); + } + if (StringHelper::equalsIgnoreCase(option, LOG4CXX_STR("BUFFERSIZE"), LOG4CXX_STR("buffersize"))) { + setBufferSize(OptionConverter::toInt(value, DEFAULT_BUFFER_SIZE)); + } + if (StringHelper::equalsIgnoreCase(option, LOG4CXX_STR("BLOCKING"), LOG4CXX_STR("blocking"))) { + setBlocking(OptionConverter::toBoolean(value, true)); + } else { + AppenderSkeleton::setOption(option, value); + } +} + + +void AsyncAppender::append(const spi::LoggingEventPtr& event, Pool& p) { +#if APR_HAS_THREADS + // + // if dispatcher has died then + // append subsequent events synchronously + // + if (!dispatcher.isAlive() || bufferSize <= 0) { + synchronized sync(appenders->getMutex()); + appenders->appendLoopOnAppenders(event, p); + return; + } + + // Set the NDC and thread name for the calling thread as these + // LoggingEvent fields were not set at event creation time. + LogString ndcVal; + event->getNDC(ndcVal); + event->getThreadName(); + // Get a copy of this thread's MDC. + event->getMDCCopy(); + + + { + synchronized sync(bufferMutex); + while(true) { + int previousSize = buffer.size(); + if (previousSize < bufferSize) { + buffer.push_back(event); + if (previousSize == 0) { + bufferNotEmpty.signalAll(); + } + break; + } + + // + // Following code is only reachable if buffer is full + // + // + // if blocking and thread is not already interrupted + // and not the dispatcher then + // wait for a buffer notification + bool discard = true; + if (blocking + && !Thread::interrupted() + && !dispatcher.isCurrentThread()) { + try { + bufferNotFull.await(bufferMutex); + discard = false; + } catch (InterruptedException& e) { + // + // reset interrupt status so + // calling code can see interrupt on + // their next wait or sleep. + Thread::currentThreadInterrupt(); + } + } + + // + // if blocking is false or thread has been interrupted + // add event to discard map. + // + if (discard) { + LogString loggerName = event->getLoggerName(); + DiscardMap::iterator iter = discardMap->find(loggerName); + if (iter == discardMap->end()) { + DiscardSummary summary(event); + discardMap->insert(DiscardMap::value_type(loggerName, summary)); + } else { + (*iter).second.add(event); + } + break; + } + } + } +#else + synchronized sync(appenders->getMutex()); + appenders->appendLoopOnAppenders(event, p); +#endif + } + + +void AsyncAppender::close() { + { + synchronized sync(bufferMutex); + closed = true; + bufferNotEmpty.signalAll(); + bufferNotFull.signalAll(); + } + +#if APR_HAS_THREADS + try { + dispatcher.join(); + } catch(InterruptedException& e) { + Thread::currentThreadInterrupt(); + LogLog::error(LOG4CXX_STR("Got an InterruptedException while waiting for the dispatcher to finish,"), e); + } +#endif + + { + synchronized sync(appenders->getMutex()); + AppenderList appenderList = appenders->getAllAppenders(); + for (AppenderList::iterator iter = appenderList.begin(); + iter != appenderList.end(); + iter++) { + (*iter)->close(); + } + } +} + +AppenderList AsyncAppender::getAllAppenders() const +{ + synchronized sync(appenders->getMutex()); + return appenders->getAllAppenders(); +} + +AppenderPtr AsyncAppender::getAppender(const LogString& name) const +{ + synchronized sync(appenders->getMutex()); + return appenders->getAppender(name); +} + +bool AsyncAppender::isAttached(const AppenderPtr& appender) const +{ + synchronized sync(appenders->getMutex()); + return appenders->isAttached(appender); +} + +bool AsyncAppender::requiresLayout() const { + return false; +} + +void AsyncAppender::removeAllAppenders() +{ + synchronized sync(appenders->getMutex()); + appenders->removeAllAppenders(); +} + +void AsyncAppender::removeAppender(const AppenderPtr& appender) +{ + synchronized sync(appenders->getMutex()); + appenders->removeAppender(appender); +} + +void AsyncAppender::removeAppender(const LogString& name) +{ + synchronized sync(appenders->getMutex()); + appenders->removeAppender(name); +} + +bool AsyncAppender::getLocationInfo() const { + return locationInfo; +} + +void AsyncAppender::setLocationInfo(bool flag) { + locationInfo = flag; +} + + +void AsyncAppender::setBufferSize(int size) +{ + if (size < 0) { + throw IllegalArgumentException(LOG4CXX_STR("size argument must be non-negative")); + } + synchronized sync(bufferMutex); + bufferSize = (size < 1) ? 1 : size; + bufferNotFull.signalAll(); +} + +int AsyncAppender::getBufferSize() const +{ + return bufferSize; +} + +void AsyncAppender::setBlocking(bool value) { + synchronized sync(bufferMutex); + blocking = value; + bufferNotFull.signalAll(); +} + +bool AsyncAppender::getBlocking() const { + return blocking; +} + +AsyncAppender::DiscardSummary::DiscardSummary(const LoggingEventPtr& event) : + maxEvent(event), count(1) { +} + +AsyncAppender::DiscardSummary::DiscardSummary(const DiscardSummary& src) : + maxEvent(src.maxEvent), count(src.count) { +} + +AsyncAppender::DiscardSummary& AsyncAppender::DiscardSummary::operator=(const DiscardSummary& src) { + maxEvent = src.maxEvent; + count = src.count; + return *this; +} + +void AsyncAppender::DiscardSummary::add(const LoggingEventPtr& event) { + if (event->getLevel()->toInt() > maxEvent->getLevel()->toInt()) { + maxEvent = event; + } + count++; +} + +LoggingEventPtr AsyncAppender::DiscardSummary::createEvent(Pool& p) { + LogString msg(LOG4CXX_STR("Discarded ")); + StringHelper::toString(count, p, msg); + msg.append(LOG4CXX_STR(" messages due to a full event buffer including: ")); + msg.append(maxEvent->getMessage()); + return new LoggingEvent( + maxEvent->getLoggerName(), + maxEvent->getLevel(), + msg, + LocationInfo::getLocationUnavailable()); +} + + +#if APR_HAS_THREADS +void* LOG4CXX_THREAD_FUNC AsyncAppender::dispatch(apr_thread_t* thread, void* data) { + AsyncAppender* pThis = (AsyncAppender*) data; + bool isActive = true; + try { + while (isActive) { + // + // process events after lock on buffer is released. + // + Pool p; + LoggingEventList events; + { + synchronized sync(pThis->bufferMutex); + size_t bufferSize = pThis->buffer.size(); + isActive = !pThis->closed; + + while((bufferSize == 0) && isActive) { + pThis->bufferNotEmpty.await(pThis->bufferMutex); + bufferSize = pThis->buffer.size(); + isActive = !pThis->closed; + } + for(LoggingEventList::iterator eventIter = pThis->buffer.begin(); + eventIter != pThis->buffer.end(); + eventIter++) { + events.push_back(*eventIter); + } + for(DiscardMap::iterator discardIter = pThis->discardMap->begin(); + discardIter != pThis->discardMap->end(); + discardIter++) { + events.push_back(discardIter->second.createEvent(p)); + } + pThis->buffer.clear(); + pThis->discardMap->clear(); + pThis->bufferNotFull.signalAll(); + } + + for (LoggingEventList::iterator iter = events.begin(); + iter != events.end(); + iter++) { + synchronized sync(pThis->appenders->getMutex()); + pThis->appenders->appendLoopOnAppenders(*iter, p); + } + } + } catch(InterruptedException& ex) { + Thread::currentThreadInterrupt(); + } catch(...) { + } + return 0; +} +#endif |