summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJens Lorenz <jlorenz@de.adit-jv.com>2018-06-07 18:04:03 +0200
committerJens Lorenz <jlorenz@de.adit-jv.com>2018-06-12 08:39:48 +0200
commitfc50c62104b3019ff4de9e4fcc5b6f04b74a664a (patch)
treede6c86d62087e27f318f5a9e97d8972b6c40154e
parent27c9983421494ce9f5b82027f4b2e7f72369dced (diff)
downloadaudiomanager-fc50c62104b3019ff4de9e4fcc5b6f04b74a664a.tar.gz
AMUtil: Fix inconsistent fdPollingArray
In case someone removed a fd and closed it opened a new one and added it to the socket handler the container state of the object in map was changed from either REMOVE/CLOSE to UNINIT. This leads to the emplace call in the fdPollingArray vector and the fd is maintained twice. Over the entire runtime there will be zombie fds provided to ppoll functions which will race a POLLERR revent leading to 100% cpu load. Also the CLOSE state is now removed because only the application is aware if a fd has to be closed. For instance calling add/remove in a loop were start_listenting is not running will turn to a system issue were no fds can be provided by the operation system. Signed-off-by: Jens Lorenz <jlorenz@de.adit-jv.com>
-rw-r--r--AudioManagerUtilities/include/CAmSocketHandler.h20
-rw-r--r--AudioManagerUtilities/src/CAmSocketHandler.cpp115
-rw-r--r--AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp91
3 files changed, 110 insertions, 116 deletions
diff --git a/AudioManagerUtilities/include/CAmSocketHandler.h b/AudioManagerUtilities/include/CAmSocketHandler.h
index 797551d..db3207e 100644
--- a/AudioManagerUtilities/include/CAmSocketHandler.h
+++ b/AudioManagerUtilities/include/CAmSocketHandler.h
@@ -50,7 +50,6 @@ namespace am
typedef uint16_t sh_pollHandle_t; //!<this is a handle for a filedescriptor to be used with the SocketHandler
typedef sh_pollHandle_t sh_timerHandle_t; //!<this is a handle for a timer to be used with the SocketHandler
-typedef enum:uint8_t { RMV_ONLY, RMV_N_CLS } sh_rmv_e;
/**
* prototype for poll prepared callback
@@ -218,11 +217,11 @@ class CAmSocketHandler
{
typedef enum:uint8_t
{
- UNINIT = 0u, // new, uninitialized element which needs to be inserted to ppoll array
- VALID = 1u, // it is a valid element in ppoll array
- UPDATE = 2u, // update of event information therefore update ppoll array
- REMOVE = 3u, // remove from ppoll array and internal map
- CLOSE = 4u // close and remove from ppoll array and internal map
+ ADD = 0u, // new, uninitialized element which needs to be added to ppoll array
+ UPDATE = 1u, // update of event information therefore update ppoll array
+ VALID = 2u, // it is a valid element in ppoll array
+ REMOVE = 3u, // remove from ppoll array and internal map
+ INVALID = 4u // uninit element requested to be removed from internal map only
} poll_states_e;
struct sh_poll_s //!<struct that holds information about polls
@@ -237,7 +236,7 @@ class CAmSocketHandler
poll_states_e state;
sh_poll_s() :
- handle(0), pollfdValue(), prepareCB(), firedCB(), checkCB(), dispatchCB(), userData(0), state(UNINIT)
+ handle(0), pollfdValue(), prepareCB(), firedCB(), checkCB(), dispatchCB(), userData(0), state(ADD)
{}
};
@@ -288,8 +287,9 @@ class CAmSocketHandler
typedef enum:uint8_t
{
- NO_ERROR = 0u, // OK
- FD_ERROR = 1u, // Invalid file descriptor
+ NO_ERROR = 0u, // OK
+ FD_ERROR = 1u, // Invalid file descriptor
+ MT_ERROR = 2u // Multi-thread issue
} internal_codes_e;
typedef uint8_t internal_codes_t;
@@ -463,7 +463,7 @@ public:
std::function<bool(const sh_pollHandle_t handle, void* userData)> check, std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatch, void* userData, sh_pollHandle_t& handle);
am_Error_e addFDPoll(const int fd, const short event, IAmShPollPrepare *prepare, IAmShPollFired *fired, IAmShPollCheck *check, IAmShPollDispatch *dispatch, void* userData, sh_pollHandle_t& handle);
- am_Error_e removeFDPoll(const sh_pollHandle_t handle, const sh_rmv_e rmv = RMV_ONLY);
+ am_Error_e removeFDPoll(const sh_pollHandle_t handle);
am_Error_e updateEventFlags(const sh_pollHandle_t handle, const short events);
am_Error_e addSignalHandler(std::function<void(const sh_pollHandle_t handle, const signalfd_siginfo & info, void* userData)> callback, sh_pollHandle_t& handle, void * userData);
am_Error_e removeSignalHandler(const sh_pollHandle_t handle);
diff --git a/AudioManagerUtilities/src/CAmSocketHandler.cpp b/AudioManagerUtilities/src/CAmSocketHandler.cpp
index a1f82ab..472fa7f 100644
--- a/AudioManagerUtilities/src/CAmSocketHandler.cpp
+++ b/AudioManagerUtilities/src/CAmSocketHandler.cpp
@@ -41,7 +41,6 @@
#endif
#define END_EVENT (UINT64_MAX >> 1)
-#define SHPOLL_IS_ACTIVE(state) (!(state == poll_states_e::REMOVE || state == poll_states_e::CLOSE))
namespace am
{
@@ -76,8 +75,11 @@ CAmSocketHandler::CAmSocketHandler() :
{
for (auto & elem : mMapShPoll)
{
- if (SHPOLL_IS_ACTIVE(elem.second.state))
- elem.second.state = poll_states_e::UNINIT;
+ if (elem.second.state == poll_states_e::UPDATE ||
+ elem.second.state == poll_states_e::VALID)
+ {
+ elem.second.state = poll_states_e::ADD;
+ }
}
mDispatchDone = true;
}
@@ -135,44 +137,44 @@ void CAmSocketHandler::start_listenting()
auto fdPollIt = fdPollingArray.begin();
for (auto it = mMapShPoll.begin(); it != mMapShPoll.end(); )
{
- // NOTE: The order of the switch/case statement is important and should modified with care
+ // NOTE: The order of the switch/case statement reflects the state flow
auto& elem = it->second;
switch (elem.state)
{
- case poll_states_e::CLOSE:
- close(elem.pollfdValue.fd);
- // fallthrough
-
- case poll_states_e::REMOVE:
- /* The first check is needed for the restart behavior were an element is marked
- * as removed but not part of the polling array - which is stored on heap.
- * The second check is needed in case a map element was inserted newly but was
- * directly marked after as REMOVE but in between there was no sync. of the
- * polling array processed.
- */
- if (fdPollIt != fdPollingArray.end() && fdPollIt->fd == elem.pollfdValue.fd)
- fdPollIt = fdPollingArray.erase(fdPollIt);
- it = mMapShPoll.erase(it);
- continue;
-
- case poll_states_e::UNINIT:
+ case poll_states_e::ADD:
+ elem.state = poll_states_e::UPDATE;
fdPollIt = fdPollingArray.emplace(fdPollIt);
- // fallthrough
+ break;
case poll_states_e::UPDATE:
- CAmSocketHandler::prepare(elem);
elem.state = poll_states_e::VALID;
+ CAmSocketHandler::prepare(elem);
*fdPollIt = elem.pollfdValue;
break;
case poll_states_e::VALID:
- default:
+ // check for multi-thread access
+ assert(fdPollIt != fdPollingArray.end());
+ ++fdPollIt;
+ ++it;
+ break;
+
+ case poll_states_e::REMOVE:
+ elem.state = poll_states_e::INVALID;
+ fdPollIt = fdPollingArray.erase(fdPollIt);
+ break;
+
+ case poll_states_e::INVALID:
+ it = mMapShPoll.erase(it);
break;
}
- // Ensures that fdPollIt will be never on its end before incrementing it further!
- assert(fdPollIt != fdPollingArray.end());
- ++fdPollIt;
- ++it;
+ }
+
+ if (fdPollingArray.size() != mMapShPoll.size())
+ {
+ mInternalCodes |= internal_codes_e::MT_ERROR;
+ logError("CAmSocketHandler::start_listenting is NOT multi-thread safe!");
+ return;
}
#ifndef WITH_TIMERFD
@@ -269,8 +271,8 @@ void CAmSocketHandler::wakeupWorker(const std::string & func, const uint64_t val
}
bool CAmSocketHandler::fatalErrorOccurred()
-{
- return (mInternalCodes & internal_codes_e::FD_ERROR);
+{
+ return (mInternalCodes != internal_codes_e::NO_ERROR);
}
/**
@@ -392,14 +394,29 @@ am_Error_e CAmSocketHandler::addFDPoll(const int fd,
void* userData,
sh_pollHandle_t& handle)
{
+ sh_poll_s pollData;
+
if (!fdIsValid(fd))
return E_NON_EXISTENT;
const auto elem = mMapShPoll.find(fd);
- if (elem != mMapShPoll.end() && SHPOLL_IS_ACTIVE(elem->second.state))
+ if (elem != mMapShPoll.end())
{
- logError("CAmSocketHandler::addFDPoll fd", fd, "already registered!");
- return E_ALREADY_EXISTS;
+ // The fd was already in map therefore we need to trigger an update instead
+ switch (elem->second.state)
+ {
+ case poll_states_e::REMOVE:
+ pollData.state = poll_states_e::UPDATE;
+ break;
+
+ case poll_states_e::INVALID:
+ pollData.state = poll_states_e::ADD;
+ break;
+
+ default:
+ logError("CAmSocketHandler::addFDPoll fd", fd, "already registered!");
+ return E_ALREADY_EXISTS;
+ }
}
//create a new handle for the poll
@@ -409,7 +426,6 @@ am_Error_e CAmSocketHandler::addFDPoll(const int fd,
return (E_NOT_POSSIBLE);
}
- sh_poll_s pollData;
pollData.pollfdValue.fd = fd;
pollData.handle = mSetPollKeys.lastUsedID;
pollData.pollfdValue.events = event;
@@ -470,13 +486,13 @@ am::am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, IAmS
* @param [rmv] default RMV_ONLY_FDPOLL
* @return
*/
-am_Error_e CAmSocketHandler::removeFDPoll(const sh_pollHandle_t handle, const sh_rmv_e rmv)
+am_Error_e CAmSocketHandler::removeFDPoll(const sh_pollHandle_t handle)
{
for (auto& it : mMapShPoll)
{
if (it.second.handle == handle)
{
- it.second.state = (rmv == sh_rmv_e::RMV_N_CLS ? poll_states_e::CLOSE : poll_states_e::REMOVE);
+ it.second.state = (it.second.state == poll_states_e::ADD ? poll_states_e::INVALID : poll_states_e::REMOVE);
wakeupWorker("removeFDPoll");
mSetPollKeys.pollHandles.erase(handle);
return E_OK;
@@ -661,8 +677,10 @@ am_Error_e CAmSocketHandler::removeTimer(const sh_timerHandle_t handle)
{
if (it->handle == handle)
{
+ am_Error_e err = removeFDPoll(handle);
+ close(it->fd);
mListTimer.erase(it);
- return removeFDPoll(handle, sh_rmv_e::RMV_N_CLS);
+ return err;
}
++it;
}
@@ -901,12 +919,25 @@ am_Error_e CAmSocketHandler::updateEventFlags(const sh_pollHandle_t handle, cons
for (auto& it : mMapShPoll)
{
auto& elem = it.second;
- if (elem.handle == handle && SHPOLL_IS_ACTIVE(elem.state))
+ if (elem.handle != handle)
+ continue;
+
+ switch (elem.state)
{
- elem.pollfdValue.events = events;
- elem.pollfdValue.revents = 0;
- elem.state = poll_states_e::UPDATE;
- return (E_OK);
+ case poll_states_e::ADD:
+ elem.pollfdValue.events = events;
+ return (E_OK);
+
+ case poll_states_e::UPDATE:
+ case poll_states_e::VALID:
+ elem.state = poll_states_e::UPDATE;
+ elem.pollfdValue.revents = 0;
+ elem.pollfdValue.events = events;
+ return (E_OK);
+
+ default:
+ // This issue should never happen!
+ return (E_DATABASE_ERROR);
}
}
return (E_UNKNOWN);
diff --git a/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp b/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp
index ef42d56..3fde011 100644
--- a/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp
+++ b/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp
@@ -22,6 +22,7 @@
#include "CAmSocketHandlerTest.h"
#include <cstdio>
+#include <cstring>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/ioctl.h>
@@ -57,12 +58,6 @@ struct TestUserData
float f;
};
-struct TestStressUserData
-{
- CAmSocketHandler &socket;
- std::vector<sh_pollHandle_t> &handles;
-};
-
MockIAmSignalHandler *pMockSignalHandler = NULL;
static void signalHandler(int sig, siginfo_t *siginfo, void *context)
{
@@ -317,26 +312,6 @@ void* threadCallbackUnixSocketAndTimers(void* data)
return sendTestData(sock, (struct sockaddr*)&servAddr, sizeof(servAddr), 500000);
}
-void* threadRaceFd(void* pData)
-{
- struct TestStressUserData data = *(struct TestStressUserData*)pData;
- usleep(50000);
- auto elem = data.handles.begin();
- std::advance(elem, data.handles.size() / 2);
- data.socket.removeFDPoll(*elem, sh_rmv_e::RMV_N_CLS);
- data.handles.erase(elem);
-
- return NULL;
-}
-void* threadEnd(void* pData)
-{
- struct TestStressUserData data = *(struct TestStressUserData*)pData;
- usleep(1000000);
- data.socket.exit_mainloop();
-
- return NULL;
-}
-
TEST(CAmSocketHandlerTest, stressTestUnixSocketAndTimers)
{
@@ -375,55 +350,41 @@ TEST(CAmSocketHandlerTest, stressTestUnixSocketAndTimers)
}
-TEST(CAmSocketHandlerTest, fdStressTest)
+TEST(CAmSocketHandlerTest, fdTest)
{
CAmSocketHandler myHandler;
ASSERT_FALSE(myHandler.fatalErrorOccurred());
- //Check unkonw systemd fd ids
- sh_pollHandle_t handle;
- EXPECT_EQ(myHandler.addFDPoll(100, 0, NULL, NULL, NULL, NULL, NULL, handle), E_NON_EXISTENT);
-
- int fd(-1);
- std::vector<sh_pollHandle_t> handles(10);
- for (auto& hndl : handles)
- {
- fd = eventfd(0, 0);
- ASSERT_EQ(myHandler.addFDPoll(fd, POLL_IN, NULL, NULL, NULL, NULL, NULL, hndl), E_OK);
- }
-
- // remove/add check
- ASSERT_EQ(myHandler.addFDPoll(fd, POLL_IN, NULL, NULL, NULL, NULL, NULL, handles.back()), E_ALREADY_EXISTS);
- ASSERT_EQ(myHandler.removeFDPoll(handles.back()), E_OK);
- ASSERT_EQ(myHandler.addFDPoll(fd, POLL_IN, NULL, NULL, NULL, NULL, NULL, handles.back()), E_OK);
-
- // create a copy to check if all handles are removed
- std::vector<sh_pollHandle_t> handlesCheckup(handles);
+ // for some simple fd tests
+ timespec endTime{0, 100000000}; // 0,1
+ timespec timeoutTime{0, 10000000}; // 0,01
- while (handles.size())
- {
- pthread_t tid1, tid2;
+ // check unknown system fd ids
+ sh_pollHandle_t handle;
+ ASSERT_EQ(myHandler.addFDPoll(100, 0, NULL, NULL, NULL, NULL, NULL, handle), E_NON_EXISTENT);
- // this removes an element before starting the socket handler and we
- // erase the last handle
- myHandler.removeFDPoll(handles.back(), sh_rmv_e::RMV_N_CLS);
- handles.erase(handles.end()-1);
+ // add/remove/add check of same fd
+ int fd = eventfd(0, 0);
+ ASSERT_EQ(myHandler.addFDPoll(fd, POLL_IN, NULL, NULL, NULL, NULL, NULL, handle), E_OK);
+ ASSERT_EQ(myHandler.addFDPoll(fd, POLL_IN, NULL, NULL, NULL, NULL, NULL, handle), E_ALREADY_EXISTS);
+ ASSERT_EQ(myHandler.removeFDPoll(handle), E_OK);
+ close(fd);
- TestStressUserData data = {myHandler, handles};
- pthread_create(&tid1, NULL, threadEnd, &data);
+ // Create x handles
+ TestUserData userData{1, 1.f};
+ CAmTimerStressTest timer(&myHandler, timeoutTime, 0);
- // erase the handle in the middle
- pthread_create(&tid2, NULL, threadRaceFd, &data);
+ ASSERT_EQ(myHandler.addTimer(timeoutTime, &timer.pTimerCallback, handle, &userData, true), E_OK);
+ EXPECT_CALL(timer, timerCallback(_,&userData)).Times(AnyNumber());
- myHandler.start_listenting();
+ // for some simple fd tests
+ CAmTimerSockethandlerController endCallback(&myHandler, endTime);
+ ASSERT_EQ(myHandler.addTimer(endTime, &endCallback.pTimerCallback, handle, NULL), E_OK);
+ EXPECT_CALL(endCallback,timerCallback(handle,NULL)).Times(Exactly(1));
- pthread_join(tid2, NULL);
- pthread_join(tid1, NULL);
- }
+ myHandler.start_listenting();
- // now do the check
- for (auto& hndl : handlesCheckup)
- EXPECT_EQ(myHandler.removeFDPoll(hndl), E_UNKNOWN) << "Handle " << hndl << " not correctly removed before";
+ ASSERT_FALSE(myHandler.fatalErrorOccurred());
}
TEST(CAmSocketHandlerTest, timersOneshot)
@@ -614,6 +575,8 @@ TEST(CAmSocketHandlerTest, timersStressTest)
{
delete timer, timer = NULL;
}
+
+ EXPECT_FALSE(myHandler.fatalErrorOccurred());
}