summaryrefslogtreecommitdiff
path: root/AudioManagerUtilities/src/CAmSocketHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'AudioManagerUtilities/src/CAmSocketHandler.cpp')
-rw-r--r--AudioManagerUtilities/src/CAmSocketHandler.cpp620
1 files changed, 353 insertions, 267 deletions
diff --git a/AudioManagerUtilities/src/CAmSocketHandler.cpp b/AudioManagerUtilities/src/CAmSocketHandler.cpp
index fad60e5..472fa7f 100644
--- a/AudioManagerUtilities/src/CAmSocketHandler.cpp
+++ b/AudioManagerUtilities/src/CAmSocketHandler.cpp
@@ -26,11 +26,12 @@
#include <sys/fcntl.h>
#include <sys/errno.h>
#include <sys/poll.h>
+#include <sys/eventfd.h>
#include <time.h>
#include <algorithm>
#include <features.h>
#include <csignal>
-#include <unistd.h>
+#include <cstring>
#include "CAmDltWrapper.h"
#include "CAmSocketHandler.h"
@@ -39,40 +40,66 @@
#include <sys/timerfd.h>
#endif
+#define END_EVENT (UINT64_MAX >> 1)
+
namespace am
{
CAmSocketHandler::CAmSocketHandler() :
- mPipe(), //
- mDispatchDone(true), //
- mSetPollKeys(MAX_POLLHANDLE), //
- mListPoll(), //
- mSetTimerKeys(MAX_TIMERHANDLE),
- mListTimer(), //
- mListActiveTimer(), //
- mSetSignalhandlerKeys(MAX_POLLHANDLE), //
- mSignalHandlers(), //
- mRecreatePollfds(true),
- mInternalCodes(internal_codes_e::NO_ERROR),
- mSignalFdHandle(0)
+ mEventFd(-1), //
+ mSignalFd(-1), //
+ mDispatchDone(true), //
+ mSetPollKeys(MAX_POLLHANDLE), //
+ mMapShPoll(), //
+ mSetTimerKeys(MAX_TIMERHANDLE),
+ mListTimer(), //
+#ifndef WITH_TIMERFD
+ mListActiveTimer(), //
+#endif
+ mSetSignalhandlerKeys(MAX_POLLHANDLE), //
+ mSignalHandlers(), //
+ mInternalCodes(internal_codes_e::NO_ERROR)
#ifndef WITH_TIMERFD
-,mStartTime() //
+ ,mStartTime() //
#endif
{
- if (pipe(mPipe) == -1)
+
+ auto actionPoll = [this](const pollfd pollfd, const sh_pollHandle_t, void*)
{
- mInternalCodes = internal_codes_e::PIPE_ERROR;
- logError("Sockethandler could not create pipe!");
- }
+ /* We have a valid signal, read the info from the fd */
+ uint64_t events;
+ ssize_t bytes = read(pollfd.fd, &events, sizeof(events));
+ if (bytes == sizeof(events))
+ {
+ if (events >= END_EVENT)
+ {
+ for (auto & elem : mMapShPoll)
+ {
+ if (elem.second.state == poll_states_e::UPDATE ||
+ elem.second.state == poll_states_e::VALID)
+ {
+ elem.second.state = poll_states_e::ADD;
+ }
+ }
+ mDispatchDone = true;
+ }
+ return;
+ }
+
+ // ppoll on EAGAIN
+ if ((bytes == -1) && (errno == EAGAIN))
+ return;
+
+ //Failed to read from event fd...
+ std::ostringstream msg;
+ msg << "Failed to read from event fd: " << pollfd.fd << " errno: " << std::strerror(errno);
+ throw std::runtime_error(msg.str());
+ };
//add the pipe to the poll - nothing needs to be processed here we just need the pipe to trigger the ppoll
- short event = 0;
sh_pollHandle_t handle;
- event |= POLLIN;
- if (addFDPoll(mPipe[0], event, NULL,
- [](const pollfd, const sh_pollHandle_t, void*){},
- [](const sh_pollHandle_t, void*) { return (false); },
- NULL, NULL, handle) != E_OK)
+ mEventFd = eventfd(1, EFD_NONBLOCK | EFD_CLOEXEC);
+ if (addFDPoll(mEventFd, POLLIN, NULL, actionPoll, NULL, NULL, NULL, handle) != E_OK)
{
mInternalCodes |= internal_codes_e::FD_ERROR;
}
@@ -80,12 +107,10 @@ CAmSocketHandler::CAmSocketHandler() :
CAmSocketHandler::~CAmSocketHandler()
{
- for (auto it : mListPoll)
+ for (const auto& it : mMapShPoll)
{
- close(it.pollfdValue.fd);
+ close(it.second.pollfdValue.fd);
}
- close(mPipe[0]);
- close(mPipe[1]);
}
//todo: maybe have some: give me more time returned?
@@ -95,80 +120,88 @@ CAmSocketHandler::~CAmSocketHandler()
void CAmSocketHandler::start_listenting()
{
mDispatchDone = false;
- int16_t pollStatus;
#ifndef WITH_TIMERFD
clock_gettime(CLOCK_MONOTONIC, &mStartTime);
#endif
timespec buffertime;
-
- VectorListPoll_t cloneListPoll;
- VectorListPoll_t::iterator listmPollIt;
- VectorListPollfd_t fdPollingArray; //!<the polling array for ppoll
-
- auto preparePollfd = [&](const sh_poll_s& row)
- {
- CAmSocketHandler::prepare((sh_poll_s&)row);
- pollfd temp = row.pollfdValue;
- temp.revents = 0;
- fdPollingArray.push_back(temp);
- };
+
+ VectorPollfd_t fdPollingArray; //!<the polling array for ppoll
while (!mDispatchDone)
{
- if (mRecreatePollfds)
+ /* Iterate all times through map and synchronize the polling array accordingly.
+ * In case a new element in map appears the polling array will be extended and
+ * in case an element gets removed the map and the polling array needs to be adapted.
+ */
+ auto fdPollIt = fdPollingArray.begin();
+ for (auto it = mMapShPoll.begin(); it != mMapShPoll.end(); )
{
- fdPollingArray.clear();
- //freeze mListPoll by copying it - otherwise we get problems when we want to manipulate it during the next lines
- cloneListPoll = mListPoll;
- //there was a change in the setup, so we need to recreate the fdarray from the list
- std::for_each(cloneListPoll.begin(), cloneListPoll.end(), preparePollfd);
- mRecreatePollfds = false;
+ // NOTE: The order of the switch/case statement reflects the state flow
+ auto& elem = it->second;
+ switch (elem.state)
+ {
+ case poll_states_e::ADD:
+ elem.state = poll_states_e::UPDATE;
+ fdPollIt = fdPollingArray.emplace(fdPollIt);
+ break;
+
+ case poll_states_e::UPDATE:
+ elem.state = poll_states_e::VALID;
+ CAmSocketHandler::prepare(elem);
+ *fdPollIt = elem.pollfdValue;
+ break;
+
+ case poll_states_e::VALID:
+ // check for multi-thread access
+ assert(fdPollIt != fdPollingArray.end());
+ ++fdPollIt;
+ ++it;
+ break;
+
+ case poll_states_e::REMOVE:
+ elem.state = poll_states_e::INVALID;
+ fdPollIt = fdPollingArray.erase(fdPollIt);
+ break;
+
+ case poll_states_e::INVALID:
+ it = mMapShPoll.erase(it);
+ break;
+ }
}
- else
+
+ if (fdPollingArray.size() != mMapShPoll.size())
{
- //first we go through the registered filedescriptors and check if someone needs preparation:
- std::for_each(cloneListPoll.begin(), cloneListPoll.end(), CAmSocketHandler::prepare);
+ mInternalCodes |= internal_codes_e::MT_ERROR;
+ logError("CAmSocketHandler::start_listenting is NOT multi-thread safe!");
+ return;
}
#ifndef WITH_TIMERFD
timerCorrection();
#endif
- //block until something is on a filedescriptor
- if ((pollStatus = ppoll(&fdPollingArray[0], fdPollingArray.size(), insertTime(buffertime), NULL)) < 0)
+ // block until something is on a file descriptor
+ int16_t pollStatus = ppoll(&fdPollingArray[0], fdPollingArray.size(), insertTime(buffertime), NULL);
+ if (pollStatus > 0)
{
- if (errno == EINTR)
- {
- //a signal was received, that means it's time to go...
- pollStatus = 0;
- }
- else
+ // stage 0+1, call firedCB
+ std::list<sh_poll_s*> listPoll;
+ for (auto& it : fdPollingArray)
{
- logError("SocketHandler::start_listenting ppoll returned with error", errno);
- throw std::runtime_error(std::string("SocketHandler::start_listenting ppoll returned with error."));
- }
- }
-
- if (pollStatus != 0) //only check filedescriptors if there was a change
- {
- std::list<sh_poll_s> listPoll;
- //todo: here could be a timer that makes sure naughty plugins return!
- //stage 0+1, call firedCB
- listmPollIt = cloneListPoll.begin();
- for (auto it : fdPollingArray)
- {
- if (CAmSocketHandler::eventFired(it))
- {
- listmPollIt->pollfdValue.revents = it.revents;
- listPoll.push_back(*listmPollIt);
- CAmSocketHandler::fire(*listmPollIt);
- }
- else
- {
- listmPollIt->pollfdValue.revents = 0;
- }
- listmPollIt++;
+ it.revents &= it.events;
+ if (it.revents == 0)
+ continue;
+
+ sh_poll_s& pollObj = mMapShPoll.at(it.fd);
+ if (pollObj.state != poll_states_e::VALID)
+ continue;
+
+ // ensure to copy the revents fired in fdPollingArray
+ pollObj.pollfdValue.revents = it.revents;
+ listPoll.push_back(&pollObj);
+ CAmSocketHandler::fire(pollObj);
+ it.revents = 0;
}
//stage 2, lets ask around if some dispatching is necessary, the ones who need stay on the list
@@ -179,16 +212,20 @@ void CAmSocketHandler::start_listenting()
{
listPoll.remove_if(CAmSocketHandler::dispatchingFinished);
} while (!listPoll.empty());
-
}
-#ifndef WITH_TIMERFD
+ else if ((pollStatus < 0) && (errno != EINTR))
+ {
+ logError("SocketHandler::start_listenting ppoll returned with error", errno);
+ throw std::runtime_error(std::string("SocketHandler::start_listenting ppoll returned with error."));
+ }
else //Timerevent
{
+#ifndef WITH_TIMERFD
//this was a timer event, we need to take care about the timers
//find out the timedifference to starttime
timerUp();
- }
#endif
+ }
}
}
@@ -197,7 +234,12 @@ void CAmSocketHandler::start_listenting()
*/
void CAmSocketHandler::stop_listening()
{
- mDispatchDone = true;
+ //fire the ending event
+ if (mDispatchDone)
+ return;
+
+ wakeupWorker("stop_listening", END_EVENT);
+
#ifndef WITH_TIMERFD
//this is for all running timers only - we need to handle the additional offset here
if (!mListActiveTimer.empty())
@@ -215,29 +257,22 @@ void CAmSocketHandler::exit_mainloop()
{
//end the while loop
stop_listening();
-
- //fire the ending filedescriptor
- int p(1);
- ssize_t result = write(mPipe[1], &p, sizeof(p));
}
-bool CAmSocketHandler::fatalErrorOccurred()
-{
- return ((mInternalCodes&internal_codes_e::PIPE_ERROR)>0)||((mInternalCodes&internal_codes_e::FD_ERROR)>0);
-}
-
-am_Error_e CAmSocketHandler::getFDPollData(const sh_pollHandle_t handle, sh_poll_s & outPollData)
+void CAmSocketHandler::wakeupWorker(const std::string & func, const uint64_t value)
{
- VectorListPoll_t::iterator iterator = mListPoll.begin();
- for (; iterator != mListPoll.end(); ++iterator)
+ if (write(mEventFd, &value, sizeof(value)) < 0)
{
- if (iterator->handle == handle)
- {
- outPollData = *iterator;
- return (E_OK);
- }
+ // no log message here, it is already done in main.cpp
+ std::ostringstream msg("CAmSocketHandler::");
+ msg << func << " Failed to write to event fd: " << mEventFd << " errno: " << std::strerror(errno);
+ throw std::runtime_error(msg.str());
}
- return (E_UNKNOWN);
+}
+
+bool CAmSocketHandler::fatalErrorOccurred()
+{
+ return (mInternalCodes != internal_codes_e::NO_ERROR);
}
/**
@@ -287,54 +322,51 @@ am_Error_e CAmSocketHandler::listenToSignals(const std::vector<uint8_t> & listSi
return (E_NOT_POSSIBLE);
}
- int signalHandlerFd;
- if(mSignalFdHandle)
+ if (mSignalFd < 0)
+ {
+ /* Create the signalfd */
+ mSignalFd = signalfd(-1, &sigset, SFD_NONBLOCK);
+ if (mSignalFd == -1)
+ {
+ logError("Could not open signal fd!", std::strerror(errno));
+ return (E_NOT_POSSIBLE);
+ }
+
+ auto actionPoll = [this](const pollfd pollfd, const sh_pollHandle_t, void*)
+ {
+ /* We have a valid signal, read the info from the fd */
+ struct signalfd_siginfo info;
+ ssize_t bytes = read(pollfd.fd, &info, sizeof(info));
+ if (bytes == sizeof(info))
+ {
+ /* Notify all listeners */
+ for(const auto& it: mSignalHandlers)
+ it.callback(it.handle, info, it.userData);
+ return;
+ }
+
+ // ppoll on EAGAIN
+ if ((bytes == -1) && (errno == EAGAIN))
+ return;
+
+ //Failed to read from fd...
+ std::ostringstream msg;
+ msg << "Failed to read from signal fd: " << pollfd.fd << " errno: " << std::strerror(errno);
+ throw std::runtime_error(msg.str());
+ };
+ /* We're going to add the signal fd through addFDPoll. At this point we don't have any signal listeners. */
+ sh_pollHandle_t handle;
+ return addFDPoll(mSignalFd, POLLIN | POLLERR | POLLHUP, NULL, actionPoll, NULL, NULL, NULL, handle);
+ }
+ else
{
- sh_poll_s sgPollData;
- if(E_OK!=getFDPollData(mSignalFdHandle, sgPollData))
- {
- removeFDPoll(mSignalFdHandle);
- mSignalFdHandle = 0;
- }
- else
- {
- int signalHandlerFd = signalfd(sgPollData.pollfdValue.fd, &sigset, 0);
- if (signalHandlerFd == -1)
+ if (signalfd(mSignalFd, &sigset, 0) == -1)
{
- logError("Could not update signal fd!");
+ logError("Could not update signal fd!", std::strerror(errno));
return (E_NOT_POSSIBLE);
}
return E_OK;
- }
}
-
- if(0==mSignalFdHandle)
- {
- /* Create the signalfd */
- signalHandlerFd = signalfd(-1, &sigset, 0);
- if (signalHandlerFd == -1)
- {
- logError("Could not open signal fd!");
- return (E_NOT_POSSIBLE);
- }
-
- auto actionPoll = [this](const pollfd pollfd, const sh_pollHandle_t, void*)
- {
- const VectorSignalHandlers_t & signalHandlers = mSignalHandlers;
- /* We have a valid signal, read the info from the fd */
- struct signalfd_siginfo info;
- ssize_t bytes = read(pollfd.fd, &info, sizeof(info));
- assert(bytes == sizeof(info));
-
- /* Notify all listeners */
- for(auto it: signalHandlers)
- it.callback(it.handle, info, it.userData);
- };
- /* We're going to add the signal fd through addFDPoll. At this point we don't have any signal listeners. */
- am_Error_e shFdError = addFDPoll(signalHandlerFd, POLLIN | POLLERR | POLLHUP, NULL, actionPoll, [](const sh_pollHandle_t, void*)
- { return (false);}, NULL, NULL, mSignalFdHandle);
- return shFdError;
- }
}
/**
@@ -347,24 +379,53 @@ am_Error_e CAmSocketHandler::listenToSignals(const std::vector<uint8_t> & listSi
* @param dispatch a std::function that is called to dispatch the received data
* @param userData a pointer to userdata that is always passed around
* @param handle the handle of this poll
- * @return E_OK if the descriptor was added, E_NON_EXISTENT if the fd is not valid
+ * @return E_OK if the descriptor was added
+ * E_NON_EXISTENT if the fd is not valid
+ * E_ALREADY_EXISTS if the fd is already known
+ * E_NOT_POSSIBLE if the maximum handle threshold is reached
*/
-am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, std::function<void(const sh_pollHandle_t handle, void* userData)> prepare,
- std::function<void(const pollfd pollfd, const sh_pollHandle_t handle, void* userData)> fired, std::function<bool(const sh_pollHandle_t handle, void* userData)> check,
- std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatch, void* userData, sh_pollHandle_t& handle)
+am_Error_e CAmSocketHandler::addFDPoll(const int fd,
+ const short event,
+ std::function<void(const sh_pollHandle_t handle, void* userData)> prepare,
+ std::function<void(const pollfd pollfd, const sh_pollHandle_t handle, void* userData)> fired,
+ std::function<bool(const sh_pollHandle_t handle, void* userData)> check,
+ std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatch,
+ void* userData,
+ sh_pollHandle_t& handle)
{
+ sh_poll_s pollData;
+
if (!fdIsValid(fd))
- return (E_NON_EXISTENT);
+ return E_NON_EXISTENT;
+
+ const auto elem = mMapShPoll.find(fd);
+ if (elem != mMapShPoll.end())
+ {
+ // The fd was already in map therefore we need to trigger an update instead
+ switch (elem->second.state)
+ {
+ case poll_states_e::REMOVE:
+ pollData.state = poll_states_e::UPDATE;
+ break;
+
+ case poll_states_e::INVALID:
+ pollData.state = poll_states_e::ADD;
+ break;
+
+ default:
+ logError("CAmSocketHandler::addFDPoll fd", fd, "already registered!");
+ return E_ALREADY_EXISTS;
+ }
+ }
//create a new handle for the poll
if (!nextHandle(mSetPollKeys))
{
- logError("Could not create new polls, too many open!");
+ logError("CAmSocketHandler::addFDPoll Max handle count reached!");
return (E_NOT_POSSIBLE);
}
- sh_poll_s pollData;
pollData.pollfdValue.fd = fd;
pollData.handle = mSetPollKeys.lastUsedID;
pollData.pollfdValue.events = event;
@@ -374,10 +435,10 @@ am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, std::fun
pollData.checkCB = check;
pollData.dispatchCB = dispatch;
pollData.userData = userData;
- //add new data to the list
- mListPoll.push_back(pollData);
- mRecreatePollfds = true;
+ //add new data to the list
+ mMapShPoll[fd] = pollData;
+ wakeupWorker("addFDPoll");
handle = pollData.handle;
return (E_OK);
@@ -394,15 +455,18 @@ am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, std::fun
* @param dispatch a callback that is called to dispatch the received data
* @param userData a pointer to userdata that is always passed around
* @param handle the handle of this poll
- * @return E_OK if the descriptor was added, E_NON_EXISTENT if the fd is not valid
+ * @return E_OK if the descriptor was added
+ * E_NON_EXISTENT if the fd is not valid
+ * E_ALREADY_EXISTS if the fd is already known
+ * E_NOT_POSSIBLE if the maximum handle threshold is reached
*/
am::am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, IAmShPollPrepare *prepare, IAmShPollFired *fired, IAmShPollCheck *check, IAmShPollDispatch *dispatch, void *userData, sh_pollHandle_t & handle)
{
- std::function<void(const sh_pollHandle_t handle, void* userData)> prepareCB; //preperation callback
- std::function<void(const pollfd poll, const sh_pollHandle_t handle, void* userData)> firedCB; //fired callback
- std::function<bool(const sh_pollHandle_t handle, void* userData)> checkCB; //check callback
- std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatchCB; //check callback
+ std::function<void(const sh_pollHandle_t, void*)> prepareCB; //preperation callback
+ std::function<void(const pollfd, const sh_pollHandle_t, void*)> firedCB; //fired callback
+ std::function<bool(const sh_pollHandle_t, void*)> checkCB; //check callback
+ std::function<bool(const sh_pollHandle_t, void*)> dispatchCB; //check callback
if (prepare)
prepareCB = std::bind(&IAmShPollPrepare::Call, prepare, std::placeholders::_1, std::placeholders::_2);
@@ -419,23 +483,23 @@ am::am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, IAmS
/**
* removes a filedescriptor from the poll loop
* @param handle
+ * @param [rmv] default RMV_ONLY_FDPOLL
* @return
*/
am_Error_e CAmSocketHandler::removeFDPoll(const sh_pollHandle_t handle)
{
- VectorListPoll_t::iterator iterator = mListPoll.begin();
-
- for (; iterator != mListPoll.end(); ++iterator)
+ for (auto& it : mMapShPoll)
{
- if (iterator->handle == handle)
+ if (it.second.handle == handle)
{
- iterator = mListPoll.erase(iterator);
+ it.second.state = (it.second.state == poll_states_e::ADD ? poll_states_e::INVALID : poll_states_e::REMOVE);
+ wakeupWorker("removeFDPoll");
mSetPollKeys.pollHandles.erase(handle);
- mRecreatePollfds = true;
- return (E_OK);
+ return E_OK;
}
}
- return (E_UNKNOWN);
+ logWarning("CAmSocketHandler::removeFDPoll handle unknown", handle);
+ return E_UNKNOWN;
}
/**
@@ -449,7 +513,7 @@ am_Error_e CAmSocketHandler::addSignalHandler(std::function<void(const sh_pollHa
{
if (!nextHandle(mSetSignalhandlerKeys))
{
- logError("Could not create new polls, too many open!");
+ logError("CAmSocketHandler::addSignalHandler Could not create new polls, too many open!");
return (E_NOT_POSSIBLE);
}
@@ -474,7 +538,7 @@ am_Error_e CAmSocketHandler::removeSignalHandler(const sh_pollHandle_t handle)
{
if (it->handle == handle)
{
- it = mSignalHandlers.erase(it);
+ mSignalHandlers.erase(it);
mSetSignalhandlerKeys.pollHandles.erase(handle);
return (E_OK);
}
@@ -508,17 +572,17 @@ am_Error_e CAmSocketHandler::addTimer(const timespec & timeouts, std::function<v
{
assert(!((timeouts.tv_sec == 0) && (timeouts.tv_nsec == 0)));
- mListTimer.emplace_back();
- sh_timer_s & timerItem = mListTimer.back();
-
#ifndef WITH_TIMERFD
//create a new handle for the timer
if (!nextHandle(mSetTimerKeys))
{
- logError("Could not create new timers, too many open!");
- mListTimer.pop_back();
+ logError("CAmSocketHandler::addTimer Could not create new timers, too many open!");
return (E_NOT_POSSIBLE);
}
+
+ mListTimer.emplace_back();
+ sh_timer_s & timerItem = mListTimer.back();
+
//create a new handle for the timer
handle = mSetTimerKeys.lastUsedID;
@@ -533,12 +597,13 @@ am_Error_e CAmSocketHandler::addTimer(const timespec & timeouts, std::function<v
clock_gettime(CLOCK_MONOTONIC, &currentTime);
if (!mDispatchDone)//the mainloop is started
timerItem.countdown = timespecAdd(timeouts, timespecSub(currentTime, mStartTime));
- mListTimer.push_back(timerItem);
mListActiveTimer.push_back(timerItem);
mListActiveTimer.sort(compareCountdown);
return (E_OK);
#else
+ sh_timer_s timerItem;
+
timerItem.countdown.it_value = timeouts;
if (repeats)
timerItem.countdown.it_interval = timeouts;
@@ -554,35 +619,43 @@ am_Error_e CAmSocketHandler::addTimer(const timespec & timeouts, std::function<v
timerItem.userData = userData;
am_Error_e err = createTimeFD(timerItem.countdown, timerItem.fd);
if (err != E_OK)
- {
- mListTimer.pop_back();
return err;
- }
- static auto actionPoll = [](const pollfd pollfd, const sh_pollHandle_t handle, void* userData)
+ auto actionPoll = [this](const pollfd pollfd, const sh_pollHandle_t handle, void* userData)
{
- uint64_t mExpirations;
- if (read(pollfd.fd, &mExpirations, sizeof(uint64_t)) == -1)
- {
- //error received...try again
- read(pollfd.fd, &mExpirations, sizeof(uint64_t));
- }
+ uint64_t expCnt;
+ ssize_t bytes = read(pollfd.fd, &expCnt, sizeof(expCnt));
+ if (bytes == sizeof(expCnt))
+ return;
+
+ // ppoll has to be called again in following case
+ if ((bytes == -1) && (errno == EAGAIN))
+ return;
+
+ // failed to read data from timer_fd...
+ std::ostringstream msg;
+ msg << "Failed to read from timer fd: " << pollfd.fd << " errno: " << std::strerror(errno);
+ throw std::runtime_error(msg.str());
};
- err = addFDPoll(timerItem.fd, POLLIN, NULL, actionPoll, [callback](const sh_pollHandle_t handle, void* userData)->bool
- {
- callback(handle, userData);
- return false;
- },
- NULL, userData, handle);
- if (E_OK == err)
+ err = addFDPoll(timerItem.fd, POLLIN | POLLERR, NULL, actionPoll,
+ [callback](const sh_pollHandle_t handle, void* userData)->bool {
+ callback(handle, userData);
+ return false;
+ },
+ NULL, userData, handle);
+
+ if (err == E_OK)
{
timerItem.handle = handle;
+ mListTimer.push_back(timerItem);
+ return E_OK;
}
- else
- {
- mListTimer.pop_back();
- }
+
+ // E_NOT_POSSIBLE is the only case were we need to close the timer
+ if (err == E_NOT_POSSIBLE)
+ close(timerItem.fd);
+
return err;
#endif
@@ -599,29 +672,32 @@ am_Error_e CAmSocketHandler::removeTimer(const sh_timerHandle_t handle)
//stop the current timer
#ifdef WITH_TIMERFD
- std::list<sh_timer_s>::iterator it = mListTimer.begin();
- for (; it != mListTimer.end(); ++it)
+ std::list<sh_timer_s>::iterator it(mListTimer.begin());
+ while (it != mListTimer.end())
{
if (it->handle == handle)
- break;
+ {
+ am_Error_e err = removeFDPoll(handle);
+ close(it->fd);
+ mListTimer.erase(it);
+ return err;
+ }
+ ++it;
}
- if (it == mListTimer.end())
- return (E_NON_EXISTENT);
+ return (E_NON_EXISTENT);
- close(it->fd);
- mListTimer.erase(it);
- return removeFDPoll(handle);
#else
stopTimer(handle);
std::list<sh_timer_s>::iterator it(mListTimer.begin());
- for (; it != mListTimer.end(); ++it)
+ while (it != mListTimer.end())
{
if (it->handle == handle)
{
- it = mListTimer.erase(it);
+ mListTimer.erase(it);
mSetTimerKeys.pollHandles.erase(handle);
return (E_OK);
}
+ ++it;
}
return (E_UNKNOWN);
#endif
@@ -798,38 +874,38 @@ am_Error_e CAmSocketHandler::restartTimer(const sh_timerHandle_t handle)
am_Error_e CAmSocketHandler::stopTimer(const sh_timerHandle_t handle)
{
#ifdef WITH_TIMERFD
- std::list<sh_timer_s>::iterator it = mListTimer.begin();
- for (; it != mListTimer.end(); ++it)
+ for (auto elem : mListTimer)
{
- if (it->handle == handle)
- break;
- }
- if (it == mListTimer.end())
- return (E_NON_EXISTENT);
+ if (elem.handle != handle)
+ continue;
- itimerspec countdown = it->countdown;
- countdown.it_value.tv_nsec = 0;
- countdown.it_value.tv_sec = 0;
+ itimerspec countdown = elem.countdown;
+ countdown.it_value.tv_nsec = 0;
+ countdown.it_value.tv_sec = 0;
- if (timerfd_settime(it->fd, 0, &countdown, NULL))
- {
- logError("Failed to set timer duration");
- return E_NOT_POSSIBLE;
+ if (timerfd_settime(elem.fd, 0, &countdown, NULL) < 0)
+ {
+ logError("Failed to set timer duration");
+ return E_NOT_POSSIBLE;
+ }
+
+ return E_OK;
}
- return (E_OK);
-#else
+#else
//go through the list and remove the timer with the handle
std::list<sh_timer_s>::iterator it(mListActiveTimer.begin());
- for (; it != mListActiveTimer.end(); ++it)
+ while (it != mListActiveTimer.end())
{
if (it->handle == handle)
{
- it = mListActiveTimer.erase(it);
- return (E_OK);
+ mListActiveTimer.erase(it);
+ return E_OK;
}
+ ++it;
}
- return (E_NON_EXISTENT);
#endif
+
+ return E_NON_EXISTENT;
}
/**
@@ -840,15 +916,28 @@ am_Error_e CAmSocketHandler::stopTimer(const sh_timerHandle_t handle)
*/
am_Error_e CAmSocketHandler::updateEventFlags(const sh_pollHandle_t handle, const short events)
{
- VectorListPoll_t::iterator iterator = mListPoll.begin();
-
- for (; iterator != mListPoll.end(); ++iterator)
+ for (auto& it : mMapShPoll)
{
- if (iterator->handle == handle)
+ auto& elem = it.second;
+ if (elem.handle != handle)
+ continue;
+
+ switch (elem.state)
{
- iterator->pollfdValue.events = events;
- mRecreatePollfds = true;
- return (E_OK);
+ case poll_states_e::ADD:
+ elem.pollfdValue.events = events;
+ return (E_OK);
+
+ case poll_states_e::UPDATE:
+ case poll_states_e::VALID:
+ elem.state = poll_states_e::UPDATE;
+ elem.pollfdValue.revents = 0;
+ elem.pollfdValue.events = events;
+ return (E_OK);
+
+ default:
+ // This issue should never happen!
+ return (E_DATABASE_ERROR);
}
}
return (E_UNKNOWN);
@@ -951,60 +1040,55 @@ void CAmSocketHandler::timerCorrection()
*/
void CAmSocketHandler::prepare(am::CAmSocketHandler::sh_poll_s& row)
{
- if (row.prepareCB)
+ if (!row.prepareCB)
+ return;
+
+ try
{
- try
- {
- row.prepareCB(row.handle, row.userData);
- } catch (std::exception& e)
- {
- logError("Sockethandler: Exception in Preparecallback,caught", e.what());
- }
+ row.prepareCB(row.handle, row.userData);
+ }
+ catch (std::exception& e)
+ {
+ logError("CAmSocketHandler::prepare Exception caught", e.what());
}
}
/**
* fire callback
*/
-void CAmSocketHandler::fire(sh_poll_s& a)
+void CAmSocketHandler::fire(const sh_poll_s& a)
{
try
{
a.firedCB(a.pollfdValue, a.handle, a.userData);
- } catch (std::exception& e)
+ }
+ catch (std::exception& e)
{
- logError("Sockethandler: Exception in Preparecallback,caught", e.what());
+ logError("CAmSocketHandler::fire Exception caught", e.what());
}
}
/**
* should disptach
*/
-bool CAmSocketHandler::noDispatching(const sh_poll_s& a)
+bool CAmSocketHandler::noDispatching(const sh_poll_s* a)
{
//remove from list of there is no checkCB
- if (nullptr == a.checkCB)
+ if (nullptr == a->checkCB || a->state != poll_states_e::VALID)
return (true);
- return (!a.checkCB(a.handle, a.userData));
+ return (!a->checkCB(a->handle, a->userData));
}
/**
* disptach
*/
-bool CAmSocketHandler::dispatchingFinished(const sh_poll_s& a)
+bool CAmSocketHandler::dispatchingFinished(const sh_poll_s* a)
{
//remove from list of there is no dispatchCB
- if (nullptr == a.dispatchCB)
+ if (nullptr == a->dispatchCB || a->state != poll_states_e::VALID)
return (true);
- return (!a.dispatchCB(a.handle, a.userData));
-}
-/**
- * event triggered
- */
-bool CAmSocketHandler::eventFired(const pollfd& a)
-{
- return (a.revents == 0 ? false : true);
+ return (!a->dispatchCB(a->handle, a->userData));
}
/**
@@ -1031,15 +1115,17 @@ inline timespec* CAmSocketHandler::insertTime(timespec& buffertime)
am_Error_e CAmSocketHandler::createTimeFD(const itimerspec & timeouts, int & fd)
{
fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
- if (fd <= 0)
+ if (fd < 0)
{
- logError("Failed to create timer");
+ logError("CAmSocketHandler::createTimeFD Failed with", static_cast<const char*>(std::strerror(errno)));
return E_NOT_POSSIBLE;
}
- if (timerfd_settime(fd, 0, &timeouts, NULL))
+ if (timerfd_settime(fd, 0, &timeouts, NULL) < 0)
{
- logError("Failed to set timer duration");
+ logError("CAmSocketHandler::createTimeFD Failed to set duration for", fd);
+ close(fd);
+ fd = -1;
return E_NOT_POSSIBLE;
}
return E_OK;
@@ -1051,9 +1137,10 @@ void CAmSocketHandler::callTimer(sh_timer_s& a)
try
{
a.callback(a.handle, a.userData);
- } catch (std::exception& e)
+ }
+ catch (std::exception& e)
{
- logError("Sockethandler: Exception in Timercallback,caught", e.what());
+ logError("CAmSocketHandler::callTimer() Exception caught", e.what());
}
}
@@ -1070,7 +1157,6 @@ bool CAmSocketHandler::nextHandle(sh_identifier_s & handle)
}
if (handle.lastUsedID == lastHandle)
{
- logError("Could not create new polls, too many open!");
return (false);
}