summaryrefslogtreecommitdiff
path: root/AudioManagerUtilities/src
diff options
context:
space:
mode:
Diffstat (limited to 'AudioManagerUtilities/src')
-rw-r--r--AudioManagerUtilities/src/CAmCommonAPIWrapper.cpp257
-rw-r--r--AudioManagerUtilities/src/CAmDltWrapper.cpp5
-rw-r--r--AudioManagerUtilities/src/CAmSocketHandler.cpp315
3 files changed, 378 insertions, 199 deletions
diff --git a/AudioManagerUtilities/src/CAmCommonAPIWrapper.cpp b/AudioManagerUtilities/src/CAmCommonAPIWrapper.cpp
index 2aa8370..4a6accf 100644
--- a/AudioManagerUtilities/src/CAmCommonAPIWrapper.cpp
+++ b/AudioManagerUtilities/src/CAmCommonAPIWrapper.cpp
@@ -36,21 +36,47 @@ namespace am
static CAmCommonAPIWrapper* pSingleCommonAPIInstance = NULL;
+bool timeoutToTimespec(const int64_t & localTimeout, timespec & pollTimeout)
+{
+ if(CommonAPI::TIMEOUT_INFINITE == localTimeout)//dispatch never
+ {
+ return false;
+ }
+ else
+ {
+ if(CommonAPI::TIMEOUT_NONE==localTimeout)//dispatch immediately
+ {
+ pollTimeout.tv_sec = 0;
+ pollTimeout.tv_nsec = 5000000;//5 ms
+ }
+ else
+ {
+ pollTimeout.tv_sec = localTimeout / 1000;
+ pollTimeout.tv_nsec = (localTimeout % 1000) * 1000000;
+ }
+ return true;
+ }
+}
+
+
CAmCommonAPIWrapper::CAmCommonAPIWrapper(CAmSocketHandler* socketHandler, const std::string & applicationName):
pCommonPrepareCallback(this,&CAmCommonAPIWrapper::commonPrepareCallback), //
- pCommonDispatchCallback(this, &CAmCommonAPIWrapper::commonDispatchCallback), //
pCommonFireCallback(this, &CAmCommonAPIWrapper::commonFireCallback), //
pCommonCheckCallback(this, &CAmCommonAPIWrapper::commonCheckCallback), //
+ pCommonDispatchCallback(this, &CAmCommonAPIWrapper::commonDispatchCallback), //
pCommonTimerCallback(this, &CAmCommonAPIWrapper::commonTimerCallback), //
- mpSocketHandler(socketHandler), //
- mWatchToCheck(NULL)
+ mpSocketHandler(socketHandler),
+ mRegisteredDispatchSources(),
+ mMapWatches(),
+ mSourcesToDispatch(),
+ mListTimerhandles()
{
assert(NULL!=socketHandler);
//Get the runtime
mRuntime = CommonAPI::Runtime::get();
assert(NULL!=mRuntime);
- //Create the context
+//Create the context
if(applicationName.size())
mContext = std::make_shared<CommonAPI::MainLoopContext>(applicationName);
else
@@ -75,9 +101,11 @@ CAmCommonAPIWrapper::~CAmCommonAPIWrapper()
mContext->unsubscribeForDispatchSources(mDispatchSourceListenerSubscription);
mContext->unsubscribeForWatches(mWatchListenerSubscription);
mContext->unsubscribeForTimeouts(mTimeoutSourceListenerSubscription);
+ deregisterAllDispatchSource();
+ deregisterAllTimeouts();
+ deregisterAllWatches();
mContext.reset();
mpSocketHandler = NULL;
- mWatchToCheck = NULL;
}
CAmCommonAPIWrapper* CAmCommonAPIWrapper::instantiateOnce(CAmSocketHandler* socketHandler, const std::string & applicationName)
@@ -115,151 +143,214 @@ CAmCommonAPIWrapper* CAmCommonAPIWrapper::getInstance()
return pSingleCommonAPIInstance;
}
-bool CAmCommonAPIWrapper::commonDispatchCallback(const sh_pollHandle_t handle, void *userData)
+void CAmCommonAPIWrapper::commonPrepareCallback(const sh_pollHandle_t, void*)
{
- (void) handle;
- (void) userData;
-
- std::list<CommonAPI::DispatchSource*>::iterator iterator(mSourcesToDispatch.begin());
- for(;iterator!=mSourcesToDispatch.end();)
+ for (auto dispatchSourceIterator = mRegisteredDispatchSources.begin();
+ dispatchSourceIterator != mRegisteredDispatchSources.end();
+ dispatchSourceIterator++)
{
- CommonAPI::DispatchSource* source = *iterator;
- if (!source->dispatch()) {
- iterator=mSourcesToDispatch.erase(iterator);
+ int64_t dispatchTimeout(CommonAPI::TIMEOUT_INFINITE);
+ if((*dispatchSourceIterator)->prepare(dispatchTimeout))
+ {
+ while ((*dispatchSourceIterator)->dispatch());
}
- else
- iterator++;
}
- if (!mSourcesToDispatch.empty())
- return (true);
+}
- return false;
+void CAmCommonAPIWrapper::commonFireCallback(const pollfd pollfd, const sh_pollHandle_t handle, void *)
+{
+ CommonAPI::Watch* pWatchToCheck = watchWithHandle(handle);
+ if( pWatchToCheck )
+ pWatchToCheck->dispatch(pollfd.revents);
}
-bool CAmCommonAPIWrapper::commonCheckCallback(const sh_pollHandle_t, void *)
+bool CAmCommonAPIWrapper::commonCheckCallback(const sh_pollHandle_t handle, void *)
{
- std::vector<CommonAPI::DispatchSource*> vecDispatch=mWatchToCheck->getDependentDispatchSources();
- mSourcesToDispatch.insert(mSourcesToDispatch.end(), vecDispatch.begin(), vecDispatch.end());
+ CommonAPI::Watch* pWatchToCheck = watchWithHandle(handle);
+ if( pWatchToCheck )
+ {
+ const ArrayDispatchSources & vecDispatch = pWatchToCheck->getDependentDispatchSources();
+ if(vecDispatch.size()>0)
+ {
+ mSourcesToDispatch[handle].insert(mSourcesToDispatch[handle].end(), vecDispatch.begin(), vecDispatch.end());
+ return true;
+ }
+ }
+ return false;
+}
- return (mWatchToCheck || !mSourcesToDispatch.empty());
+bool CAmCommonAPIWrapper::commonDispatchCallback(const sh_pollHandle_t handle, void *)
+{
+ CommonAPI::Watch* pWatchToCheck = watchWithHandle(handle);
+ if( pWatchToCheck )
+ {
+ std::list<CommonAPI::DispatchSource*> & srcList = mSourcesToDispatch[handle];
+ for(auto it = srcList.begin();it!=srcList.end();)
+ {
+ if (false==(*it)->dispatch())
+ it=srcList.erase(it);
+ else
+ it++;
+ }
+ if (!srcList.empty())
+ return (true);
+ }
+ mSourcesToDispatch.erase(handle);
+ return false;
}
-void CAmCommonAPIWrapper::commonFireCallback(const pollfd pollfd, const sh_pollHandle_t, void *)
+void CAmCommonAPIWrapper::commonTimerCallback(sh_timerHandle_t handle, void *)
{
- mWatchToCheck=NULL;
- try
+ CommonAPI::Timeout* pTimeout = timeoutWithHandle(handle);
+
+ if( NULL==pTimeout )
{
- mWatchToCheck=mMapWatches.at(pollfd.fd);
+ //erroneous call because deregisterTimeout has been called, so try to remove the timer from the sockethandler
+ mpSocketHandler->removeTimer(handle);
}
- catch (const std::out_of_range& error) {
- logInfo(__PRETTY_FUNCTION__,error.what());
- return;
+ else
+ {
+ if ( false==pTimeout->dispatch() ) //it should be removed
+ {
+ mpSocketHandler->removeTimer(handle);
+ mListTimerhandles.erase(handle);
+ }
+ #ifndef WITH_TIMERFD
+ else //the timeout should be rescheduled
+ mpSocketHandler->restartTimer(handle);
+ #endif
}
+}
- mWatchToCheck->dispatch(pollfd.events);
+void CAmCommonAPIWrapper::registerDispatchSource(CommonAPI::DispatchSource* dispatchSource, const CommonAPI::DispatchPriority)
+{
+ mRegisteredDispatchSources.push_back(dispatchSource);
}
-void CAmCommonAPIWrapper::commonPrepareCallback(const sh_pollHandle_t, void*)
+void CAmCommonAPIWrapper::deregisterDispatchSource(CommonAPI::DispatchSource* dispatchSource)
{
- for (auto dispatchSourceIterator = mRegisteredDispatchSources.begin();
- dispatchSourceIterator != mRegisteredDispatchSources.end();
- dispatchSourceIterator++)
+ for(IteratorArrayDispatchSources dispatchSourceIterator = mRegisteredDispatchSources.begin(); dispatchSourceIterator != mRegisteredDispatchSources.end(); dispatchSourceIterator++)
{
- int64_t dispatchTimeout(CommonAPI::TIMEOUT_INFINITE);
- if(dispatchSourceIterator->second->prepare(dispatchTimeout))
+ if( *dispatchSourceIterator == dispatchSource )
{
- while (dispatchSourceIterator->second->dispatch());
+ mRegisteredDispatchSources.erase(dispatchSourceIterator);
+ break;
}
}
}
-void CAmCommonAPIWrapper::registerDispatchSource(CommonAPI::DispatchSource* dispatchSource, const CommonAPI::DispatchPriority dispatchPriority)
+void CAmCommonAPIWrapper::deregisterAllDispatchSource()
{
- mRegisteredDispatchSources.insert({dispatchPriority, dispatchSource});
+ mRegisteredDispatchSources.clear();
}
-void CAmCommonAPIWrapper::deregisterDispatchSource(CommonAPI::DispatchSource* dispatchSource)
+void CAmCommonAPIWrapper::registerWatch(CommonAPI::Watch* watch, const CommonAPI::DispatchPriority)
{
- for(auto dispatchSourceIterator = mRegisteredDispatchSources.begin();
- dispatchSourceIterator != mRegisteredDispatchSources.end();
- dispatchSourceIterator++) {
+ logInfo(__PRETTY_FUNCTION__);
+ pollfd pollfd_ (watch->getAssociatedFileDescriptor());
+ sh_pollHandle_t handle (0);
- if(dispatchSourceIterator->second == dispatchSource) {
- mRegisteredDispatchSources.erase(dispatchSourceIterator);
- break;
- }
+ am_Error_e error = mpSocketHandler->addFDPoll(pollfd_.fd, pollfd_.events, &pCommonPrepareCallback, &pCommonFireCallback, &pCommonCheckCallback, &pCommonDispatchCallback, watch, handle);
+
+ //if everything is alright, add the watch and the handle to our map so we know this relationship
+ if (error != am_Error_e::E_OK || handle == 0)
+ {
+ logError(__func__,"entering watch failed");
}
+ else
+ mMapWatches.insert(std::make_pair(handle,watch));
}
void CAmCommonAPIWrapper::deregisterWatch(CommonAPI::Watch* watch)
{
- for(std::map<int,CommonAPI::Watch*>::iterator iter(mMapWatches.begin());iter!=mMapWatches.end();iter++)
+ for(IteratorMapWatches iter=mMapWatches.begin();iter!=mMapWatches.end();iter++)
{
if (iter->second == watch)
{
+ mpSocketHandler->removeFDPoll(iter->first);
mMapWatches.erase(iter);
break;
}
}
}
+void CAmCommonAPIWrapper::deregisterAllWatches()
+{
+ for(IteratorMapWatches iter=mMapWatches.begin();iter!=mMapWatches.end();iter++)
+ mpSocketHandler->removeFDPoll(iter->first);
+ mMapWatches.clear();
+}
+
void CAmCommonAPIWrapper::registerTimeout(CommonAPI::Timeout* timeout, const CommonAPI::DispatchPriority)
{
timespec pollTimeout;
- int64_t localTimeout = timeout->getTimeoutInterval();
-
- pollTimeout.tv_sec = localTimeout / 1000;
- pollTimeout.tv_nsec = (localTimeout % 1000) * 1000000;
-
- //prepare handle and callback. new is eval, but there is no other choice because we need the pointer!
- sh_timerHandle_t handle;
-
- //add the timer to the pollLoop
- mpSocketHandler->addTimer(pollTimeout, &pCommonTimerCallback, handle, timeout);
-
- timerHandles myHandle({handle,timeout});
- mpListTimerhandles.push_back(myHandle);
+ if(timeoutToTimespec(timeout->getTimeoutInterval(), pollTimeout))
+ {
+ //prepare handle and callback. new is eval, but there is no other choice because we need the pointer!
+ sh_timerHandle_t handle;
- return;
+ //add the timer to the pollLoop
+ am_Error_e error = mpSocketHandler->addTimer(pollTimeout, &pCommonTimerCallback, handle, timeout, true);
+ if (error != am_Error_e::E_OK || handle == 0)
+ {
+ logError(__func__,"adding timer failed");
+ }
+ else
+ {
+ mListTimerhandles.insert(std::make_pair(handle,timeout));
+ }
+ }
}
void CAmCommonAPIWrapper::deregisterTimeout(CommonAPI::Timeout* timeout)
{
- for( std::vector<timerHandles>::iterator iter(mpListTimerhandles.begin());iter!=mpListTimerhandles.end();iter++)
+ for( IteratorMapTimeouts iter=mListTimerhandles.begin();iter!= mListTimerhandles.end();iter++)
{
- if(iter->timeout==timeout)
+ if(iter->second==timeout)
{
- mpSocketHandler->removeTimer(iter->handle);
+ mpSocketHandler->removeTimer(iter->first);
+ mListTimerhandles.erase(iter->first);
+ break;
}
}
}
-void CAmCommonAPIWrapper::registerWatch(CommonAPI::Watch* watch, const CommonAPI::DispatchPriority)
+void CAmCommonAPIWrapper::deregisterAllTimeouts()
{
- logInfo(__PRETTY_FUNCTION__);
- pollfd pollfd_ (watch->getAssociatedFileDescriptor());
- sh_pollHandle_t handle (0);
-
- am_Error_e error = mpSocketHandler->addFDPoll(pollfd_.fd, pollfd_.events, &pCommonPrepareCallback, &pCommonFireCallback, &pCommonCheckCallback, &pCommonDispatchCallback, watch, handle);
-
- //if everything is alright, add the watch and the handle to our map so we know this relationship
- if (error == !am_Error_e::E_OK || handle == 0)
- logError(__func__,"entering watch failed");
+ for( IteratorMapTimeouts iter=mListTimerhandles.begin();iter!= mListTimerhandles.end();iter++)
+ mpSocketHandler->removeTimer(iter->first);
+ mListTimerhandles.clear();
+}
- mMapWatches.insert(std::make_pair(pollfd_.fd,watch));
+CommonAPI::Watch* CAmCommonAPIWrapper::watchWithHandle(const sh_pollHandle_t handle)
+{
+ CommonAPI::Watch* pWatchToCheck = NULL;
+ try
+ {
+ pWatchToCheck = mMapWatches.at(handle);
+ }
+ catch (const std::out_of_range& error)
+ {
+ logInfo(__PRETTY_FUNCTION__,error.what());
+ }
+ return pWatchToCheck;
}
-void CAmCommonAPIWrapper::commonTimerCallback(sh_timerHandle_t handle, void *)
+CommonAPI::Timeout* CAmCommonAPIWrapper::timeoutWithHandle(const sh_pollHandle_t handle)
{
- for( std::vector<timerHandles>::iterator iter(mpListTimerhandles.begin());iter!=mpListTimerhandles.end();iter++)
+ CommonAPI::Timeout* pTimeout = NULL;
+ try
{
- if(iter->handle==handle)
- {
- iter->timeout->dispatch();
- }
+ pTimeout = mListTimerhandles.at(handle);
}
+ catch (const std::out_of_range& error)
+ {
+ logInfo(__PRETTY_FUNCTION__,error.what());
+ }
+ return pTimeout;
}
+
CAmCommonAPIWrapper* (*getCAPI)() = CAmCommonAPIWrapper::getInstance;
}
diff --git a/AudioManagerUtilities/src/CAmDltWrapper.cpp b/AudioManagerUtilities/src/CAmDltWrapper.cpp
index 742b396..44ec614 100644
--- a/AudioManagerUtilities/src/CAmDltWrapper.cpp
+++ b/AudioManagerUtilities/src/CAmDltWrapper.cpp
@@ -23,13 +23,14 @@
*/
-#include "CAmDltWrapper.h"
#include <string>
#include <iostream>
#include <string.h>
#include <chrono>
#include <ctime>
+#include <sys/types.h>
#include <unistd.h>
+#include "CAmDltWrapper.h"
namespace am
{
@@ -625,7 +626,7 @@ bool CAmDltWrapper::initNoDlt(DltLogLevelType loglevel, DltContext* context)
bool CAmDltWrapper::init(DltLogLevelType loglevel, DltContext* context)
{
pthread_mutex_lock(&mMutex);
- initNoDlt(loglevel,context);
+ return initNoDlt(loglevel,context);
}
void CAmDltWrapper::send()
diff --git a/AudioManagerUtilities/src/CAmSocketHandler.cpp b/AudioManagerUtilities/src/CAmSocketHandler.cpp
index fad60e5..7cf5594 100644
--- a/AudioManagerUtilities/src/CAmSocketHandler.cpp
+++ b/AudioManagerUtilities/src/CAmSocketHandler.cpp
@@ -31,6 +31,7 @@
#include <features.h>
#include <csignal>
#include <unistd.h>
+#include <string.h>
#include "CAmDltWrapper.h"
#include "CAmSocketHandler.h"
@@ -42,22 +43,38 @@
namespace am
{
+#define CHECK_CALLER_THREAD_ID()\
+ if(std::this_thread::get_id() != mThreadID)\
+ {\
+ logError("Sockethandler: Call from another thread detected!");\
+ assert(false);\
+ }
+
+
+
+
CAmSocketHandler::CAmSocketHandler() :
mPipe(), //
- mDispatchDone(true), //
- mSetPollKeys(MAX_POLLHANDLE), //
- mListPoll(), //
- mSetTimerKeys(MAX_TIMERHANDLE),
- mListTimer(), //
- mListActiveTimer(), //
- mSetSignalhandlerKeys(MAX_POLLHANDLE), //
- mSignalHandlers(), //
- mRecreatePollfds(true),
- mInternalCodes(internal_codes_e::NO_ERROR),
- mSignalFdHandle(0)
-#ifndef WITH_TIMERFD
-,mStartTime() //
-#endif
+ mDispatchDone(true), //
+ mSetPollKeys(MAX_POLLHANDLE), //
+ mListPoll(), //
+ mSetTimerKeys(MAX_TIMERHANDLE),
+ mListTimer(), //
+ #ifndef WITH_TIMERFD
+ mListActiveTimer(), //
+ #else
+ mListRemovedTimers(),
+ #endif
+ mSetSignalhandlerKeys(MAX_POLLHANDLE), //
+ mSignalHandlers(), //
+ mRecreatePollfds(true),
+ mInternalCodes(internal_codes_e::NO_ERROR),
+ mSignalFdHandle(0),
+ mListActivePolls(),
+ mThreadID(std::this_thread::get_id())
+ #ifndef WITH_TIMERFD
+ ,mStartTime() //
+ #endif
{
if (pipe(mPipe) == -1)
{
@@ -69,17 +86,17 @@ CAmSocketHandler::CAmSocketHandler() :
short event = 0;
sh_pollHandle_t handle;
event |= POLLIN;
- if (addFDPoll(mPipe[0], event, NULL,
- [](const pollfd, const sh_pollHandle_t, void*){},
- [](const sh_pollHandle_t, void*) { return (false); },
- NULL, NULL, handle) != E_OK)
- {
+ if (addFDPoll(mPipe[0], event, NULL, [](const pollfd pollfd, const sh_pollHandle_t, void*)
+ {}, [](const sh_pollHandle_t, void*)
+ { return (false);}, NULL, NULL, handle) != E_OK)
mInternalCodes |= internal_codes_e::FD_ERROR;
- }
}
CAmSocketHandler::~CAmSocketHandler()
{
+#ifdef WITH_TIMERFD
+ closeRemovedTimers();
+#endif
for (auto it : mListPoll)
{
close(it.pollfdValue.fd);
@@ -97,13 +114,16 @@ void CAmSocketHandler::start_listenting()
mDispatchDone = false;
int16_t pollStatus;
+ CHECK_CALLER_THREAD_ID()
+
#ifndef WITH_TIMERFD
clock_gettime(CLOCK_MONOTONIC, &mStartTime);
#endif
timespec buffertime;
- VectorListPoll_t cloneListPoll;
+ std::list<sh_poll_s*> listPoll;
VectorListPoll_t::iterator listmPollIt;
+ VectorListPollfd_t::iterator itMfdPollingArray;
VectorListPollfd_t fdPollingArray; //!<the polling array for ppoll
auto preparePollfd = [&](const sh_poll_s& row)
@@ -118,17 +138,20 @@ void CAmSocketHandler::start_listenting()
{
if (mRecreatePollfds)
{
+#ifdef WITH_TIMERFD
+ closeRemovedTimers();
+#endif
fdPollingArray.clear();
//freeze mListPoll by copying it - otherwise we get problems when we want to manipulate it during the next lines
- cloneListPoll = mListPoll;
+ mListActivePolls = mListPoll;
//there was a change in the setup, so we need to recreate the fdarray from the list
- std::for_each(cloneListPoll.begin(), cloneListPoll.end(), preparePollfd);
+ std::for_each(mListActivePolls.begin(), mListActivePolls.end(), preparePollfd);
mRecreatePollfds = false;
}
else
{
//first we go through the registered filedescriptors and check if someone needs preparation:
- std::for_each(cloneListPoll.begin(), cloneListPoll.end(), CAmSocketHandler::prepare);
+ std::for_each(mListActivePolls.begin(), mListActivePolls.end(), CAmSocketHandler::prepare);
}
#ifndef WITH_TIMERFD
@@ -152,25 +175,25 @@ void CAmSocketHandler::start_listenting()
if (pollStatus != 0) //only check filedescriptors if there was a change
{
- std::list<sh_poll_s> listPoll;
//todo: here could be a timer that makes sure naughty plugins return!
+ listPoll.clear();
//stage 0+1, call firedCB
- listmPollIt = cloneListPoll.begin();
- for (auto it : fdPollingArray)
+ for (itMfdPollingArray = fdPollingArray.begin(); itMfdPollingArray != fdPollingArray.end(); ++itMfdPollingArray)
{
- if (CAmSocketHandler::eventFired(it))
+ itMfdPollingArray->revents &= itMfdPollingArray->events | POLLERR | POLLHUP;
+ if ( itMfdPollingArray->revents!=0 )
{
- listmPollIt->pollfdValue.revents = it.revents;
- listPoll.push_back(*listmPollIt);
- CAmSocketHandler::fire(*listmPollIt);
- }
- else
- {
- listmPollIt->pollfdValue.revents = 0;
+ listmPollIt = mListActivePolls.begin();
+ std::advance(listmPollIt, std::distance(fdPollingArray.begin(), itMfdPollingArray));
+
+ sh_poll_s & pollObj = *listmPollIt;
+
+ listPoll.push_back(&pollObj);
+ CAmSocketHandler::fire(&pollObj);
+ itMfdPollingArray->revents = 0;
}
- listmPollIt++;
}
-
+
//stage 2, lets ask around if some dispatching is necessary, the ones who need stay on the list
listPoll.remove_if(CAmSocketHandler::noDispatching);
@@ -223,7 +246,7 @@ void CAmSocketHandler::exit_mainloop()
bool CAmSocketHandler::fatalErrorOccurred()
{
- return ((mInternalCodes&internal_codes_e::PIPE_ERROR)>0)||((mInternalCodes&internal_codes_e::FD_ERROR)>0);
+ return ((mInternalCodes&internal_codes_e::PIPE_ERROR)>0)||((mInternalCodes&internal_codes_e::FD_ERROR)>0);
}
am_Error_e CAmSocketHandler::getFDPollData(const sh_pollHandle_t handle, sh_poll_s & outPollData)
@@ -246,6 +269,8 @@ am_Error_e CAmSocketHandler::getFDPollData(const sh_pollHandle_t handle, sh_poll
*/
am_Error_e CAmSocketHandler::listenToSignals(const std::vector<uint8_t> & listSignals)
{
+ CHECK_CALLER_THREAD_ID()
+
int fdErr;
uint8_t addedSignals = 0;
sigset_t sigset;
@@ -287,31 +312,19 @@ am_Error_e CAmSocketHandler::listenToSignals(const std::vector<uint8_t> & listSi
return (E_NOT_POSSIBLE);
}
- int signalHandlerFd;
+ sh_poll_s sgPollData;
if(mSignalFdHandle)
{
- sh_poll_s sgPollData;
if(E_OK!=getFDPollData(mSignalFdHandle, sgPollData))
{
- removeFDPoll(mSignalFdHandle);
- mSignalFdHandle = 0;
- }
- else
- {
- int signalHandlerFd = signalfd(sgPollData.pollfdValue.fd, &sigset, 0);
- if (signalHandlerFd == -1)
- {
- logError("Could not update signal fd!");
- return (E_NOT_POSSIBLE);
- }
- return E_OK;
+ mSignalFdHandle = 0;
}
}
if(0==mSignalFdHandle)
{
/* Create the signalfd */
- signalHandlerFd = signalfd(-1, &sigset, 0);
+ int signalHandlerFd = signalfd(-1, &sigset, SFD_NONBLOCK);
if (signalHandlerFd == -1)
{
logError("Could not open signal fd!");
@@ -320,21 +333,40 @@ am_Error_e CAmSocketHandler::listenToSignals(const std::vector<uint8_t> & listSi
auto actionPoll = [this](const pollfd pollfd, const sh_pollHandle_t, void*)
{
- const VectorSignalHandlers_t & signalHandlers = mSignalHandlers;
- /* We have a valid signal, read the info from the fd */
- struct signalfd_siginfo info;
- ssize_t bytes = read(pollfd.fd, &info, sizeof(info));
- assert(bytes == sizeof(info));
-
- /* Notify all listeners */
- for(auto it: signalHandlers)
- it.callback(it.handle, info, it.userData);
+ const VectorSignalHandlers_t & signalHandlers = mSignalHandlers;
+ /* We have a valid signal, read the info from the fd */
+ struct signalfd_siginfo info;
+ ssize_t bytes = read(pollfd.fd, &info, sizeof(info));
+ if(bytes == -1)
+ {
+ if (errno == EAGAIN) //Something wrong, check for EAGAIN
+ bytes = read(pollfd.fd, &info, sizeof(info));
+ }
+ if(bytes != sizeof(info))
+ {
+ //Failed to read from fd...
+ logError("Failed to read from signal fd");
+ throw std::runtime_error(std::string("Failed to read from signal fd."));
+ }
+
+ /* Notify all listeners */
+ for(auto it: signalHandlers)
+ it.callback(it.handle, info, it.userData);
};
/* We're going to add the signal fd through addFDPoll. At this point we don't have any signal listeners. */
- am_Error_e shFdError = addFDPoll(signalHandlerFd, POLLIN | POLLERR | POLLHUP, NULL, actionPoll, [](const sh_pollHandle_t, void*)
+ return addFDPoll(signalHandlerFd, POLLIN | POLLERR | POLLHUP, NULL, actionPoll, [](const sh_pollHandle_t, void*)
{ return (false);}, NULL, NULL, mSignalFdHandle);
- return shFdError;
}
+ else
+ {
+ int signalHandlerFd = signalfd(sgPollData.pollfdValue.fd, &sigset, 0);
+ if (signalHandlerFd == -1)
+ {
+ logError("Could not update signal fd!", strerror(errno));
+ return (E_NOT_POSSIBLE);
+ }
+ return E_OK;
+ }
}
/**
@@ -350,10 +382,17 @@ am_Error_e CAmSocketHandler::listenToSignals(const std::vector<uint8_t> & listSi
* @return E_OK if the descriptor was added, E_NON_EXISTENT if the fd is not valid
*/
-am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, std::function<void(const sh_pollHandle_t handle, void* userData)> prepare,
- std::function<void(const pollfd pollfd, const sh_pollHandle_t handle, void* userData)> fired, std::function<bool(const sh_pollHandle_t handle, void* userData)> check,
- std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatch, void* userData, sh_pollHandle_t& handle)
+am_Error_e CAmSocketHandler::addFDPoll(const int fd,
+ const short event,
+ std::function<void(const sh_pollHandle_t handle, void* userData)> prepare,
+ std::function<void(const pollfd pollfd, const sh_pollHandle_t handle, void* userData)> fired,
+ std::function<bool(const sh_pollHandle_t handle, void* userData)> check,
+ std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatch,
+ void* userData,
+ sh_pollHandle_t& handle)
{
+ CHECK_CALLER_THREAD_ID()
+
if (!fdIsValid(fd))
return (E_NON_EXISTENT);
@@ -400,7 +439,7 @@ am::am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, IAmS
{
std::function<void(const sh_pollHandle_t handle, void* userData)> prepareCB; //preperation callback
- std::function<void(const pollfd poll, const sh_pollHandle_t handle, void* userData)> firedCB; //fired callback
+ std::function<void(const pollfd pollfd, const sh_pollHandle_t handle, void* userData)> firedCB; //fired callback
std::function<bool(const sh_pollHandle_t handle, void* userData)> checkCB; //check callback
std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatchCB; //check callback
@@ -423,19 +462,36 @@ am::am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, IAmS
*/
am_Error_e CAmSocketHandler::removeFDPoll(const sh_pollHandle_t handle)
{
- VectorListPoll_t::iterator iterator = mListPoll.begin();
+ CHECK_CALLER_THREAD_ID()
- for (; iterator != mListPoll.end(); ++iterator)
+ bool handleRemoved = false;
+
+ for (auto it = mListPoll.begin(); it != mListPoll.end(); ++it)
{
- if (iterator->handle == handle)
+ if (it->handle == handle)
{
- iterator = mListPoll.erase(iterator);
+ it = mListPoll.erase(it);
mSetPollKeys.pollHandles.erase(handle);
- mRecreatePollfds = true;
- return (E_OK);
+ handleRemoved = true;
+ break;
}
}
- return (E_UNKNOWN);
+
+ if ( false == handleRemoved )
+ return (E_UNKNOWN);
+
+ mRecreatePollfds = true;
+
+ for (auto it = mListActivePolls.begin(); it != mListActivePolls.end(); ++it)
+ {
+ if (it->handle == handle)
+ {
+ it->isValid = false;
+ break;
+ }
+ }
+
+ return (E_OK);
}
/**
@@ -447,6 +503,8 @@ am_Error_e CAmSocketHandler::removeFDPoll(const sh_pollHandle_t handle)
*/
am_Error_e CAmSocketHandler::addSignalHandler(std::function<void(const sh_pollHandle_t handle, const signalfd_siginfo & info, void* userData)> callback, sh_pollHandle_t& handle, void * userData)
{
+ CHECK_CALLER_THREAD_ID()
+
if (!nextHandle(mSetSignalhandlerKeys))
{
logError("Could not create new polls, too many open!");
@@ -469,6 +527,8 @@ am_Error_e CAmSocketHandler::addSignalHandler(std::function<void(const sh_pollHa
*/
am_Error_e CAmSocketHandler::removeSignalHandler(const sh_pollHandle_t handle)
{
+ CHECK_CALLER_THREAD_ID()
+
VectorSignalHandlers_t::iterator it(mSignalHandlers.begin());
for (; it != mSignalHandlers.end(); ++it)
{
@@ -506,6 +566,7 @@ am_Error_e CAmSocketHandler::addTimer(const timespec & timeouts, IAmShTimerCallB
am_Error_e CAmSocketHandler::addTimer(const timespec & timeouts, std::function<void(const sh_timerHandle_t handle, void* userData)> callback, sh_timerHandle_t& handle, void * userData, const bool repeats)
{
+ CHECK_CALLER_THREAD_ID()
assert(!((timeouts.tv_sec == 0) && (timeouts.tv_nsec == 0)));
mListTimer.emplace_back();
@@ -533,7 +594,6 @@ am_Error_e CAmSocketHandler::addTimer(const timespec & timeouts, std::function<v
clock_gettime(CLOCK_MONOTONIC, &currentTime);
if (!mDispatchDone)//the mainloop is started
timerItem.countdown = timespecAdd(timeouts, timespecSub(currentTime, mStartTime));
- mListTimer.push_back(timerItem);
mListActiveTimer.push_back(timerItem);
mListActiveTimer.sort(compareCountdown);
return (E_OK);
@@ -559,22 +619,35 @@ am_Error_e CAmSocketHandler::addTimer(const timespec & timeouts, std::function<v
return err;
}
- static auto actionPoll = [](const pollfd pollfd, const sh_pollHandle_t handle, void* userData)
+ auto actionPoll = [this](const pollfd pollfd, const sh_pollHandle_t handle, void* userData)
{
uint64_t mExpirations;
- if (read(pollfd.fd, &mExpirations, sizeof(uint64_t)) == -1)
+ ssize_t bytes = read(pollfd.fd, &mExpirations, sizeof(mExpirations));
+ if(bytes == -1)
+ {
+ if (errno == EAGAIN)//Something wrong, check for EAGAIN
+ bytes = read(pollfd.fd, &mExpirations, sizeof(mExpirations));
+ }
+
+ if(bytes != sizeof(mExpirations))
{
- //error received...try again
- read(pollfd.fd, &mExpirations, sizeof(uint64_t));
+ //Failed to read from fd...
+ logError("Failed to read from timer fd");
+ throw std::runtime_error(std::string("Failed to read from timer fd."));
}
};
- err = addFDPoll(timerItem.fd, POLLIN, NULL, actionPoll, [callback](const sh_pollHandle_t handle, void* userData)->bool
- {
- callback(handle, userData);
- return false;
- },
- NULL, userData, handle);
+ err = addFDPoll(timerItem.fd,
+ POLLIN,
+ NULL,
+ actionPoll,
+ [callback](const sh_pollHandle_t handle, void* userData)->bool{
+ callback(handle, userData);
+ return false;
+ },
+ NULL,
+ userData,
+ handle);
if (E_OK == err)
{
timerItem.handle = handle;
@@ -595,6 +668,7 @@ am_Error_e CAmSocketHandler::addTimer(const timespec & timeouts, std::function<v
*/
am_Error_e CAmSocketHandler::removeTimer(const sh_timerHandle_t handle)
{
+ CHECK_CALLER_THREAD_ID()
assert(handle != 0);
//stop the current timer
@@ -608,20 +682,22 @@ am_Error_e CAmSocketHandler::removeTimer(const sh_timerHandle_t handle)
if (it == mListTimer.end())
return (E_NON_EXISTENT);
- close(it->fd);
+ mListRemovedTimers.push_back(*it);
mListTimer.erase(it);
return removeFDPoll(handle);
#else
stopTimer(handle);
std::list<sh_timer_s>::iterator it(mListTimer.begin());
- for (; it != mListTimer.end(); ++it)
+ while (it != mListTimer.end())
{
if (it->handle == handle)
{
- it = mListTimer.erase(it);
+ it = mListTimer.erase(it);
mSetTimerKeys.pollHandles.erase(handle);
return (E_OK);
}
+ else
+ ++it;
}
return (E_UNKNOWN);
#endif
@@ -635,6 +711,8 @@ am_Error_e CAmSocketHandler::removeTimer(const sh_timerHandle_t handle)
*/
am_Error_e CAmSocketHandler::updateTimer(const sh_timerHandle_t handle, const timespec & timeouts)
{
+ CHECK_CALLER_THREAD_ID()
+
#ifdef WITH_TIMERFD
std::list<sh_timer_s>::iterator it = mListTimer.begin();
for (; it != mListTimer.end(); ++it)
@@ -657,7 +735,7 @@ am_Error_e CAmSocketHandler::updateTimer(const sh_timerHandle_t handle, const ti
}
else
{
- if (timerfd_settime(it->fd, 0, &it->countdown, NULL))
+ if (timerfd_settime(it->fd, 0, &it->countdown, NULL)<0)
{
logError("Failed to set timer duration");
return E_NOT_POSSIBLE;
@@ -719,6 +797,7 @@ am_Error_e CAmSocketHandler::updateTimer(const sh_timerHandle_t handle, const ti
*/
am_Error_e CAmSocketHandler::restartTimer(const sh_timerHandle_t handle)
{
+ CHECK_CALLER_THREAD_ID()
#ifdef WITH_TIMERFD
std::list<sh_timer_s>::iterator it = mListTimer.begin();
for (; it != mListTimer.end(); ++it)
@@ -737,7 +816,7 @@ am_Error_e CAmSocketHandler::restartTimer(const sh_timerHandle_t handle)
}
else
{
- if (timerfd_settime(it->fd, 0, &it->countdown, NULL))
+ if (timerfd_settime(it->fd, 0, &it->countdown, NULL)<0)
{
logError("Failed to set timer duration");
return E_NOT_POSSIBLE;
@@ -797,6 +876,7 @@ am_Error_e CAmSocketHandler::restartTimer(const sh_timerHandle_t handle)
*/
am_Error_e CAmSocketHandler::stopTimer(const sh_timerHandle_t handle)
{
+ CHECK_CALLER_THREAD_ID()
#ifdef WITH_TIMERFD
std::list<sh_timer_s>::iterator it = mListTimer.begin();
for (; it != mListTimer.end(); ++it)
@@ -811,7 +891,7 @@ am_Error_e CAmSocketHandler::stopTimer(const sh_timerHandle_t handle)
countdown.it_value.tv_nsec = 0;
countdown.it_value.tv_sec = 0;
- if (timerfd_settime(it->fd, 0, &countdown, NULL))
+ if (timerfd_settime(it->fd, 0, &countdown, NULL)<0)
{
logError("Failed to set timer duration");
return E_NOT_POSSIBLE;
@@ -820,13 +900,16 @@ am_Error_e CAmSocketHandler::stopTimer(const sh_timerHandle_t handle)
#else
//go through the list and remove the timer with the handle
std::list<sh_timer_s>::iterator it(mListActiveTimer.begin());
- for (; it != mListActiveTimer.end(); ++it)
+
+ while (it != mListActiveTimer.end())
{
if (it->handle == handle)
{
it = mListActiveTimer.erase(it);
return (E_OK);
}
+ else
+ it++;
}
return (E_NON_EXISTENT);
#endif
@@ -840,6 +923,7 @@ am_Error_e CAmSocketHandler::stopTimer(const sh_timerHandle_t handle)
*/
am_Error_e CAmSocketHandler::updateEventFlags(const sh_pollHandle_t handle, const short events)
{
+ CHECK_CALLER_THREAD_ID()
VectorListPoll_t::iterator iterator = mListPoll.begin();
for (; iterator != mListPoll.end(); ++iterator)
@@ -966,11 +1050,11 @@ void CAmSocketHandler::prepare(am::CAmSocketHandler::sh_poll_s& row)
/**
* fire callback
*/
-void CAmSocketHandler::fire(sh_poll_s& a)
+void CAmSocketHandler::fire(const sh_poll_s* a)
{
try
{
- a.firedCB(a.pollfdValue, a.handle, a.userData);
+ a->firedCB(a->pollfdValue, a->handle, a->userData);
} catch (std::exception& e)
{
logError("Sockethandler: Exception in Preparecallback,caught", e.what());
@@ -980,31 +1064,23 @@ void CAmSocketHandler::fire(sh_poll_s& a)
/**
* should disptach
*/
-bool CAmSocketHandler::noDispatching(const sh_poll_s& a)
+bool CAmSocketHandler::noDispatching(const sh_poll_s* a)
{
//remove from list of there is no checkCB
- if (nullptr == a.checkCB)
+ if (nullptr == a->checkCB || false == a->isValid)
return (true);
- return (!a.checkCB(a.handle, a.userData));
+ return (!a->checkCB(a->handle, a->userData));
}
/**
* disptach
*/
-bool CAmSocketHandler::dispatchingFinished(const sh_poll_s& a)
+bool CAmSocketHandler::dispatchingFinished(const sh_poll_s* a)
{
//remove from list of there is no dispatchCB
- if (nullptr == a.dispatchCB)
+ if (nullptr == a->dispatchCB || false == a->isValid)
return (true);
- return (!a.dispatchCB(a.handle, a.userData));
-}
-
-/**
- * event triggered
- */
-bool CAmSocketHandler::eventFired(const pollfd& a)
-{
- return (a.revents == 0 ? false : true);
+ return (!a->dispatchCB(a->handle, a->userData));
}
/**
@@ -1021,30 +1097,41 @@ inline timespec* CAmSocketHandler::insertTime(timespec& buffertime)
return (&buffertime);
}
else
-#endif
+#endif
{
return (NULL);
}
}
-#ifdef WITH_TIMERFD
+#ifdef WITH_TIMERFD
am_Error_e CAmSocketHandler::createTimeFD(const itimerspec & timeouts, int & fd)
{
fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
- if (fd <= 0)
+ if (fd < 0)
{
logError("Failed to create timer");
return E_NOT_POSSIBLE;
}
- if (timerfd_settime(fd, 0, &timeouts, NULL))
+ if (timerfd_settime(fd, 0, &timeouts, NULL) < 0)
{
logError("Failed to set timer duration");
return E_NOT_POSSIBLE;
}
return E_OK;
}
-#endif
+
+void CAmSocketHandler::closeRemovedTimers()
+{
+ for (auto it : mListRemovedTimers)
+ {
+ if( it.fd > -1 )
+ close( it.fd );
+ }
+ mListRemovedTimers.clear();
+}
+
+#endif
void CAmSocketHandler::callTimer(sh_timer_s& a)
{