diff options
-rw-r--r-- | AudioManagerCore/src/CAmControlSender.cpp | 3 | ||||
-rw-r--r-- | AudioManagerUtilities/include/CAmCommonAPIWrapper.h | 185 | ||||
-rw-r--r-- | AudioManagerUtilities/include/CAmSocketHandler.h | 54 | ||||
-rw-r--r-- | AudioManagerUtilities/src/CAmCommonAPIWrapper.cpp | 45 | ||||
-rw-r--r-- | AudioManagerUtilities/src/CAmDltWrapper.cpp | 5 | ||||
-rw-r--r-- | AudioManagerUtilities/src/CAmSocketHandler.cpp | 620 | ||||
-rw-r--r-- | AudioManagerUtilities/test/AmSerializerTest/CAmSerializerTest.cpp | 7 | ||||
-rw-r--r-- | AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp | 560 | ||||
-rw-r--r-- | AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.h | 72 | ||||
-rwxr-xr-x | CMakeLists.txt | 4 |
10 files changed, 982 insertions, 573 deletions
diff --git a/AudioManagerCore/src/CAmControlSender.cpp b/AudioManagerCore/src/CAmControlSender.cpp index 17a9b2b..530de32 100644 --- a/AudioManagerCore/src/CAmControlSender.cpp +++ b/AudioManagerCore/src/CAmControlSender.cpp @@ -23,6 +23,7 @@ #include "CAmControlSender.h" #include <cassert> +#include <cstring> #include <fstream> #include <iostream> #include <sstream> @@ -565,12 +566,12 @@ CAmControlSender::CAmControlSender() : receiverCallbackT(this, &CAmControlSender::receiverCallback),// checkerCallbackT(this, &CAmControlSender::checkerCallback),// dispatcherCallbackT(this, &CAmControlSender::dispatcherCallback), // - mPipe(), // mlibHandle(NULL), // mController(NULL), // mSignal(0) { logInfo("CAmControlSender was loaded in test mode!"); + std::memset(mPipe, -1, sizeof(mPipe)); } bool CAmControlSender::dispatcherCallback(const sh_pollHandle_t handle, void* userData) diff --git a/AudioManagerUtilities/include/CAmCommonAPIWrapper.h b/AudioManagerUtilities/include/CAmCommonAPIWrapper.h index 82328d6..a83b5b3 100644 --- a/AudioManagerUtilities/include/CAmCommonAPIWrapper.h +++ b/AudioManagerUtilities/include/CAmCommonAPIWrapper.h @@ -133,24 +133,25 @@ public: * @return Pointer to the socket handler. */ CAmSocketHandler *getSocketHandler() const { return mpSocketHandler; } -#if COMMONAPI_VERSION_NUMBER >= 300 - /** - * \brief Register stub objects. - * - * Example: std::shared_ptr<ConcreteStubClass> aStub; - * registerService( aStub, "local", "com.your_company.instance_name", "service-name"); - * - * @param shStub: Shared pointer to a stub instance - * @param domain: A string with the domain name, usually "local" - * @param instance: Common-api instance string as example "com.your_company.instance_name" - * @param connectionId: A string connection id, which is used by CommonAPI to group applications - * - */ - template <class TStubImp> bool registerService(const std::shared_ptr<TStubImp> & shStub, const std::string & domain, const std::string & instance, const CommonAPI::ConnectionId_t & connectionId) - { - return mRuntime->registerService(domain, instance, shStub, connectionId); - } -#endif + + /** + * \brief Deprecated method. This class is used only in single connection applications and no connectionId is needed. Instead you should use bool registerService(const std::shared_ptr<TStubImp> & shStub, const std::string & domain, const std::string & instance). + * + * + * Example: std::shared_ptr<ConcreteStubClass> aStub; + * registerService( aStub, "local", "com.your_company.instance_name", "service-name"); + * + * @param shStub: Shared pointer to a stub instance + * @param domain: A string with the domain name, usually "local" + * @param instance: Common-api instance string as example "com.your_company.instance_name" + * @param connectionId: A string connection id, which is used by CommonAPI to group applications + * + */ + template <class TStubImp> bool __attribute__((deprecated)) registerService(const std::shared_ptr<TStubImp> & shStub, const std::string & domain, const std::string & instance, const CommonAPI::ConnectionId_t __attribute__((__unused__)) & /*connectionId*/) + { + return mRuntime->registerService(domain, instance, shStub, mContext); + } + /** * \brief Register stub objects. * @@ -181,61 +182,59 @@ public: } /** - * \brief Deprecated method. Instead you should use bool registerService(const std::shared_ptr<TStubImp> & shStub, const std::string & domain, const std::string & instance). - * - * Register stub objects. - * - * Example: std::shared_ptr<ConcreteStubClass> aStub; - * registerService( aStub, "local:com.your_company.interface_name:com.your_company.instance_name"); - * - * @param shStub: Shared pointer to a stub instance - * @param address: Complete common-api address as example "local:com.your_company.interface_name:com.your_company.instance_name" - * - */ - template <class TStubImp> bool __attribute__((deprecated)) registerStub(const std::shared_ptr<TStubImp> & shStub, const std::string & address) - { - std::vector<std::string> parts = CommonAPI::split(address, ':'); - assert(parts.size()==3); - - return registerService(shStub, parts[0], parts[2]); - } - - /** - * \brief Deprecated method. Instead you should use bool unregisterService(const std::string &domain, const std::string &interface, const std::string &instance). - * - * Unregister stub objects. - * - * @param address: Complete common-api address as example "local:com.your_company.interface_name:com.your_company.instance_name" - * - */ - bool __attribute__((deprecated)) unregisterStub(const std::string & address) - { - std::vector<std::string> parts = CommonAPI::split(address, ':'); - assert(parts.size()==3); - - return unregisterService(parts[0], parts[1], parts[2]); - } - -#if COMMONAPI_VERSION_NUMBER >= 300 - /** - * \brief Build proxy objects. - * - * Example: std::shared_ptr<AProxyClass<>> aProxy = buildProxy<AProxyClass>("local", "com.your_company.instance_name", "client-name"); - * - * @param domain: A string with the domain name, usually "local" - * @param instance: Common-api instance string as example "com.your_company.instance_name" - * @param connectionId: A string connection id, which is used by CommonAPI to group applications - * - * @return A proxy object. - */ - template<template<typename ...> class ProxyClass, typename ... AttributeExtensions> - std::shared_ptr<ProxyClass<AttributeExtensions...>> buildProxy(const std::string &domain, const std::string &instance, const CommonAPI::ConnectionId_t & connectionId) - { - return mRuntime->buildProxy<ProxyClass>(domain, instance, connectionId); - } -#endif - - /** + * \brief Deprecated method. Instead you should use bool registerService(const std::shared_ptr<TStubImp> & shStub, const std::string & domain, const std::string & instance). + * + * Register stub objects. + * + * Example: std::shared_ptr<ConcreteStubClass> aStub; + * registerService( aStub, "local:com.your_company.interface_name:com.your_company.instance_name"); + * + * @param shStub: Shared pointer to a stub instance + * @param address: Complete common-api address as example "local:com.your_company.interface_name:com.your_company.instance_name" + * + */ + template <class TStubImp> bool __attribute__((deprecated)) registerStub(const std::shared_ptr<TStubImp> & shStub, const std::string & address) + { + std::vector<std::string> parts = CommonAPI::split(address, ':'); + assert(parts.size()==3); + + return registerService(shStub, parts[0], parts[2]); + } + + /** + * \brief Deprecated method. Instead you should use bool unregisterService(const std::string &domain, const std::string &interface, const std::string &instance). + * + * Unregister stub objects. + * + * @param address: Complete common-api address as example "local:com.your_company.interface_name:com.your_company.instance_name" + * + */ + bool __attribute__((deprecated)) unregisterStub(const std::string & address) + { + std::vector<std::string> parts = CommonAPI::split(address, ':'); + assert(parts.size()==3); + + return unregisterService(parts[0], parts[1], parts[2]); + } + + /** + * \brief Deprecated method. This class is used only in single connection applications and no connectionId is needed. Instead you should use buildProxy(const std::string &domain, const std::string &instance). + * + * Example: std::shared_ptr<AProxyClass<>> aProxy = buildProxy<AProxyClass>("local", "com.your_company.instance_name", "client-name"); + * + * @param domain: A string with the domain name, usually "local" + * @param instance: Common-api instance string as example "com.your_company.instance_name" + * @param connectionId: A string connection id, which is used by CommonAPI to group applications + * + * @return A proxy object. + */ + template<template<typename ...> class ProxyClass, typename ... AttributeExtensions> + std::shared_ptr<ProxyClass<AttributeExtensions...>> __attribute__((deprecated)) buildProxy(const std::string &domain, const std::string &instance, const CommonAPI::ConnectionId_t __attribute__((__unused__)) & /*connectionId*/) + { + return mRuntime->buildProxy<ProxyClass>(domain, instance, mContext); + } + + /** * \brief Build proxy objects. * * Example: std::shared_ptr<AProxyClass<>> aProxy = buildProxy<AProxyClass>("local", "com.your_company.instance_name"); @@ -250,26 +249,26 @@ public: { return mRuntime->buildProxy<ProxyClass>(domain, instance, mContext); } - - - /** - * \brief Deprecated method. Instead you should use buildProxy(const std::string &domain, const std::string &instance). - * - * Build proxy objects. - * Example: std::shared_ptr<AProxyClass<>> aProxy = buildProxy<AProxyClass>("local:com.your_company.interface_name:com.your_company.instance_name"); - * - * @param address: Complete common-api address as example "local:com.your_company.interface_name:com.your_company.instance_name" - * - * @return A proxy object. - */ - template<template<typename ...> class ProxyClass, typename ... AttributeExtensions> - std::shared_ptr<ProxyClass<AttributeExtensions...>> __attribute__((deprecated)) buildProxy(const std::string & address) - { - std::vector<std::string> parts=CommonAPI::split(address, ':'); - assert(parts.size()==3); - - return buildProxy<ProxyClass>(parts[0], parts[2]); - } + + + /** + * \brief Deprecated method. Instead you should use buildProxy(const std::string &domain, const std::string &instance). + * + * Build proxy objects. + * Example: std::shared_ptr<AProxyClass<>> aProxy = buildProxy<AProxyClass>("local:com.your_company.interface_name:com.your_company.instance_name"); + * + * @param address: Complete common-api address as example "local:com.your_company.interface_name:com.your_company.instance_name" + * + * @return A proxy object. + */ + template<template<typename ...> class ProxyClass, typename ... AttributeExtensions> + std::shared_ptr<ProxyClass<AttributeExtensions...>> __attribute__((deprecated)) buildProxy(const std::string & address) + { + std::vector<std::string> parts=CommonAPI::split(address, ':'); + assert(parts.size()==3); + + return buildProxy<ProxyClass>(parts[0], parts[2]); + } }; diff --git a/AudioManagerUtilities/include/CAmSocketHandler.h b/AudioManagerUtilities/include/CAmSocketHandler.h index 717f792..db3207e 100644 --- a/AudioManagerUtilities/include/CAmSocketHandler.h +++ b/AudioManagerUtilities/include/CAmSocketHandler.h @@ -215,6 +215,15 @@ public: */ class CAmSocketHandler { + typedef enum:uint8_t + { + ADD = 0u, // new, uninitialized element which needs to be added to ppoll array + UPDATE = 1u, // update of event information therefore update ppoll array + VALID = 2u, // it is a valid element in ppoll array + REMOVE = 3u, // remove from ppoll array and internal map + INVALID = 4u // uninit element requested to be removed from internal map only + } poll_states_e; + struct sh_poll_s //!<struct that holds information about polls { sh_pollHandle_t handle; //!<handle to uniquely adress a filedesriptor @@ -224,9 +233,10 @@ class CAmSocketHandler std::function<bool(const sh_pollHandle_t handle, void* userData)> checkCB; //check callback std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatchCB; //dispatch callback void* userData; + poll_states_e state; sh_poll_s() : - handle(0), pollfdValue(), prepareCB(), firedCB(), checkCB(), dispatchCB(), userData(0) + handle(0), pollfdValue(), prepareCB(), firedCB(), checkCB(), dispatchCB(), userData(0), state(ADD) {} }; @@ -244,7 +254,7 @@ class CAmSocketHandler sh_timer_s() : handle(0) #ifdef WITH_TIMERFD - , fd(0) + , fd(-1) #endif , countdown(), callback(), userData(0) {} @@ -271,42 +281,43 @@ class CAmSocketHandler }; typedef std::reverse_iterator<sh_timer_s> rListTimerIter; //!<typedef for reverseiterator on timer lists - typedef std::vector<pollfd> VectorListPollfd_t; //!<vector of filedescriptors - typedef std::vector<sh_poll_s> VectorListPoll_t; //!<list for the callbacks + typedef std::vector<pollfd> VectorPollfd_t; //!<vector of filedescriptors + typedef std::map<int, sh_poll_s> MapShPoll_t; //!<list for the callbacks typedef std::vector<sh_signal_s> VectorSignalHandlers_t; //!<list for the callbacks typedef enum:uint8_t { - NO_ERROR = 0u, // OK - PIPE_ERROR = 1u, // Pipe error - FD_ERROR = 2u, // Invalid file descriptor - SFD_ERROR = 4u, + NO_ERROR = 0u, // OK + FD_ERROR = 1u, // Invalid file descriptor + MT_ERROR = 2u // Multi-thread issue } internal_codes_e; typedef uint8_t internal_codes_t; - int mPipe[2]; + int mEventFd; + int mSignalFd; bool mDispatchDone; //this starts / stops the mainloop + MapShPoll_t mMapShPoll; //!<list that holds all information for the ppoll sh_identifier_s mSetPollKeys; //!A set of all used ppoll keys - VectorListPoll_t mListPoll; //!<list that holds all information for the ppoll sh_identifier_s mSetTimerKeys; //!A set of all used timer keys std::list<sh_timer_s> mListTimer; //!<list of all timers +#ifndef WITH_TIMERFD std::list<sh_timer_s> mListActiveTimer; //!<list of all currently active timers +#endif sh_identifier_s mSetSignalhandlerKeys; //!A set of all used signal handler keys VectorSignalHandlers_t mSignalHandlers; - bool mRecreatePollfds; //!<when this is true, the poll list needs to be recreated internal_codes_t mInternalCodes; - sh_pollHandle_t mSignalFdHandle; #ifndef WITH_TIMERFD timespec mStartTime; //!<here the actual time is saved for timecorrection #endif + private: bool fdIsValid(const int fd) const; + void wakeupWorker(const std::string & func, const uint64_t value = 1u); timespec* insertTime(timespec& buffertime); #ifdef WITH_TIMERFD am_Error_e createTimeFD(const itimerspec & timeouts, int & fd); - #else void timerUp(); void timerCorrection(); @@ -408,28 +419,21 @@ private: * @param a * @return */ - inline static void fire(sh_poll_s& a); - - /** - * functor to return all fired events - * @param a - * @return - */ - inline static bool eventFired(const pollfd& a); + inline static void fire(const sh_poll_s& a); /** * functor to help find the items that do not need dispatching * @param a * @return */ - inline static bool noDispatching(const sh_poll_s& a); + inline static bool noDispatching(const sh_poll_s* a); /** * checks if dispatching is already finished * @param a * @return */ - inline static bool dispatchingFinished(const sh_poll_s& a); + inline static bool dispatchingFinished(const sh_poll_s* a); /** * timer fire callback @@ -444,9 +448,7 @@ private: * @return handle */ bool nextHandle(sh_identifier_s & handle); - - am_Error_e getFDPollData(const sh_pollHandle_t handle, sh_poll_s & outPollData); - + public: CAmSocketHandler(); diff --git a/AudioManagerUtilities/src/CAmCommonAPIWrapper.cpp b/AudioManagerUtilities/src/CAmCommonAPIWrapper.cpp index 2aa8370..caa7aa8 100644 --- a/AudioManagerUtilities/src/CAmCommonAPIWrapper.cpp +++ b/AudioManagerUtilities/src/CAmCommonAPIWrapper.cpp @@ -156,7 +156,7 @@ void CAmCommonAPIWrapper::commonFireCallback(const pollfd pollfd, const sh_pollH return; } - mWatchToCheck->dispatch(pollfd.events); + mWatchToCheck->dispatch(pollfd.revents); } void CAmCommonAPIWrapper::commonPrepareCallback(const sh_pollHandle_t, void*) @@ -207,20 +207,37 @@ void CAmCommonAPIWrapper::registerTimeout(CommonAPI::Timeout* timeout, const Com { timespec pollTimeout; int64_t localTimeout = timeout->getTimeoutInterval(); - - pollTimeout.tv_sec = localTimeout / 1000; - pollTimeout.tv_nsec = (localTimeout % 1000) * 1000000; + + if(CommonAPI::TIMEOUT_INFINITE==localTimeout)//dispatch never + { + pollTimeout.tv_sec = localTimeout; + pollTimeout.tv_nsec = 0; + } + else if(CommonAPI::TIMEOUT_NONE==localTimeout)//dispatch immediately + { + pollTimeout.tv_sec = 0; + pollTimeout.tv_nsec = 1000000; + } + else + { + pollTimeout.tv_sec = localTimeout / 1000; + pollTimeout.tv_nsec = (localTimeout % 1000) * 1000000; + } //prepare handle and callback. new is eval, but there is no other choice because we need the pointer! sh_timerHandle_t handle; //add the timer to the pollLoop - mpSocketHandler->addTimer(pollTimeout, &pCommonTimerCallback, handle, timeout); - - timerHandles myHandle({handle,timeout}); - mpListTimerhandles.push_back(myHandle); - - return; + am_Error_e error = mpSocketHandler->addTimer(pollTimeout, &pCommonTimerCallback, handle, timeout); + if (error != am_Error_e::E_OK || handle == 0) + { + logError(__func__,"adding timer failed"); + } + else + { + timerHandles myHandle({handle,timeout}); + mpListTimerhandles.push_back(myHandle); + } } void CAmCommonAPIWrapper::deregisterTimeout(CommonAPI::Timeout* timeout) @@ -243,10 +260,12 @@ void CAmCommonAPIWrapper::registerWatch(CommonAPI::Watch* watch, const CommonAPI am_Error_e error = mpSocketHandler->addFDPoll(pollfd_.fd, pollfd_.events, &pCommonPrepareCallback, &pCommonFireCallback, &pCommonCheckCallback, &pCommonDispatchCallback, watch, handle); //if everything is alright, add the watch and the handle to our map so we know this relationship - if (error == !am_Error_e::E_OK || handle == 0) + if (error != am_Error_e::E_OK || handle == 0) + { logError(__func__,"entering watch failed"); - - mMapWatches.insert(std::make_pair(pollfd_.fd,watch)); + } + else + mMapWatches.insert(std::make_pair(pollfd_.fd,watch)); } void CAmCommonAPIWrapper::commonTimerCallback(sh_timerHandle_t handle, void *) diff --git a/AudioManagerUtilities/src/CAmDltWrapper.cpp b/AudioManagerUtilities/src/CAmDltWrapper.cpp index 742b396..44ec614 100644 --- a/AudioManagerUtilities/src/CAmDltWrapper.cpp +++ b/AudioManagerUtilities/src/CAmDltWrapper.cpp @@ -23,13 +23,14 @@ */ -#include "CAmDltWrapper.h" #include <string> #include <iostream> #include <string.h> #include <chrono> #include <ctime> +#include <sys/types.h> #include <unistd.h> +#include "CAmDltWrapper.h" namespace am { @@ -625,7 +626,7 @@ bool CAmDltWrapper::initNoDlt(DltLogLevelType loglevel, DltContext* context) bool CAmDltWrapper::init(DltLogLevelType loglevel, DltContext* context) { pthread_mutex_lock(&mMutex); - initNoDlt(loglevel,context); + return initNoDlt(loglevel,context); } void CAmDltWrapper::send() diff --git a/AudioManagerUtilities/src/CAmSocketHandler.cpp b/AudioManagerUtilities/src/CAmSocketHandler.cpp index fad60e5..472fa7f 100644 --- a/AudioManagerUtilities/src/CAmSocketHandler.cpp +++ b/AudioManagerUtilities/src/CAmSocketHandler.cpp @@ -26,11 +26,12 @@ #include <sys/fcntl.h> #include <sys/errno.h> #include <sys/poll.h> +#include <sys/eventfd.h> #include <time.h> #include <algorithm> #include <features.h> #include <csignal> -#include <unistd.h> +#include <cstring> #include "CAmDltWrapper.h" #include "CAmSocketHandler.h" @@ -39,40 +40,66 @@ #include <sys/timerfd.h> #endif +#define END_EVENT (UINT64_MAX >> 1) + namespace am { CAmSocketHandler::CAmSocketHandler() : - mPipe(), // - mDispatchDone(true), // - mSetPollKeys(MAX_POLLHANDLE), // - mListPoll(), // - mSetTimerKeys(MAX_TIMERHANDLE), - mListTimer(), // - mListActiveTimer(), // - mSetSignalhandlerKeys(MAX_POLLHANDLE), // - mSignalHandlers(), // - mRecreatePollfds(true), - mInternalCodes(internal_codes_e::NO_ERROR), - mSignalFdHandle(0) + mEventFd(-1), // + mSignalFd(-1), // + mDispatchDone(true), // + mSetPollKeys(MAX_POLLHANDLE), // + mMapShPoll(), // + mSetTimerKeys(MAX_TIMERHANDLE), + mListTimer(), // +#ifndef WITH_TIMERFD + mListActiveTimer(), // +#endif + mSetSignalhandlerKeys(MAX_POLLHANDLE), // + mSignalHandlers(), // + mInternalCodes(internal_codes_e::NO_ERROR) #ifndef WITH_TIMERFD -,mStartTime() // + ,mStartTime() // #endif { - if (pipe(mPipe) == -1) + + auto actionPoll = [this](const pollfd pollfd, const sh_pollHandle_t, void*) { - mInternalCodes = internal_codes_e::PIPE_ERROR; - logError("Sockethandler could not create pipe!"); - } + /* We have a valid signal, read the info from the fd */ + uint64_t events; + ssize_t bytes = read(pollfd.fd, &events, sizeof(events)); + if (bytes == sizeof(events)) + { + if (events >= END_EVENT) + { + for (auto & elem : mMapShPoll) + { + if (elem.second.state == poll_states_e::UPDATE || + elem.second.state == poll_states_e::VALID) + { + elem.second.state = poll_states_e::ADD; + } + } + mDispatchDone = true; + } + return; + } + + // ppoll on EAGAIN + if ((bytes == -1) && (errno == EAGAIN)) + return; + + //Failed to read from event fd... + std::ostringstream msg; + msg << "Failed to read from event fd: " << pollfd.fd << " errno: " << std::strerror(errno); + throw std::runtime_error(msg.str()); + }; //add the pipe to the poll - nothing needs to be processed here we just need the pipe to trigger the ppoll - short event = 0; sh_pollHandle_t handle; - event |= POLLIN; - if (addFDPoll(mPipe[0], event, NULL, - [](const pollfd, const sh_pollHandle_t, void*){}, - [](const sh_pollHandle_t, void*) { return (false); }, - NULL, NULL, handle) != E_OK) + mEventFd = eventfd(1, EFD_NONBLOCK | EFD_CLOEXEC); + if (addFDPoll(mEventFd, POLLIN, NULL, actionPoll, NULL, NULL, NULL, handle) != E_OK) { mInternalCodes |= internal_codes_e::FD_ERROR; } @@ -80,12 +107,10 @@ CAmSocketHandler::CAmSocketHandler() : CAmSocketHandler::~CAmSocketHandler() { - for (auto it : mListPoll) + for (const auto& it : mMapShPoll) { - close(it.pollfdValue.fd); + close(it.second.pollfdValue.fd); } - close(mPipe[0]); - close(mPipe[1]); } //todo: maybe have some: give me more time returned? @@ -95,80 +120,88 @@ CAmSocketHandler::~CAmSocketHandler() void CAmSocketHandler::start_listenting() { mDispatchDone = false; - int16_t pollStatus; #ifndef WITH_TIMERFD clock_gettime(CLOCK_MONOTONIC, &mStartTime); #endif timespec buffertime; - - VectorListPoll_t cloneListPoll; - VectorListPoll_t::iterator listmPollIt; - VectorListPollfd_t fdPollingArray; //!<the polling array for ppoll - - auto preparePollfd = [&](const sh_poll_s& row) - { - CAmSocketHandler::prepare((sh_poll_s&)row); - pollfd temp = row.pollfdValue; - temp.revents = 0; - fdPollingArray.push_back(temp); - }; + + VectorPollfd_t fdPollingArray; //!<the polling array for ppoll while (!mDispatchDone) { - if (mRecreatePollfds) + /* Iterate all times through map and synchronize the polling array accordingly. + * In case a new element in map appears the polling array will be extended and + * in case an element gets removed the map and the polling array needs to be adapted. + */ + auto fdPollIt = fdPollingArray.begin(); + for (auto it = mMapShPoll.begin(); it != mMapShPoll.end(); ) { - fdPollingArray.clear(); - //freeze mListPoll by copying it - otherwise we get problems when we want to manipulate it during the next lines - cloneListPoll = mListPoll; - //there was a change in the setup, so we need to recreate the fdarray from the list - std::for_each(cloneListPoll.begin(), cloneListPoll.end(), preparePollfd); - mRecreatePollfds = false; + // NOTE: The order of the switch/case statement reflects the state flow + auto& elem = it->second; + switch (elem.state) + { + case poll_states_e::ADD: + elem.state = poll_states_e::UPDATE; + fdPollIt = fdPollingArray.emplace(fdPollIt); + break; + + case poll_states_e::UPDATE: + elem.state = poll_states_e::VALID; + CAmSocketHandler::prepare(elem); + *fdPollIt = elem.pollfdValue; + break; + + case poll_states_e::VALID: + // check for multi-thread access + assert(fdPollIt != fdPollingArray.end()); + ++fdPollIt; + ++it; + break; + + case poll_states_e::REMOVE: + elem.state = poll_states_e::INVALID; + fdPollIt = fdPollingArray.erase(fdPollIt); + break; + + case poll_states_e::INVALID: + it = mMapShPoll.erase(it); + break; + } } - else + + if (fdPollingArray.size() != mMapShPoll.size()) { - //first we go through the registered filedescriptors and check if someone needs preparation: - std::for_each(cloneListPoll.begin(), cloneListPoll.end(), CAmSocketHandler::prepare); + mInternalCodes |= internal_codes_e::MT_ERROR; + logError("CAmSocketHandler::start_listenting is NOT multi-thread safe!"); + return; } #ifndef WITH_TIMERFD timerCorrection(); #endif - //block until something is on a filedescriptor - if ((pollStatus = ppoll(&fdPollingArray[0], fdPollingArray.size(), insertTime(buffertime), NULL)) < 0) + // block until something is on a file descriptor + int16_t pollStatus = ppoll(&fdPollingArray[0], fdPollingArray.size(), insertTime(buffertime), NULL); + if (pollStatus > 0) { - if (errno == EINTR) - { - //a signal was received, that means it's time to go... - pollStatus = 0; - } - else + // stage 0+1, call firedCB + std::list<sh_poll_s*> listPoll; + for (auto& it : fdPollingArray) { - logError("SocketHandler::start_listenting ppoll returned with error", errno); - throw std::runtime_error(std::string("SocketHandler::start_listenting ppoll returned with error.")); - } - } - - if (pollStatus != 0) //only check filedescriptors if there was a change - { - std::list<sh_poll_s> listPoll; - //todo: here could be a timer that makes sure naughty plugins return! - //stage 0+1, call firedCB - listmPollIt = cloneListPoll.begin(); - for (auto it : fdPollingArray) - { - if (CAmSocketHandler::eventFired(it)) - { - listmPollIt->pollfdValue.revents = it.revents; - listPoll.push_back(*listmPollIt); - CAmSocketHandler::fire(*listmPollIt); - } - else - { - listmPollIt->pollfdValue.revents = 0; - } - listmPollIt++; + it.revents &= it.events; + if (it.revents == 0) + continue; + + sh_poll_s& pollObj = mMapShPoll.at(it.fd); + if (pollObj.state != poll_states_e::VALID) + continue; + + // ensure to copy the revents fired in fdPollingArray + pollObj.pollfdValue.revents = it.revents; + listPoll.push_back(&pollObj); + CAmSocketHandler::fire(pollObj); + it.revents = 0; } //stage 2, lets ask around if some dispatching is necessary, the ones who need stay on the list @@ -179,16 +212,20 @@ void CAmSocketHandler::start_listenting() { listPoll.remove_if(CAmSocketHandler::dispatchingFinished); } while (!listPoll.empty()); - } -#ifndef WITH_TIMERFD + else if ((pollStatus < 0) && (errno != EINTR)) + { + logError("SocketHandler::start_listenting ppoll returned with error", errno); + throw std::runtime_error(std::string("SocketHandler::start_listenting ppoll returned with error.")); + } else //Timerevent { +#ifndef WITH_TIMERFD //this was a timer event, we need to take care about the timers //find out the timedifference to starttime timerUp(); - } #endif + } } } @@ -197,7 +234,12 @@ void CAmSocketHandler::start_listenting() */ void CAmSocketHandler::stop_listening() { - mDispatchDone = true; + //fire the ending event + if (mDispatchDone) + return; + + wakeupWorker("stop_listening", END_EVENT); + #ifndef WITH_TIMERFD //this is for all running timers only - we need to handle the additional offset here if (!mListActiveTimer.empty()) @@ -215,29 +257,22 @@ void CAmSocketHandler::exit_mainloop() { //end the while loop stop_listening(); - - //fire the ending filedescriptor - int p(1); - ssize_t result = write(mPipe[1], &p, sizeof(p)); } -bool CAmSocketHandler::fatalErrorOccurred() -{ - return ((mInternalCodes&internal_codes_e::PIPE_ERROR)>0)||((mInternalCodes&internal_codes_e::FD_ERROR)>0); -} - -am_Error_e CAmSocketHandler::getFDPollData(const sh_pollHandle_t handle, sh_poll_s & outPollData) +void CAmSocketHandler::wakeupWorker(const std::string & func, const uint64_t value) { - VectorListPoll_t::iterator iterator = mListPoll.begin(); - for (; iterator != mListPoll.end(); ++iterator) + if (write(mEventFd, &value, sizeof(value)) < 0) { - if (iterator->handle == handle) - { - outPollData = *iterator; - return (E_OK); - } + // no log message here, it is already done in main.cpp + std::ostringstream msg("CAmSocketHandler::"); + msg << func << " Failed to write to event fd: " << mEventFd << " errno: " << std::strerror(errno); + throw std::runtime_error(msg.str()); } - return (E_UNKNOWN); +} + +bool CAmSocketHandler::fatalErrorOccurred() +{ + return (mInternalCodes != internal_codes_e::NO_ERROR); } /** @@ -287,54 +322,51 @@ am_Error_e CAmSocketHandler::listenToSignals(const std::vector<uint8_t> & listSi return (E_NOT_POSSIBLE); } - int signalHandlerFd; - if(mSignalFdHandle) + if (mSignalFd < 0) + { + /* Create the signalfd */ + mSignalFd = signalfd(-1, &sigset, SFD_NONBLOCK); + if (mSignalFd == -1) + { + logError("Could not open signal fd!", std::strerror(errno)); + return (E_NOT_POSSIBLE); + } + + auto actionPoll = [this](const pollfd pollfd, const sh_pollHandle_t, void*) + { + /* We have a valid signal, read the info from the fd */ + struct signalfd_siginfo info; + ssize_t bytes = read(pollfd.fd, &info, sizeof(info)); + if (bytes == sizeof(info)) + { + /* Notify all listeners */ + for(const auto& it: mSignalHandlers) + it.callback(it.handle, info, it.userData); + return; + } + + // ppoll on EAGAIN + if ((bytes == -1) && (errno == EAGAIN)) + return; + + //Failed to read from fd... + std::ostringstream msg; + msg << "Failed to read from signal fd: " << pollfd.fd << " errno: " << std::strerror(errno); + throw std::runtime_error(msg.str()); + }; + /* We're going to add the signal fd through addFDPoll. At this point we don't have any signal listeners. */ + sh_pollHandle_t handle; + return addFDPoll(mSignalFd, POLLIN | POLLERR | POLLHUP, NULL, actionPoll, NULL, NULL, NULL, handle); + } + else { - sh_poll_s sgPollData; - if(E_OK!=getFDPollData(mSignalFdHandle, sgPollData)) - { - removeFDPoll(mSignalFdHandle); - mSignalFdHandle = 0; - } - else - { - int signalHandlerFd = signalfd(sgPollData.pollfdValue.fd, &sigset, 0); - if (signalHandlerFd == -1) + if (signalfd(mSignalFd, &sigset, 0) == -1) { - logError("Could not update signal fd!"); + logError("Could not update signal fd!", std::strerror(errno)); return (E_NOT_POSSIBLE); } return E_OK; - } } - - if(0==mSignalFdHandle) - { - /* Create the signalfd */ - signalHandlerFd = signalfd(-1, &sigset, 0); - if (signalHandlerFd == -1) - { - logError("Could not open signal fd!"); - return (E_NOT_POSSIBLE); - } - - auto actionPoll = [this](const pollfd pollfd, const sh_pollHandle_t, void*) - { - const VectorSignalHandlers_t & signalHandlers = mSignalHandlers; - /* We have a valid signal, read the info from the fd */ - struct signalfd_siginfo info; - ssize_t bytes = read(pollfd.fd, &info, sizeof(info)); - assert(bytes == sizeof(info)); - - /* Notify all listeners */ - for(auto it: signalHandlers) - it.callback(it.handle, info, it.userData); - }; - /* We're going to add the signal fd through addFDPoll. At this point we don't have any signal listeners. */ - am_Error_e shFdError = addFDPoll(signalHandlerFd, POLLIN | POLLERR | POLLHUP, NULL, actionPoll, [](const sh_pollHandle_t, void*) - { return (false);}, NULL, NULL, mSignalFdHandle); - return shFdError; - } } /** @@ -347,24 +379,53 @@ am_Error_e CAmSocketHandler::listenToSignals(const std::vector<uint8_t> & listSi * @param dispatch a std::function that is called to dispatch the received data * @param userData a pointer to userdata that is always passed around * @param handle the handle of this poll - * @return E_OK if the descriptor was added, E_NON_EXISTENT if the fd is not valid + * @return E_OK if the descriptor was added + * E_NON_EXISTENT if the fd is not valid + * E_ALREADY_EXISTS if the fd is already known + * E_NOT_POSSIBLE if the maximum handle threshold is reached */ -am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, std::function<void(const sh_pollHandle_t handle, void* userData)> prepare, - std::function<void(const pollfd pollfd, const sh_pollHandle_t handle, void* userData)> fired, std::function<bool(const sh_pollHandle_t handle, void* userData)> check, - std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatch, void* userData, sh_pollHandle_t& handle) +am_Error_e CAmSocketHandler::addFDPoll(const int fd, + const short event, + std::function<void(const sh_pollHandle_t handle, void* userData)> prepare, + std::function<void(const pollfd pollfd, const sh_pollHandle_t handle, void* userData)> fired, + std::function<bool(const sh_pollHandle_t handle, void* userData)> check, + std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatch, + void* userData, + sh_pollHandle_t& handle) { + sh_poll_s pollData; + if (!fdIsValid(fd)) - return (E_NON_EXISTENT); + return E_NON_EXISTENT; + + const auto elem = mMapShPoll.find(fd); + if (elem != mMapShPoll.end()) + { + // The fd was already in map therefore we need to trigger an update instead + switch (elem->second.state) + { + case poll_states_e::REMOVE: + pollData.state = poll_states_e::UPDATE; + break; + + case poll_states_e::INVALID: + pollData.state = poll_states_e::ADD; + break; + + default: + logError("CAmSocketHandler::addFDPoll fd", fd, "already registered!"); + return E_ALREADY_EXISTS; + } + } //create a new handle for the poll if (!nextHandle(mSetPollKeys)) { - logError("Could not create new polls, too many open!"); + logError("CAmSocketHandler::addFDPoll Max handle count reached!"); return (E_NOT_POSSIBLE); } - sh_poll_s pollData; pollData.pollfdValue.fd = fd; pollData.handle = mSetPollKeys.lastUsedID; pollData.pollfdValue.events = event; @@ -374,10 +435,10 @@ am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, std::fun pollData.checkCB = check; pollData.dispatchCB = dispatch; pollData.userData = userData; - //add new data to the list - mListPoll.push_back(pollData); - mRecreatePollfds = true; + //add new data to the list + mMapShPoll[fd] = pollData; + wakeupWorker("addFDPoll"); handle = pollData.handle; return (E_OK); @@ -394,15 +455,18 @@ am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, std::fun * @param dispatch a callback that is called to dispatch the received data * @param userData a pointer to userdata that is always passed around * @param handle the handle of this poll - * @return E_OK if the descriptor was added, E_NON_EXISTENT if the fd is not valid + * @return E_OK if the descriptor was added + * E_NON_EXISTENT if the fd is not valid + * E_ALREADY_EXISTS if the fd is already known + * E_NOT_POSSIBLE if the maximum handle threshold is reached */ am::am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, IAmShPollPrepare *prepare, IAmShPollFired *fired, IAmShPollCheck *check, IAmShPollDispatch *dispatch, void *userData, sh_pollHandle_t & handle) { - std::function<void(const sh_pollHandle_t handle, void* userData)> prepareCB; //preperation callback - std::function<void(const pollfd poll, const sh_pollHandle_t handle, void* userData)> firedCB; //fired callback - std::function<bool(const sh_pollHandle_t handle, void* userData)> checkCB; //check callback - std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatchCB; //check callback + std::function<void(const sh_pollHandle_t, void*)> prepareCB; //preperation callback + std::function<void(const pollfd, const sh_pollHandle_t, void*)> firedCB; //fired callback + std::function<bool(const sh_pollHandle_t, void*)> checkCB; //check callback + std::function<bool(const sh_pollHandle_t, void*)> dispatchCB; //check callback if (prepare) prepareCB = std::bind(&IAmShPollPrepare::Call, prepare, std::placeholders::_1, std::placeholders::_2); @@ -419,23 +483,23 @@ am::am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, IAmS /** * removes a filedescriptor from the poll loop * @param handle + * @param [rmv] default RMV_ONLY_FDPOLL * @return */ am_Error_e CAmSocketHandler::removeFDPoll(const sh_pollHandle_t handle) { - VectorListPoll_t::iterator iterator = mListPoll.begin(); - - for (; iterator != mListPoll.end(); ++iterator) + for (auto& it : mMapShPoll) { - if (iterator->handle == handle) + if (it.second.handle == handle) { - iterator = mListPoll.erase(iterator); + it.second.state = (it.second.state == poll_states_e::ADD ? poll_states_e::INVALID : poll_states_e::REMOVE); + wakeupWorker("removeFDPoll"); mSetPollKeys.pollHandles.erase(handle); - mRecreatePollfds = true; - return (E_OK); + return E_OK; } } - return (E_UNKNOWN); + logWarning("CAmSocketHandler::removeFDPoll handle unknown", handle); + return E_UNKNOWN; } /** @@ -449,7 +513,7 @@ am_Error_e CAmSocketHandler::addSignalHandler(std::function<void(const sh_pollHa { if (!nextHandle(mSetSignalhandlerKeys)) { - logError("Could not create new polls, too many open!"); + logError("CAmSocketHandler::addSignalHandler Could not create new polls, too many open!"); return (E_NOT_POSSIBLE); } @@ -474,7 +538,7 @@ am_Error_e CAmSocketHandler::removeSignalHandler(const sh_pollHandle_t handle) { if (it->handle == handle) { - it = mSignalHandlers.erase(it); + mSignalHandlers.erase(it); mSetSignalhandlerKeys.pollHandles.erase(handle); return (E_OK); } @@ -508,17 +572,17 @@ am_Error_e CAmSocketHandler::addTimer(const timespec & timeouts, std::function<v { assert(!((timeouts.tv_sec == 0) && (timeouts.tv_nsec == 0))); - mListTimer.emplace_back(); - sh_timer_s & timerItem = mListTimer.back(); - #ifndef WITH_TIMERFD //create a new handle for the timer if (!nextHandle(mSetTimerKeys)) { - logError("Could not create new timers, too many open!"); - mListTimer.pop_back(); + logError("CAmSocketHandler::addTimer Could not create new timers, too many open!"); return (E_NOT_POSSIBLE); } + + mListTimer.emplace_back(); + sh_timer_s & timerItem = mListTimer.back(); + //create a new handle for the timer handle = mSetTimerKeys.lastUsedID; @@ -533,12 +597,13 @@ am_Error_e CAmSocketHandler::addTimer(const timespec & timeouts, std::function<v clock_gettime(CLOCK_MONOTONIC, ¤tTime); if (!mDispatchDone)//the mainloop is started timerItem.countdown = timespecAdd(timeouts, timespecSub(currentTime, mStartTime)); - mListTimer.push_back(timerItem); mListActiveTimer.push_back(timerItem); mListActiveTimer.sort(compareCountdown); return (E_OK); #else + sh_timer_s timerItem; + timerItem.countdown.it_value = timeouts; if (repeats) timerItem.countdown.it_interval = timeouts; @@ -554,35 +619,43 @@ am_Error_e CAmSocketHandler::addTimer(const timespec & timeouts, std::function<v timerItem.userData = userData; am_Error_e err = createTimeFD(timerItem.countdown, timerItem.fd); if (err != E_OK) - { - mListTimer.pop_back(); return err; - } - static auto actionPoll = [](const pollfd pollfd, const sh_pollHandle_t handle, void* userData) + auto actionPoll = [this](const pollfd pollfd, const sh_pollHandle_t handle, void* userData) { - uint64_t mExpirations; - if (read(pollfd.fd, &mExpirations, sizeof(uint64_t)) == -1) - { - //error received...try again - read(pollfd.fd, &mExpirations, sizeof(uint64_t)); - } + uint64_t expCnt; + ssize_t bytes = read(pollfd.fd, &expCnt, sizeof(expCnt)); + if (bytes == sizeof(expCnt)) + return; + + // ppoll has to be called again in following case + if ((bytes == -1) && (errno == EAGAIN)) + return; + + // failed to read data from timer_fd... + std::ostringstream msg; + msg << "Failed to read from timer fd: " << pollfd.fd << " errno: " << std::strerror(errno); + throw std::runtime_error(msg.str()); }; - err = addFDPoll(timerItem.fd, POLLIN, NULL, actionPoll, [callback](const sh_pollHandle_t handle, void* userData)->bool - { - callback(handle, userData); - return false; - }, - NULL, userData, handle); - if (E_OK == err) + err = addFDPoll(timerItem.fd, POLLIN | POLLERR, NULL, actionPoll, + [callback](const sh_pollHandle_t handle, void* userData)->bool { + callback(handle, userData); + return false; + }, + NULL, userData, handle); + + if (err == E_OK) { timerItem.handle = handle; + mListTimer.push_back(timerItem); + return E_OK; } - else - { - mListTimer.pop_back(); - } + + // E_NOT_POSSIBLE is the only case were we need to close the timer + if (err == E_NOT_POSSIBLE) + close(timerItem.fd); + return err; #endif @@ -599,29 +672,32 @@ am_Error_e CAmSocketHandler::removeTimer(const sh_timerHandle_t handle) //stop the current timer #ifdef WITH_TIMERFD - std::list<sh_timer_s>::iterator it = mListTimer.begin(); - for (; it != mListTimer.end(); ++it) + std::list<sh_timer_s>::iterator it(mListTimer.begin()); + while (it != mListTimer.end()) { if (it->handle == handle) - break; + { + am_Error_e err = removeFDPoll(handle); + close(it->fd); + mListTimer.erase(it); + return err; + } + ++it; } - if (it == mListTimer.end()) - return (E_NON_EXISTENT); + return (E_NON_EXISTENT); - close(it->fd); - mListTimer.erase(it); - return removeFDPoll(handle); #else stopTimer(handle); std::list<sh_timer_s>::iterator it(mListTimer.begin()); - for (; it != mListTimer.end(); ++it) + while (it != mListTimer.end()) { if (it->handle == handle) { - it = mListTimer.erase(it); + mListTimer.erase(it); mSetTimerKeys.pollHandles.erase(handle); return (E_OK); } + ++it; } return (E_UNKNOWN); #endif @@ -798,38 +874,38 @@ am_Error_e CAmSocketHandler::restartTimer(const sh_timerHandle_t handle) am_Error_e CAmSocketHandler::stopTimer(const sh_timerHandle_t handle) { #ifdef WITH_TIMERFD - std::list<sh_timer_s>::iterator it = mListTimer.begin(); - for (; it != mListTimer.end(); ++it) + for (auto elem : mListTimer) { - if (it->handle == handle) - break; - } - if (it == mListTimer.end()) - return (E_NON_EXISTENT); + if (elem.handle != handle) + continue; - itimerspec countdown = it->countdown; - countdown.it_value.tv_nsec = 0; - countdown.it_value.tv_sec = 0; + itimerspec countdown = elem.countdown; + countdown.it_value.tv_nsec = 0; + countdown.it_value.tv_sec = 0; - if (timerfd_settime(it->fd, 0, &countdown, NULL)) - { - logError("Failed to set timer duration"); - return E_NOT_POSSIBLE; + if (timerfd_settime(elem.fd, 0, &countdown, NULL) < 0) + { + logError("Failed to set timer duration"); + return E_NOT_POSSIBLE; + } + + return E_OK; } - return (E_OK); -#else +#else //go through the list and remove the timer with the handle std::list<sh_timer_s>::iterator it(mListActiveTimer.begin()); - for (; it != mListActiveTimer.end(); ++it) + while (it != mListActiveTimer.end()) { if (it->handle == handle) { - it = mListActiveTimer.erase(it); - return (E_OK); + mListActiveTimer.erase(it); + return E_OK; } + ++it; } - return (E_NON_EXISTENT); #endif + + return E_NON_EXISTENT; } /** @@ -840,15 +916,28 @@ am_Error_e CAmSocketHandler::stopTimer(const sh_timerHandle_t handle) */ am_Error_e CAmSocketHandler::updateEventFlags(const sh_pollHandle_t handle, const short events) { - VectorListPoll_t::iterator iterator = mListPoll.begin(); - - for (; iterator != mListPoll.end(); ++iterator) + for (auto& it : mMapShPoll) { - if (iterator->handle == handle) + auto& elem = it.second; + if (elem.handle != handle) + continue; + + switch (elem.state) { - iterator->pollfdValue.events = events; - mRecreatePollfds = true; - return (E_OK); + case poll_states_e::ADD: + elem.pollfdValue.events = events; + return (E_OK); + + case poll_states_e::UPDATE: + case poll_states_e::VALID: + elem.state = poll_states_e::UPDATE; + elem.pollfdValue.revents = 0; + elem.pollfdValue.events = events; + return (E_OK); + + default: + // This issue should never happen! + return (E_DATABASE_ERROR); } } return (E_UNKNOWN); @@ -951,60 +1040,55 @@ void CAmSocketHandler::timerCorrection() */ void CAmSocketHandler::prepare(am::CAmSocketHandler::sh_poll_s& row) { - if (row.prepareCB) + if (!row.prepareCB) + return; + + try { - try - { - row.prepareCB(row.handle, row.userData); - } catch (std::exception& e) - { - logError("Sockethandler: Exception in Preparecallback,caught", e.what()); - } + row.prepareCB(row.handle, row.userData); + } + catch (std::exception& e) + { + logError("CAmSocketHandler::prepare Exception caught", e.what()); } } /** * fire callback */ -void CAmSocketHandler::fire(sh_poll_s& a) +void CAmSocketHandler::fire(const sh_poll_s& a) { try { a.firedCB(a.pollfdValue, a.handle, a.userData); - } catch (std::exception& e) + } + catch (std::exception& e) { - logError("Sockethandler: Exception in Preparecallback,caught", e.what()); + logError("CAmSocketHandler::fire Exception caught", e.what()); } } /** * should disptach */ -bool CAmSocketHandler::noDispatching(const sh_poll_s& a) +bool CAmSocketHandler::noDispatching(const sh_poll_s* a) { //remove from list of there is no checkCB - if (nullptr == a.checkCB) + if (nullptr == a->checkCB || a->state != poll_states_e::VALID) return (true); - return (!a.checkCB(a.handle, a.userData)); + return (!a->checkCB(a->handle, a->userData)); } /** * disptach */ -bool CAmSocketHandler::dispatchingFinished(const sh_poll_s& a) +bool CAmSocketHandler::dispatchingFinished(const sh_poll_s* a) { //remove from list of there is no dispatchCB - if (nullptr == a.dispatchCB) + if (nullptr == a->dispatchCB || a->state != poll_states_e::VALID) return (true); - return (!a.dispatchCB(a.handle, a.userData)); -} -/** - * event triggered - */ -bool CAmSocketHandler::eventFired(const pollfd& a) -{ - return (a.revents == 0 ? false : true); + return (!a->dispatchCB(a->handle, a->userData)); } /** @@ -1031,15 +1115,17 @@ inline timespec* CAmSocketHandler::insertTime(timespec& buffertime) am_Error_e CAmSocketHandler::createTimeFD(const itimerspec & timeouts, int & fd) { fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); - if (fd <= 0) + if (fd < 0) { - logError("Failed to create timer"); + logError("CAmSocketHandler::createTimeFD Failed with", static_cast<const char*>(std::strerror(errno))); return E_NOT_POSSIBLE; } - if (timerfd_settime(fd, 0, &timeouts, NULL)) + if (timerfd_settime(fd, 0, &timeouts, NULL) < 0) { - logError("Failed to set timer duration"); + logError("CAmSocketHandler::createTimeFD Failed to set duration for", fd); + close(fd); + fd = -1; return E_NOT_POSSIBLE; } return E_OK; @@ -1051,9 +1137,10 @@ void CAmSocketHandler::callTimer(sh_timer_s& a) try { a.callback(a.handle, a.userData); - } catch (std::exception& e) + } + catch (std::exception& e) { - logError("Sockethandler: Exception in Timercallback,caught", e.what()); + logError("CAmSocketHandler::callTimer() Exception caught", e.what()); } } @@ -1070,7 +1157,6 @@ bool CAmSocketHandler::nextHandle(sh_identifier_s & handle) } if (handle.lastUsedID == lastHandle) { - logError("Could not create new polls, too many open!"); return (false); } diff --git a/AudioManagerUtilities/test/AmSerializerTest/CAmSerializerTest.cpp b/AudioManagerUtilities/test/AmSerializerTest/CAmSerializerTest.cpp index 49c6738..e1b62c4 100644 --- a/AudioManagerUtilities/test/AmSerializerTest/CAmSerializerTest.cpp +++ b/AudioManagerUtilities/test/AmSerializerTest/CAmSerializerTest.cpp @@ -79,6 +79,8 @@ struct SerializerData V2::CAmSerializer *pSerializer; }; +#define ASYNCLOOP 100 + void* ptSerializerSync(void* data) { SerializerData *pData = (SerializerData*) data; @@ -96,6 +98,7 @@ void* ptSerializerSync(void* data) return (NULL); } + void* ptSerializerASync(void* data) { SerializerData *pData = (SerializerData*) data; @@ -106,7 +109,7 @@ void* ptSerializerASync(void* data) #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wdeprecated-declarations" - for (uint32_t i = 0; i < 5; i++) + for (uint32_t i = 0; i < ASYNCLOOP; i++) { testStr = pData->testStr; pData->pSerializer->asyncCall(pData->pSerCb, &MockIAmSerializerCb::dispatchData, i, testStr); @@ -191,7 +194,7 @@ TEST(CAmSerializerTest, asyncTest) EXPECT_CALL(serCb,check()).Times(2); EXPECT_CALL(serCb,checkInt()).Times(1).WillRepeatedly(Return(100)); - for (int i = 0; i < 5; i++) + for (uint32_t i = 0; i < ASYNCLOOP; i++) EXPECT_CALL(serCb,dispatchData(i,testStr)).WillOnce(DoAll(ActionDispatchData(), Return(true))); myHandler.start_listenting(); diff --git a/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp b/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp index ecd38fe..3fde011 100644 --- a/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp +++ b/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp @@ -22,6 +22,7 @@ #include "CAmSocketHandlerTest.h" #include <cstdio> +#include <cstring> #include <sys/socket.h> #include <arpa/inet.h> #include <sys/ioctl.h> @@ -30,15 +31,18 @@ #include <fcntl.h> #include <sys/un.h> #include <sys/poll.h> - +#include <sys/eventfd.h> +#include "CAmDltWrapper.h" #include "CAmSocketHandler.h" -//todo: expand test, implement more usecases -//todo: test removeFD + +#undef ENABLED_SOCKETHANDLER_TEST_OUTPUT +#undef ENABLED_TIMERS_TEST_OUTPUT #define SOCK_PATH "/tmp/mysock" -#define SOCKET_TEST_LOOPS_COUNT 1000 +#define SOCKET_TEST_LOOPS_COUNT 50 +#define TIMERS_TO_TEST 100 using namespace testing; using namespace am; @@ -48,6 +52,11 @@ static const char * TEST_SOCKET_DATA_FINAL = "finish!"; static const std::chrono::time_point<std::chrono::high_resolution_clock> TP_ZERO; +struct TestUserData +{ + int i; + float f; +}; MockIAmSignalHandler *pMockSignalHandler = NULL; static void signalHandler(int sig, siginfo_t *siginfo, void *context) @@ -58,7 +67,7 @@ static void signalHandler(int sig, siginfo_t *siginfo, void *context) if(pMockSignalHandler!=NULL) pMockSignalHandler->signalHandler(sig, siginfo, context); - + #ifdef ENABLED_SOCKETHANDLER_TEST_OUTPUT std::cout<<"signal handler was called with signal " << sig << std::endl; #endif @@ -115,14 +124,14 @@ void am::CAmTimerSignalHandler::timerCallback(sh_timerHandle_t handle, void* use std::advance(it, mIndex); kill(getpid(), *it); mIndex++; - + #ifndef WITH_TIMERFD mpSocketHandler->updateTimer( handle, mUpdateTimeout); #endif } else mpSocketHandler->stop_listening(); - + } CAmTimer::CAmTimer(CAmSocketHandler *myHandler, const timespec &timeout, const int32_t repeats) : @@ -149,11 +158,52 @@ void am::CAmTimer::timerCallback(sh_timerHandle_t handle, void* userData) } } +CAmTimerStressTest::CAmTimerStressTest(CAmSocketHandler *myHandler, const timespec &timeout, const int32_t repeats) : + MockIAmTimerCb(), mpSocketHandler(myHandler), mUpdateTimeout(timeout), pTimerCallback(this, &CAmTimerStressTest::timerCallback), mRepeats(repeats), mHandle(0) +{ +} + +am::CAmTimerStressTest::~CAmTimerStressTest() +{ +} + +void am::CAmTimerStressTest::timerCallback(sh_timerHandle_t handle, void* pUserData) +{ + mpSocketHandler->removeTimer(handle); + MockIAmTimerCb::timerCallback(handle, pUserData); + sh_timerHandle_t handle1; + mpSocketHandler->addTimer(mUpdateTimeout, &pTimerCallback, handle1, &(*((TestUserData*)pUserData)), true); +} + +CAmTimerStressTest2::CAmTimerStressTest2(CAmSocketHandler *myHandler, const timespec &timeout, const int32_t repeats) : + MockIAmTimerCb(), mpSocketHandler(myHandler), mUpdateTimeout(timeout), pTimerCallback(this, &CAmTimerStressTest2::timerCallback), mRepeats(repeats), mHandle(0) +{ +} + +am::CAmTimerStressTest2::~CAmTimerStressTest2() +{ +} + +void am::CAmTimerStressTest2::timerCallback(sh_timerHandle_t handle, void* pUserData) +{ + #ifdef ENABLED_SOCKETHANDLER_TEST_OUTPUT + std::cout<<"timerCallback handle=" << handle <<std::endl; + #endif + MockIAmTimerCb::timerCallback(handle, pUserData); +} + + CAmTimerMeasurment::CAmTimerMeasurment(CAmSocketHandler *myHandler, const timespec &timeout, const std::string & label, const int32_t repeats, void * userData) : - MockIAmTimerCb(), pTimerCallback(this, &CAmTimerMeasurment::timerCallback), // - mSocketHandler(myHandler), mUpdateTimeout(timeout), mUpdateTimePoint(std::chrono::seconds - { mUpdateTimeout.tv_sec } + std::chrono::nanoseconds - { mUpdateTimeout.tv_nsec }), mLastInvocationTime(), mExpected(mUpdateTimePoint - TP_ZERO), mRepeats(repeats), mpUserData(userData), mDebugText(label) + MockIAmTimerCb() + , pTimerCallback(this, &CAmTimerMeasurment::timerCallback) + , mSocketHandler(myHandler) + , mUpdateTimeout(timeout) + , mUpdateTimePoint(std::chrono::seconds{ mUpdateTimeout.tv_sec } + std::chrono::nanoseconds{ mUpdateTimeout.tv_nsec }) + , mLastInvocationTime(std::chrono::high_resolution_clock::now()) + , mExpected(mUpdateTimePoint - TP_ZERO) + , mRepeats(repeats) + , mpUserData(userData) + , mDebugText(label) { } @@ -165,59 +215,176 @@ void am::CAmTimerMeasurment::timerCallback(sh_timerHandle_t handle, void* userDa { MockIAmTimerCb::timerCallback(handle, userData); - std::chrono::time_point<std::chrono::high_resolution_clock> t_end = std::chrono::high_resolution_clock::now(); - if (TP_ZERO != mLastInvocationTime) - { - auto durationLast = t_end - mLastInvocationTime; - double diff = (std::chrono::duration<double, std::milli>(mExpected - durationLast).count()); + auto t_end = std::chrono::high_resolution_clock::now(); + std::chrono::duration<double, std::milli> durationLast = t_end - mLastInvocationTime; + double diff = (std::chrono::duration<double, std::milli>(mExpected - durationLast).count()); #ifdef ENABLED_TIMERS_TEST_OUTPUT - std::cout << mDebugText << - " [ expected:" <<std::chrono::duration<double, std::milli>(mExpected).count() << "ms" - " , current:" << std::chrono::duration<double, std::milli>(durationLast).count() << "ms" - ", diff:" << diff << "ms ] " << - std::endl; + std::cout << mDebugText << + " [ expected:" <<std::chrono::duration<double, std::milli>(mExpected).count() << "ms" + " , current:" << std::chrono::duration<double, std::milli>(durationLast).count() << "ms" + ", diff:" << diff << "ms ] " << + std::endl; #endif - if (diff > TIMERS_CB_TOLERANCE) - std::cout << mDebugText << " Warning [ expected:" << std::chrono::duration<double, std::milli>(mExpected).count() << "ms, current:" << std::chrono::duration<double, std::milli>(durationLast).count() << "ms ]" << std::endl; - if (diff < -TIMERS_CB_TOLERANCE) - std::cout << mDebugText << " Warning [ expected:" << std::chrono::duration<double, std::milli>(mExpected).count() << "ms, current:" << std::chrono::duration<double, std::milli>(durationLast).count() << "ms ]" << std::endl; - - mLastInvocationTime = t_end; - if (--mRepeats > 0) - { + if (diff > TIMERS_CB_TOLERANCE) + std::cout << mDebugText << " Warning [ expected:" << std::chrono::duration<double, std::milli>(mExpected).count() << "ms, current:" << std::chrono::duration<double, std::milli>(durationLast).count() << "ms ]" << std::endl; + if (diff < -TIMERS_CB_TOLERANCE) + std::cout << mDebugText << " Warning [ expected:" << std::chrono::duration<double, std::milli>(mExpected).count() << "ms, current:" << std::chrono::duration<double, std::milli>(durationLast).count() << "ms ]" << std::endl; + + mLastInvocationTime = t_end; + if (--mRepeats > 0) + { #ifndef WITH_TIMERFD - mSocketHandler->updateTimer( handle, mUpdateTimeout); + mSocketHandler->updateTimer( handle, mUpdateTimeout); #endif - } - else - { - mSocketHandler->stopTimer(handle); - } } else { -#ifdef ENABLED_TIMERS_TEST_OUTPUT - std::cout << mDebugText << " Init measurment " << std::endl; -#endif - mLastInvocationTime = t_end; - mSocketHandler->updateTimer(handle, mUpdateTimeout); + mSocketHandler->stopTimer(handle); + } +} + +void* sendTestData(int sock, const struct sockaddr* servAddr, unsigned int size, __useconds_t time = 0u) +{ + int ret = connect(sock, servAddr, size); + if (ret < 0) + { + std::cerr << "ERROR: connect() failed\n" << std::endl; + return (NULL); } + for (int i = 1; i < SOCKET_TEST_LOOPS_COUNT; i++) + { + std::string string(TEST_SOCKET_DATA); + send(sock, string.c_str(), string.size(), 0); + if (time) + usleep(time); + } + std::string string(TEST_SOCKET_DATA_FINAL); + send(sock, string.c_str(), string.size(), 0); + + return NULL; } void* playWithSocketServer(void* data) { - CAmSocketHandler *pSockethandler = (CAmSocketHandler*) data; - pSockethandler->start_listenting(); - return (NULL); + int sock = *((int*)data); + struct sockaddr_in servAddr; + unsigned short servPort = 6060; + struct hostent *host; + + if ((host = (struct hostent*) gethostbyname("localhost")) == 0) + { + std::cout << "ERROR: gethostbyname() failed\n" << std::endl; + exit(1); + } + + memset(&servAddr, 0, sizeof(servAddr)); + servAddr.sin_family = AF_INET; + servAddr.sin_addr.s_addr = inet_addr(inet_ntoa(*(struct in_addr*) (host->h_addr_list[0]))); + servAddr.sin_port = htons(servPort); + sleep(1); + + return sendTestData(sock, (struct sockaddr*)&servAddr, sizeof(servAddr)); } void* playWithUnixSocketServer(void* data) { - CAmSocketHandler *pSockethandler = (CAmSocketHandler*) data; - pSockethandler->start_listenting(); - return (NULL); + int sock = *((int*)data); + struct sockaddr_un servAddr; + memset(&servAddr, 0, sizeof(servAddr)); + strcpy(servAddr.sun_path, SOCK_PATH); + servAddr.sun_family = AF_UNIX; + sleep(1); + + return sendTestData(sock, (struct sockaddr*)&servAddr, sizeof(servAddr)); +} + +void* threadCallbackUnixSocketAndTimers(void* data) +{ + int sock = *((int*)data); + struct sockaddr_un servAddr; + memset(&servAddr, 0, sizeof(servAddr)); + strcpy(servAddr.sun_path, SOCK_PATH); + servAddr.sun_family = AF_UNIX; + sleep(1); + + return sendTestData(sock, (struct sockaddr*)&servAddr, sizeof(servAddr), 500000); +} + +TEST(CAmSocketHandlerTest, stressTestUnixSocketAndTimers) +{ + + pthread_t serverThread; + + int socket_; + + CAmSocketHandler myHandler; + ASSERT_FALSE(myHandler.fatalErrorOccurred()); + CAmSamplePluginStressTest::sockType_e type = CAmSamplePlugin::UNIX; + CAmSamplePluginStressTest myplugin(&myHandler, type); + + EXPECT_CALL(myplugin,receiveData(Field(&pollfd::revents, Eq(POLL_IN)),_,_)).Times(Exactly(SOCKET_TEST_LOOPS_COUNT)); + EXPECT_CALL(myplugin,dispatchData(_,_)).Times(Exactly(SOCKET_TEST_LOOPS_COUNT)); + EXPECT_CALL(myplugin,check(_,_)).Times(Exactly(SOCKET_TEST_LOOPS_COUNT)); + + for(int i=0;i<myplugin.getTimers().size();i++) + { + EXPECT_CALL(*myplugin.getTimers()[i],timerCallback(_,_)).Times(AnyNumber()); + } + + + if ((socket_ = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) + { + std::cout << "socket problem" << std::endl; + } + ASSERT_GT(socket_, -1); + + //creates a thread that handles the serverpart + pthread_create(&serverThread, NULL, threadCallbackUnixSocketAndTimers, &socket_); + + myHandler.start_listenting(); + + pthread_join(serverThread, NULL); + shutdown(socket_, SHUT_RDWR); +} + + +TEST(CAmSocketHandlerTest, fdTest) +{ + CAmSocketHandler myHandler; + ASSERT_FALSE(myHandler.fatalErrorOccurred()); + + // for some simple fd tests + timespec endTime{0, 100000000}; // 0,1 + timespec timeoutTime{0, 10000000}; // 0,01 + + // check unknown system fd ids + sh_pollHandle_t handle; + ASSERT_EQ(myHandler.addFDPoll(100, 0, NULL, NULL, NULL, NULL, NULL, handle), E_NON_EXISTENT); + + // add/remove/add check of same fd + int fd = eventfd(0, 0); + ASSERT_EQ(myHandler.addFDPoll(fd, POLL_IN, NULL, NULL, NULL, NULL, NULL, handle), E_OK); + ASSERT_EQ(myHandler.addFDPoll(fd, POLL_IN, NULL, NULL, NULL, NULL, NULL, handle), E_ALREADY_EXISTS); + ASSERT_EQ(myHandler.removeFDPoll(handle), E_OK); + close(fd); + + // Create x handles + TestUserData userData{1, 1.f}; + CAmTimerStressTest timer(&myHandler, timeoutTime, 0); + + ASSERT_EQ(myHandler.addTimer(timeoutTime, &timer.pTimerCallback, handle, &userData, true), E_OK); + EXPECT_CALL(timer, timerCallback(_,&userData)).Times(AnyNumber()); + + // for some simple fd tests + CAmTimerSockethandlerController endCallback(&myHandler, endTime); + ASSERT_EQ(myHandler.addTimer(endTime, &endCallback.pTimerCallback, handle, NULL), E_OK); + EXPECT_CALL(endCallback,timerCallback(handle,NULL)).Times(Exactly(1)); + + myHandler.start_listenting(); + + ASSERT_FALSE(myHandler.fatalErrorOccurred()); } TEST(CAmSocketHandlerTest, timersOneshot) @@ -239,26 +406,26 @@ TEST(CAmSocketHandlerTest, timersOneshot) userData.f = 1.f; sh_timerHandle_t handle; - myHandler.addTimer(timeoutTime, &testCallback1.pTimerCallback, handle, &userData); + ASSERT_EQ(myHandler.addTimer(timeoutTime, &testCallback1.pTimerCallback, handle, &userData), E_OK); #ifndef WITH_TIMERFD ASSERT_EQ(handle, 1); #else ASSERT_EQ(handle, 2); #endif - EXPECT_CALL(testCallback1,timerCallback(handle,&userData)).Times(1); + EXPECT_CALL(testCallback1,timerCallback(handle,&userData)).Times(Exactly(1)); timespec timeout4; timeout4.tv_nsec = 0; timeout4.tv_sec = 3; CAmTimerSockethandlerController testCallback4(&myHandler, timeout4); - myHandler.addTimer(timeout4, &testCallback4.pTimerCallback, handle, NULL); + ASSERT_EQ(myHandler.addTimer(timeout4, &testCallback4.pTimerCallback, handle, NULL), E_OK); #ifndef WITH_TIMERFD ASSERT_EQ(handle, 2); #else ASSERT_EQ(handle, 3); #endif - EXPECT_CALL(testCallback4,timerCallback(handle,NULL)).Times(1); + EXPECT_CALL(testCallback4,timerCallback(handle,NULL)).Times(Exactly(1)); myHandler.start_listenting(); } @@ -281,7 +448,7 @@ TEST(CAmSocketHandlerTest, timersStop) userData.f = 1.f; sh_timerHandle_t handle; - myHandler.addTimer(timeoutTime, &testCallback1.pTimerCallback, handle, &userData, true); + ASSERT_EQ(myHandler.addTimer(timeoutTime, &testCallback1.pTimerCallback, handle, &userData, true), E_OK); #ifndef WITH_TIMERFD ASSERT_EQ(handle, 1); #else @@ -294,7 +461,7 @@ TEST(CAmSocketHandlerTest, timersStop) timeout4.tv_sec = 6; CAmTimerSockethandlerController testCallback4(&myHandler, timeout4); - myHandler.addTimer(timeout4, &testCallback4.pTimerCallback, handle, NULL); + ASSERT_EQ(myHandler.addTimer(timeout4, &testCallback4.pTimerCallback, handle, NULL), E_OK); #ifndef WITH_TIMERFD ASSERT_EQ(handle, 2); #else @@ -324,7 +491,7 @@ TEST(CAmSocketHandlerTest, timersGeneral) userData.f = 1.f; sh_timerHandle_t handle; - myHandler.addTimer(timeoutTime, &testCallback1.pTimerCallback, handle, &userData, true); + ASSERT_EQ(myHandler.addTimer(timeoutTime, &testCallback1.pTimerCallback, handle, &userData, true), E_OK); #ifndef WITH_TIMERFD ASSERT_EQ(handle, 1); #else @@ -337,7 +504,7 @@ TEST(CAmSocketHandlerTest, timersGeneral) timeout4.tv_sec = 5; CAmTimerSockethandlerController testCallback4(&myHandler, timeout4); - myHandler.addTimer(timeout4, &testCallback4.pTimerCallback, handle, NULL); + ASSERT_EQ(myHandler.addTimer(timeout4, &testCallback4.pTimerCallback, handle, NULL), E_OK); #ifndef WITH_TIMERFD ASSERT_EQ(handle, 2); #else @@ -347,6 +514,72 @@ TEST(CAmSocketHandlerTest, timersGeneral) myHandler.start_listenting(); } +TEST(CAmSocketHandlerTest, timersStressTest) +{ + CAmSocketHandler myHandler; + ASSERT_FALSE(myHandler.fatalErrorOccurred()); + + sh_timerHandle_t handle; + TestUserData userData; + userData.i = 1; + userData.f = 1.f; + + timespec timeout4; + timeout4.tv_nsec = 0; + timeout4.tv_sec = 60; + + timespec timeoutTime; + timeoutTime.tv_sec = 0; + timeoutTime.tv_nsec = 10000000;// 0,01 + + std::vector<CAmTimerStressTest*> timers(TIMERS_TO_TEST, NULL); + + for (auto & timer: timers) + { + timer = new CAmTimerStressTest(&myHandler, timeoutTime, 0); + ASSERT_EQ(myHandler.addTimer(timeoutTime, &(timer->pTimerCallback), handle, &userData, true), E_OK); + EXPECT_CALL(*timer, timerCallback(_,&userData)).Times(AnyNumber()); + } + + timespec timeoutTime11, timeout12, timeout13; + timeoutTime11.tv_sec = 1; + timeoutTime11.tv_nsec = 34000000; + CAmTimerMeasurment testCallback11(&myHandler, timeoutTime11, "repeated 1", std::numeric_limits<int32_t>::max()); + + timeout12.tv_sec = 0; + timeout12.tv_nsec = 100000000; + CAmTimerMeasurment testCallback12(&myHandler, timeout12, "repeated 2", std::numeric_limits<int32_t>::max()); + + timeout13.tv_sec = 3; + timeout13.tv_nsec = 333000000; + CAmTimerMeasurment testCallback13(&myHandler, timeout13, "oneshot 3"); + + ASSERT_EQ(myHandler.addTimer(timeoutTime11, &testCallback11.pTimerCallback, handle, NULL, true), E_OK); + EXPECT_CALL(testCallback11,timerCallback(_,NULL)).Times(AnyNumber()); + + ASSERT_EQ(myHandler.addTimer(timeout12, &testCallback12.pTimerCallback, handle, NULL, true), E_OK); + EXPECT_CALL(testCallback12,timerCallback(_,NULL)).Times(AnyNumber()); + + ASSERT_EQ(myHandler.addTimer(timeout13, &testCallback13.pTimerCallback, handle, NULL), E_OK); + EXPECT_CALL(testCallback13,timerCallback(_,NULL)).Times(Exactly(1)); + + + CAmTimerSockethandlerController testCallback4(&myHandler, timeout4); + + ASSERT_EQ(myHandler.addTimer(timeout4, &testCallback4.pTimerCallback, handle, NULL), E_OK); + + EXPECT_CALL(testCallback4,timerCallback(_,NULL)).Times(Exactly(1)); + myHandler.start_listenting(); + + for (auto & timer: timers) + { + delete timer, timer = NULL; + } + + EXPECT_FALSE(myHandler.fatalErrorOccurred()); +} + + TEST(CAmSocketHandlerTest,playWithTimers) { CAmSocketHandler myHandler; @@ -354,21 +587,22 @@ TEST(CAmSocketHandlerTest,playWithTimers) timespec timeoutTime, timeout2, timeout3, timeout4; timeoutTime.tv_sec = 1; timeoutTime.tv_nsec = 34000000; - CAmTimerMeasurment testCallback1(&myHandler, timeoutTime, "repeatedCallback 1", std::numeric_limits<int32_t>::max()); + CAmTimerMeasurment testCallback1(&myHandler, timeoutTime, "repeated 1", std::numeric_limits<int32_t>::max()); timeout2.tv_nsec = 2000000; timeout2.tv_sec = 0; - CAmTimerMeasurment testCallback2(&myHandler, timeout2, "repeatedCallback 2", std::numeric_limits<int32_t>::max()); + CAmTimerMeasurment testCallback2(&myHandler, timeout2, "repeated 2", std::numeric_limits<int32_t>::max()); timeout3.tv_nsec = 333000000; timeout3.tv_sec = 3; - CAmTimerMeasurment testCallback3(&myHandler, timeout3, "oneshotCallback 3"); + CAmTimerMeasurment testCallback3(&myHandler, timeout3, "oneshot 3"); + timeout4.tv_nsec = 0; timeout4.tv_sec = 8; CAmTimerSockethandlerController testCallback4(&myHandler, timeout4); sh_timerHandle_t handle; - myHandler.addTimer(timeoutTime, &testCallback1.pTimerCallback, handle, NULL, true); + ASSERT_EQ(myHandler.addTimer(timeoutTime, &testCallback1.pTimerCallback, handle, NULL, true), E_OK); #ifndef WITH_TIMERFD ASSERT_EQ(handle, 1); #else @@ -376,7 +610,7 @@ TEST(CAmSocketHandlerTest,playWithTimers) #endif EXPECT_CALL(testCallback1,timerCallback(handle,NULL)).Times(AnyNumber()); - myHandler.addTimer(timeout2, &testCallback2.pTimerCallback, handle, NULL, true); + ASSERT_EQ(myHandler.addTimer(timeout2, &testCallback2.pTimerCallback, handle, NULL, true), E_OK); #ifndef WITH_TIMERFD ASSERT_EQ(handle, 2); #else @@ -384,15 +618,15 @@ TEST(CAmSocketHandlerTest,playWithTimers) #endif EXPECT_CALL(testCallback2,timerCallback(handle,NULL)).Times(AnyNumber()); - myHandler.addTimer(timeout3, &testCallback3.pTimerCallback, handle, NULL); + ASSERT_EQ(myHandler.addTimer(timeout3, &testCallback3.pTimerCallback, handle, NULL), E_OK); #ifndef WITH_TIMERFD ASSERT_EQ(handle, 3); #else ASSERT_EQ(handle, 4); #endif - EXPECT_CALL(testCallback3,timerCallback(handle,NULL)).Times(2); //+1 because of measurment + EXPECT_CALL(testCallback3,timerCallback(handle,NULL)).Times(Exactly(1)); - myHandler.addTimer(timeout4, &testCallback4.pTimerCallback, handle, NULL); + ASSERT_EQ(myHandler.addTimer(timeout4, &testCallback4.pTimerCallback, handle, NULL), E_OK); #ifndef WITH_TIMERFD ASSERT_EQ(handle, 4); #else @@ -410,10 +644,10 @@ TEST(CAmSocketHandlerTest, signalHandlerPrimaryPlusSecondary) pMockSignalHandler = new MockIAmSignalHandler; CAmSocketHandler myHandler; ASSERT_FALSE(myHandler.fatalErrorOccurred()); - ASSERT_TRUE(myHandler.listenToSignals({SIGHUP})==E_OK); - ASSERT_TRUE(myHandler.listenToSignals({SIGHUP, SIGTERM, SIGCHLD})==E_OK); + ASSERT_EQ(myHandler.listenToSignals({SIGHUP}), E_OK); + ASSERT_EQ(myHandler.listenToSignals({SIGHUP, SIGTERM, SIGCHLD}), E_OK); + sh_pollHandle_t signalHandler1, signalHandler2; - std::string userData = "User data"; // critical signals are registered here: @@ -423,27 +657,29 @@ TEST(CAmSocketHandlerTest, signalHandlerPrimaryPlusSecondary) signalAction.sa_flags = SA_RESETHAND | SA_NODEFER| SA_SIGINFO; sigaction(SIGINT, &signalAction, NULL); sigaction(SIGQUIT, &signalAction, NULL); - - myHandler.addSignalHandler([&](const sh_pollHandle_t handle, const signalfd_siginfo & info, void* userData) - { - unsigned sig = info.ssi_signo; - pMockSignalHandler->signalHandlerAction(handle, sig, userData); + + myHandler.addSignalHandler([&](const sh_pollHandle_t handle, const signalfd_siginfo & info, void* userData) + { + unsigned sig = info.ssi_signo; + pMockSignalHandler->signalHandlerAction(handle, sig, userData); #ifdef ENABLED_SOCKETHANDLER_TEST_OUTPUT - unsigned user = info.ssi_uid; - std::cout<<"signal handler was called from user "<< user << " with signal " << sig << std::endl; + unsigned user = info.ssi_uid; + std::cout<<"signal handler was called from user "<< user << " with signal " << sig << std::endl; #endif - }, signalHandler1, &userData); - ASSERT_EQ(signalHandler1, 1); - myHandler.addSignalHandler([&](const sh_pollHandle_t handle, const signalfd_siginfo & info, void* userData) - { - unsigned sig = info.ssi_signo; - pMockSignalHandler->signalHandlerAction(handle, sig, userData); + }, signalHandler1, &userData); + ASSERT_EQ(signalHandler1, 1); + + myHandler.addSignalHandler([&](const sh_pollHandle_t handle, const signalfd_siginfo & info, void* userData) + { + unsigned sig = info.ssi_signo; + pMockSignalHandler->signalHandlerAction(handle, sig, userData); #ifdef ENABLED_SOCKETHANDLER_TEST_OUTPUT - unsigned user = info.ssi_uid; - std::cout<<"signal handler was called from user "<< user << " with signal " << sig << std::endl; + unsigned user = info.ssi_uid; + std::cout<<"signal handler was called from user "<< user << " with signal " << sig << std::endl; #endif - }, signalHandler2, &userData); - ASSERT_EQ(signalHandler2, 2); + }, signalHandler2, &userData); + ASSERT_EQ(signalHandler2, 2); + timespec timeout4; timeout4.tv_nsec = 200000000; timeout4.tv_sec = 0; @@ -456,7 +692,7 @@ TEST(CAmSocketHandlerTest, signalHandlerPrimaryPlusSecondary) CAmTimerSignalHandler testCallback4(&myHandler, timeout4, signals); sh_timerHandle_t handle; - myHandler.addTimer(timeout4, &testCallback4.pTimerCallback, handle, NULL, true); + ASSERT_EQ(myHandler.addTimer(timeout4, &testCallback4.pTimerCallback, handle, NULL, true), E_OK); #ifndef WITH_TIMERFD ASSERT_EQ(handle, 1); #else @@ -469,7 +705,7 @@ TEST(CAmSocketHandlerTest, signalHandlerPrimaryPlusSecondary) EXPECT_CALL(*pMockSignalHandler,signalHandlerAction(signalHandler2,it,&userData)).Times(1); for(auto it: primarySignals) EXPECT_CALL(*pMockSignalHandler,signalHandler(it,_,_)).Times(1); - + myHandler.start_listenting(); delete pMockSignalHandler; } @@ -484,94 +720,52 @@ TEST(CAmSocketHandlerTest,playWithUNIXSockets) ASSERT_FALSE(myHandler.fatalErrorOccurred()); CAmSamplePlugin::sockType_e type = CAmSamplePlugin::UNIX; CAmSamplePlugin myplugin(&myHandler, type); + ASSERT_TRUE(myplugin.isSocketOpened()); - EXPECT_CALL(myplugin,receiveData(_,_,_)).Times(SOCKET_TEST_LOOPS_COUNT + 1); - EXPECT_CALL(myplugin,dispatchData(_,_)).Times(SOCKET_TEST_LOOPS_COUNT + 1); - EXPECT_CALL(myplugin,check(_,_)).Times(SOCKET_TEST_LOOPS_COUNT + 1); - - //creates a thread that handles the serverpart - pthread_create(&serverThread, NULL, playWithUnixSocketServer, &myHandler); + EXPECT_CALL(myplugin,receiveData(Field(&pollfd::revents, Eq(POLL_IN)),_,_)).Times(Exactly(SOCKET_TEST_LOOPS_COUNT)); + EXPECT_CALL(myplugin,dispatchData(_,_)).Times(Exactly(SOCKET_TEST_LOOPS_COUNT)); + EXPECT_CALL(myplugin,check(_,_)).Times(Exactly(SOCKET_TEST_LOOPS_COUNT)); - sleep(1); //we need that here because the port needs to be opened if ((socket_ = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) { std::cout << "socket problem" << std::endl; - - } - - memset(&servAddr, 0, sizeof(servAddr)); - strcpy(servAddr.sun_path, SOCK_PATH); - servAddr.sun_family = AF_UNIX; - if (connect(socket_, (struct sockaddr *) &servAddr, sizeof(servAddr)) < 0) - { - std::cout << "ERROR: connect() failed\n" << std::endl; } + ASSERT_GT(socket_, -1); + //creates a thread that handles the serverpart + pthread_create(&serverThread, NULL, playWithUnixSocketServer, &socket_); - for (int i = 1; i <= SOCKET_TEST_LOOPS_COUNT; i++) - { - std::string stringToSend(TEST_SOCKET_DATA); - send(socket_, stringToSend.c_str(), stringToSend.size(), 0); - } - std::string stringToSend(TEST_SOCKET_DATA_FINAL); - send(socket_, stringToSend.c_str(), stringToSend.size(), 0); + myHandler.start_listenting(); pthread_join(serverThread, NULL); - + shutdown(socket_, SHUT_RDWR); } TEST(CAmSocketHandlerTest,playWithSockets) { pthread_t serverThread; - struct sockaddr_in servAddr; - unsigned short servPort = 6060; - struct hostent *host; int socket_; CAmSocketHandler myHandler; ASSERT_FALSE(myHandler.fatalErrorOccurred()); CAmSamplePlugin::sockType_e type = CAmSamplePlugin::INET; CAmSamplePlugin myplugin(&myHandler, type); + ASSERT_TRUE(myplugin.isSocketOpened()); + EXPECT_CALL(myplugin,receiveData(Field(&pollfd::revents, Eq(POLL_IN)),_,_)).Times(Exactly(SOCKET_TEST_LOOPS_COUNT)); + EXPECT_CALL(myplugin,dispatchData(_,_)).Times(Exactly(SOCKET_TEST_LOOPS_COUNT)); + EXPECT_CALL(myplugin,check(_,_)).Times(Exactly(SOCKET_TEST_LOOPS_COUNT)); - EXPECT_CALL(myplugin,receiveData(_,_,_)).Times(SOCKET_TEST_LOOPS_COUNT + 1); - EXPECT_CALL(myplugin,dispatchData(_,_)).Times(SOCKET_TEST_LOOPS_COUNT + 1); - EXPECT_CALL(myplugin,check(_,_)).Times(SOCKET_TEST_LOOPS_COUNT + 1); - - //creates a thread that handles the serverpart - pthread_create(&serverThread, NULL, playWithSocketServer, &myHandler); - - sleep(1); //we need that here because the port needs to be opened if ((socket_ = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { std::cout << "socket problem" << std::endl; - - } - - if ((host = (struct hostent*) gethostbyname("localhost")) == 0) - { - std::cout << "ERROR: gethostbyname() failed\n" << std::endl; - exit(1); - } - - memset(&servAddr, 0, sizeof(servAddr)); - servAddr.sin_family = AF_INET; - servAddr.sin_addr.s_addr = inet_addr(inet_ntoa(*(struct in_addr*) (host->h_addr_list[0]))); - servAddr.sin_port = htons(servPort); - - if (connect(socket_, (struct sockaddr *) &servAddr, sizeof(servAddr)) < 0) - { - std::cout << "ERROR: connect() failed\n" << std::endl; } + ASSERT_GT(socket_, -1); + //creates a thread that handles the serverpart + pthread_create(&serverThread, NULL, playWithSocketServer, &socket_); - for (int i = 1; i <= SOCKET_TEST_LOOPS_COUNT; i++) - { - std::string string(TEST_SOCKET_DATA); - send(socket_, string.c_str(), string.size(), 0); - } - std::string string(TEST_SOCKET_DATA_FINAL); - send(socket_, string.c_str(), string.size(), 0); + myHandler.start_listenting(); pthread_join(serverThread, NULL); - + shutdown(socket_, SHUT_RDWR); } int main(int argc, char **argv) @@ -587,12 +781,12 @@ am::CAmSamplePlugin::CAmSamplePlugin(CAmSocketHandler *mySocketHandler, sockType sampleCheckCB(this, &CAmSamplePlugin::check), // mSocketHandler(mySocketHandler), // mConnecthandle(), // - mReceiveHandle(), // - msgList() + mReceiveHandle(), // + msgList(), + mSocket(-1) { int yes = 1; - int socketHandle; struct sockaddr_in servAddr; struct sockaddr_un unixAddr; unsigned int servPort = 6060; @@ -600,26 +794,30 @@ am::CAmSamplePlugin::CAmSamplePlugin(CAmSocketHandler *mySocketHandler, sockType switch (socketType) { case UNIX: - socketHandle = socket(AF_UNIX, SOCK_STREAM, 0); + mSocket = socket(AF_UNIX, SOCK_STREAM, 0); + if(mSocket==-1) + return; unixAddr.sun_family = AF_UNIX; strcpy(unixAddr.sun_path, SOCK_PATH); unlink(unixAddr.sun_path); - bind(socketHandle, (struct sockaddr *) &unixAddr, strlen(unixAddr.sun_path) + sizeof(unixAddr.sun_family)); + bind(mSocket, (struct sockaddr *) &unixAddr, strlen(unixAddr.sun_path) + sizeof(unixAddr.sun_family)); break; case INET: - socketHandle = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); - setsockopt(socketHandle, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); + mSocket = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); + if(mSocket==-1) + return; + setsockopt(mSocket, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); memset(&servAddr, 0, sizeof(servAddr)); servAddr.sin_family = AF_INET; servAddr.sin_addr.s_addr = INADDR_ANY; servAddr.sin_port = htons(servPort); - bind(socketHandle, (struct sockaddr *) &servAddr, sizeof(servAddr)); + bind(mSocket, (struct sockaddr *) &servAddr, sizeof(servAddr)); break; default: break; } - if (listen(socketHandle, 3) < 0) + if (listen(mSocket, 3) < 0) { #ifdef ENABLED_SOCKETHANDLER_TEST_OUTPUT std::cout << "listen ok" << std::endl; @@ -627,12 +825,12 @@ am::CAmSamplePlugin::CAmSamplePlugin(CAmSocketHandler *mySocketHandler, sockType } /* if */ int a = 1; - ioctl(socketHandle, FIONBIO, (char *) &a); - setsockopt(socketHandle, SOL_SOCKET, SO_KEEPALIVE, (char *) &a, sizeof(a)); + ioctl(mSocket, FIONBIO, (char *) &a); + setsockopt(mSocket, SOL_SOCKET, SO_KEEPALIVE, (char *) &a, sizeof(a)); short events = 0; events |= POLLIN; - mySocketHandler->addFDPoll(socketHandle, events, NULL, &connectFiredCB, NULL, NULL, NULL, mConnecthandle); + mySocketHandler->addFDPoll(mSocket, events, NULL, &connectFiredCB, NULL, NULL, NULL, mConnecthandle); #ifdef ENABLED_SOCKETHANDLER_TEST_OUTPUT std::cout << "setup server - listening" << std::endl; #endif @@ -719,3 +917,53 @@ bool am::CAmSamplePlugin::check(const sh_pollHandle_t handle, void *userData) return false; } +CAmSamplePluginStressTest::CAmSamplePluginStressTest(CAmSocketHandler *mySocketHandler, sockType_e socketType):CAmSamplePlugin(mySocketHandler,socketType) +, mTimers(TIMERS_TO_TEST, NULL) +{ + sh_timerHandle_t handle; + TestUserData userData; + userData.i = 1; + userData.f = 1.f; + timespec timeoutTime; + timeoutTime.tv_sec = 0; + timeoutTime.tv_nsec = 10000000;// 0,01 + for (auto & timer : mTimers) + { + timer = new CAmTimerStressTest2(mySocketHandler, timeoutTime, 0); + if (mySocketHandler->addTimer(timeoutTime, &(timer->pTimerCallback), handle, &userData, true) == E_OK); + timer->setHandle(handle); + + EXPECT_CALL(*timer, timerCallback(_,&userData)).Times(AnyNumber()); + } +} + +CAmSamplePluginStressTest::~CAmSamplePluginStressTest() +{ + for (auto & timer : mTimers) + { + delete timer, timer = NULL; + } +} + +void CAmSamplePluginStressTest::receiveData(const pollfd pollfd, const sh_pollHandle_t handle, void* userData) +{ + CAmSamplePlugin::receiveData(pollfd, handle, userData); + + sh_timerHandle_t handle1; + for (auto & timer : mTimers) + { + ASSERT_EQ(mSocketHandler->removeTimer(timer->getHandle()), E_OK); + ASSERT_EQ(mSocketHandler->addTimer(timer->getUpdateTimeout(), &(timer->pTimerCallback), handle1, NULL, true), E_OK); + timer->setHandle(handle1); + } +} + +bool CAmSamplePluginStressTest::dispatchData(const sh_pollHandle_t handle, void* userData) +{ + return CAmSamplePlugin::dispatchData( handle, userData); +} + +bool CAmSamplePluginStressTest::check(const sh_pollHandle_t handle, void* userData) +{ + return CAmSamplePlugin::check( handle, userData); +} diff --git a/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.h b/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.h index ba2bf51..93620b5 100644 --- a/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.h +++ b/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.h @@ -101,23 +101,21 @@ namespace am UNIX, INET }; CAmSamplePlugin(CAmSocketHandler *mySocketHandler, sockType_e socketType); - ~CAmSamplePlugin() - { - } - ; + virtual ~CAmSamplePlugin() { } void connectSocket(const pollfd pollfd, const sh_pollHandle_t handle, void* userData); - void receiveData(const pollfd pollfd, const sh_pollHandle_t handle, void* userData); - bool dispatchData(const sh_pollHandle_t handle, void* userData); - bool check(const sh_pollHandle_t handle, void* userData); + virtual void receiveData(const pollfd pollfd, const sh_pollHandle_t handle, void* userData); + virtual bool dispatchData(const sh_pollHandle_t handle, void* userData); + virtual bool check(const sh_pollHandle_t handle, void* userData); TAmShPollFired<CAmSamplePlugin> connectFiredCB; TAmShPollFired<CAmSamplePlugin> receiveFiredCB; TAmShPollDispatch<CAmSamplePlugin> sampleDispatchCB; TAmShPollCheck<CAmSamplePlugin> sampleCheckCB; - - private: + bool isSocketOpened() { return mSocket>-1; } + protected: CAmSocketHandler *mSocketHandler; sh_pollHandle_t mConnecthandle, mReceiveHandle; std::queue<std::string> msgList; + int mSocket; }; class CAmTimerSockethandlerController: public MockIAmTimerCb @@ -162,13 +160,53 @@ namespace am TAmShTimerCallBack<CAmTimer> pTimerCallback; }; + class CAmTimerStressTest: public MockIAmTimerCb + { + CAmSocketHandler *mpSocketHandler; + timespec mUpdateTimeout; + int32_t mRepeats; + int32_t mHandle; + public: + explicit CAmTimerStressTest(CAmSocketHandler *SocketHandler, const timespec &timeout, const int32_t repeats = 0u); + virtual ~CAmTimerStressTest(); + + int32_t getHandle() { return mHandle; } + void setHandle(const int32_t id) { mHandle=id; } + + timespec getUpdateTimeout( ) { return mUpdateTimeout; } + + void timerCallback(sh_timerHandle_t handle, void * userData); + + TAmShTimerCallBack<CAmTimerStressTest> pTimerCallback; + }; + + class CAmTimerStressTest2: public MockIAmTimerCb + { + CAmSocketHandler *mpSocketHandler; + timespec mUpdateTimeout; + int32_t mRepeats; + int32_t mHandle; + public: + explicit CAmTimerStressTest2(CAmSocketHandler *SocketHandler, const timespec &timeout, const int32_t repeats = 0u); + virtual ~CAmTimerStressTest2(); + + int32_t getHandle() { return mHandle; } + void setHandle(const int32_t id) { mHandle=id; } + + timespec getUpdateTimeout( ) { return mUpdateTimeout; } + + void timerCallback(sh_timerHandle_t handle, void * userData); + + TAmShTimerCallBack<CAmTimerStressTest2> pTimerCallback; + }; + class CAmTimerMeasurment: public MockIAmTimerCb { CAmSocketHandler *mSocketHandler; timespec mUpdateTimeout; std::chrono::time_point<std::chrono::high_resolution_clock> mUpdateTimePoint; std::chrono::time_point<std::chrono::high_resolution_clock> mLastInvocationTime; - std::chrono::duration<long, std::ratio<1l, 1000000000l>> mExpected; + std::chrono::duration<uint64_t, std::nano> mExpected; int32_t mRepeats; void * mpUserData; std::string mDebugText; @@ -189,5 +227,19 @@ namespace am void TearDown(); }; + class CAmSamplePluginStressTest: public CAmSamplePlugin + { + std::vector<CAmTimerStressTest2*> mTimers; + public: + CAmSamplePluginStressTest(CAmSocketHandler *mySocketHandler, sockType_e socketType); + virtual ~CAmSamplePluginStressTest(); + + void receiveData(const pollfd pollfd, const sh_pollHandle_t handle, void* userData) final; + bool dispatchData(const sh_pollHandle_t handle, void* userData) final; + bool check(const sh_pollHandle_t handle, void* userData) final; + + std::vector<CAmTimerStressTest2*> & getTimers() { return mTimers; } + }; + } /* namespace am */ #endif /* SOCKETHANDLERTEST_H_ */ diff --git a/CMakeLists.txt b/CMakeLists.txt index 7e3bddb..3919fac 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -65,8 +65,7 @@ option ( WITH_SHARED_CORE "Build audio manager core as dynamic library" OFF) option ( WITH_TIMERFD - "Build with the linux specific TIMERFD feature to support timing without using signals" ON) - + "Build with timer fd support" ON ) set(DBUS_SERVICE_PREFIX "org.genivi.audiomanager" CACHE PROPERTY "The dbus service prefix for the AM - only changable for legacy dbus") @@ -213,7 +212,6 @@ if(WITH_DOCUMENTATION) PATTERN "def" EXCLUDE) endif(WITH_DOCUMENTATION) - message(STATUS) message(STATUS "${PROJECT_NAME} Configuration:") message(STATUS "CMAKE_BUILD_TYPE = ${CMAKE_BUILD_TYPE}") |