summaryrefslogtreecommitdiff
path: root/AudioManagerUtilities/src/CAmSocketHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'AudioManagerUtilities/src/CAmSocketHandler.cpp')
-rw-r--r--AudioManagerUtilities/src/CAmSocketHandler.cpp255
1 files changed, 140 insertions, 115 deletions
diff --git a/AudioManagerUtilities/src/CAmSocketHandler.cpp b/AudioManagerUtilities/src/CAmSocketHandler.cpp
index ba36363..7e12fc0 100644
--- a/AudioManagerUtilities/src/CAmSocketHandler.cpp
+++ b/AudioManagerUtilities/src/CAmSocketHandler.cpp
@@ -47,7 +47,7 @@ CAmSocketHandler::CAmSocketHandler() :
mEventFd(-1), //
mDispatchDone(true), //
mSetPollKeys(MAX_POLLHANDLE), //
- mListPoll(), //
+ mMapShPoll(), //
mSetTimerKeys(MAX_TIMERHANDLE),
mListTimer(), //
#ifndef WITH_TIMERFD
@@ -55,7 +55,6 @@ CAmSocketHandler::CAmSocketHandler() :
#endif
mSetSignalhandlerKeys(MAX_POLLHANDLE), //
mSignalHandlers(), //
- mRecreatePollfds(true),
mInternalCodes(internal_codes_e::NO_ERROR),
mSignalFdHandle(0)
#ifndef WITH_TIMERFD
@@ -71,6 +70,12 @@ CAmSocketHandler::CAmSocketHandler() :
if (bytes == sizeof(events))
{
if (events == UINT64_MAX-1)
+ {
+ for (auto & elem : mMapShPoll)
+ {
+ if (elem.second.state != poll_states_e::REMOVE)
+ elem.second.state = poll_states_e::UNINIT;
+ }
mDispatchDone = true;
return;
}
@@ -98,12 +103,9 @@ CAmSocketHandler::CAmSocketHandler() :
CAmSocketHandler::~CAmSocketHandler()
{
- for (auto it : mListPoll)
+ for (const auto& it : mMapShPoll)
{
- // This check is needed to ensure that fd wasn't closed
- // e.g. in the communication plugin's dtor's.
- if (fdIsValid(mEventFd))
- close(it.pollfdValue.fd);
+ close(it.second.pollfdValue.fd);
}
}
@@ -114,80 +116,78 @@ CAmSocketHandler::~CAmSocketHandler()
void CAmSocketHandler::start_listenting()
{
mDispatchDone = false;
- int16_t pollStatus;
#ifndef WITH_TIMERFD
clock_gettime(CLOCK_MONOTONIC, &mStartTime);
#endif
timespec buffertime;
- VectorListPoll_t cloneListPoll;
- VectorListPoll_t::iterator listmPollIt;
- VectorListPollfd_t fdPollingArray; //!<the polling array for ppoll
-
- auto preparePollfd = [&](const sh_poll_s& row)
- {
- CAmSocketHandler::prepare((sh_poll_s&)row);
- pollfd temp = row.pollfdValue;
- temp.revents = 0;
- fdPollingArray.push_back(temp);
- };
+ VectorPollfd_t fdPollingArray; //!<the polling array for ppoll
while (!mDispatchDone)
{
- if (mRecreatePollfds)
+ /* Iterate all times through map and synchronize the polling array accordingly.
+ * In case a new element in map appears the polling array will be extended and
+ * in case an element gets removed the map and the polling array needs to be adapted.
+ */
+ auto fdPollIt = fdPollingArray.begin();
+ for (auto it = mMapShPoll.begin(); it != mMapShPoll.end(); )
{
- fdPollingArray.clear();
- //freeze mListPoll by copying it - otherwise we get problems when we want to manipulate it during the next lines
- cloneListPoll = mListPoll;
- //there was a change in the setup, so we need to recreate the fdarray from the list
- std::for_each(cloneListPoll.begin(), cloneListPoll.end(), preparePollfd);
- mRecreatePollfds = false;
- }
- else
- {
- //first we go through the registered filedescriptors and check if someone needs preparation:
- std::for_each(cloneListPoll.begin(), cloneListPoll.end(), CAmSocketHandler::prepare);
+ // NOTE: The order of the switch/case statement is important and should modified with care
+ auto& elem = it->second;
+ switch (elem.state)
+ {
+ case poll_states_e::REMOVE:
+ close(elem.pollfdValue.fd);
+ fdPollIt = fdPollingArray.erase(fdPollIt);
+ it = mMapShPoll.erase(it);
+ continue;
+
+ case poll_states_e::UNINIT:
+ fdPollIt = fdPollingArray.emplace(fdPollIt);
+ // fallthrough
+
+ case poll_states_e::UPDATE:
+ CAmSocketHandler::prepare(elem);
+ elem.state = poll_states_e::VALID;
+ *fdPollIt = elem.pollfdValue;
+ break;
+
+ case poll_states_e::VALID:
+ default:
+ break;
+ }
+ // Ensures that fdPollIt will be never on its end before incrementing it further!
+ assert(fdPollIt != fdPollingArray.end());
+ ++fdPollIt;
+ ++it;
}
#ifndef WITH_TIMERFD
timerCorrection();
#endif
- //block until something is on a filedescriptor
- if ((pollStatus = ppoll(&fdPollingArray[0], fdPollingArray.size(), insertTime(buffertime), NULL)) < 0)
+ // block until something is on a file descriptor
+ int16_t pollStatus = ppoll(&fdPollingArray[0], fdPollingArray.size(), insertTime(buffertime), NULL);
+ if (pollStatus > 0)
{
- if (errno == EINTR)
- {
- //a signal was received, that means it's time to go...
- pollStatus = 0;
- }
- else
+ // stage 0+1, call firedCB
+ std::list<sh_poll_s*> listPoll;
+ for (auto& it : fdPollingArray)
{
- logError("SocketHandler::start_listenting ppoll returned with error", errno);
- throw std::runtime_error(std::string("SocketHandler::start_listenting ppoll returned with error."));
- }
- }
-
- if (pollStatus != 0) //only check filedescriptors if there was a change
- {
- std::list<sh_poll_s> listPoll;
- //todo: here could be a timer that makes sure naughty plugins return!
- //stage 0+1, call firedCB
- listmPollIt = cloneListPoll.begin();
- for (auto it : fdPollingArray)
- {
- if (CAmSocketHandler::eventFired(it))
- {
- listmPollIt->pollfdValue.revents = it.revents;
- listPoll.push_back(*listmPollIt);
- CAmSocketHandler::fire(*listmPollIt);
- }
- else
- {
- listmPollIt->pollfdValue.revents = 0;
- }
- listmPollIt++;
+ it.revents &= it.events;
+ if (it.revents == 0)
+ continue;
+
+ sh_poll_s& pollObj = mMapShPoll.at(it.fd);
+ if (pollObj.state != poll_states_e::VALID)
+ continue;
+
+ // ensure to copy the revents fired in fdPollingArray
+ pollObj.pollfdValue.revents = it.revents;
+ listPoll.push_back(&pollObj);
+ CAmSocketHandler::fire(pollObj);
+ it.revents = 0;
}
//stage 2, lets ask around if some dispatching is necessary, the ones who need stay on the list
@@ -198,16 +198,20 @@ void CAmSocketHandler::start_listenting()
{
listPoll.remove_if(CAmSocketHandler::dispatchingFinished);
} while (!listPoll.empty());
-
}
-#ifndef WITH_TIMERFD
+ else if ((pollStatus < 0) && (errno != EINTR))
+ {
+ logError("SocketHandler::start_listenting ppoll returned with error", errno);
+ throw std::runtime_error(std::string("SocketHandler::start_listenting ppoll returned with error."));
+ }
else //Timerevent
{
+#ifndef WITH_TIMERFD
//this was a timer event, we need to take care about the timers
//find out the timedifference to starttime
timerUp();
- }
#endif
+ }
}
}
@@ -253,14 +257,13 @@ bool CAmSocketHandler::fatalErrorOccurred()
am_Error_e CAmSocketHandler::getFDPollData(const sh_pollHandle_t handle, sh_poll_s & outPollData)
{
- VectorListPoll_t::iterator iterator = mListPoll.begin();
- for (; iterator != mListPoll.end(); ++iterator)
+ for (auto it : mMapShPoll)
{
- if (iterator->handle == handle)
- {
- outPollData = *iterator;
- return (E_OK);
- }
+ if (it.second.handle != handle)
+ continue;
+
+ outPollData = it.second;
+ return (E_OK);
}
return (E_UNKNOWN);
}
@@ -380,20 +383,35 @@ am_Error_e CAmSocketHandler::listenToSignals(const std::vector<uint8_t> & listSi
* @param dispatch a std::function that is called to dispatch the received data
* @param userData a pointer to userdata that is always passed around
* @param handle the handle of this poll
- * @return E_OK if the descriptor was added, E_NON_EXISTENT if the fd is not valid
+ * @return E_OK if the descriptor was added
+ * E_NON_EXISTENT if the fd is not valid
+ * E_ALREADY_EXISTS if the fd is already known
+ * E_NOT_POSSIBLE if the maximum handle threshold is reached
*/
-am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, std::function<void(const sh_pollHandle_t handle, void* userData)> prepare,
- std::function<void(const pollfd pollfd, const sh_pollHandle_t handle, void* userData)> fired, std::function<bool(const sh_pollHandle_t handle, void* userData)> check,
- std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatch, void* userData, sh_pollHandle_t& handle)
+am_Error_e CAmSocketHandler::addFDPoll(const int fd,
+ const short event,
+ std::function<void(const sh_pollHandle_t handle, void* userData)> prepare,
+ std::function<void(const pollfd pollfd, const sh_pollHandle_t handle, void* userData)> fired,
+ std::function<bool(const sh_pollHandle_t handle, void* userData)> check,
+ std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatch,
+ void* userData,
+ sh_pollHandle_t& handle)
{
if (!fdIsValid(fd))
- return (E_NON_EXISTENT);
+ return E_NON_EXISTENT;
+
+ const auto elem = mMapShPoll.find(fd);
+ if (elem != mMapShPoll.end() && elem->second.state != poll_states_e::REMOVE)
+ {
+ logError("CAmSocketHandler::addFDPoll fd", fd, "already registered!");
+ return E_ALREADY_EXISTS;
+ }
//create a new handle for the poll
if (!nextHandle(mSetPollKeys))
{
- logError("Could not create new polls, too many open!");
+ logError("CAmSocketHandler::addFDPoll Max handle count reached!");
return (E_NOT_POSSIBLE);
}
@@ -407,10 +425,9 @@ am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, std::fun
pollData.checkCB = check;
pollData.dispatchCB = dispatch;
pollData.userData = userData;
- //add new data to the list
- mListPoll.push_back(pollData);
- mRecreatePollfds = true;
+ //add new data to the list
+ mMapShPoll[fd] = pollData;
handle = pollData.handle;
return (E_OK);
@@ -427,15 +444,18 @@ am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, std::fun
* @param dispatch a callback that is called to dispatch the received data
* @param userData a pointer to userdata that is always passed around
* @param handle the handle of this poll
- * @return E_OK if the descriptor was added, E_NON_EXISTENT if the fd is not valid
+ * @return E_OK if the descriptor was added
+ * E_NON_EXISTENT if the fd is not valid
+ * E_ALREADY_EXISTS if the fd is already known
+ * E_NOT_POSSIBLE if the maximum handle threshold is reached
*/
am::am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, IAmShPollPrepare *prepare, IAmShPollFired *fired, IAmShPollCheck *check, IAmShPollDispatch *dispatch, void *userData, sh_pollHandle_t & handle)
{
- std::function<void(const sh_pollHandle_t handle, void* userData)> prepareCB; //preperation callback
- std::function<void(const pollfd poll, const sh_pollHandle_t handle, void* userData)> firedCB; //fired callback
- std::function<bool(const sh_pollHandle_t handle, void* userData)> checkCB; //check callback
- std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatchCB; //check callback
+ std::function<void(const sh_pollHandle_t, void*)> prepareCB; //preperation callback
+ std::function<void(const pollfd, const sh_pollHandle_t, void*)> firedCB; //fired callback
+ std::function<bool(const sh_pollHandle_t, void*)> checkCB; //check callback
+ std::function<bool(const sh_pollHandle_t, void*)> dispatchCB; //check callback
if (prepare)
prepareCB = std::bind(&IAmShPollPrepare::Call, prepare, std::placeholders::_1, std::placeholders::_2);
@@ -456,19 +476,25 @@ am::am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, IAmS
*/
am_Error_e CAmSocketHandler::removeFDPoll(const sh_pollHandle_t handle)
{
- VectorListPoll_t::iterator iterator = mListPoll.begin();
-
- for (; iterator != mListPoll.end(); ++iterator)
+ for (auto& it : mMapShPoll)
{
- if (iterator->handle == handle)
+ if (it.second.handle == handle)
{
- iterator = mListPoll.erase(iterator);
+ it.second.state = poll_states_e::REMOVE;
+
+ static uint64_t events(1);
+ if (write(mEventFd, &(++events), sizeof(events)) < 0)
+ {
+ std::ostringstream msg("CAmSocketHandler::removeFDPoll ");
+ msg << "Failed to write to event fd: " << mEventFd << " errno: " << std::strerror(errno);
+ throw std::runtime_error(msg.str());
+ }
mSetPollKeys.pollHandles.erase(handle);
- mRecreatePollfds = true;
- return (E_OK);
+ return E_OK;
}
}
- return (E_UNKNOWN);
+ logError("CAmSocketHandler::removeFDPoll handle unknown", handle);
+ return E_UNKNOWN;
}
/**
@@ -618,7 +644,13 @@ am_Error_e CAmSocketHandler::addTimer(const timespec & timeouts, std::function<v
{
timerItem.handle = handle;
mListTimer.push_back(timerItem);
+ return E_OK;
}
+
+ // E_NOT_POSSIBLE is the only case were we need to close the timer
+ if (err == E_NOT_POSSIBLE)
+ close(timerItem.fd);
+
return err;
#endif
@@ -877,14 +909,14 @@ am_Error_e CAmSocketHandler::stopTimer(const sh_timerHandle_t handle)
*/
am_Error_e CAmSocketHandler::updateEventFlags(const sh_pollHandle_t handle, const short events)
{
- VectorListPoll_t::iterator iterator = mListPoll.begin();
-
- for (; iterator != mListPoll.end(); ++iterator)
+ for (auto& it : mMapShPoll)
{
- if (iterator->handle == handle)
+ auto& elem = it.second;
+ if (elem.handle == handle && elem.state != poll_states_e::REMOVE)
{
- iterator->pollfdValue.events = events;
- mRecreatePollfds = true;
+ elem.pollfdValue.events = events;
+ elem.pollfdValue.revents = 0;
+ elem.state = poll_states_e::UPDATE;
return (E_OK);
}
}
@@ -1003,7 +1035,7 @@ void CAmSocketHandler::prepare(am::CAmSocketHandler::sh_poll_s& row)
/**
* fire callback
*/
-void CAmSocketHandler::fire(sh_poll_s& a)
+void CAmSocketHandler::fire(const sh_poll_s& a)
{
try
{
@@ -1017,31 +1049,24 @@ void CAmSocketHandler::fire(sh_poll_s& a)
/**
* should disptach
*/
-bool CAmSocketHandler::noDispatching(const sh_poll_s& a)
+bool CAmSocketHandler::noDispatching(const sh_poll_s* a)
{
//remove from list of there is no checkCB
- if (nullptr == a.checkCB)
+ if (nullptr == a->checkCB || a->state != poll_states_e::VALID)
return (true);
- return (!a.checkCB(a.handle, a.userData));
+ return (!a->checkCB(a->handle, a->userData));
}
/**
* disptach
*/
-bool CAmSocketHandler::dispatchingFinished(const sh_poll_s& a)
+bool CAmSocketHandler::dispatchingFinished(const sh_poll_s* a)
{
//remove from list of there is no dispatchCB
- if (nullptr == a.dispatchCB)
+ if (nullptr == a->dispatchCB || a->state != poll_states_e::VALID)
return (true);
- return (!a.dispatchCB(a.handle, a.userData));
-}
-/**
- * event triggered
- */
-bool CAmSocketHandler::eventFired(const pollfd& a)
-{
- return (a.revents == 0 ? false : true);
+ return (!a->dispatchCB(a->handle, a->userData));
}
/**