diff options
author | GENIVI Audio Manager Maintainer <genivi-maint-audiomanager@smtp1.genivi.org> | 2018-03-13 11:28:45 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-03-13 11:28:45 +0100 |
commit | f32875117e54fd4887f3256d19518d3f62db6225 (patch) | |
tree | 0b387aae10734f32918c7ccc218c782230449104 | |
parent | cfded6ea1a53e5387e1f24c0a9132f363247669b (diff) | |
parent | c0d9501bcfa14ea7d851e5dd1cdc89e03a651342 (diff) | |
download | audiomanager-f32875117e54fd4887f3256d19518d3f62db6225.tar.gz |
Merge pull request #26 from GENIVI/utilityUpdates_CAPI_fixes
Utility updates capi fixes
-rw-r--r-- | AudioManagerUtilities/include/CAmCommonAPIWrapper.h | 246 | ||||
-rw-r--r-- | AudioManagerUtilities/include/CAmSocketHandler.h | 32 | ||||
-rw-r--r-- | AudioManagerUtilities/src/CAmCommonAPIWrapper.cpp | 257 | ||||
-rw-r--r-- | AudioManagerUtilities/src/CAmDltWrapper.cpp | 5 | ||||
-rw-r--r-- | AudioManagerUtilities/src/CAmSocketHandler.cpp | 315 | ||||
-rw-r--r-- | AudioManagerUtilities/test/AmSerializerTest/CAmSerializerTest.cpp | 7 | ||||
-rw-r--r-- | AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp | 416 | ||||
-rw-r--r-- | AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.h | 78 | ||||
-rwxr-xr-x | CMakeLists.txt | 5 |
9 files changed, 932 insertions, 429 deletions
diff --git a/AudioManagerUtilities/include/CAmCommonAPIWrapper.h b/AudioManagerUtilities/include/CAmCommonAPIWrapper.h index 82328d6..7d64a04 100644 --- a/AudioManagerUtilities/include/CAmCommonAPIWrapper.h +++ b/AudioManagerUtilities/include/CAmCommonAPIWrapper.h @@ -22,6 +22,7 @@ #include <string> #include <list> #include <map> +#include <unordered_map> #include <queue> #include <memory> #include <cassert> @@ -48,29 +49,37 @@ class CAmSocketHandler; class CAmCommonAPIWrapper { - void commonPrepareCallback(const sh_pollHandle_t handle, void* userData); + void commonPrepareCallback ( const sh_pollHandle_t, void* ); TAmShPollPrepare<CAmCommonAPIWrapper> pCommonPrepareCallback; - bool commonDispatchCallback(const sh_pollHandle_t handle, void* userData); - TAmShPollDispatch<CAmCommonAPIWrapper> pCommonDispatchCallback; - void commonFireCallback(const pollfd pollfd, const sh_pollHandle_t, void*); TAmShPollFired<CAmCommonAPIWrapper> pCommonFireCallback; bool commonCheckCallback(const sh_pollHandle_t handle, void*); TAmShPollCheck<CAmCommonAPIWrapper> pCommonCheckCallback; + + bool commonDispatchCallback(const sh_pollHandle_t handle, void* userData); + TAmShPollDispatch<CAmCommonAPIWrapper> pCommonDispatchCallback; void commonTimerCallback(sh_timerHandle_t handle, void* userData); TAmShTimerCallBack<CAmCommonAPIWrapper> pCommonTimerCallback; - struct timerHandles - { - sh_timerHandle_t handle; - CommonAPI::Timeout* timeout; - }; - CAmSocketHandler *mpSocketHandler; //!< pointer to the sockethandler + typedef std::vector<CommonAPI::DispatchSource*> ArrayDispatchSources; + typedef ArrayDispatchSources::iterator IteratorArrayDispatchSources; + typedef std::unordered_map<am::sh_pollHandle_t, CommonAPI::Watch*> MapWatches; + typedef MapWatches::iterator IteratorMapWatches; + typedef std::unordered_map<am::sh_pollHandle_t,std::list<CommonAPI::DispatchSource*>> MapDispatchSources; + typedef MapDispatchSources::iterator IteratorDispatchSources; + typedef std::unordered_map<am::sh_pollHandle_t, CommonAPI::Timeout*> MapTimeouts; + typedef MapTimeouts::iterator IteratorMapTimeouts; + + ArrayDispatchSources mRegisteredDispatchSources; + MapWatches mMapWatches; + MapDispatchSources mSourcesToDispatch; + MapTimeouts mListTimerhandles; + std::shared_ptr<CommonAPI::Runtime> mRuntime; std::shared_ptr<CommonAPI::MainLoopContext> mContext; @@ -78,22 +87,23 @@ class CAmCommonAPIWrapper CommonAPI::WatchListenerSubscription mWatchListenerSubscription; CommonAPI::TimeoutSourceListenerSubscription mTimeoutSourceListenerSubscription; CommonAPI::WakeupListenerSubscription mWakeupListenerSubscription; - std::multimap<CommonAPI::DispatchPriority, CommonAPI::DispatchSource*> mRegisteredDispatchSources; - std::map<int,CommonAPI::Watch*> mMapWatches; - CommonAPI::Watch* mWatchToCheck; - std::list<CommonAPI::DispatchSource*> mSourcesToDispatch; - std::vector<timerHandles> mpListTimerhandles; + void registerDispatchSource(CommonAPI::DispatchSource* dispatchSource, const CommonAPI::DispatchPriority dispatchPriority); void deregisterDispatchSource(CommonAPI::DispatchSource* dispatchSource); + void deregisterAllDispatchSource(); void registerWatch(CommonAPI::Watch* watch, const CommonAPI::DispatchPriority dispatchPriority); void deregisterWatch(CommonAPI::Watch* watch); + void deregisterAllWatches(); void registerTimeout(CommonAPI::Timeout* timeout, const CommonAPI::DispatchPriority dispatchPriority); void deregisterTimeout(CommonAPI::Timeout* timeout); - void wakeup(); + void deregisterAllTimeouts(); + + CommonAPI::Watch* watchWithHandle(const sh_pollHandle_t handle); + CommonAPI::Timeout* timeoutWithHandle(const sh_pollHandle_t handle); protected: - CAmCommonAPIWrapper(CAmSocketHandler* socketHandler, const std::string & applicationName = "") ; + CAmCommonAPIWrapper(CAmSocketHandler* socketHandler, const std::string& applicationName = "") ; public: @@ -133,24 +143,25 @@ public: * @return Pointer to the socket handler. */ CAmSocketHandler *getSocketHandler() const { return mpSocketHandler; } -#if COMMONAPI_VERSION_NUMBER >= 300 - /** - * \brief Register stub objects. - * - * Example: std::shared_ptr<ConcreteStubClass> aStub; - * registerService( aStub, "local", "com.your_company.instance_name", "service-name"); - * - * @param shStub: Shared pointer to a stub instance - * @param domain: A string with the domain name, usually "local" - * @param instance: Common-api instance string as example "com.your_company.instance_name" - * @param connectionId: A string connection id, which is used by CommonAPI to group applications - * - */ - template <class TStubImp> bool registerService(const std::shared_ptr<TStubImp> & shStub, const std::string & domain, const std::string & instance, const CommonAPI::ConnectionId_t & connectionId) - { - return mRuntime->registerService(domain, instance, shStub, connectionId); - } -#endif + + /** + * \brief Deprecated method. This class is used only in single connection applications and no connectionId is needed. Instead you should use bool registerService(const std::shared_ptr<TStubImp> & shStub, const std::string & domain, const std::string & instance). + * + * + * Example: std::shared_ptr<ConcreteStubClass> aStub; + * registerService( aStub, "local", "com.your_company.instance_name", "service-name"); + * + * @param shStub: Shared pointer to a stub instance + * @param domain: A string with the domain name, usually "local" + * @param instance: Common-api instance string as example "com.your_company.instance_name" + * @param connectionId: A string connection id, which is used by CommonAPI to group applications + * + */ + template <class TStubImp> bool __attribute__((deprecated)) registerService(const std::shared_ptr<TStubImp> & shStub, const std::string & domain, const std::string & instance, const CommonAPI::ConnectionId_t __attribute__((__unused__)) & /*connectionId*/) + { + return mRuntime->registerService(domain, instance, shStub, mContext); + } + /** * \brief Register stub objects. * @@ -180,96 +191,93 @@ public: return mRuntime->unregisterService(domain, interface, instance); } - /** - * \brief Deprecated method. Instead you should use bool registerService(const std::shared_ptr<TStubImp> & shStub, const std::string & domain, const std::string & instance). - * - * Register stub objects. - * - * Example: std::shared_ptr<ConcreteStubClass> aStub; - * registerService( aStub, "local:com.your_company.interface_name:com.your_company.instance_name"); - * - * @param shStub: Shared pointer to a stub instance - * @param address: Complete common-api address as example "local:com.your_company.interface_name:com.your_company.instance_name" - * - */ - template <class TStubImp> bool __attribute__((deprecated)) registerStub(const std::shared_ptr<TStubImp> & shStub, const std::string & address) - { - std::vector<std::string> parts = CommonAPI::split(address, ':'); - assert(parts.size()==3); - - return registerService(shStub, parts[0], parts[2]); - } - - /** - * \brief Deprecated method. Instead you should use bool unregisterService(const std::string &domain, const std::string &interface, const std::string &instance). - * - * Unregister stub objects. - * - * @param address: Complete common-api address as example "local:com.your_company.interface_name:com.your_company.instance_name" - * - */ - bool __attribute__((deprecated)) unregisterStub(const std::string & address) - { - std::vector<std::string> parts = CommonAPI::split(address, ':'); - assert(parts.size()==3); - - return unregisterService(parts[0], parts[1], parts[2]); - } - -#if COMMONAPI_VERSION_NUMBER >= 300 - /** - * \brief Build proxy objects. - * - * Example: std::shared_ptr<AProxyClass<>> aProxy = buildProxy<AProxyClass>("local", "com.your_company.instance_name", "client-name"); - * - * @param domain: A string with the domain name, usually "local" - * @param instance: Common-api instance string as example "com.your_company.instance_name" - * @param connectionId: A string connection id, which is used by CommonAPI to group applications - * - * @return A proxy object. - */ - template<template<typename ...> class ProxyClass, typename ... AttributeExtensions> - std::shared_ptr<ProxyClass<AttributeExtensions...>> buildProxy(const std::string &domain, const std::string &instance, const CommonAPI::ConnectionId_t & connectionId) - { - return mRuntime->buildProxy<ProxyClass>(domain, instance, connectionId); - } -#endif - - /** - * \brief Build proxy objects. - * - * Example: std::shared_ptr<AProxyClass<>> aProxy = buildProxy<AProxyClass>("local", "com.your_company.instance_name"); - * - * @param domain: A string with the domain name, usually "local" - * @param instance: Common-api instance string as example "com.your_company.instance_name" - * - * @return A proxy object. - */ + /** + * \brief Deprecated method. Instead you should use bool registerService(const std::shared_ptr<TStubImp> & shStub, const std::string & domain, const std::string & instance). + * + * Register stub objects. + * + * Example: std::shared_ptr<ConcreteStubClass> aStub; + * registerService( aStub, "local:com.your_company.interface_name:com.your_company.instance_name"); + * + * @param shStub: Shared pointer to a stub instance + * @param address: Complete common-api address as example "local:com.your_company.interface_name:com.your_company.instance_name" + * + */ + template <class TStubImp> bool __attribute__((deprecated)) registerStub(const std::shared_ptr<TStubImp> & shStub, const std::string & address) + { + std::vector<std::string> parts = CommonAPI::split(address, ':'); + assert(parts.size()==3); + + return registerService(shStub, parts[0], parts[2]); + } + + /** + * \brief Deprecated method. Instead you should use bool unregisterService(const std::string &domain, const std::string &interface, const std::string &instance). + * + * Unregister stub objects. + * + * @param address: Complete common-api address as example "local:com.your_company.interface_name:com.your_company.instance_name" + * + */ + bool __attribute__((deprecated)) unregisterStub(const std::string & address) + { + std::vector<std::string> parts = CommonAPI::split(address, ':'); + assert(parts.size()==3); + + return unregisterService(parts[0], parts[1], parts[2]); + } + + /** + * \brief Deprecated method. This class is used only in single connection applications and no connectionId is needed. Instead you should use buildProxy(const std::string &domain, const std::string &instance). + * + * Example: std::shared_ptr<AProxyClass<>> aProxy = buildProxy<AProxyClass>("local", "com.your_company.instance_name", "client-name"); + * + * @param domain: A string with the domain name, usually "local" + * @param instance: Common-api instance string as example "com.your_company.instance_name" + * @param connectionId: A string connection id, which is used by CommonAPI to group applications + * + * @return A proxy object. + */ + template<template<typename ...> class ProxyClass, typename ... AttributeExtensions> + std::shared_ptr<ProxyClass<AttributeExtensions...>> __attribute__((deprecated)) buildProxy(const std::string &domain, const std::string &instance, const CommonAPI::ConnectionId_t __attribute__((__unused__)) & /*connectionId*/) + { + return mRuntime->buildProxy<ProxyClass>(domain, instance, mContext); + } + + /** + * \brief Build proxy objects. + * + * Example: std::shared_ptr<AProxyClass<>> aProxy = buildProxy<AProxyClass>("local", "com.your_company.instance_name"); + * + * @param domain: A string with the domain name, usually "local" + * @param instance: Common-api instance string as example "com.your_company.instance_name" + * + * @return A proxy object. + */ template<template<typename ...> class ProxyClass, typename ... AttributeExtensions> std::shared_ptr<ProxyClass<AttributeExtensions...>> buildProxy(const std::string &domain, const std::string &instance) { return mRuntime->buildProxy<ProxyClass>(domain, instance, mContext); } - - /** - * \brief Deprecated method. Instead you should use buildProxy(const std::string &domain, const std::string &instance). - * - * Build proxy objects. - * Example: std::shared_ptr<AProxyClass<>> aProxy = buildProxy<AProxyClass>("local:com.your_company.interface_name:com.your_company.instance_name"); - * - * @param address: Complete common-api address as example "local:com.your_company.interface_name:com.your_company.instance_name" - * - * @return A proxy object. - */ - template<template<typename ...> class ProxyClass, typename ... AttributeExtensions> - std::shared_ptr<ProxyClass<AttributeExtensions...>> __attribute__((deprecated)) buildProxy(const std::string & address) - { - std::vector<std::string> parts=CommonAPI::split(address, ':'); - assert(parts.size()==3); - - return buildProxy<ProxyClass>(parts[0], parts[2]); - } + /** + * \brief Deprecated method. Instead you should use buildProxy(const std::string &domain, const std::string &instance). + * + * Build proxy objects. + * Example: std::shared_ptr<AProxyClass<>> aProxy = buildProxy<AProxyClass>("local:com.your_company.interface_name:com.your_company.instance_name"); + * + * @param address: Complete common-api address as example "local:com.your_company.interface_name:com.your_company.instance_name" + * + * @return A proxy object. + */ + template<template<typename ...> class ProxyClass, typename ... AttributeExtensions> + std::shared_ptr<ProxyClass<AttributeExtensions...>> __attribute__((deprecated)) buildProxy(const std::string & address) + { + std::vector<std::string> parts=CommonAPI::split(address, ':'); + assert(parts.size()==3); + + return buildProxy<ProxyClass>(parts[0], parts[2]); + } }; diff --git a/AudioManagerUtilities/include/CAmSocketHandler.h b/AudioManagerUtilities/include/CAmSocketHandler.h index 717f792..53010ba 100644 --- a/AudioManagerUtilities/include/CAmSocketHandler.h +++ b/AudioManagerUtilities/include/CAmSocketHandler.h @@ -27,6 +27,7 @@ #include <signal.h> #include <vector> #include <functional> +#include <thread> #include <sys/signalfd.h> #include <audiomanagerconfig.h> #include "audiomanagertypes.h" @@ -217,6 +218,7 @@ class CAmSocketHandler { struct sh_poll_s //!<struct that holds information about polls { + bool isValid; sh_pollHandle_t handle; //!<handle to uniquely adress a filedesriptor pollfd pollfdValue; //!<the array for polling the filedescriptors std::function<void(const sh_pollHandle_t handle, void* userData)> prepareCB; //preperation callback @@ -226,7 +228,7 @@ class CAmSocketHandler void* userData; sh_poll_s() : - handle(0), pollfdValue(), prepareCB(), firedCB(), checkCB(), dispatchCB(), userData(0) + isValid(true), handle(0), pollfdValue(), prepareCB(), firedCB(), checkCB(), dispatchCB(), userData(0) {} }; @@ -244,10 +246,11 @@ class CAmSocketHandler sh_timer_s() : handle(0) #ifdef WITH_TIMERFD - , fd(0) + , fd(-1) #endif , countdown(), callback(), userData(0) {} + }; struct sh_signal_s @@ -291,22 +294,30 @@ class CAmSocketHandler VectorListPoll_t mListPoll; //!<list that holds all information for the ppoll sh_identifier_s mSetTimerKeys; //!A set of all used timer keys std::list<sh_timer_s> mListTimer; //!<list of all timers +#ifndef WITH_TIMERFD std::list<sh_timer_s> mListActiveTimer; //!<list of all currently active timers +#else + std::list<sh_timer_s> mListRemovedTimers; +#endif sh_identifier_s mSetSignalhandlerKeys; //!A set of all used signal handler keys VectorSignalHandlers_t mSignalHandlers; bool mRecreatePollfds; //!<when this is true, the poll list needs to be recreated internal_codes_t mInternalCodes; sh_pollHandle_t mSignalFdHandle; + VectorListPoll_t mListActivePolls; + const std::thread::id mThreadID; //!< Socket handler thread id used to check if the calls come from the same thread #ifndef WITH_TIMERFD timespec mStartTime; //!<here the actual time is saved for timecorrection #endif + private: + bool fdIsValid(const int fd) const; timespec* insertTime(timespec& buffertime); #ifdef WITH_TIMERFD am_Error_e createTimeFD(const itimerspec & timeouts, int & fd); - + void closeRemovedTimers(); #else void timerUp(); void timerCorrection(); @@ -408,28 +419,21 @@ private: * @param a * @return */ - inline static void fire(sh_poll_s& a); - - /** - * functor to return all fired events - * @param a - * @return - */ - inline static bool eventFired(const pollfd& a); + inline static void fire(const sh_poll_s* a); /** * functor to help find the items that do not need dispatching * @param a * @return */ - inline static bool noDispatching(const sh_poll_s& a); + inline static bool noDispatching(const sh_poll_s* a); /** * checks if dispatching is already finished * @param a * @return */ - inline static bool dispatchingFinished(const sh_poll_s& a); + inline static bool dispatchingFinished(const sh_poll_s* a); /** * timer fire callback @@ -446,7 +450,7 @@ private: bool nextHandle(sh_identifier_s & handle); am_Error_e getFDPollData(const sh_pollHandle_t handle, sh_poll_s & outPollData); - + public: CAmSocketHandler(); diff --git a/AudioManagerUtilities/src/CAmCommonAPIWrapper.cpp b/AudioManagerUtilities/src/CAmCommonAPIWrapper.cpp index 2aa8370..4a6accf 100644 --- a/AudioManagerUtilities/src/CAmCommonAPIWrapper.cpp +++ b/AudioManagerUtilities/src/CAmCommonAPIWrapper.cpp @@ -36,21 +36,47 @@ namespace am static CAmCommonAPIWrapper* pSingleCommonAPIInstance = NULL; +bool timeoutToTimespec(const int64_t & localTimeout, timespec & pollTimeout) +{ + if(CommonAPI::TIMEOUT_INFINITE == localTimeout)//dispatch never + { + return false; + } + else + { + if(CommonAPI::TIMEOUT_NONE==localTimeout)//dispatch immediately + { + pollTimeout.tv_sec = 0; + pollTimeout.tv_nsec = 5000000;//5 ms + } + else + { + pollTimeout.tv_sec = localTimeout / 1000; + pollTimeout.tv_nsec = (localTimeout % 1000) * 1000000; + } + return true; + } +} + + CAmCommonAPIWrapper::CAmCommonAPIWrapper(CAmSocketHandler* socketHandler, const std::string & applicationName): pCommonPrepareCallback(this,&CAmCommonAPIWrapper::commonPrepareCallback), // - pCommonDispatchCallback(this, &CAmCommonAPIWrapper::commonDispatchCallback), // pCommonFireCallback(this, &CAmCommonAPIWrapper::commonFireCallback), // pCommonCheckCallback(this, &CAmCommonAPIWrapper::commonCheckCallback), // + pCommonDispatchCallback(this, &CAmCommonAPIWrapper::commonDispatchCallback), // pCommonTimerCallback(this, &CAmCommonAPIWrapper::commonTimerCallback), // - mpSocketHandler(socketHandler), // - mWatchToCheck(NULL) + mpSocketHandler(socketHandler), + mRegisteredDispatchSources(), + mMapWatches(), + mSourcesToDispatch(), + mListTimerhandles() { assert(NULL!=socketHandler); //Get the runtime mRuntime = CommonAPI::Runtime::get(); assert(NULL!=mRuntime); - //Create the context +//Create the context if(applicationName.size()) mContext = std::make_shared<CommonAPI::MainLoopContext>(applicationName); else @@ -75,9 +101,11 @@ CAmCommonAPIWrapper::~CAmCommonAPIWrapper() mContext->unsubscribeForDispatchSources(mDispatchSourceListenerSubscription); mContext->unsubscribeForWatches(mWatchListenerSubscription); mContext->unsubscribeForTimeouts(mTimeoutSourceListenerSubscription); + deregisterAllDispatchSource(); + deregisterAllTimeouts(); + deregisterAllWatches(); mContext.reset(); mpSocketHandler = NULL; - mWatchToCheck = NULL; } CAmCommonAPIWrapper* CAmCommonAPIWrapper::instantiateOnce(CAmSocketHandler* socketHandler, const std::string & applicationName) @@ -115,151 +143,214 @@ CAmCommonAPIWrapper* CAmCommonAPIWrapper::getInstance() return pSingleCommonAPIInstance; } -bool CAmCommonAPIWrapper::commonDispatchCallback(const sh_pollHandle_t handle, void *userData) +void CAmCommonAPIWrapper::commonPrepareCallback(const sh_pollHandle_t, void*) { - (void) handle; - (void) userData; - - std::list<CommonAPI::DispatchSource*>::iterator iterator(mSourcesToDispatch.begin()); - for(;iterator!=mSourcesToDispatch.end();) + for (auto dispatchSourceIterator = mRegisteredDispatchSources.begin(); + dispatchSourceIterator != mRegisteredDispatchSources.end(); + dispatchSourceIterator++) { - CommonAPI::DispatchSource* source = *iterator; - if (!source->dispatch()) { - iterator=mSourcesToDispatch.erase(iterator); + int64_t dispatchTimeout(CommonAPI::TIMEOUT_INFINITE); + if((*dispatchSourceIterator)->prepare(dispatchTimeout)) + { + while ((*dispatchSourceIterator)->dispatch()); } - else - iterator++; } - if (!mSourcesToDispatch.empty()) - return (true); +} - return false; +void CAmCommonAPIWrapper::commonFireCallback(const pollfd pollfd, const sh_pollHandle_t handle, void *) +{ + CommonAPI::Watch* pWatchToCheck = watchWithHandle(handle); + if( pWatchToCheck ) + pWatchToCheck->dispatch(pollfd.revents); } -bool CAmCommonAPIWrapper::commonCheckCallback(const sh_pollHandle_t, void *) +bool CAmCommonAPIWrapper::commonCheckCallback(const sh_pollHandle_t handle, void *) { - std::vector<CommonAPI::DispatchSource*> vecDispatch=mWatchToCheck->getDependentDispatchSources(); - mSourcesToDispatch.insert(mSourcesToDispatch.end(), vecDispatch.begin(), vecDispatch.end()); + CommonAPI::Watch* pWatchToCheck = watchWithHandle(handle); + if( pWatchToCheck ) + { + const ArrayDispatchSources & vecDispatch = pWatchToCheck->getDependentDispatchSources(); + if(vecDispatch.size()>0) + { + mSourcesToDispatch[handle].insert(mSourcesToDispatch[handle].end(), vecDispatch.begin(), vecDispatch.end()); + return true; + } + } + return false; +} - return (mWatchToCheck || !mSourcesToDispatch.empty()); +bool CAmCommonAPIWrapper::commonDispatchCallback(const sh_pollHandle_t handle, void *) +{ + CommonAPI::Watch* pWatchToCheck = watchWithHandle(handle); + if( pWatchToCheck ) + { + std::list<CommonAPI::DispatchSource*> & srcList = mSourcesToDispatch[handle]; + for(auto it = srcList.begin();it!=srcList.end();) + { + if (false==(*it)->dispatch()) + it=srcList.erase(it); + else + it++; + } + if (!srcList.empty()) + return (true); + } + mSourcesToDispatch.erase(handle); + return false; } -void CAmCommonAPIWrapper::commonFireCallback(const pollfd pollfd, const sh_pollHandle_t, void *) +void CAmCommonAPIWrapper::commonTimerCallback(sh_timerHandle_t handle, void *) { - mWatchToCheck=NULL; - try + CommonAPI::Timeout* pTimeout = timeoutWithHandle(handle); + + if( NULL==pTimeout ) { - mWatchToCheck=mMapWatches.at(pollfd.fd); + //erroneous call because deregisterTimeout has been called, so try to remove the timer from the sockethandler + mpSocketHandler->removeTimer(handle); } - catch (const std::out_of_range& error) { - logInfo(__PRETTY_FUNCTION__,error.what()); - return; + else + { + if ( false==pTimeout->dispatch() ) //it should be removed + { + mpSocketHandler->removeTimer(handle); + mListTimerhandles.erase(handle); + } + #ifndef WITH_TIMERFD + else //the timeout should be rescheduled + mpSocketHandler->restartTimer(handle); + #endif } +} - mWatchToCheck->dispatch(pollfd.events); +void CAmCommonAPIWrapper::registerDispatchSource(CommonAPI::DispatchSource* dispatchSource, const CommonAPI::DispatchPriority) +{ + mRegisteredDispatchSources.push_back(dispatchSource); } -void CAmCommonAPIWrapper::commonPrepareCallback(const sh_pollHandle_t, void*) +void CAmCommonAPIWrapper::deregisterDispatchSource(CommonAPI::DispatchSource* dispatchSource) { - for (auto dispatchSourceIterator = mRegisteredDispatchSources.begin(); - dispatchSourceIterator != mRegisteredDispatchSources.end(); - dispatchSourceIterator++) + for(IteratorArrayDispatchSources dispatchSourceIterator = mRegisteredDispatchSources.begin(); dispatchSourceIterator != mRegisteredDispatchSources.end(); dispatchSourceIterator++) { - int64_t dispatchTimeout(CommonAPI::TIMEOUT_INFINITE); - if(dispatchSourceIterator->second->prepare(dispatchTimeout)) + if( *dispatchSourceIterator == dispatchSource ) { - while (dispatchSourceIterator->second->dispatch()); + mRegisteredDispatchSources.erase(dispatchSourceIterator); + break; } } } -void CAmCommonAPIWrapper::registerDispatchSource(CommonAPI::DispatchSource* dispatchSource, const CommonAPI::DispatchPriority dispatchPriority) +void CAmCommonAPIWrapper::deregisterAllDispatchSource() { - mRegisteredDispatchSources.insert({dispatchPriority, dispatchSource}); + mRegisteredDispatchSources.clear(); } -void CAmCommonAPIWrapper::deregisterDispatchSource(CommonAPI::DispatchSource* dispatchSource) +void CAmCommonAPIWrapper::registerWatch(CommonAPI::Watch* watch, const CommonAPI::DispatchPriority) { - for(auto dispatchSourceIterator = mRegisteredDispatchSources.begin(); - dispatchSourceIterator != mRegisteredDispatchSources.end(); - dispatchSourceIterator++) { + logInfo(__PRETTY_FUNCTION__); + pollfd pollfd_ (watch->getAssociatedFileDescriptor()); + sh_pollHandle_t handle (0); - if(dispatchSourceIterator->second == dispatchSource) { - mRegisteredDispatchSources.erase(dispatchSourceIterator); - break; - } + am_Error_e error = mpSocketHandler->addFDPoll(pollfd_.fd, pollfd_.events, &pCommonPrepareCallback, &pCommonFireCallback, &pCommonCheckCallback, &pCommonDispatchCallback, watch, handle); + + //if everything is alright, add the watch and the handle to our map so we know this relationship + if (error != am_Error_e::E_OK || handle == 0) + { + logError(__func__,"entering watch failed"); } + else + mMapWatches.insert(std::make_pair(handle,watch)); } void CAmCommonAPIWrapper::deregisterWatch(CommonAPI::Watch* watch) { - for(std::map<int,CommonAPI::Watch*>::iterator iter(mMapWatches.begin());iter!=mMapWatches.end();iter++) + for(IteratorMapWatches iter=mMapWatches.begin();iter!=mMapWatches.end();iter++) { if (iter->second == watch) { + mpSocketHandler->removeFDPoll(iter->first); mMapWatches.erase(iter); break; } } } +void CAmCommonAPIWrapper::deregisterAllWatches() +{ + for(IteratorMapWatches iter=mMapWatches.begin();iter!=mMapWatches.end();iter++) + mpSocketHandler->removeFDPoll(iter->first); + mMapWatches.clear(); +} + void CAmCommonAPIWrapper::registerTimeout(CommonAPI::Timeout* timeout, const CommonAPI::DispatchPriority) { timespec pollTimeout; - int64_t localTimeout = timeout->getTimeoutInterval(); - - pollTimeout.tv_sec = localTimeout / 1000; - pollTimeout.tv_nsec = (localTimeout % 1000) * 1000000; - - //prepare handle and callback. new is eval, but there is no other choice because we need the pointer! - sh_timerHandle_t handle; - - //add the timer to the pollLoop - mpSocketHandler->addTimer(pollTimeout, &pCommonTimerCallback, handle, timeout); - - timerHandles myHandle({handle,timeout}); - mpListTimerhandles.push_back(myHandle); + if(timeoutToTimespec(timeout->getTimeoutInterval(), pollTimeout)) + { + //prepare handle and callback. new is eval, but there is no other choice because we need the pointer! + sh_timerHandle_t handle; - return; + //add the timer to the pollLoop + am_Error_e error = mpSocketHandler->addTimer(pollTimeout, &pCommonTimerCallback, handle, timeout, true); + if (error != am_Error_e::E_OK || handle == 0) + { + logError(__func__,"adding timer failed"); + } + else + { + mListTimerhandles.insert(std::make_pair(handle,timeout)); + } + } } void CAmCommonAPIWrapper::deregisterTimeout(CommonAPI::Timeout* timeout) { - for( std::vector<timerHandles>::iterator iter(mpListTimerhandles.begin());iter!=mpListTimerhandles.end();iter++) + for( IteratorMapTimeouts iter=mListTimerhandles.begin();iter!= mListTimerhandles.end();iter++) { - if(iter->timeout==timeout) + if(iter->second==timeout) { - mpSocketHandler->removeTimer(iter->handle); + mpSocketHandler->removeTimer(iter->first); + mListTimerhandles.erase(iter->first); + break; } } } -void CAmCommonAPIWrapper::registerWatch(CommonAPI::Watch* watch, const CommonAPI::DispatchPriority) +void CAmCommonAPIWrapper::deregisterAllTimeouts() { - logInfo(__PRETTY_FUNCTION__); - pollfd pollfd_ (watch->getAssociatedFileDescriptor()); - sh_pollHandle_t handle (0); - - am_Error_e error = mpSocketHandler->addFDPoll(pollfd_.fd, pollfd_.events, &pCommonPrepareCallback, &pCommonFireCallback, &pCommonCheckCallback, &pCommonDispatchCallback, watch, handle); - - //if everything is alright, add the watch and the handle to our map so we know this relationship - if (error == !am_Error_e::E_OK || handle == 0) - logError(__func__,"entering watch failed"); + for( IteratorMapTimeouts iter=mListTimerhandles.begin();iter!= mListTimerhandles.end();iter++) + mpSocketHandler->removeTimer(iter->first); + mListTimerhandles.clear(); +} - mMapWatches.insert(std::make_pair(pollfd_.fd,watch)); +CommonAPI::Watch* CAmCommonAPIWrapper::watchWithHandle(const sh_pollHandle_t handle) +{ + CommonAPI::Watch* pWatchToCheck = NULL; + try + { + pWatchToCheck = mMapWatches.at(handle); + } + catch (const std::out_of_range& error) + { + logInfo(__PRETTY_FUNCTION__,error.what()); + } + return pWatchToCheck; } -void CAmCommonAPIWrapper::commonTimerCallback(sh_timerHandle_t handle, void *) +CommonAPI::Timeout* CAmCommonAPIWrapper::timeoutWithHandle(const sh_pollHandle_t handle) { - for( std::vector<timerHandles>::iterator iter(mpListTimerhandles.begin());iter!=mpListTimerhandles.end();iter++) + CommonAPI::Timeout* pTimeout = NULL; + try { - if(iter->handle==handle) - { - iter->timeout->dispatch(); - } + pTimeout = mListTimerhandles.at(handle); } + catch (const std::out_of_range& error) + { + logInfo(__PRETTY_FUNCTION__,error.what()); + } + return pTimeout; } + CAmCommonAPIWrapper* (*getCAPI)() = CAmCommonAPIWrapper::getInstance; } diff --git a/AudioManagerUtilities/src/CAmDltWrapper.cpp b/AudioManagerUtilities/src/CAmDltWrapper.cpp index 742b396..44ec614 100644 --- a/AudioManagerUtilities/src/CAmDltWrapper.cpp +++ b/AudioManagerUtilities/src/CAmDltWrapper.cpp @@ -23,13 +23,14 @@ */ -#include "CAmDltWrapper.h" #include <string> #include <iostream> #include <string.h> #include <chrono> #include <ctime> +#include <sys/types.h> #include <unistd.h> +#include "CAmDltWrapper.h" namespace am { @@ -625,7 +626,7 @@ bool CAmDltWrapper::initNoDlt(DltLogLevelType loglevel, DltContext* context) bool CAmDltWrapper::init(DltLogLevelType loglevel, DltContext* context) { pthread_mutex_lock(&mMutex); - initNoDlt(loglevel,context); + return initNoDlt(loglevel,context); } void CAmDltWrapper::send() diff --git a/AudioManagerUtilities/src/CAmSocketHandler.cpp b/AudioManagerUtilities/src/CAmSocketHandler.cpp index fad60e5..7cf5594 100644 --- a/AudioManagerUtilities/src/CAmSocketHandler.cpp +++ b/AudioManagerUtilities/src/CAmSocketHandler.cpp @@ -31,6 +31,7 @@ #include <features.h> #include <csignal> #include <unistd.h> +#include <string.h> #include "CAmDltWrapper.h" #include "CAmSocketHandler.h" @@ -42,22 +43,38 @@ namespace am { +#define CHECK_CALLER_THREAD_ID()\ + if(std::this_thread::get_id() != mThreadID)\ + {\ + logError("Sockethandler: Call from another thread detected!");\ + assert(false);\ + } + + + + CAmSocketHandler::CAmSocketHandler() : mPipe(), // - mDispatchDone(true), // - mSetPollKeys(MAX_POLLHANDLE), // - mListPoll(), // - mSetTimerKeys(MAX_TIMERHANDLE), - mListTimer(), // - mListActiveTimer(), // - mSetSignalhandlerKeys(MAX_POLLHANDLE), // - mSignalHandlers(), // - mRecreatePollfds(true), - mInternalCodes(internal_codes_e::NO_ERROR), - mSignalFdHandle(0) -#ifndef WITH_TIMERFD -,mStartTime() // -#endif + mDispatchDone(true), // + mSetPollKeys(MAX_POLLHANDLE), // + mListPoll(), // + mSetTimerKeys(MAX_TIMERHANDLE), + mListTimer(), // + #ifndef WITH_TIMERFD + mListActiveTimer(), // + #else + mListRemovedTimers(), + #endif + mSetSignalhandlerKeys(MAX_POLLHANDLE), // + mSignalHandlers(), // + mRecreatePollfds(true), + mInternalCodes(internal_codes_e::NO_ERROR), + mSignalFdHandle(0), + mListActivePolls(), + mThreadID(std::this_thread::get_id()) + #ifndef WITH_TIMERFD + ,mStartTime() // + #endif { if (pipe(mPipe) == -1) { @@ -69,17 +86,17 @@ CAmSocketHandler::CAmSocketHandler() : short event = 0; sh_pollHandle_t handle; event |= POLLIN; - if (addFDPoll(mPipe[0], event, NULL, - [](const pollfd, const sh_pollHandle_t, void*){}, - [](const sh_pollHandle_t, void*) { return (false); }, - NULL, NULL, handle) != E_OK) - { + if (addFDPoll(mPipe[0], event, NULL, [](const pollfd pollfd, const sh_pollHandle_t, void*) + {}, [](const sh_pollHandle_t, void*) + { return (false);}, NULL, NULL, handle) != E_OK) mInternalCodes |= internal_codes_e::FD_ERROR; - } } CAmSocketHandler::~CAmSocketHandler() { +#ifdef WITH_TIMERFD + closeRemovedTimers(); +#endif for (auto it : mListPoll) { close(it.pollfdValue.fd); @@ -97,13 +114,16 @@ void CAmSocketHandler::start_listenting() mDispatchDone = false; int16_t pollStatus; + CHECK_CALLER_THREAD_ID() + #ifndef WITH_TIMERFD clock_gettime(CLOCK_MONOTONIC, &mStartTime); #endif timespec buffertime; - VectorListPoll_t cloneListPoll; + std::list<sh_poll_s*> listPoll; VectorListPoll_t::iterator listmPollIt; + VectorListPollfd_t::iterator itMfdPollingArray; VectorListPollfd_t fdPollingArray; //!<the polling array for ppoll auto preparePollfd = [&](const sh_poll_s& row) @@ -118,17 +138,20 @@ void CAmSocketHandler::start_listenting() { if (mRecreatePollfds) { +#ifdef WITH_TIMERFD + closeRemovedTimers(); +#endif fdPollingArray.clear(); //freeze mListPoll by copying it - otherwise we get problems when we want to manipulate it during the next lines - cloneListPoll = mListPoll; + mListActivePolls = mListPoll; //there was a change in the setup, so we need to recreate the fdarray from the list - std::for_each(cloneListPoll.begin(), cloneListPoll.end(), preparePollfd); + std::for_each(mListActivePolls.begin(), mListActivePolls.end(), preparePollfd); mRecreatePollfds = false; } else { //first we go through the registered filedescriptors and check if someone needs preparation: - std::for_each(cloneListPoll.begin(), cloneListPoll.end(), CAmSocketHandler::prepare); + std::for_each(mListActivePolls.begin(), mListActivePolls.end(), CAmSocketHandler::prepare); } #ifndef WITH_TIMERFD @@ -152,25 +175,25 @@ void CAmSocketHandler::start_listenting() if (pollStatus != 0) //only check filedescriptors if there was a change { - std::list<sh_poll_s> listPoll; //todo: here could be a timer that makes sure naughty plugins return! + listPoll.clear(); //stage 0+1, call firedCB - listmPollIt = cloneListPoll.begin(); - for (auto it : fdPollingArray) + for (itMfdPollingArray = fdPollingArray.begin(); itMfdPollingArray != fdPollingArray.end(); ++itMfdPollingArray) { - if (CAmSocketHandler::eventFired(it)) + itMfdPollingArray->revents &= itMfdPollingArray->events | POLLERR | POLLHUP; + if ( itMfdPollingArray->revents!=0 ) { - listmPollIt->pollfdValue.revents = it.revents; - listPoll.push_back(*listmPollIt); - CAmSocketHandler::fire(*listmPollIt); - } - else - { - listmPollIt->pollfdValue.revents = 0; + listmPollIt = mListActivePolls.begin(); + std::advance(listmPollIt, std::distance(fdPollingArray.begin(), itMfdPollingArray)); + + sh_poll_s & pollObj = *listmPollIt; + + listPoll.push_back(&pollObj); + CAmSocketHandler::fire(&pollObj); + itMfdPollingArray->revents = 0; } - listmPollIt++; } - + //stage 2, lets ask around if some dispatching is necessary, the ones who need stay on the list listPoll.remove_if(CAmSocketHandler::noDispatching); @@ -223,7 +246,7 @@ void CAmSocketHandler::exit_mainloop() bool CAmSocketHandler::fatalErrorOccurred() { - return ((mInternalCodes&internal_codes_e::PIPE_ERROR)>0)||((mInternalCodes&internal_codes_e::FD_ERROR)>0); + return ((mInternalCodes&internal_codes_e::PIPE_ERROR)>0)||((mInternalCodes&internal_codes_e::FD_ERROR)>0); } am_Error_e CAmSocketHandler::getFDPollData(const sh_pollHandle_t handle, sh_poll_s & outPollData) @@ -246,6 +269,8 @@ am_Error_e CAmSocketHandler::getFDPollData(const sh_pollHandle_t handle, sh_poll */ am_Error_e CAmSocketHandler::listenToSignals(const std::vector<uint8_t> & listSignals) { + CHECK_CALLER_THREAD_ID() + int fdErr; uint8_t addedSignals = 0; sigset_t sigset; @@ -287,31 +312,19 @@ am_Error_e CAmSocketHandler::listenToSignals(const std::vector<uint8_t> & listSi return (E_NOT_POSSIBLE); } - int signalHandlerFd; + sh_poll_s sgPollData; if(mSignalFdHandle) { - sh_poll_s sgPollData; if(E_OK!=getFDPollData(mSignalFdHandle, sgPollData)) { - removeFDPoll(mSignalFdHandle); - mSignalFdHandle = 0; - } - else - { - int signalHandlerFd = signalfd(sgPollData.pollfdValue.fd, &sigset, 0); - if (signalHandlerFd == -1) - { - logError("Could not update signal fd!"); - return (E_NOT_POSSIBLE); - } - return E_OK; + mSignalFdHandle = 0; } } if(0==mSignalFdHandle) { /* Create the signalfd */ - signalHandlerFd = signalfd(-1, &sigset, 0); + int signalHandlerFd = signalfd(-1, &sigset, SFD_NONBLOCK); if (signalHandlerFd == -1) { logError("Could not open signal fd!"); @@ -320,21 +333,40 @@ am_Error_e CAmSocketHandler::listenToSignals(const std::vector<uint8_t> & listSi auto actionPoll = [this](const pollfd pollfd, const sh_pollHandle_t, void*) { - const VectorSignalHandlers_t & signalHandlers = mSignalHandlers; - /* We have a valid signal, read the info from the fd */ - struct signalfd_siginfo info; - ssize_t bytes = read(pollfd.fd, &info, sizeof(info)); - assert(bytes == sizeof(info)); - - /* Notify all listeners */ - for(auto it: signalHandlers) - it.callback(it.handle, info, it.userData); + const VectorSignalHandlers_t & signalHandlers = mSignalHandlers; + /* We have a valid signal, read the info from the fd */ + struct signalfd_siginfo info; + ssize_t bytes = read(pollfd.fd, &info, sizeof(info)); + if(bytes == -1) + { + if (errno == EAGAIN) //Something wrong, check for EAGAIN + bytes = read(pollfd.fd, &info, sizeof(info)); + } + if(bytes != sizeof(info)) + { + //Failed to read from fd... + logError("Failed to read from signal fd"); + throw std::runtime_error(std::string("Failed to read from signal fd.")); + } + + /* Notify all listeners */ + for(auto it: signalHandlers) + it.callback(it.handle, info, it.userData); }; /* We're going to add the signal fd through addFDPoll. At this point we don't have any signal listeners. */ - am_Error_e shFdError = addFDPoll(signalHandlerFd, POLLIN | POLLERR | POLLHUP, NULL, actionPoll, [](const sh_pollHandle_t, void*) + return addFDPoll(signalHandlerFd, POLLIN | POLLERR | POLLHUP, NULL, actionPoll, [](const sh_pollHandle_t, void*) { return (false);}, NULL, NULL, mSignalFdHandle); - return shFdError; } + else + { + int signalHandlerFd = signalfd(sgPollData.pollfdValue.fd, &sigset, 0); + if (signalHandlerFd == -1) + { + logError("Could not update signal fd!", strerror(errno)); + return (E_NOT_POSSIBLE); + } + return E_OK; + } } /** @@ -350,10 +382,17 @@ am_Error_e CAmSocketHandler::listenToSignals(const std::vector<uint8_t> & listSi * @return E_OK if the descriptor was added, E_NON_EXISTENT if the fd is not valid */ -am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, std::function<void(const sh_pollHandle_t handle, void* userData)> prepare, - std::function<void(const pollfd pollfd, const sh_pollHandle_t handle, void* userData)> fired, std::function<bool(const sh_pollHandle_t handle, void* userData)> check, - std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatch, void* userData, sh_pollHandle_t& handle) +am_Error_e CAmSocketHandler::addFDPoll(const int fd, + const short event, + std::function<void(const sh_pollHandle_t handle, void* userData)> prepare, + std::function<void(const pollfd pollfd, const sh_pollHandle_t handle, void* userData)> fired, + std::function<bool(const sh_pollHandle_t handle, void* userData)> check, + std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatch, + void* userData, + sh_pollHandle_t& handle) { + CHECK_CALLER_THREAD_ID() + if (!fdIsValid(fd)) return (E_NON_EXISTENT); @@ -400,7 +439,7 @@ am::am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, IAmS { std::function<void(const sh_pollHandle_t handle, void* userData)> prepareCB; //preperation callback - std::function<void(const pollfd poll, const sh_pollHandle_t handle, void* userData)> firedCB; //fired callback + std::function<void(const pollfd pollfd, const sh_pollHandle_t handle, void* userData)> firedCB; //fired callback std::function<bool(const sh_pollHandle_t handle, void* userData)> checkCB; //check callback std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatchCB; //check callback @@ -423,19 +462,36 @@ am::am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, IAmS */ am_Error_e CAmSocketHandler::removeFDPoll(const sh_pollHandle_t handle) { - VectorListPoll_t::iterator iterator = mListPoll.begin(); + CHECK_CALLER_THREAD_ID() - for (; iterator != mListPoll.end(); ++iterator) + bool handleRemoved = false; + + for (auto it = mListPoll.begin(); it != mListPoll.end(); ++it) { - if (iterator->handle == handle) + if (it->handle == handle) { - iterator = mListPoll.erase(iterator); + it = mListPoll.erase(it); mSetPollKeys.pollHandles.erase(handle); - mRecreatePollfds = true; - return (E_OK); + handleRemoved = true; + break; } } - return (E_UNKNOWN); + + if ( false == handleRemoved ) + return (E_UNKNOWN); + + mRecreatePollfds = true; + + for (auto it = mListActivePolls.begin(); it != mListActivePolls.end(); ++it) + { + if (it->handle == handle) + { + it->isValid = false; + break; + } + } + + return (E_OK); } /** @@ -447,6 +503,8 @@ am_Error_e CAmSocketHandler::removeFDPoll(const sh_pollHandle_t handle) */ am_Error_e CAmSocketHandler::addSignalHandler(std::function<void(const sh_pollHandle_t handle, const signalfd_siginfo & info, void* userData)> callback, sh_pollHandle_t& handle, void * userData) { + CHECK_CALLER_THREAD_ID() + if (!nextHandle(mSetSignalhandlerKeys)) { logError("Could not create new polls, too many open!"); @@ -469,6 +527,8 @@ am_Error_e CAmSocketHandler::addSignalHandler(std::function<void(const sh_pollHa */ am_Error_e CAmSocketHandler::removeSignalHandler(const sh_pollHandle_t handle) { + CHECK_CALLER_THREAD_ID() + VectorSignalHandlers_t::iterator it(mSignalHandlers.begin()); for (; it != mSignalHandlers.end(); ++it) { @@ -506,6 +566,7 @@ am_Error_e CAmSocketHandler::addTimer(const timespec & timeouts, IAmShTimerCallB am_Error_e CAmSocketHandler::addTimer(const timespec & timeouts, std::function<void(const sh_timerHandle_t handle, void* userData)> callback, sh_timerHandle_t& handle, void * userData, const bool repeats) { + CHECK_CALLER_THREAD_ID() assert(!((timeouts.tv_sec == 0) && (timeouts.tv_nsec == 0))); mListTimer.emplace_back(); @@ -533,7 +594,6 @@ am_Error_e CAmSocketHandler::addTimer(const timespec & timeouts, std::function<v clock_gettime(CLOCK_MONOTONIC, ¤tTime); if (!mDispatchDone)//the mainloop is started timerItem.countdown = timespecAdd(timeouts, timespecSub(currentTime, mStartTime)); - mListTimer.push_back(timerItem); mListActiveTimer.push_back(timerItem); mListActiveTimer.sort(compareCountdown); return (E_OK); @@ -559,22 +619,35 @@ am_Error_e CAmSocketHandler::addTimer(const timespec & timeouts, std::function<v return err; } - static auto actionPoll = [](const pollfd pollfd, const sh_pollHandle_t handle, void* userData) + auto actionPoll = [this](const pollfd pollfd, const sh_pollHandle_t handle, void* userData) { uint64_t mExpirations; - if (read(pollfd.fd, &mExpirations, sizeof(uint64_t)) == -1) + ssize_t bytes = read(pollfd.fd, &mExpirations, sizeof(mExpirations)); + if(bytes == -1) + { + if (errno == EAGAIN)//Something wrong, check for EAGAIN + bytes = read(pollfd.fd, &mExpirations, sizeof(mExpirations)); + } + + if(bytes != sizeof(mExpirations)) { - //error received...try again - read(pollfd.fd, &mExpirations, sizeof(uint64_t)); + //Failed to read from fd... + logError("Failed to read from timer fd"); + throw std::runtime_error(std::string("Failed to read from timer fd.")); } }; - err = addFDPoll(timerItem.fd, POLLIN, NULL, actionPoll, [callback](const sh_pollHandle_t handle, void* userData)->bool - { - callback(handle, userData); - return false; - }, - NULL, userData, handle); + err = addFDPoll(timerItem.fd, + POLLIN, + NULL, + actionPoll, + [callback](const sh_pollHandle_t handle, void* userData)->bool{ + callback(handle, userData); + return false; + }, + NULL, + userData, + handle); if (E_OK == err) { timerItem.handle = handle; @@ -595,6 +668,7 @@ am_Error_e CAmSocketHandler::addTimer(const timespec & timeouts, std::function<v */ am_Error_e CAmSocketHandler::removeTimer(const sh_timerHandle_t handle) { + CHECK_CALLER_THREAD_ID() assert(handle != 0); //stop the current timer @@ -608,20 +682,22 @@ am_Error_e CAmSocketHandler::removeTimer(const sh_timerHandle_t handle) if (it == mListTimer.end()) return (E_NON_EXISTENT); - close(it->fd); + mListRemovedTimers.push_back(*it); mListTimer.erase(it); return removeFDPoll(handle); #else stopTimer(handle); std::list<sh_timer_s>::iterator it(mListTimer.begin()); - for (; it != mListTimer.end(); ++it) + while (it != mListTimer.end()) { if (it->handle == handle) { - it = mListTimer.erase(it); + it = mListTimer.erase(it); mSetTimerKeys.pollHandles.erase(handle); return (E_OK); } + else + ++it; } return (E_UNKNOWN); #endif @@ -635,6 +711,8 @@ am_Error_e CAmSocketHandler::removeTimer(const sh_timerHandle_t handle) */ am_Error_e CAmSocketHandler::updateTimer(const sh_timerHandle_t handle, const timespec & timeouts) { + CHECK_CALLER_THREAD_ID() + #ifdef WITH_TIMERFD std::list<sh_timer_s>::iterator it = mListTimer.begin(); for (; it != mListTimer.end(); ++it) @@ -657,7 +735,7 @@ am_Error_e CAmSocketHandler::updateTimer(const sh_timerHandle_t handle, const ti } else { - if (timerfd_settime(it->fd, 0, &it->countdown, NULL)) + if (timerfd_settime(it->fd, 0, &it->countdown, NULL)<0) { logError("Failed to set timer duration"); return E_NOT_POSSIBLE; @@ -719,6 +797,7 @@ am_Error_e CAmSocketHandler::updateTimer(const sh_timerHandle_t handle, const ti */ am_Error_e CAmSocketHandler::restartTimer(const sh_timerHandle_t handle) { + CHECK_CALLER_THREAD_ID() #ifdef WITH_TIMERFD std::list<sh_timer_s>::iterator it = mListTimer.begin(); for (; it != mListTimer.end(); ++it) @@ -737,7 +816,7 @@ am_Error_e CAmSocketHandler::restartTimer(const sh_timerHandle_t handle) } else { - if (timerfd_settime(it->fd, 0, &it->countdown, NULL)) + if (timerfd_settime(it->fd, 0, &it->countdown, NULL)<0) { logError("Failed to set timer duration"); return E_NOT_POSSIBLE; @@ -797,6 +876,7 @@ am_Error_e CAmSocketHandler::restartTimer(const sh_timerHandle_t handle) */ am_Error_e CAmSocketHandler::stopTimer(const sh_timerHandle_t handle) { + CHECK_CALLER_THREAD_ID() #ifdef WITH_TIMERFD std::list<sh_timer_s>::iterator it = mListTimer.begin(); for (; it != mListTimer.end(); ++it) @@ -811,7 +891,7 @@ am_Error_e CAmSocketHandler::stopTimer(const sh_timerHandle_t handle) countdown.it_value.tv_nsec = 0; countdown.it_value.tv_sec = 0; - if (timerfd_settime(it->fd, 0, &countdown, NULL)) + if (timerfd_settime(it->fd, 0, &countdown, NULL)<0) { logError("Failed to set timer duration"); return E_NOT_POSSIBLE; @@ -820,13 +900,16 @@ am_Error_e CAmSocketHandler::stopTimer(const sh_timerHandle_t handle) #else //go through the list and remove the timer with the handle std::list<sh_timer_s>::iterator it(mListActiveTimer.begin()); - for (; it != mListActiveTimer.end(); ++it) + + while (it != mListActiveTimer.end()) { if (it->handle == handle) { it = mListActiveTimer.erase(it); return (E_OK); } + else + it++; } return (E_NON_EXISTENT); #endif @@ -840,6 +923,7 @@ am_Error_e CAmSocketHandler::stopTimer(const sh_timerHandle_t handle) */ am_Error_e CAmSocketHandler::updateEventFlags(const sh_pollHandle_t handle, const short events) { + CHECK_CALLER_THREAD_ID() VectorListPoll_t::iterator iterator = mListPoll.begin(); for (; iterator != mListPoll.end(); ++iterator) @@ -966,11 +1050,11 @@ void CAmSocketHandler::prepare(am::CAmSocketHandler::sh_poll_s& row) /** * fire callback */ -void CAmSocketHandler::fire(sh_poll_s& a) +void CAmSocketHandler::fire(const sh_poll_s* a) { try { - a.firedCB(a.pollfdValue, a.handle, a.userData); + a->firedCB(a->pollfdValue, a->handle, a->userData); } catch (std::exception& e) { logError("Sockethandler: Exception in Preparecallback,caught", e.what()); @@ -980,31 +1064,23 @@ void CAmSocketHandler::fire(sh_poll_s& a) /** * should disptach */ -bool CAmSocketHandler::noDispatching(const sh_poll_s& a) +bool CAmSocketHandler::noDispatching(const sh_poll_s* a) { //remove from list of there is no checkCB - if (nullptr == a.checkCB) + if (nullptr == a->checkCB || false == a->isValid) return (true); - return (!a.checkCB(a.handle, a.userData)); + return (!a->checkCB(a->handle, a->userData)); } /** * disptach */ -bool CAmSocketHandler::dispatchingFinished(const sh_poll_s& a) +bool CAmSocketHandler::dispatchingFinished(const sh_poll_s* a) { //remove from list of there is no dispatchCB - if (nullptr == a.dispatchCB) + if (nullptr == a->dispatchCB || false == a->isValid) return (true); - return (!a.dispatchCB(a.handle, a.userData)); -} - -/** - * event triggered - */ -bool CAmSocketHandler::eventFired(const pollfd& a) -{ - return (a.revents == 0 ? false : true); + return (!a->dispatchCB(a->handle, a->userData)); } /** @@ -1021,30 +1097,41 @@ inline timespec* CAmSocketHandler::insertTime(timespec& buffertime) return (&buffertime); } else -#endif +#endif { return (NULL); } } -#ifdef WITH_TIMERFD +#ifdef WITH_TIMERFD am_Error_e CAmSocketHandler::createTimeFD(const itimerspec & timeouts, int & fd) { fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); - if (fd <= 0) + if (fd < 0) { logError("Failed to create timer"); return E_NOT_POSSIBLE; } - if (timerfd_settime(fd, 0, &timeouts, NULL)) + if (timerfd_settime(fd, 0, &timeouts, NULL) < 0) { logError("Failed to set timer duration"); return E_NOT_POSSIBLE; } return E_OK; } -#endif + +void CAmSocketHandler::closeRemovedTimers() +{ + for (auto it : mListRemovedTimers) + { + if( it.fd > -1 ) + close( it.fd ); + } + mListRemovedTimers.clear(); +} + +#endif void CAmSocketHandler::callTimer(sh_timer_s& a) { diff --git a/AudioManagerUtilities/test/AmSerializerTest/CAmSerializerTest.cpp b/AudioManagerUtilities/test/AmSerializerTest/CAmSerializerTest.cpp index 49c6738..b18b284 100644 --- a/AudioManagerUtilities/test/AmSerializerTest/CAmSerializerTest.cpp +++ b/AudioManagerUtilities/test/AmSerializerTest/CAmSerializerTest.cpp @@ -79,6 +79,8 @@ struct SerializerData V2::CAmSerializer *pSerializer; }; +#define ASYNCLOOP 100 + void* ptSerializerSync(void* data) { SerializerData *pData = (SerializerData*) data; @@ -96,6 +98,7 @@ void* ptSerializerSync(void* data) return (NULL); } + void* ptSerializerASync(void* data) { SerializerData *pData = (SerializerData*) data; @@ -106,7 +109,7 @@ void* ptSerializerASync(void* data) #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wdeprecated-declarations" - for (uint32_t i = 0; i < 5; i++) + for (uint32_t i = 0; i < ASYNCLOOP; i++) { testStr = pData->testStr; pData->pSerializer->asyncCall(pData->pSerCb, &MockIAmSerializerCb::dispatchData, i, testStr); @@ -191,7 +194,7 @@ TEST(CAmSerializerTest, asyncTest) EXPECT_CALL(serCb,check()).Times(2); EXPECT_CALL(serCb,checkInt()).Times(1).WillRepeatedly(Return(100)); - for (int i = 0; i < 5; i++) + for (int i = 0; i < ASYNCLOOP; i++) EXPECT_CALL(serCb,dispatchData(i,testStr)).WillOnce(DoAll(ActionDispatchData(), Return(true))); myHandler.start_listenting(); diff --git a/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp b/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp index ecd38fe..3908c2e 100644 --- a/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp +++ b/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp @@ -30,15 +30,17 @@ #include <fcntl.h> #include <sys/un.h> #include <sys/poll.h> - +#include "CAmDltWrapper.h" #include "CAmSocketHandler.h" -//todo: expand test, implement more usecases -//todo: test removeFD + +#undef ENABLED_SOCKETHANDLER_TEST_OUTPUT +#undef ENABLED_TIMERS_TEST_OUTPUT #define SOCK_PATH "/tmp/mysock" -#define SOCKET_TEST_LOOPS_COUNT 1000 +#define SOCKET_TEST_LOOPS_COUNT 50 +#define TIMERS_TO_TEST 100 using namespace testing; using namespace am; @@ -48,6 +50,11 @@ static const char * TEST_SOCKET_DATA_FINAL = "finish!"; static const std::chrono::time_point<std::chrono::high_resolution_clock> TP_ZERO; +struct TestUserData +{ + int i; + float f; +}; MockIAmSignalHandler *pMockSignalHandler = NULL; static void signalHandler(int sig, siginfo_t *siginfo, void *context) @@ -149,11 +156,52 @@ void am::CAmTimer::timerCallback(sh_timerHandle_t handle, void* userData) } } +CAmTimerStressTest::CAmTimerStressTest(CAmSocketHandler *myHandler, const timespec &timeout, const int32_t repeats) : + MockIAmTimerCb(), mpSocketHandler(myHandler), mUpdateTimeout(timeout), pTimerCallback(this, &CAmTimerStressTest::timerCallback), mRepeats(repeats), mId(0), mHandle(0) +{ +} + +am::CAmTimerStressTest::~CAmTimerStressTest() +{ +} + +void am::CAmTimerStressTest::timerCallback(sh_timerHandle_t handle, void* pUserData) +{ + mpSocketHandler->removeTimer(handle); + MockIAmTimerCb::timerCallback(handle, pUserData); + sh_timerHandle_t handle1; + mpSocketHandler->addTimer(mUpdateTimeout, &pTimerCallback, handle1, &(*((TestUserData*)pUserData)), true); +} + +CAmTimerStressTest2::CAmTimerStressTest2(CAmSocketHandler *myHandler, const timespec &timeout, const int32_t repeats) : + MockIAmTimerCb(), mpSocketHandler(myHandler), mUpdateTimeout(timeout), pTimerCallback(this, &CAmTimerStressTest2::timerCallback), mRepeats(repeats), mId(0) +{ +} + +am::CAmTimerStressTest2::~CAmTimerStressTest2() +{ +} + +void am::CAmTimerStressTest2::timerCallback(sh_timerHandle_t handle, void* pUserData) +{ + #ifdef ENABLED_SOCKETHANDLER_TEST_OUTPUT + std::cout<<"timerCallback handle=" << handle <<std::endl; + #endif + MockIAmTimerCb::timerCallback(handle, pUserData); +} + + CAmTimerMeasurment::CAmTimerMeasurment(CAmSocketHandler *myHandler, const timespec &timeout, const std::string & label, const int32_t repeats, void * userData) : - MockIAmTimerCb(), pTimerCallback(this, &CAmTimerMeasurment::timerCallback), // - mSocketHandler(myHandler), mUpdateTimeout(timeout), mUpdateTimePoint(std::chrono::seconds - { mUpdateTimeout.tv_sec } + std::chrono::nanoseconds - { mUpdateTimeout.tv_nsec }), mLastInvocationTime(), mExpected(mUpdateTimePoint - TP_ZERO), mRepeats(repeats), mpUserData(userData), mDebugText(label) + MockIAmTimerCb() + , pTimerCallback(this, &CAmTimerMeasurment::timerCallback) + , mSocketHandler(myHandler) + , mUpdateTimeout(timeout) + , mUpdateTimePoint(std::chrono::seconds{ mUpdateTimeout.tv_sec } + std::chrono::nanoseconds{ mUpdateTimeout.tv_nsec }) + , mLastInvocationTime() + , mExpected(mUpdateTimePoint - TP_ZERO) + , mRepeats(repeats) + , mpUserData(userData) + , mDebugText(label) { } @@ -201,25 +249,137 @@ void am::CAmTimerMeasurment::timerCallback(sh_timerHandle_t handle, void* userDa std::cout << mDebugText << " Init measurment " << std::endl; #endif mLastInvocationTime = t_end; - mSocketHandler->updateTimer(handle, mUpdateTimeout); + mSocketHandler->updateTimer( handle, mUpdateTimeout); } } void* playWithSocketServer(void* data) { - CAmSocketHandler *pSockethandler = (CAmSocketHandler*) data; - pSockethandler->start_listenting(); + int socket_ = *((int*)data); + struct sockaddr_in servAddr; + unsigned short servPort = 6060; + struct hostent *host; + + if ((host = (struct hostent*) gethostbyname("localhost")) == 0) + { + std::cout << "ERROR: gethostbyname() failed\n" << std::endl; + exit(1); + } + + memset(&servAddr, 0, sizeof(servAddr)); + servAddr.sin_family = AF_INET; + servAddr.sin_addr.s_addr = inet_addr(inet_ntoa(*(struct in_addr*) (host->h_addr_list[0]))); + servAddr.sin_port = htons(servPort); + sleep(1); + int ret = connect(socket_, (struct sockaddr *) &servAddr, sizeof(servAddr)); + if (ret < 0) + { + std::cerr << "ERROR: connect() failed\n" << std::endl; + return (NULL); + } + + for (int i = 1; i <= SOCKET_TEST_LOOPS_COUNT; i++) + { + std::string string(TEST_SOCKET_DATA); + send(socket_, string.c_str(), string.size(), 0); + } + std::string string(TEST_SOCKET_DATA_FINAL); + send(socket_, string.c_str(), string.size(), 0); + return (NULL); } void* playWithUnixSocketServer(void* data) { - CAmSocketHandler *pSockethandler = (CAmSocketHandler*) data; - pSockethandler->start_listenting(); + int socket_ = *((int*)data); + struct sockaddr_un servAddr; + memset(&servAddr, 0, sizeof(servAddr)); + strcpy(servAddr.sun_path, SOCK_PATH); + servAddr.sun_family = AF_UNIX; + sleep(1); + int ret = connect(socket_, (struct sockaddr *) &servAddr, sizeof(servAddr)); + if ( ret < 0) + { + std::cerr << "ERROR: connect() failed\n" << std::endl; + return (NULL); + } + + for (int i = 1; i <= SOCKET_TEST_LOOPS_COUNT; i++) + { + std::string stringToSend(TEST_SOCKET_DATA); + send(socket_, stringToSend.c_str(), stringToSend.size(), 0); + } + std::string stringToSend(TEST_SOCKET_DATA_FINAL); + send(socket_, stringToSend.c_str(), stringToSend.size(), 0); + + return (NULL); +} + +void* threadCallbackUnixSocketAndTimers(void* data) +{ + int socket_ = *((int*)data); + struct sockaddr_un servAddr; + memset(&servAddr, 0, sizeof(servAddr)); + strcpy(servAddr.sun_path, SOCK_PATH); + servAddr.sun_family = AF_UNIX; + sleep(1); + int ret = connect(socket_, (struct sockaddr *) &servAddr, sizeof(servAddr)); + if ( ret < 0) + { + std::cerr << "ERROR: connect() failed\n" << std::endl; + return (NULL); + } + + for (int i = 1; i <= SOCKET_TEST_LOOPS_COUNT; i++) + { + std::string stringToSend(TEST_SOCKET_DATA); + usleep(500000); + send(socket_, stringToSend.c_str(), stringToSend.size(), 0); + } + std::string stringToSend(TEST_SOCKET_DATA_FINAL); + send(socket_, stringToSend.c_str(), stringToSend.size(), 0); + return (NULL); } +TEST(CAmSocketHandlerTest, stressTestUnixSocketAndTimers) +{ + + pthread_t serverThread; + + int socket_; + + CAmSocketHandler myHandler; + ASSERT_FALSE(myHandler.fatalErrorOccurred()); + CAmSamplePluginStressTest::sockType_e type = CAmSamplePlugin::UNIX; + CAmSamplePluginStressTest myplugin(&myHandler, type); + + EXPECT_CALL(myplugin,receiveData(_,_,_)).Times(SOCKET_TEST_LOOPS_COUNT + 1); + EXPECT_CALL(myplugin,dispatchData(_,_)).Times(SOCKET_TEST_LOOPS_COUNT + 1); + EXPECT_CALL(myplugin,check(_,_)).Times(SOCKET_TEST_LOOPS_COUNT + 1); + + for(int i=0;i<myplugin.getTimers().size();i++) + { + EXPECT_CALL(*myplugin.getTimers()[i],timerCallback(_,_)).Times(AnyNumber()); + } + + + if ((socket_ = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) + { + std::cout << "socket problem" << std::endl; + } + ASSERT_GT(socket_, -1); + + //creates a thread that handles the serverpart + pthread_create(&serverThread, NULL, threadCallbackUnixSocketAndTimers, &socket_); + + myHandler.start_listenting(); + + pthread_join(serverThread, NULL); + shutdown(socket_, SHUT_RDWR); +} + TEST(CAmSocketHandlerTest, timersOneshot) { CAmSocketHandler myHandler; @@ -347,6 +507,73 @@ TEST(CAmSocketHandlerTest, timersGeneral) myHandler.start_listenting(); } +TEST(CAmSocketHandlerTest, timersStressTest) +{ + CAmSocketHandler myHandler; + ASSERT_FALSE(myHandler.fatalErrorOccurred()); + + sh_timerHandle_t handle; + TestUserData userData; + userData.i = 1; + userData.f = 1.f; + + timespec timeout4; + timeout4.tv_nsec = 0; + timeout4.tv_sec = 60; + + timespec timeoutTime; + timeoutTime.tv_sec = 0; + timeoutTime.tv_nsec = 10000000;// 0,01 + + std::vector<CAmTimerStressTest*> timers; + + for(int i=0;i<TIMERS_TO_TEST;i++) + { + CAmTimerStressTest *ptestCallback1 = new CAmTimerStressTest(&myHandler, timeoutTime, 0); + ptestCallback1->setId(i); + timers.push_back( ptestCallback1 ); + myHandler.addTimer(timeoutTime, &(ptestCallback1->pTimerCallback), handle, &userData, true); + EXPECT_CALL(*ptestCallback1,timerCallback(_,&userData)).Times(AnyNumber()); + } + + timespec timeoutTime11, timeout12, timeout13; + timeoutTime11.tv_sec = 1; + timeoutTime11.tv_nsec = 34000000; + CAmTimerMeasurment testCallback11(&myHandler, timeoutTime11, "repeated 1", std::numeric_limits<int32_t>::max()); + + timeout12.tv_nsec = 100000000; + timeout12.tv_sec = 0; + CAmTimerMeasurment testCallback12(&myHandler, timeout12, "repeated 2", std::numeric_limits<int32_t>::max()); + + timeout13.tv_nsec = 333000000; + timeout13.tv_sec = 3; + CAmTimerMeasurment testCallback13(&myHandler, timeout13, "oneshot 3"); + + myHandler.addTimer(timeoutTime11, &testCallback11.pTimerCallback, handle, NULL, true); + EXPECT_CALL(testCallback11,timerCallback(_,NULL)).Times(AnyNumber()); + + myHandler.addTimer(timeout12, &testCallback12.pTimerCallback, handle, NULL, true); + EXPECT_CALL(testCallback12,timerCallback(_,NULL)).Times(AnyNumber()); + + myHandler.addTimer(timeout13, &testCallback13.pTimerCallback, handle, NULL); + EXPECT_CALL(testCallback13,timerCallback(_,NULL)).Times(AnyNumber()); + + + CAmTimerSockethandlerController testCallback4(&myHandler, timeout4); + + myHandler.addTimer(timeout4, &testCallback4.pTimerCallback, handle, NULL); + + EXPECT_CALL(testCallback4,timerCallback(_,NULL)).Times(1); + myHandler.start_listenting(); + + for(int i=0;i<timers.size();i++) + { + if(timers[i]) + delete timers[i], timers[i]=NULL; + } +} + + TEST(CAmSocketHandlerTest,playWithTimers) { CAmSocketHandler myHandler; @@ -354,15 +581,15 @@ TEST(CAmSocketHandlerTest,playWithTimers) timespec timeoutTime, timeout2, timeout3, timeout4; timeoutTime.tv_sec = 1; timeoutTime.tv_nsec = 34000000; - CAmTimerMeasurment testCallback1(&myHandler, timeoutTime, "repeatedCallback 1", std::numeric_limits<int32_t>::max()); + CAmTimerMeasurment testCallback1(&myHandler, timeoutTime, "repeated 1", std::numeric_limits<int32_t>::max()); timeout2.tv_nsec = 2000000; timeout2.tv_sec = 0; - CAmTimerMeasurment testCallback2(&myHandler, timeout2, "repeatedCallback 2", std::numeric_limits<int32_t>::max()); + CAmTimerMeasurment testCallback2(&myHandler, timeout2, "repeated 2", std::numeric_limits<int32_t>::max()); timeout3.tv_nsec = 333000000; timeout3.tv_sec = 3; - CAmTimerMeasurment testCallback3(&myHandler, timeout3, "oneshotCallback 3"); + CAmTimerMeasurment testCallback3(&myHandler, timeout3, "oneshot 3"); timeout4.tv_nsec = 0; timeout4.tv_sec = 8; CAmTimerSockethandlerController testCallback4(&myHandler, timeout4); @@ -390,7 +617,7 @@ TEST(CAmSocketHandlerTest,playWithTimers) #else ASSERT_EQ(handle, 4); #endif - EXPECT_CALL(testCallback3,timerCallback(handle,NULL)).Times(2); //+1 because of measurment + EXPECT_CALL(testCallback3,timerCallback(handle,NULL)).Times(2); myHandler.addTimer(timeout4, &testCallback4.pTimerCallback, handle, NULL); #ifndef WITH_TIMERFD @@ -484,94 +711,52 @@ TEST(CAmSocketHandlerTest,playWithUNIXSockets) ASSERT_FALSE(myHandler.fatalErrorOccurred()); CAmSamplePlugin::sockType_e type = CAmSamplePlugin::UNIX; CAmSamplePlugin myplugin(&myHandler, type); + ASSERT_TRUE(myplugin.isSocketOpened()); EXPECT_CALL(myplugin,receiveData(_,_,_)).Times(SOCKET_TEST_LOOPS_COUNT + 1); EXPECT_CALL(myplugin,dispatchData(_,_)).Times(SOCKET_TEST_LOOPS_COUNT + 1); EXPECT_CALL(myplugin,check(_,_)).Times(SOCKET_TEST_LOOPS_COUNT + 1); - //creates a thread that handles the serverpart - pthread_create(&serverThread, NULL, playWithUnixSocketServer, &myHandler); - - sleep(1); //we need that here because the port needs to be opened if ((socket_ = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) { std::cout << "socket problem" << std::endl; - - } - - memset(&servAddr, 0, sizeof(servAddr)); - strcpy(servAddr.sun_path, SOCK_PATH); - servAddr.sun_family = AF_UNIX; - if (connect(socket_, (struct sockaddr *) &servAddr, sizeof(servAddr)) < 0) - { - std::cout << "ERROR: connect() failed\n" << std::endl; - } - - for (int i = 1; i <= SOCKET_TEST_LOOPS_COUNT; i++) - { - std::string stringToSend(TEST_SOCKET_DATA); - send(socket_, stringToSend.c_str(), stringToSend.size(), 0); } - std::string stringToSend(TEST_SOCKET_DATA_FINAL); - send(socket_, stringToSend.c_str(), stringToSend.size(), 0); + ASSERT_GT(socket_, -1); + //creates a thread that handles the serverpart + pthread_create(&serverThread, NULL, playWithUnixSocketServer, &socket_); + myHandler.start_listenting(); + pthread_join(serverThread, NULL); - + shutdown(socket_, SHUT_RDWR); } TEST(CAmSocketHandlerTest,playWithSockets) { pthread_t serverThread; - struct sockaddr_in servAddr; - unsigned short servPort = 6060; - struct hostent *host; int socket_; CAmSocketHandler myHandler; ASSERT_FALSE(myHandler.fatalErrorOccurred()); CAmSamplePlugin::sockType_e type = CAmSamplePlugin::INET; CAmSamplePlugin myplugin(&myHandler, type); - + ASSERT_TRUE(myplugin.isSocketOpened()); EXPECT_CALL(myplugin,receiveData(_,_,_)).Times(SOCKET_TEST_LOOPS_COUNT + 1); EXPECT_CALL(myplugin,dispatchData(_,_)).Times(SOCKET_TEST_LOOPS_COUNT + 1); EXPECT_CALL(myplugin,check(_,_)).Times(SOCKET_TEST_LOOPS_COUNT + 1); - //creates a thread that handles the serverpart - pthread_create(&serverThread, NULL, playWithSocketServer, &myHandler); - - sleep(1); //we need that here because the port needs to be opened if ((socket_ = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { std::cout << "socket problem" << std::endl; - - } - - if ((host = (struct hostent*) gethostbyname("localhost")) == 0) - { - std::cout << "ERROR: gethostbyname() failed\n" << std::endl; - exit(1); - } - - memset(&servAddr, 0, sizeof(servAddr)); - servAddr.sin_family = AF_INET; - servAddr.sin_addr.s_addr = inet_addr(inet_ntoa(*(struct in_addr*) (host->h_addr_list[0]))); - servAddr.sin_port = htons(servPort); - - if (connect(socket_, (struct sockaddr *) &servAddr, sizeof(servAddr)) < 0) - { - std::cout << "ERROR: connect() failed\n" << std::endl; - } - - for (int i = 1; i <= SOCKET_TEST_LOOPS_COUNT; i++) - { - std::string string(TEST_SOCKET_DATA); - send(socket_, string.c_str(), string.size(), 0); } - std::string string(TEST_SOCKET_DATA_FINAL); - send(socket_, string.c_str(), string.size(), 0); + ASSERT_GT(socket_, -1); + //creates a thread that handles the serverpart + pthread_create(&serverThread, NULL, playWithSocketServer, &socket_); + + myHandler.start_listenting(); pthread_join(serverThread, NULL); - + shutdown(socket_, SHUT_RDWR); } int main(int argc, char **argv) @@ -588,11 +773,11 @@ am::CAmSamplePlugin::CAmSamplePlugin(CAmSocketHandler *mySocketHandler, sockType mSocketHandler(mySocketHandler), // mConnecthandle(), // mReceiveHandle(), // - msgList() + msgList(), + mSocket(-1) { int yes = 1; - int socketHandle; struct sockaddr_in servAddr; struct sockaddr_un unixAddr; unsigned int servPort = 6060; @@ -600,26 +785,30 @@ am::CAmSamplePlugin::CAmSamplePlugin(CAmSocketHandler *mySocketHandler, sockType switch (socketType) { case UNIX: - socketHandle = socket(AF_UNIX, SOCK_STREAM, 0); + mSocket = socket(AF_UNIX, SOCK_STREAM, 0); + if(mSocket==-1) + return; unixAddr.sun_family = AF_UNIX; strcpy(unixAddr.sun_path, SOCK_PATH); unlink(unixAddr.sun_path); - bind(socketHandle, (struct sockaddr *) &unixAddr, strlen(unixAddr.sun_path) + sizeof(unixAddr.sun_family)); + bind(mSocket, (struct sockaddr *) &unixAddr, strlen(unixAddr.sun_path) + sizeof(unixAddr.sun_family)); break; case INET: - socketHandle = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); - setsockopt(socketHandle, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); + mSocket = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); + if(mSocket==-1) + return; + setsockopt(mSocket, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); memset(&servAddr, 0, sizeof(servAddr)); servAddr.sin_family = AF_INET; servAddr.sin_addr.s_addr = INADDR_ANY; servAddr.sin_port = htons(servPort); - bind(socketHandle, (struct sockaddr *) &servAddr, sizeof(servAddr)); + bind(mSocket, (struct sockaddr *) &servAddr, sizeof(servAddr)); break; default: break; } - if (listen(socketHandle, 3) < 0) + if (listen(mSocket, 3) < 0) { #ifdef ENABLED_SOCKETHANDLER_TEST_OUTPUT std::cout << "listen ok" << std::endl; @@ -627,12 +816,12 @@ am::CAmSamplePlugin::CAmSamplePlugin(CAmSocketHandler *mySocketHandler, sockType } /* if */ int a = 1; - ioctl(socketHandle, FIONBIO, (char *) &a); - setsockopt(socketHandle, SOL_SOCKET, SO_KEEPALIVE, (char *) &a, sizeof(a)); + ioctl(mSocket, FIONBIO, (char *) &a); + setsockopt(mSocket, SOL_SOCKET, SO_KEEPALIVE, (char *) &a, sizeof(a)); short events = 0; events |= POLLIN; - mySocketHandler->addFDPoll(socketHandle, events, NULL, &connectFiredCB, NULL, NULL, NULL, mConnecthandle); + mySocketHandler->addFDPoll(mSocket, events, NULL, &connectFiredCB, NULL, NULL, NULL, mConnecthandle); #ifdef ENABLED_SOCKETHANDLER_TEST_OUTPUT std::cout << "setup server - listening" << std::endl; #endif @@ -719,3 +908,62 @@ bool am::CAmSamplePlugin::check(const sh_pollHandle_t handle, void *userData) return false; } +CAmSamplePluginStressTest::CAmSamplePluginStressTest(CAmSocketHandler *mySocketHandler, sockType_e socketType):CAmSamplePlugin(mySocketHandler,socketType) +, mTimers() +{ + sh_timerHandle_t handle; + TestUserData userData; + userData.i = 1; + userData.f = 1.f; + timespec timeoutTime; + timeoutTime.tv_sec = 0; + timeoutTime.tv_nsec = 10000000;// 0,01 + for(int i=0;i<TIMERS_TO_TEST;i++) + { + CAmTimerStressTest2 *ptestCallback1 = new CAmTimerStressTest2(mySocketHandler, timeoutTime, 0); + ptestCallback1->setId(i); + if(E_OK==mySocketHandler->addTimer(timeoutTime, &(ptestCallback1->pTimerCallback), handle, &userData, true)) + { + mTimers.push_back( ptestCallback1 ); + ptestCallback1->setHandle(handle); + } + + EXPECT_CALL(*ptestCallback1,timerCallback(_,&userData)).Times(AnyNumber()); + } +} + +CAmSamplePluginStressTest::~CAmSamplePluginStressTest() +{ + for(int i=0;i<mTimers.size();i++) + { + if(mTimers[i]) + delete mTimers[i], mTimers[i]=NULL; + } +} + +void CAmSamplePluginStressTest::receiveData(const pollfd pollfd, const sh_pollHandle_t handle, void* userData) +{ + CAmSamplePlugin::receiveData(pollfd, handle, userData); + + sh_timerHandle_t handle1; + for(int i=0;i<mTimers.size();i++) + { + am_Error_e resultRemove = mSocketHandler->removeTimer(mTimers[i]->getHandle()); + am_Error_e resultAdd = mSocketHandler->addTimer(mTimers[i]->getUpdateTimeout(), &(mTimers[i]->pTimerCallback), handle1, NULL, true); + #ifdef ENABLED_SOCKETHANDLER_TEST_OUTPUT + std::cout << "receiveData return removeTimer=" << resultRemove << " return addTimer=" << resultAdd <<std::endl; + #endif + mTimers[i]->setHandle(handle1); + } +} + +bool CAmSamplePluginStressTest::dispatchData(const sh_pollHandle_t handle, void* userData) +{ + return CAmSamplePlugin::dispatchData( handle, userData); +} + +bool CAmSamplePluginStressTest::check(const sh_pollHandle_t handle, void* userData) +{ + return CAmSamplePlugin::check( handle, userData); +} + diff --git a/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.h b/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.h index ba2bf51..6cda2b3 100644 --- a/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.h +++ b/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.h @@ -101,23 +101,21 @@ namespace am UNIX, INET }; CAmSamplePlugin(CAmSocketHandler *mySocketHandler, sockType_e socketType); - ~CAmSamplePlugin() - { - } - ; + virtual ~CAmSamplePlugin(){ shutdown(mSocket, SHUT_RDWR); } void connectSocket(const pollfd pollfd, const sh_pollHandle_t handle, void* userData); - void receiveData(const pollfd pollfd, const sh_pollHandle_t handle, void* userData); - bool dispatchData(const sh_pollHandle_t handle, void* userData); - bool check(const sh_pollHandle_t handle, void* userData); + virtual void receiveData(const pollfd pollfd, const sh_pollHandle_t handle, void* userData); + virtual bool dispatchData(const sh_pollHandle_t handle, void* userData); + virtual bool check(const sh_pollHandle_t handle, void* userData); TAmShPollFired<CAmSamplePlugin> connectFiredCB; TAmShPollFired<CAmSamplePlugin> receiveFiredCB; TAmShPollDispatch<CAmSamplePlugin> sampleDispatchCB; TAmShPollCheck<CAmSamplePlugin> sampleCheckCB; - - private: + bool isSocketOpened() { return mSocket>-1; } + protected: CAmSocketHandler *mSocketHandler; sh_pollHandle_t mConnecthandle, mReceiveHandle; std::queue<std::string> msgList; + int mSocket; }; class CAmTimerSockethandlerController: public MockIAmTimerCb @@ -162,6 +160,54 @@ namespace am TAmShTimerCallBack<CAmTimer> pTimerCallback; }; + class CAmTimerStressTest: public MockIAmTimerCb + { + CAmSocketHandler *mpSocketHandler; + timespec mUpdateTimeout; + int32_t mRepeats; + int32_t mId; + int32_t mHandle; + public: + explicit CAmTimerStressTest(CAmSocketHandler *SocketHandler, const timespec &timeout, const int32_t repeats = 0u); + virtual ~CAmTimerStressTest(); + + int32_t getId() { return mId; } + void setId(const int32_t id) { mId=id; } + + int32_t getHandle() { return mHandle; } + void setHandle(const int32_t id) { mHandle=id; } + + timespec getUpdateTimeout( ) { return mUpdateTimeout; } + + void timerCallback(sh_timerHandle_t handle, void * userData); + + TAmShTimerCallBack<CAmTimerStressTest> pTimerCallback; + }; + + class CAmTimerStressTest2: public MockIAmTimerCb + { + CAmSocketHandler *mpSocketHandler; + timespec mUpdateTimeout; + int32_t mRepeats; + int32_t mId; + int32_t mHandle; + public: + explicit CAmTimerStressTest2(CAmSocketHandler *SocketHandler, const timespec &timeout, const int32_t repeats = 0u); + virtual ~CAmTimerStressTest2(); + + int32_t getId() { return mId; } + void setId(const int32_t id) { mId=id; } + + int32_t getHandle() { return mHandle; } + void setHandle(const int32_t id) { mHandle=id; } + + timespec getUpdateTimeout( ) { return mUpdateTimeout; } + + void timerCallback(sh_timerHandle_t handle, void * userData); + + TAmShTimerCallBack<CAmTimerStressTest2> pTimerCallback; + }; + class CAmTimerMeasurment: public MockIAmTimerCb { CAmSocketHandler *mSocketHandler; @@ -188,6 +234,20 @@ namespace am void SetUp(); void TearDown(); }; + + class CAmSamplePluginStressTest: public CAmSamplePlugin + { + std::vector<CAmTimerStressTest2*> mTimers; + public: + CAmSamplePluginStressTest(CAmSocketHandler *mySocketHandler, sockType_e socketType); + virtual ~CAmSamplePluginStressTest(); + + void receiveData(const pollfd pollfd, const sh_pollHandle_t handle, void* userData) final; + bool dispatchData(const sh_pollHandle_t handle, void* userData) final; + bool check(const sh_pollHandle_t handle, void* userData) final; + + std::vector<CAmTimerStressTest2*> & getTimers() { return mTimers; } + }; } /* namespace am */ #endif /* SOCKETHANDLERTEST_H_ */ diff --git a/CMakeLists.txt b/CMakeLists.txt index 7e3bddb..e61a0fa 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -65,8 +65,10 @@ option ( WITH_SHARED_CORE "Build audio manager core as dynamic library" OFF) option ( WITH_TIMERFD - "Build with the linux specific TIMERFD feature to support timing without using signals" ON) + "Build with timer fd support" ON ) +option( WITH_TESTS + "Build together with all available unitTest" ON ) set(DBUS_SERVICE_PREFIX "org.genivi.audiomanager" CACHE PROPERTY "The dbus service prefix for the AM - only changable for legacy dbus") @@ -213,7 +215,6 @@ if(WITH_DOCUMENTATION) PATTERN "def" EXCLUDE) endif(WITH_DOCUMENTATION) - message(STATUS) message(STATUS "${PROJECT_NAME} Configuration:") message(STATUS "CMAKE_BUILD_TYPE = ${CMAKE_BUILD_TYPE}") |