diff options
Diffstat (limited to 'AudioManagerUtilities/src/CAmSocketHandler.cpp')
-rw-r--r-- | AudioManagerUtilities/src/CAmSocketHandler.cpp | 315 |
1 files changed, 201 insertions, 114 deletions
diff --git a/AudioManagerUtilities/src/CAmSocketHandler.cpp b/AudioManagerUtilities/src/CAmSocketHandler.cpp index fad60e5..7cf5594 100644 --- a/AudioManagerUtilities/src/CAmSocketHandler.cpp +++ b/AudioManagerUtilities/src/CAmSocketHandler.cpp @@ -31,6 +31,7 @@ #include <features.h> #include <csignal> #include <unistd.h> +#include <string.h> #include "CAmDltWrapper.h" #include "CAmSocketHandler.h" @@ -42,22 +43,38 @@ namespace am { +#define CHECK_CALLER_THREAD_ID()\ + if(std::this_thread::get_id() != mThreadID)\ + {\ + logError("Sockethandler: Call from another thread detected!");\ + assert(false);\ + } + + + + CAmSocketHandler::CAmSocketHandler() : mPipe(), // - mDispatchDone(true), // - mSetPollKeys(MAX_POLLHANDLE), // - mListPoll(), // - mSetTimerKeys(MAX_TIMERHANDLE), - mListTimer(), // - mListActiveTimer(), // - mSetSignalhandlerKeys(MAX_POLLHANDLE), // - mSignalHandlers(), // - mRecreatePollfds(true), - mInternalCodes(internal_codes_e::NO_ERROR), - mSignalFdHandle(0) -#ifndef WITH_TIMERFD -,mStartTime() // -#endif + mDispatchDone(true), // + mSetPollKeys(MAX_POLLHANDLE), // + mListPoll(), // + mSetTimerKeys(MAX_TIMERHANDLE), + mListTimer(), // + #ifndef WITH_TIMERFD + mListActiveTimer(), // + #else + mListRemovedTimers(), + #endif + mSetSignalhandlerKeys(MAX_POLLHANDLE), // + mSignalHandlers(), // + mRecreatePollfds(true), + mInternalCodes(internal_codes_e::NO_ERROR), + mSignalFdHandle(0), + mListActivePolls(), + mThreadID(std::this_thread::get_id()) + #ifndef WITH_TIMERFD + ,mStartTime() // + #endif { if (pipe(mPipe) == -1) { @@ -69,17 +86,17 @@ CAmSocketHandler::CAmSocketHandler() : short event = 0; sh_pollHandle_t handle; event |= POLLIN; - if (addFDPoll(mPipe[0], event, NULL, - [](const pollfd, const sh_pollHandle_t, void*){}, - [](const sh_pollHandle_t, void*) { return (false); }, - NULL, NULL, handle) != E_OK) - { + if (addFDPoll(mPipe[0], event, NULL, [](const pollfd pollfd, const sh_pollHandle_t, void*) + {}, [](const sh_pollHandle_t, void*) + { return (false);}, NULL, NULL, handle) != E_OK) mInternalCodes |= internal_codes_e::FD_ERROR; - } } CAmSocketHandler::~CAmSocketHandler() { +#ifdef WITH_TIMERFD + closeRemovedTimers(); +#endif for (auto it : mListPoll) { close(it.pollfdValue.fd); @@ -97,13 +114,16 @@ void CAmSocketHandler::start_listenting() mDispatchDone = false; int16_t pollStatus; + CHECK_CALLER_THREAD_ID() + #ifndef WITH_TIMERFD clock_gettime(CLOCK_MONOTONIC, &mStartTime); #endif timespec buffertime; - VectorListPoll_t cloneListPoll; + std::list<sh_poll_s*> listPoll; VectorListPoll_t::iterator listmPollIt; + VectorListPollfd_t::iterator itMfdPollingArray; VectorListPollfd_t fdPollingArray; //!<the polling array for ppoll auto preparePollfd = [&](const sh_poll_s& row) @@ -118,17 +138,20 @@ void CAmSocketHandler::start_listenting() { if (mRecreatePollfds) { +#ifdef WITH_TIMERFD + closeRemovedTimers(); +#endif fdPollingArray.clear(); //freeze mListPoll by copying it - otherwise we get problems when we want to manipulate it during the next lines - cloneListPoll = mListPoll; + mListActivePolls = mListPoll; //there was a change in the setup, so we need to recreate the fdarray from the list - std::for_each(cloneListPoll.begin(), cloneListPoll.end(), preparePollfd); + std::for_each(mListActivePolls.begin(), mListActivePolls.end(), preparePollfd); mRecreatePollfds = false; } else { //first we go through the registered filedescriptors and check if someone needs preparation: - std::for_each(cloneListPoll.begin(), cloneListPoll.end(), CAmSocketHandler::prepare); + std::for_each(mListActivePolls.begin(), mListActivePolls.end(), CAmSocketHandler::prepare); } #ifndef WITH_TIMERFD @@ -152,25 +175,25 @@ void CAmSocketHandler::start_listenting() if (pollStatus != 0) //only check filedescriptors if there was a change { - std::list<sh_poll_s> listPoll; //todo: here could be a timer that makes sure naughty plugins return! + listPoll.clear(); //stage 0+1, call firedCB - listmPollIt = cloneListPoll.begin(); - for (auto it : fdPollingArray) + for (itMfdPollingArray = fdPollingArray.begin(); itMfdPollingArray != fdPollingArray.end(); ++itMfdPollingArray) { - if (CAmSocketHandler::eventFired(it)) + itMfdPollingArray->revents &= itMfdPollingArray->events | POLLERR | POLLHUP; + if ( itMfdPollingArray->revents!=0 ) { - listmPollIt->pollfdValue.revents = it.revents; - listPoll.push_back(*listmPollIt); - CAmSocketHandler::fire(*listmPollIt); - } - else - { - listmPollIt->pollfdValue.revents = 0; + listmPollIt = mListActivePolls.begin(); + std::advance(listmPollIt, std::distance(fdPollingArray.begin(), itMfdPollingArray)); + + sh_poll_s & pollObj = *listmPollIt; + + listPoll.push_back(&pollObj); + CAmSocketHandler::fire(&pollObj); + itMfdPollingArray->revents = 0; } - listmPollIt++; } - + //stage 2, lets ask around if some dispatching is necessary, the ones who need stay on the list listPoll.remove_if(CAmSocketHandler::noDispatching); @@ -223,7 +246,7 @@ void CAmSocketHandler::exit_mainloop() bool CAmSocketHandler::fatalErrorOccurred() { - return ((mInternalCodes&internal_codes_e::PIPE_ERROR)>0)||((mInternalCodes&internal_codes_e::FD_ERROR)>0); + return ((mInternalCodes&internal_codes_e::PIPE_ERROR)>0)||((mInternalCodes&internal_codes_e::FD_ERROR)>0); } am_Error_e CAmSocketHandler::getFDPollData(const sh_pollHandle_t handle, sh_poll_s & outPollData) @@ -246,6 +269,8 @@ am_Error_e CAmSocketHandler::getFDPollData(const sh_pollHandle_t handle, sh_poll */ am_Error_e CAmSocketHandler::listenToSignals(const std::vector<uint8_t> & listSignals) { + CHECK_CALLER_THREAD_ID() + int fdErr; uint8_t addedSignals = 0; sigset_t sigset; @@ -287,31 +312,19 @@ am_Error_e CAmSocketHandler::listenToSignals(const std::vector<uint8_t> & listSi return (E_NOT_POSSIBLE); } - int signalHandlerFd; + sh_poll_s sgPollData; if(mSignalFdHandle) { - sh_poll_s sgPollData; if(E_OK!=getFDPollData(mSignalFdHandle, sgPollData)) { - removeFDPoll(mSignalFdHandle); - mSignalFdHandle = 0; - } - else - { - int signalHandlerFd = signalfd(sgPollData.pollfdValue.fd, &sigset, 0); - if (signalHandlerFd == -1) - { - logError("Could not update signal fd!"); - return (E_NOT_POSSIBLE); - } - return E_OK; + mSignalFdHandle = 0; } } if(0==mSignalFdHandle) { /* Create the signalfd */ - signalHandlerFd = signalfd(-1, &sigset, 0); + int signalHandlerFd = signalfd(-1, &sigset, SFD_NONBLOCK); if (signalHandlerFd == -1) { logError("Could not open signal fd!"); @@ -320,21 +333,40 @@ am_Error_e CAmSocketHandler::listenToSignals(const std::vector<uint8_t> & listSi auto actionPoll = [this](const pollfd pollfd, const sh_pollHandle_t, void*) { - const VectorSignalHandlers_t & signalHandlers = mSignalHandlers; - /* We have a valid signal, read the info from the fd */ - struct signalfd_siginfo info; - ssize_t bytes = read(pollfd.fd, &info, sizeof(info)); - assert(bytes == sizeof(info)); - - /* Notify all listeners */ - for(auto it: signalHandlers) - it.callback(it.handle, info, it.userData); + const VectorSignalHandlers_t & signalHandlers = mSignalHandlers; + /* We have a valid signal, read the info from the fd */ + struct signalfd_siginfo info; + ssize_t bytes = read(pollfd.fd, &info, sizeof(info)); + if(bytes == -1) + { + if (errno == EAGAIN) //Something wrong, check for EAGAIN + bytes = read(pollfd.fd, &info, sizeof(info)); + } + if(bytes != sizeof(info)) + { + //Failed to read from fd... + logError("Failed to read from signal fd"); + throw std::runtime_error(std::string("Failed to read from signal fd.")); + } + + /* Notify all listeners */ + for(auto it: signalHandlers) + it.callback(it.handle, info, it.userData); }; /* We're going to add the signal fd through addFDPoll. At this point we don't have any signal listeners. */ - am_Error_e shFdError = addFDPoll(signalHandlerFd, POLLIN | POLLERR | POLLHUP, NULL, actionPoll, [](const sh_pollHandle_t, void*) + return addFDPoll(signalHandlerFd, POLLIN | POLLERR | POLLHUP, NULL, actionPoll, [](const sh_pollHandle_t, void*) { return (false);}, NULL, NULL, mSignalFdHandle); - return shFdError; } + else + { + int signalHandlerFd = signalfd(sgPollData.pollfdValue.fd, &sigset, 0); + if (signalHandlerFd == -1) + { + logError("Could not update signal fd!", strerror(errno)); + return (E_NOT_POSSIBLE); + } + return E_OK; + } } /** @@ -350,10 +382,17 @@ am_Error_e CAmSocketHandler::listenToSignals(const std::vector<uint8_t> & listSi * @return E_OK if the descriptor was added, E_NON_EXISTENT if the fd is not valid */ -am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, std::function<void(const sh_pollHandle_t handle, void* userData)> prepare, - std::function<void(const pollfd pollfd, const sh_pollHandle_t handle, void* userData)> fired, std::function<bool(const sh_pollHandle_t handle, void* userData)> check, - std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatch, void* userData, sh_pollHandle_t& handle) +am_Error_e CAmSocketHandler::addFDPoll(const int fd, + const short event, + std::function<void(const sh_pollHandle_t handle, void* userData)> prepare, + std::function<void(const pollfd pollfd, const sh_pollHandle_t handle, void* userData)> fired, + std::function<bool(const sh_pollHandle_t handle, void* userData)> check, + std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatch, + void* userData, + sh_pollHandle_t& handle) { + CHECK_CALLER_THREAD_ID() + if (!fdIsValid(fd)) return (E_NON_EXISTENT); @@ -400,7 +439,7 @@ am::am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, IAmS { std::function<void(const sh_pollHandle_t handle, void* userData)> prepareCB; //preperation callback - std::function<void(const pollfd poll, const sh_pollHandle_t handle, void* userData)> firedCB; //fired callback + std::function<void(const pollfd pollfd, const sh_pollHandle_t handle, void* userData)> firedCB; //fired callback std::function<bool(const sh_pollHandle_t handle, void* userData)> checkCB; //check callback std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatchCB; //check callback @@ -423,19 +462,36 @@ am::am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, IAmS */ am_Error_e CAmSocketHandler::removeFDPoll(const sh_pollHandle_t handle) { - VectorListPoll_t::iterator iterator = mListPoll.begin(); + CHECK_CALLER_THREAD_ID() - for (; iterator != mListPoll.end(); ++iterator) + bool handleRemoved = false; + + for (auto it = mListPoll.begin(); it != mListPoll.end(); ++it) { - if (iterator->handle == handle) + if (it->handle == handle) { - iterator = mListPoll.erase(iterator); + it = mListPoll.erase(it); mSetPollKeys.pollHandles.erase(handle); - mRecreatePollfds = true; - return (E_OK); + handleRemoved = true; + break; } } - return (E_UNKNOWN); + + if ( false == handleRemoved ) + return (E_UNKNOWN); + + mRecreatePollfds = true; + + for (auto it = mListActivePolls.begin(); it != mListActivePolls.end(); ++it) + { + if (it->handle == handle) + { + it->isValid = false; + break; + } + } + + return (E_OK); } /** @@ -447,6 +503,8 @@ am_Error_e CAmSocketHandler::removeFDPoll(const sh_pollHandle_t handle) */ am_Error_e CAmSocketHandler::addSignalHandler(std::function<void(const sh_pollHandle_t handle, const signalfd_siginfo & info, void* userData)> callback, sh_pollHandle_t& handle, void * userData) { + CHECK_CALLER_THREAD_ID() + if (!nextHandle(mSetSignalhandlerKeys)) { logError("Could not create new polls, too many open!"); @@ -469,6 +527,8 @@ am_Error_e CAmSocketHandler::addSignalHandler(std::function<void(const sh_pollHa */ am_Error_e CAmSocketHandler::removeSignalHandler(const sh_pollHandle_t handle) { + CHECK_CALLER_THREAD_ID() + VectorSignalHandlers_t::iterator it(mSignalHandlers.begin()); for (; it != mSignalHandlers.end(); ++it) { @@ -506,6 +566,7 @@ am_Error_e CAmSocketHandler::addTimer(const timespec & timeouts, IAmShTimerCallB am_Error_e CAmSocketHandler::addTimer(const timespec & timeouts, std::function<void(const sh_timerHandle_t handle, void* userData)> callback, sh_timerHandle_t& handle, void * userData, const bool repeats) { + CHECK_CALLER_THREAD_ID() assert(!((timeouts.tv_sec == 0) && (timeouts.tv_nsec == 0))); mListTimer.emplace_back(); @@ -533,7 +594,6 @@ am_Error_e CAmSocketHandler::addTimer(const timespec & timeouts, std::function<v clock_gettime(CLOCK_MONOTONIC, ¤tTime); if (!mDispatchDone)//the mainloop is started timerItem.countdown = timespecAdd(timeouts, timespecSub(currentTime, mStartTime)); - mListTimer.push_back(timerItem); mListActiveTimer.push_back(timerItem); mListActiveTimer.sort(compareCountdown); return (E_OK); @@ -559,22 +619,35 @@ am_Error_e CAmSocketHandler::addTimer(const timespec & timeouts, std::function<v return err; } - static auto actionPoll = [](const pollfd pollfd, const sh_pollHandle_t handle, void* userData) + auto actionPoll = [this](const pollfd pollfd, const sh_pollHandle_t handle, void* userData) { uint64_t mExpirations; - if (read(pollfd.fd, &mExpirations, sizeof(uint64_t)) == -1) + ssize_t bytes = read(pollfd.fd, &mExpirations, sizeof(mExpirations)); + if(bytes == -1) + { + if (errno == EAGAIN)//Something wrong, check for EAGAIN + bytes = read(pollfd.fd, &mExpirations, sizeof(mExpirations)); + } + + if(bytes != sizeof(mExpirations)) { - //error received...try again - read(pollfd.fd, &mExpirations, sizeof(uint64_t)); + //Failed to read from fd... + logError("Failed to read from timer fd"); + throw std::runtime_error(std::string("Failed to read from timer fd.")); } }; - err = addFDPoll(timerItem.fd, POLLIN, NULL, actionPoll, [callback](const sh_pollHandle_t handle, void* userData)->bool - { - callback(handle, userData); - return false; - }, - NULL, userData, handle); + err = addFDPoll(timerItem.fd, + POLLIN, + NULL, + actionPoll, + [callback](const sh_pollHandle_t handle, void* userData)->bool{ + callback(handle, userData); + return false; + }, + NULL, + userData, + handle); if (E_OK == err) { timerItem.handle = handle; @@ -595,6 +668,7 @@ am_Error_e CAmSocketHandler::addTimer(const timespec & timeouts, std::function<v */ am_Error_e CAmSocketHandler::removeTimer(const sh_timerHandle_t handle) { + CHECK_CALLER_THREAD_ID() assert(handle != 0); //stop the current timer @@ -608,20 +682,22 @@ am_Error_e CAmSocketHandler::removeTimer(const sh_timerHandle_t handle) if (it == mListTimer.end()) return (E_NON_EXISTENT); - close(it->fd); + mListRemovedTimers.push_back(*it); mListTimer.erase(it); return removeFDPoll(handle); #else stopTimer(handle); std::list<sh_timer_s>::iterator it(mListTimer.begin()); - for (; it != mListTimer.end(); ++it) + while (it != mListTimer.end()) { if (it->handle == handle) { - it = mListTimer.erase(it); + it = mListTimer.erase(it); mSetTimerKeys.pollHandles.erase(handle); return (E_OK); } + else + ++it; } return (E_UNKNOWN); #endif @@ -635,6 +711,8 @@ am_Error_e CAmSocketHandler::removeTimer(const sh_timerHandle_t handle) */ am_Error_e CAmSocketHandler::updateTimer(const sh_timerHandle_t handle, const timespec & timeouts) { + CHECK_CALLER_THREAD_ID() + #ifdef WITH_TIMERFD std::list<sh_timer_s>::iterator it = mListTimer.begin(); for (; it != mListTimer.end(); ++it) @@ -657,7 +735,7 @@ am_Error_e CAmSocketHandler::updateTimer(const sh_timerHandle_t handle, const ti } else { - if (timerfd_settime(it->fd, 0, &it->countdown, NULL)) + if (timerfd_settime(it->fd, 0, &it->countdown, NULL)<0) { logError("Failed to set timer duration"); return E_NOT_POSSIBLE; @@ -719,6 +797,7 @@ am_Error_e CAmSocketHandler::updateTimer(const sh_timerHandle_t handle, const ti */ am_Error_e CAmSocketHandler::restartTimer(const sh_timerHandle_t handle) { + CHECK_CALLER_THREAD_ID() #ifdef WITH_TIMERFD std::list<sh_timer_s>::iterator it = mListTimer.begin(); for (; it != mListTimer.end(); ++it) @@ -737,7 +816,7 @@ am_Error_e CAmSocketHandler::restartTimer(const sh_timerHandle_t handle) } else { - if (timerfd_settime(it->fd, 0, &it->countdown, NULL)) + if (timerfd_settime(it->fd, 0, &it->countdown, NULL)<0) { logError("Failed to set timer duration"); return E_NOT_POSSIBLE; @@ -797,6 +876,7 @@ am_Error_e CAmSocketHandler::restartTimer(const sh_timerHandle_t handle) */ am_Error_e CAmSocketHandler::stopTimer(const sh_timerHandle_t handle) { + CHECK_CALLER_THREAD_ID() #ifdef WITH_TIMERFD std::list<sh_timer_s>::iterator it = mListTimer.begin(); for (; it != mListTimer.end(); ++it) @@ -811,7 +891,7 @@ am_Error_e CAmSocketHandler::stopTimer(const sh_timerHandle_t handle) countdown.it_value.tv_nsec = 0; countdown.it_value.tv_sec = 0; - if (timerfd_settime(it->fd, 0, &countdown, NULL)) + if (timerfd_settime(it->fd, 0, &countdown, NULL)<0) { logError("Failed to set timer duration"); return E_NOT_POSSIBLE; @@ -820,13 +900,16 @@ am_Error_e CAmSocketHandler::stopTimer(const sh_timerHandle_t handle) #else //go through the list and remove the timer with the handle std::list<sh_timer_s>::iterator it(mListActiveTimer.begin()); - for (; it != mListActiveTimer.end(); ++it) + + while (it != mListActiveTimer.end()) { if (it->handle == handle) { it = mListActiveTimer.erase(it); return (E_OK); } + else + it++; } return (E_NON_EXISTENT); #endif @@ -840,6 +923,7 @@ am_Error_e CAmSocketHandler::stopTimer(const sh_timerHandle_t handle) */ am_Error_e CAmSocketHandler::updateEventFlags(const sh_pollHandle_t handle, const short events) { + CHECK_CALLER_THREAD_ID() VectorListPoll_t::iterator iterator = mListPoll.begin(); for (; iterator != mListPoll.end(); ++iterator) @@ -966,11 +1050,11 @@ void CAmSocketHandler::prepare(am::CAmSocketHandler::sh_poll_s& row) /** * fire callback */ -void CAmSocketHandler::fire(sh_poll_s& a) +void CAmSocketHandler::fire(const sh_poll_s* a) { try { - a.firedCB(a.pollfdValue, a.handle, a.userData); + a->firedCB(a->pollfdValue, a->handle, a->userData); } catch (std::exception& e) { logError("Sockethandler: Exception in Preparecallback,caught", e.what()); @@ -980,31 +1064,23 @@ void CAmSocketHandler::fire(sh_poll_s& a) /** * should disptach */ -bool CAmSocketHandler::noDispatching(const sh_poll_s& a) +bool CAmSocketHandler::noDispatching(const sh_poll_s* a) { //remove from list of there is no checkCB - if (nullptr == a.checkCB) + if (nullptr == a->checkCB || false == a->isValid) return (true); - return (!a.checkCB(a.handle, a.userData)); + return (!a->checkCB(a->handle, a->userData)); } /** * disptach */ -bool CAmSocketHandler::dispatchingFinished(const sh_poll_s& a) +bool CAmSocketHandler::dispatchingFinished(const sh_poll_s* a) { //remove from list of there is no dispatchCB - if (nullptr == a.dispatchCB) + if (nullptr == a->dispatchCB || false == a->isValid) return (true); - return (!a.dispatchCB(a.handle, a.userData)); -} - -/** - * event triggered - */ -bool CAmSocketHandler::eventFired(const pollfd& a) -{ - return (a.revents == 0 ? false : true); + return (!a->dispatchCB(a->handle, a->userData)); } /** @@ -1021,30 +1097,41 @@ inline timespec* CAmSocketHandler::insertTime(timespec& buffertime) return (&buffertime); } else -#endif +#endif { return (NULL); } } -#ifdef WITH_TIMERFD +#ifdef WITH_TIMERFD am_Error_e CAmSocketHandler::createTimeFD(const itimerspec & timeouts, int & fd) { fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); - if (fd <= 0) + if (fd < 0) { logError("Failed to create timer"); return E_NOT_POSSIBLE; } - if (timerfd_settime(fd, 0, &timeouts, NULL)) + if (timerfd_settime(fd, 0, &timeouts, NULL) < 0) { logError("Failed to set timer duration"); return E_NOT_POSSIBLE; } return E_OK; } -#endif + +void CAmSocketHandler::closeRemovedTimers() +{ + for (auto it : mListRemovedTimers) + { + if( it.fd > -1 ) + close( it.fd ); + } + mListRemovedTimers.clear(); +} + +#endif void CAmSocketHandler::callTimer(sh_timer_s& a) { |