diff options
Diffstat (limited to 'AudioManagerUtilities/src/CAmSocketHandler.cpp')
-rw-r--r-- | AudioManagerUtilities/src/CAmSocketHandler.cpp | 255 |
1 files changed, 140 insertions, 115 deletions
diff --git a/AudioManagerUtilities/src/CAmSocketHandler.cpp b/AudioManagerUtilities/src/CAmSocketHandler.cpp index ba36363..7e12fc0 100644 --- a/AudioManagerUtilities/src/CAmSocketHandler.cpp +++ b/AudioManagerUtilities/src/CAmSocketHandler.cpp @@ -47,7 +47,7 @@ CAmSocketHandler::CAmSocketHandler() : mEventFd(-1), // mDispatchDone(true), // mSetPollKeys(MAX_POLLHANDLE), // - mListPoll(), // + mMapShPoll(), // mSetTimerKeys(MAX_TIMERHANDLE), mListTimer(), // #ifndef WITH_TIMERFD @@ -55,7 +55,6 @@ CAmSocketHandler::CAmSocketHandler() : #endif mSetSignalhandlerKeys(MAX_POLLHANDLE), // mSignalHandlers(), // - mRecreatePollfds(true), mInternalCodes(internal_codes_e::NO_ERROR), mSignalFdHandle(0) #ifndef WITH_TIMERFD @@ -71,6 +70,12 @@ CAmSocketHandler::CAmSocketHandler() : if (bytes == sizeof(events)) { if (events == UINT64_MAX-1) + { + for (auto & elem : mMapShPoll) + { + if (elem.second.state != poll_states_e::REMOVE) + elem.second.state = poll_states_e::UNINIT; + } mDispatchDone = true; return; } @@ -98,12 +103,9 @@ CAmSocketHandler::CAmSocketHandler() : CAmSocketHandler::~CAmSocketHandler() { - for (auto it : mListPoll) + for (const auto& it : mMapShPoll) { - // This check is needed to ensure that fd wasn't closed - // e.g. in the communication plugin's dtor's. - if (fdIsValid(mEventFd)) - close(it.pollfdValue.fd); + close(it.second.pollfdValue.fd); } } @@ -114,80 +116,78 @@ CAmSocketHandler::~CAmSocketHandler() void CAmSocketHandler::start_listenting() { mDispatchDone = false; - int16_t pollStatus; #ifndef WITH_TIMERFD clock_gettime(CLOCK_MONOTONIC, &mStartTime); #endif timespec buffertime; - VectorListPoll_t cloneListPoll; - VectorListPoll_t::iterator listmPollIt; - VectorListPollfd_t fdPollingArray; //!<the polling array for ppoll - - auto preparePollfd = [&](const sh_poll_s& row) - { - CAmSocketHandler::prepare((sh_poll_s&)row); - pollfd temp = row.pollfdValue; - temp.revents = 0; - fdPollingArray.push_back(temp); - }; + VectorPollfd_t fdPollingArray; //!<the polling array for ppoll while (!mDispatchDone) { - if (mRecreatePollfds) + /* Iterate all times through map and synchronize the polling array accordingly. + * In case a new element in map appears the polling array will be extended and + * in case an element gets removed the map and the polling array needs to be adapted. + */ + auto fdPollIt = fdPollingArray.begin(); + for (auto it = mMapShPoll.begin(); it != mMapShPoll.end(); ) { - fdPollingArray.clear(); - //freeze mListPoll by copying it - otherwise we get problems when we want to manipulate it during the next lines - cloneListPoll = mListPoll; - //there was a change in the setup, so we need to recreate the fdarray from the list - std::for_each(cloneListPoll.begin(), cloneListPoll.end(), preparePollfd); - mRecreatePollfds = false; - } - else - { - //first we go through the registered filedescriptors and check if someone needs preparation: - std::for_each(cloneListPoll.begin(), cloneListPoll.end(), CAmSocketHandler::prepare); + // NOTE: The order of the switch/case statement is important and should modified with care + auto& elem = it->second; + switch (elem.state) + { + case poll_states_e::REMOVE: + close(elem.pollfdValue.fd); + fdPollIt = fdPollingArray.erase(fdPollIt); + it = mMapShPoll.erase(it); + continue; + + case poll_states_e::UNINIT: + fdPollIt = fdPollingArray.emplace(fdPollIt); + // fallthrough + + case poll_states_e::UPDATE: + CAmSocketHandler::prepare(elem); + elem.state = poll_states_e::VALID; + *fdPollIt = elem.pollfdValue; + break; + + case poll_states_e::VALID: + default: + break; + } + // Ensures that fdPollIt will be never on its end before incrementing it further! + assert(fdPollIt != fdPollingArray.end()); + ++fdPollIt; + ++it; } #ifndef WITH_TIMERFD timerCorrection(); #endif - //block until something is on a filedescriptor - if ((pollStatus = ppoll(&fdPollingArray[0], fdPollingArray.size(), insertTime(buffertime), NULL)) < 0) + // block until something is on a file descriptor + int16_t pollStatus = ppoll(&fdPollingArray[0], fdPollingArray.size(), insertTime(buffertime), NULL); + if (pollStatus > 0) { - if (errno == EINTR) - { - //a signal was received, that means it's time to go... - pollStatus = 0; - } - else + // stage 0+1, call firedCB + std::list<sh_poll_s*> listPoll; + for (auto& it : fdPollingArray) { - logError("SocketHandler::start_listenting ppoll returned with error", errno); - throw std::runtime_error(std::string("SocketHandler::start_listenting ppoll returned with error.")); - } - } - - if (pollStatus != 0) //only check filedescriptors if there was a change - { - std::list<sh_poll_s> listPoll; - //todo: here could be a timer that makes sure naughty plugins return! - //stage 0+1, call firedCB - listmPollIt = cloneListPoll.begin(); - for (auto it : fdPollingArray) - { - if (CAmSocketHandler::eventFired(it)) - { - listmPollIt->pollfdValue.revents = it.revents; - listPoll.push_back(*listmPollIt); - CAmSocketHandler::fire(*listmPollIt); - } - else - { - listmPollIt->pollfdValue.revents = 0; - } - listmPollIt++; + it.revents &= it.events; + if (it.revents == 0) + continue; + + sh_poll_s& pollObj = mMapShPoll.at(it.fd); + if (pollObj.state != poll_states_e::VALID) + continue; + + // ensure to copy the revents fired in fdPollingArray + pollObj.pollfdValue.revents = it.revents; + listPoll.push_back(&pollObj); + CAmSocketHandler::fire(pollObj); + it.revents = 0; } //stage 2, lets ask around if some dispatching is necessary, the ones who need stay on the list @@ -198,16 +198,20 @@ void CAmSocketHandler::start_listenting() { listPoll.remove_if(CAmSocketHandler::dispatchingFinished); } while (!listPoll.empty()); - } -#ifndef WITH_TIMERFD + else if ((pollStatus < 0) && (errno != EINTR)) + { + logError("SocketHandler::start_listenting ppoll returned with error", errno); + throw std::runtime_error(std::string("SocketHandler::start_listenting ppoll returned with error.")); + } else //Timerevent { +#ifndef WITH_TIMERFD //this was a timer event, we need to take care about the timers //find out the timedifference to starttime timerUp(); - } #endif + } } } @@ -253,14 +257,13 @@ bool CAmSocketHandler::fatalErrorOccurred() am_Error_e CAmSocketHandler::getFDPollData(const sh_pollHandle_t handle, sh_poll_s & outPollData) { - VectorListPoll_t::iterator iterator = mListPoll.begin(); - for (; iterator != mListPoll.end(); ++iterator) + for (auto it : mMapShPoll) { - if (iterator->handle == handle) - { - outPollData = *iterator; - return (E_OK); - } + if (it.second.handle != handle) + continue; + + outPollData = it.second; + return (E_OK); } return (E_UNKNOWN); } @@ -380,20 +383,35 @@ am_Error_e CAmSocketHandler::listenToSignals(const std::vector<uint8_t> & listSi * @param dispatch a std::function that is called to dispatch the received data * @param userData a pointer to userdata that is always passed around * @param handle the handle of this poll - * @return E_OK if the descriptor was added, E_NON_EXISTENT if the fd is not valid + * @return E_OK if the descriptor was added + * E_NON_EXISTENT if the fd is not valid + * E_ALREADY_EXISTS if the fd is already known + * E_NOT_POSSIBLE if the maximum handle threshold is reached */ -am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, std::function<void(const sh_pollHandle_t handle, void* userData)> prepare, - std::function<void(const pollfd pollfd, const sh_pollHandle_t handle, void* userData)> fired, std::function<bool(const sh_pollHandle_t handle, void* userData)> check, - std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatch, void* userData, sh_pollHandle_t& handle) +am_Error_e CAmSocketHandler::addFDPoll(const int fd, + const short event, + std::function<void(const sh_pollHandle_t handle, void* userData)> prepare, + std::function<void(const pollfd pollfd, const sh_pollHandle_t handle, void* userData)> fired, + std::function<bool(const sh_pollHandle_t handle, void* userData)> check, + std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatch, + void* userData, + sh_pollHandle_t& handle) { if (!fdIsValid(fd)) - return (E_NON_EXISTENT); + return E_NON_EXISTENT; + + const auto elem = mMapShPoll.find(fd); + if (elem != mMapShPoll.end() && elem->second.state != poll_states_e::REMOVE) + { + logError("CAmSocketHandler::addFDPoll fd", fd, "already registered!"); + return E_ALREADY_EXISTS; + } //create a new handle for the poll if (!nextHandle(mSetPollKeys)) { - logError("Could not create new polls, too many open!"); + logError("CAmSocketHandler::addFDPoll Max handle count reached!"); return (E_NOT_POSSIBLE); } @@ -407,10 +425,9 @@ am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, std::fun pollData.checkCB = check; pollData.dispatchCB = dispatch; pollData.userData = userData; - //add new data to the list - mListPoll.push_back(pollData); - mRecreatePollfds = true; + //add new data to the list + mMapShPoll[fd] = pollData; handle = pollData.handle; return (E_OK); @@ -427,15 +444,18 @@ am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, std::fun * @param dispatch a callback that is called to dispatch the received data * @param userData a pointer to userdata that is always passed around * @param handle the handle of this poll - * @return E_OK if the descriptor was added, E_NON_EXISTENT if the fd is not valid + * @return E_OK if the descriptor was added + * E_NON_EXISTENT if the fd is not valid + * E_ALREADY_EXISTS if the fd is already known + * E_NOT_POSSIBLE if the maximum handle threshold is reached */ am::am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, IAmShPollPrepare *prepare, IAmShPollFired *fired, IAmShPollCheck *check, IAmShPollDispatch *dispatch, void *userData, sh_pollHandle_t & handle) { - std::function<void(const sh_pollHandle_t handle, void* userData)> prepareCB; //preperation callback - std::function<void(const pollfd poll, const sh_pollHandle_t handle, void* userData)> firedCB; //fired callback - std::function<bool(const sh_pollHandle_t handle, void* userData)> checkCB; //check callback - std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatchCB; //check callback + std::function<void(const sh_pollHandle_t, void*)> prepareCB; //preperation callback + std::function<void(const pollfd, const sh_pollHandle_t, void*)> firedCB; //fired callback + std::function<bool(const sh_pollHandle_t, void*)> checkCB; //check callback + std::function<bool(const sh_pollHandle_t, void*)> dispatchCB; //check callback if (prepare) prepareCB = std::bind(&IAmShPollPrepare::Call, prepare, std::placeholders::_1, std::placeholders::_2); @@ -456,19 +476,25 @@ am::am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, IAmS */ am_Error_e CAmSocketHandler::removeFDPoll(const sh_pollHandle_t handle) { - VectorListPoll_t::iterator iterator = mListPoll.begin(); - - for (; iterator != mListPoll.end(); ++iterator) + for (auto& it : mMapShPoll) { - if (iterator->handle == handle) + if (it.second.handle == handle) { - iterator = mListPoll.erase(iterator); + it.second.state = poll_states_e::REMOVE; + + static uint64_t events(1); + if (write(mEventFd, &(++events), sizeof(events)) < 0) + { + std::ostringstream msg("CAmSocketHandler::removeFDPoll "); + msg << "Failed to write to event fd: " << mEventFd << " errno: " << std::strerror(errno); + throw std::runtime_error(msg.str()); + } mSetPollKeys.pollHandles.erase(handle); - mRecreatePollfds = true; - return (E_OK); + return E_OK; } } - return (E_UNKNOWN); + logError("CAmSocketHandler::removeFDPoll handle unknown", handle); + return E_UNKNOWN; } /** @@ -618,7 +644,13 @@ am_Error_e CAmSocketHandler::addTimer(const timespec & timeouts, std::function<v { timerItem.handle = handle; mListTimer.push_back(timerItem); + return E_OK; } + + // E_NOT_POSSIBLE is the only case were we need to close the timer + if (err == E_NOT_POSSIBLE) + close(timerItem.fd); + return err; #endif @@ -877,14 +909,14 @@ am_Error_e CAmSocketHandler::stopTimer(const sh_timerHandle_t handle) */ am_Error_e CAmSocketHandler::updateEventFlags(const sh_pollHandle_t handle, const short events) { - VectorListPoll_t::iterator iterator = mListPoll.begin(); - - for (; iterator != mListPoll.end(); ++iterator) + for (auto& it : mMapShPoll) { - if (iterator->handle == handle) + auto& elem = it.second; + if (elem.handle == handle && elem.state != poll_states_e::REMOVE) { - iterator->pollfdValue.events = events; - mRecreatePollfds = true; + elem.pollfdValue.events = events; + elem.pollfdValue.revents = 0; + elem.state = poll_states_e::UPDATE; return (E_OK); } } @@ -1003,7 +1035,7 @@ void CAmSocketHandler::prepare(am::CAmSocketHandler::sh_poll_s& row) /** * fire callback */ -void CAmSocketHandler::fire(sh_poll_s& a) +void CAmSocketHandler::fire(const sh_poll_s& a) { try { @@ -1017,31 +1049,24 @@ void CAmSocketHandler::fire(sh_poll_s& a) /** * should disptach */ -bool CAmSocketHandler::noDispatching(const sh_poll_s& a) +bool CAmSocketHandler::noDispatching(const sh_poll_s* a) { //remove from list of there is no checkCB - if (nullptr == a.checkCB) + if (nullptr == a->checkCB || a->state != poll_states_e::VALID) return (true); - return (!a.checkCB(a.handle, a.userData)); + return (!a->checkCB(a->handle, a->userData)); } /** * disptach */ -bool CAmSocketHandler::dispatchingFinished(const sh_poll_s& a) +bool CAmSocketHandler::dispatchingFinished(const sh_poll_s* a) { //remove from list of there is no dispatchCB - if (nullptr == a.dispatchCB) + if (nullptr == a->dispatchCB || a->state != poll_states_e::VALID) return (true); - return (!a.dispatchCB(a.handle, a.userData)); -} -/** - * event triggered - */ -bool CAmSocketHandler::eventFired(const pollfd& a) -{ - return (a.revents == 0 ? false : true); + return (!a->dispatchCB(a->handle, a->userData)); } /** |