summaryrefslogtreecommitdiff
path: root/AudioManagerUtilities/src/CAmSocketHandler.cpp
diff options
context:
space:
mode:
authorJens Lorenz <jlorenz@de.adit-jv.com>2018-04-04 09:47:25 +0200
committerJens Lorenz <jlorenz@de.adit-jv.com>2018-04-09 17:00:47 +0200
commit29b816429d141584af128256545ca0dc96ce0be3 (patch)
tree1a92719a70418f91284174eb0c079018c65c8bfa /AudioManagerUtilities/src/CAmSocketHandler.cpp
parente9240071f34ae96f72f4befd19f6fa68cc721ad1 (diff)
downloadaudiomanager-29b816429d141584af128256545ca0dc96ce0be3.tar.gz
AMUtil: Rework of socketHandler to avoid calls of invalidated objects
This patch tries to follow the idea raised in PR26. Following two patches have been reworked: commit: cfe0e77aaf87a0590ceea42f6afa62b0c7d95e80 commit: bc33226f59910a960f62d419ba10d4ea761e3724 The biggest change applies to the internal database. Instead of having a vector for all items which will be copied inside the worker thread the new approach aims a central map which allows to store the sh_poll elements in containers. By this a container is valid until it is remove from map. The remove of items inside a map is now centralized within the worker and only the worker is responsible to keep the ppoll list and the map in sync. This patch also extends the unit tests to stress different timer scenarios. Signed-off-by: Aleksandar Donchev <Aleksander.Donchev@partner.bmw.de> Signed-off-by: Jens Lorenz <jlorenz@de.adit-jv.com>
Diffstat (limited to 'AudioManagerUtilities/src/CAmSocketHandler.cpp')
-rw-r--r--AudioManagerUtilities/src/CAmSocketHandler.cpp255
1 files changed, 140 insertions, 115 deletions
diff --git a/AudioManagerUtilities/src/CAmSocketHandler.cpp b/AudioManagerUtilities/src/CAmSocketHandler.cpp
index ba36363..7e12fc0 100644
--- a/AudioManagerUtilities/src/CAmSocketHandler.cpp
+++ b/AudioManagerUtilities/src/CAmSocketHandler.cpp
@@ -47,7 +47,7 @@ CAmSocketHandler::CAmSocketHandler() :
mEventFd(-1), //
mDispatchDone(true), //
mSetPollKeys(MAX_POLLHANDLE), //
- mListPoll(), //
+ mMapShPoll(), //
mSetTimerKeys(MAX_TIMERHANDLE),
mListTimer(), //
#ifndef WITH_TIMERFD
@@ -55,7 +55,6 @@ CAmSocketHandler::CAmSocketHandler() :
#endif
mSetSignalhandlerKeys(MAX_POLLHANDLE), //
mSignalHandlers(), //
- mRecreatePollfds(true),
mInternalCodes(internal_codes_e::NO_ERROR),
mSignalFdHandle(0)
#ifndef WITH_TIMERFD
@@ -71,6 +70,12 @@ CAmSocketHandler::CAmSocketHandler() :
if (bytes == sizeof(events))
{
if (events == UINT64_MAX-1)
+ {
+ for (auto & elem : mMapShPoll)
+ {
+ if (elem.second.state != poll_states_e::REMOVE)
+ elem.second.state = poll_states_e::UNINIT;
+ }
mDispatchDone = true;
return;
}
@@ -98,12 +103,9 @@ CAmSocketHandler::CAmSocketHandler() :
CAmSocketHandler::~CAmSocketHandler()
{
- for (auto it : mListPoll)
+ for (const auto& it : mMapShPoll)
{
- // This check is needed to ensure that fd wasn't closed
- // e.g. in the communication plugin's dtor's.
- if (fdIsValid(mEventFd))
- close(it.pollfdValue.fd);
+ close(it.second.pollfdValue.fd);
}
}
@@ -114,80 +116,78 @@ CAmSocketHandler::~CAmSocketHandler()
void CAmSocketHandler::start_listenting()
{
mDispatchDone = false;
- int16_t pollStatus;
#ifndef WITH_TIMERFD
clock_gettime(CLOCK_MONOTONIC, &mStartTime);
#endif
timespec buffertime;
- VectorListPoll_t cloneListPoll;
- VectorListPoll_t::iterator listmPollIt;
- VectorListPollfd_t fdPollingArray; //!<the polling array for ppoll
-
- auto preparePollfd = [&](const sh_poll_s& row)
- {
- CAmSocketHandler::prepare((sh_poll_s&)row);
- pollfd temp = row.pollfdValue;
- temp.revents = 0;
- fdPollingArray.push_back(temp);
- };
+ VectorPollfd_t fdPollingArray; //!<the polling array for ppoll
while (!mDispatchDone)
{
- if (mRecreatePollfds)
+ /* Iterate all times through map and synchronize the polling array accordingly.
+ * In case a new element in map appears the polling array will be extended and
+ * in case an element gets removed the map and the polling array needs to be adapted.
+ */
+ auto fdPollIt = fdPollingArray.begin();
+ for (auto it = mMapShPoll.begin(); it != mMapShPoll.end(); )
{
- fdPollingArray.clear();
- //freeze mListPoll by copying it - otherwise we get problems when we want to manipulate it during the next lines
- cloneListPoll = mListPoll;
- //there was a change in the setup, so we need to recreate the fdarray from the list
- std::for_each(cloneListPoll.begin(), cloneListPoll.end(), preparePollfd);
- mRecreatePollfds = false;
- }
- else
- {
- //first we go through the registered filedescriptors and check if someone needs preparation:
- std::for_each(cloneListPoll.begin(), cloneListPoll.end(), CAmSocketHandler::prepare);
+ // NOTE: The order of the switch/case statement is important and should modified with care
+ auto& elem = it->second;
+ switch (elem.state)
+ {
+ case poll_states_e::REMOVE:
+ close(elem.pollfdValue.fd);
+ fdPollIt = fdPollingArray.erase(fdPollIt);
+ it = mMapShPoll.erase(it);
+ continue;
+
+ case poll_states_e::UNINIT:
+ fdPollIt = fdPollingArray.emplace(fdPollIt);
+ // fallthrough
+
+ case poll_states_e::UPDATE:
+ CAmSocketHandler::prepare(elem);
+ elem.state = poll_states_e::VALID;
+ *fdPollIt = elem.pollfdValue;
+ break;
+
+ case poll_states_e::VALID:
+ default:
+ break;
+ }
+ // Ensures that fdPollIt will be never on its end before incrementing it further!
+ assert(fdPollIt != fdPollingArray.end());
+ ++fdPollIt;
+ ++it;
}
#ifndef WITH_TIMERFD
timerCorrection();
#endif
- //block until something is on a filedescriptor
- if ((pollStatus = ppoll(&fdPollingArray[0], fdPollingArray.size(), insertTime(buffertime), NULL)) < 0)
+ // block until something is on a file descriptor
+ int16_t pollStatus = ppoll(&fdPollingArray[0], fdPollingArray.size(), insertTime(buffertime), NULL);
+ if (pollStatus > 0)
{
- if (errno == EINTR)
- {
- //a signal was received, that means it's time to go...
- pollStatus = 0;
- }
- else
+ // stage 0+1, call firedCB
+ std::list<sh_poll_s*> listPoll;
+ for (auto& it : fdPollingArray)
{
- logError("SocketHandler::start_listenting ppoll returned with error", errno);
- throw std::runtime_error(std::string("SocketHandler::start_listenting ppoll returned with error."));
- }
- }
-
- if (pollStatus != 0) //only check filedescriptors if there was a change
- {
- std::list<sh_poll_s> listPoll;
- //todo: here could be a timer that makes sure naughty plugins return!
- //stage 0+1, call firedCB
- listmPollIt = cloneListPoll.begin();
- for (auto it : fdPollingArray)
- {
- if (CAmSocketHandler::eventFired(it))
- {
- listmPollIt->pollfdValue.revents = it.revents;
- listPoll.push_back(*listmPollIt);
- CAmSocketHandler::fire(*listmPollIt);
- }
- else
- {
- listmPollIt->pollfdValue.revents = 0;
- }
- listmPollIt++;
+ it.revents &= it.events;
+ if (it.revents == 0)
+ continue;
+
+ sh_poll_s& pollObj = mMapShPoll.at(it.fd);
+ if (pollObj.state != poll_states_e::VALID)
+ continue;
+
+ // ensure to copy the revents fired in fdPollingArray
+ pollObj.pollfdValue.revents = it.revents;
+ listPoll.push_back(&pollObj);
+ CAmSocketHandler::fire(pollObj);
+ it.revents = 0;
}
//stage 2, lets ask around if some dispatching is necessary, the ones who need stay on the list
@@ -198,16 +198,20 @@ void CAmSocketHandler::start_listenting()
{
listPoll.remove_if(CAmSocketHandler::dispatchingFinished);
} while (!listPoll.empty());
-
}
-#ifndef WITH_TIMERFD
+ else if ((pollStatus < 0) && (errno != EINTR))
+ {
+ logError("SocketHandler::start_listenting ppoll returned with error", errno);
+ throw std::runtime_error(std::string("SocketHandler::start_listenting ppoll returned with error."));
+ }
else //Timerevent
{
+#ifndef WITH_TIMERFD
//this was a timer event, we need to take care about the timers
//find out the timedifference to starttime
timerUp();
- }
#endif
+ }
}
}
@@ -253,14 +257,13 @@ bool CAmSocketHandler::fatalErrorOccurred()
am_Error_e CAmSocketHandler::getFDPollData(const sh_pollHandle_t handle, sh_poll_s & outPollData)
{
- VectorListPoll_t::iterator iterator = mListPoll.begin();
- for (; iterator != mListPoll.end(); ++iterator)
+ for (auto it : mMapShPoll)
{
- if (iterator->handle == handle)
- {
- outPollData = *iterator;
- return (E_OK);
- }
+ if (it.second.handle != handle)
+ continue;
+
+ outPollData = it.second;
+ return (E_OK);
}
return (E_UNKNOWN);
}
@@ -380,20 +383,35 @@ am_Error_e CAmSocketHandler::listenToSignals(const std::vector<uint8_t> & listSi
* @param dispatch a std::function that is called to dispatch the received data
* @param userData a pointer to userdata that is always passed around
* @param handle the handle of this poll
- * @return E_OK if the descriptor was added, E_NON_EXISTENT if the fd is not valid
+ * @return E_OK if the descriptor was added
+ * E_NON_EXISTENT if the fd is not valid
+ * E_ALREADY_EXISTS if the fd is already known
+ * E_NOT_POSSIBLE if the maximum handle threshold is reached
*/
-am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, std::function<void(const sh_pollHandle_t handle, void* userData)> prepare,
- std::function<void(const pollfd pollfd, const sh_pollHandle_t handle, void* userData)> fired, std::function<bool(const sh_pollHandle_t handle, void* userData)> check,
- std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatch, void* userData, sh_pollHandle_t& handle)
+am_Error_e CAmSocketHandler::addFDPoll(const int fd,
+ const short event,
+ std::function<void(const sh_pollHandle_t handle, void* userData)> prepare,
+ std::function<void(const pollfd pollfd, const sh_pollHandle_t handle, void* userData)> fired,
+ std::function<bool(const sh_pollHandle_t handle, void* userData)> check,
+ std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatch,
+ void* userData,
+ sh_pollHandle_t& handle)
{
if (!fdIsValid(fd))
- return (E_NON_EXISTENT);
+ return E_NON_EXISTENT;
+
+ const auto elem = mMapShPoll.find(fd);
+ if (elem != mMapShPoll.end() && elem->second.state != poll_states_e::REMOVE)
+ {
+ logError("CAmSocketHandler::addFDPoll fd", fd, "already registered!");
+ return E_ALREADY_EXISTS;
+ }
//create a new handle for the poll
if (!nextHandle(mSetPollKeys))
{
- logError("Could not create new polls, too many open!");
+ logError("CAmSocketHandler::addFDPoll Max handle count reached!");
return (E_NOT_POSSIBLE);
}
@@ -407,10 +425,9 @@ am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, std::fun
pollData.checkCB = check;
pollData.dispatchCB = dispatch;
pollData.userData = userData;
- //add new data to the list
- mListPoll.push_back(pollData);
- mRecreatePollfds = true;
+ //add new data to the list
+ mMapShPoll[fd] = pollData;
handle = pollData.handle;
return (E_OK);
@@ -427,15 +444,18 @@ am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, std::fun
* @param dispatch a callback that is called to dispatch the received data
* @param userData a pointer to userdata that is always passed around
* @param handle the handle of this poll
- * @return E_OK if the descriptor was added, E_NON_EXISTENT if the fd is not valid
+ * @return E_OK if the descriptor was added
+ * E_NON_EXISTENT if the fd is not valid
+ * E_ALREADY_EXISTS if the fd is already known
+ * E_NOT_POSSIBLE if the maximum handle threshold is reached
*/
am::am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, IAmShPollPrepare *prepare, IAmShPollFired *fired, IAmShPollCheck *check, IAmShPollDispatch *dispatch, void *userData, sh_pollHandle_t & handle)
{
- std::function<void(const sh_pollHandle_t handle, void* userData)> prepareCB; //preperation callback
- std::function<void(const pollfd poll, const sh_pollHandle_t handle, void* userData)> firedCB; //fired callback
- std::function<bool(const sh_pollHandle_t handle, void* userData)> checkCB; //check callback
- std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatchCB; //check callback
+ std::function<void(const sh_pollHandle_t, void*)> prepareCB; //preperation callback
+ std::function<void(const pollfd, const sh_pollHandle_t, void*)> firedCB; //fired callback
+ std::function<bool(const sh_pollHandle_t, void*)> checkCB; //check callback
+ std::function<bool(const sh_pollHandle_t, void*)> dispatchCB; //check callback
if (prepare)
prepareCB = std::bind(&IAmShPollPrepare::Call, prepare, std::placeholders::_1, std::placeholders::_2);
@@ -456,19 +476,25 @@ am::am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, IAmS
*/
am_Error_e CAmSocketHandler::removeFDPoll(const sh_pollHandle_t handle)
{
- VectorListPoll_t::iterator iterator = mListPoll.begin();
-
- for (; iterator != mListPoll.end(); ++iterator)
+ for (auto& it : mMapShPoll)
{
- if (iterator->handle == handle)
+ if (it.second.handle == handle)
{
- iterator = mListPoll.erase(iterator);
+ it.second.state = poll_states_e::REMOVE;
+
+ static uint64_t events(1);
+ if (write(mEventFd, &(++events), sizeof(events)) < 0)
+ {
+ std::ostringstream msg("CAmSocketHandler::removeFDPoll ");
+ msg << "Failed to write to event fd: " << mEventFd << " errno: " << std::strerror(errno);
+ throw std::runtime_error(msg.str());
+ }
mSetPollKeys.pollHandles.erase(handle);
- mRecreatePollfds = true;
- return (E_OK);
+ return E_OK;
}
}
- return (E_UNKNOWN);
+ logError("CAmSocketHandler::removeFDPoll handle unknown", handle);
+ return E_UNKNOWN;
}
/**
@@ -618,7 +644,13 @@ am_Error_e CAmSocketHandler::addTimer(const timespec & timeouts, std::function<v
{
timerItem.handle = handle;
mListTimer.push_back(timerItem);
+ return E_OK;
}
+
+ // E_NOT_POSSIBLE is the only case were we need to close the timer
+ if (err == E_NOT_POSSIBLE)
+ close(timerItem.fd);
+
return err;
#endif
@@ -877,14 +909,14 @@ am_Error_e CAmSocketHandler::stopTimer(const sh_timerHandle_t handle)
*/
am_Error_e CAmSocketHandler::updateEventFlags(const sh_pollHandle_t handle, const short events)
{
- VectorListPoll_t::iterator iterator = mListPoll.begin();
-
- for (; iterator != mListPoll.end(); ++iterator)
+ for (auto& it : mMapShPoll)
{
- if (iterator->handle == handle)
+ auto& elem = it.second;
+ if (elem.handle == handle && elem.state != poll_states_e::REMOVE)
{
- iterator->pollfdValue.events = events;
- mRecreatePollfds = true;
+ elem.pollfdValue.events = events;
+ elem.pollfdValue.revents = 0;
+ elem.state = poll_states_e::UPDATE;
return (E_OK);
}
}
@@ -1003,7 +1035,7 @@ void CAmSocketHandler::prepare(am::CAmSocketHandler::sh_poll_s& row)
/**
* fire callback
*/
-void CAmSocketHandler::fire(sh_poll_s& a)
+void CAmSocketHandler::fire(const sh_poll_s& a)
{
try
{
@@ -1017,31 +1049,24 @@ void CAmSocketHandler::fire(sh_poll_s& a)
/**
* should disptach
*/
-bool CAmSocketHandler::noDispatching(const sh_poll_s& a)
+bool CAmSocketHandler::noDispatching(const sh_poll_s* a)
{
//remove from list of there is no checkCB
- if (nullptr == a.checkCB)
+ if (nullptr == a->checkCB || a->state != poll_states_e::VALID)
return (true);
- return (!a.checkCB(a.handle, a.userData));
+ return (!a->checkCB(a->handle, a->userData));
}
/**
* disptach
*/
-bool CAmSocketHandler::dispatchingFinished(const sh_poll_s& a)
+bool CAmSocketHandler::dispatchingFinished(const sh_poll_s* a)
{
//remove from list of there is no dispatchCB
- if (nullptr == a.dispatchCB)
+ if (nullptr == a->dispatchCB || a->state != poll_states_e::VALID)
return (true);
- return (!a.dispatchCB(a.handle, a.userData));
-}
-/**
- * event triggered
- */
-bool CAmSocketHandler::eventFired(const pollfd& a)
-{
- return (a.revents == 0 ? false : true);
+ return (!a->dispatchCB(a->handle, a->userData));
}
/**