diff options
3 files changed, 110 insertions, 116 deletions
diff --git a/AudioManagerUtilities/include/CAmSocketHandler.h b/AudioManagerUtilities/include/CAmSocketHandler.h index 797551d..db3207e 100644 --- a/AudioManagerUtilities/include/CAmSocketHandler.h +++ b/AudioManagerUtilities/include/CAmSocketHandler.h @@ -50,7 +50,6 @@ namespace am typedef uint16_t sh_pollHandle_t; //!<this is a handle for a filedescriptor to be used with the SocketHandler typedef sh_pollHandle_t sh_timerHandle_t; //!<this is a handle for a timer to be used with the SocketHandler -typedef enum:uint8_t { RMV_ONLY, RMV_N_CLS } sh_rmv_e; /** * prototype for poll prepared callback @@ -218,11 +217,11 @@ class CAmSocketHandler { typedef enum:uint8_t { - UNINIT = 0u, // new, uninitialized element which needs to be inserted to ppoll array - VALID = 1u, // it is a valid element in ppoll array - UPDATE = 2u, // update of event information therefore update ppoll array - REMOVE = 3u, // remove from ppoll array and internal map - CLOSE = 4u // close and remove from ppoll array and internal map + ADD = 0u, // new, uninitialized element which needs to be added to ppoll array + UPDATE = 1u, // update of event information therefore update ppoll array + VALID = 2u, // it is a valid element in ppoll array + REMOVE = 3u, // remove from ppoll array and internal map + INVALID = 4u // uninit element requested to be removed from internal map only } poll_states_e; struct sh_poll_s //!<struct that holds information about polls @@ -237,7 +236,7 @@ class CAmSocketHandler poll_states_e state; sh_poll_s() : - handle(0), pollfdValue(), prepareCB(), firedCB(), checkCB(), dispatchCB(), userData(0), state(UNINIT) + handle(0), pollfdValue(), prepareCB(), firedCB(), checkCB(), dispatchCB(), userData(0), state(ADD) {} }; @@ -288,8 +287,9 @@ class CAmSocketHandler typedef enum:uint8_t { - NO_ERROR = 0u, // OK - FD_ERROR = 1u, // Invalid file descriptor + NO_ERROR = 0u, // OK + FD_ERROR = 1u, // Invalid file descriptor + MT_ERROR = 2u // Multi-thread issue } internal_codes_e; typedef uint8_t internal_codes_t; @@ -463,7 +463,7 @@ public: std::function<bool(const sh_pollHandle_t handle, void* userData)> check, std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatch, void* userData, sh_pollHandle_t& handle); am_Error_e addFDPoll(const int fd, const short event, IAmShPollPrepare *prepare, IAmShPollFired *fired, IAmShPollCheck *check, IAmShPollDispatch *dispatch, void* userData, sh_pollHandle_t& handle); - am_Error_e removeFDPoll(const sh_pollHandle_t handle, const sh_rmv_e rmv = RMV_ONLY); + am_Error_e removeFDPoll(const sh_pollHandle_t handle); am_Error_e updateEventFlags(const sh_pollHandle_t handle, const short events); am_Error_e addSignalHandler(std::function<void(const sh_pollHandle_t handle, const signalfd_siginfo & info, void* userData)> callback, sh_pollHandle_t& handle, void * userData); am_Error_e removeSignalHandler(const sh_pollHandle_t handle); diff --git a/AudioManagerUtilities/src/CAmSocketHandler.cpp b/AudioManagerUtilities/src/CAmSocketHandler.cpp index a1f82ab..472fa7f 100644 --- a/AudioManagerUtilities/src/CAmSocketHandler.cpp +++ b/AudioManagerUtilities/src/CAmSocketHandler.cpp @@ -41,7 +41,6 @@ #endif #define END_EVENT (UINT64_MAX >> 1) -#define SHPOLL_IS_ACTIVE(state) (!(state == poll_states_e::REMOVE || state == poll_states_e::CLOSE)) namespace am { @@ -76,8 +75,11 @@ CAmSocketHandler::CAmSocketHandler() : { for (auto & elem : mMapShPoll) { - if (SHPOLL_IS_ACTIVE(elem.second.state)) - elem.second.state = poll_states_e::UNINIT; + if (elem.second.state == poll_states_e::UPDATE || + elem.second.state == poll_states_e::VALID) + { + elem.second.state = poll_states_e::ADD; + } } mDispatchDone = true; } @@ -135,44 +137,44 @@ void CAmSocketHandler::start_listenting() auto fdPollIt = fdPollingArray.begin(); for (auto it = mMapShPoll.begin(); it != mMapShPoll.end(); ) { - // NOTE: The order of the switch/case statement is important and should modified with care + // NOTE: The order of the switch/case statement reflects the state flow auto& elem = it->second; switch (elem.state) { - case poll_states_e::CLOSE: - close(elem.pollfdValue.fd); - // fallthrough - - case poll_states_e::REMOVE: - /* The first check is needed for the restart behavior were an element is marked - * as removed but not part of the polling array - which is stored on heap. - * The second check is needed in case a map element was inserted newly but was - * directly marked after as REMOVE but in between there was no sync. of the - * polling array processed. - */ - if (fdPollIt != fdPollingArray.end() && fdPollIt->fd == elem.pollfdValue.fd) - fdPollIt = fdPollingArray.erase(fdPollIt); - it = mMapShPoll.erase(it); - continue; - - case poll_states_e::UNINIT: + case poll_states_e::ADD: + elem.state = poll_states_e::UPDATE; fdPollIt = fdPollingArray.emplace(fdPollIt); - // fallthrough + break; case poll_states_e::UPDATE: - CAmSocketHandler::prepare(elem); elem.state = poll_states_e::VALID; + CAmSocketHandler::prepare(elem); *fdPollIt = elem.pollfdValue; break; case poll_states_e::VALID: - default: + // check for multi-thread access + assert(fdPollIt != fdPollingArray.end()); + ++fdPollIt; + ++it; + break; + + case poll_states_e::REMOVE: + elem.state = poll_states_e::INVALID; + fdPollIt = fdPollingArray.erase(fdPollIt); + break; + + case poll_states_e::INVALID: + it = mMapShPoll.erase(it); break; } - // Ensures that fdPollIt will be never on its end before incrementing it further! - assert(fdPollIt != fdPollingArray.end()); - ++fdPollIt; - ++it; + } + + if (fdPollingArray.size() != mMapShPoll.size()) + { + mInternalCodes |= internal_codes_e::MT_ERROR; + logError("CAmSocketHandler::start_listenting is NOT multi-thread safe!"); + return; } #ifndef WITH_TIMERFD @@ -269,8 +271,8 @@ void CAmSocketHandler::wakeupWorker(const std::string & func, const uint64_t val } bool CAmSocketHandler::fatalErrorOccurred() -{ - return (mInternalCodes & internal_codes_e::FD_ERROR); +{ + return (mInternalCodes != internal_codes_e::NO_ERROR); } /** @@ -392,14 +394,29 @@ am_Error_e CAmSocketHandler::addFDPoll(const int fd, void* userData, sh_pollHandle_t& handle) { + sh_poll_s pollData; + if (!fdIsValid(fd)) return E_NON_EXISTENT; const auto elem = mMapShPoll.find(fd); - if (elem != mMapShPoll.end() && SHPOLL_IS_ACTIVE(elem->second.state)) + if (elem != mMapShPoll.end()) { - logError("CAmSocketHandler::addFDPoll fd", fd, "already registered!"); - return E_ALREADY_EXISTS; + // The fd was already in map therefore we need to trigger an update instead + switch (elem->second.state) + { + case poll_states_e::REMOVE: + pollData.state = poll_states_e::UPDATE; + break; + + case poll_states_e::INVALID: + pollData.state = poll_states_e::ADD; + break; + + default: + logError("CAmSocketHandler::addFDPoll fd", fd, "already registered!"); + return E_ALREADY_EXISTS; + } } //create a new handle for the poll @@ -409,7 +426,6 @@ am_Error_e CAmSocketHandler::addFDPoll(const int fd, return (E_NOT_POSSIBLE); } - sh_poll_s pollData; pollData.pollfdValue.fd = fd; pollData.handle = mSetPollKeys.lastUsedID; pollData.pollfdValue.events = event; @@ -470,13 +486,13 @@ am::am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, IAmS * @param [rmv] default RMV_ONLY_FDPOLL * @return */ -am_Error_e CAmSocketHandler::removeFDPoll(const sh_pollHandle_t handle, const sh_rmv_e rmv) +am_Error_e CAmSocketHandler::removeFDPoll(const sh_pollHandle_t handle) { for (auto& it : mMapShPoll) { if (it.second.handle == handle) { - it.second.state = (rmv == sh_rmv_e::RMV_N_CLS ? poll_states_e::CLOSE : poll_states_e::REMOVE); + it.second.state = (it.second.state == poll_states_e::ADD ? poll_states_e::INVALID : poll_states_e::REMOVE); wakeupWorker("removeFDPoll"); mSetPollKeys.pollHandles.erase(handle); return E_OK; @@ -661,8 +677,10 @@ am_Error_e CAmSocketHandler::removeTimer(const sh_timerHandle_t handle) { if (it->handle == handle) { + am_Error_e err = removeFDPoll(handle); + close(it->fd); mListTimer.erase(it); - return removeFDPoll(handle, sh_rmv_e::RMV_N_CLS); + return err; } ++it; } @@ -901,12 +919,25 @@ am_Error_e CAmSocketHandler::updateEventFlags(const sh_pollHandle_t handle, cons for (auto& it : mMapShPoll) { auto& elem = it.second; - if (elem.handle == handle && SHPOLL_IS_ACTIVE(elem.state)) + if (elem.handle != handle) + continue; + + switch (elem.state) { - elem.pollfdValue.events = events; - elem.pollfdValue.revents = 0; - elem.state = poll_states_e::UPDATE; - return (E_OK); + case poll_states_e::ADD: + elem.pollfdValue.events = events; + return (E_OK); + + case poll_states_e::UPDATE: + case poll_states_e::VALID: + elem.state = poll_states_e::UPDATE; + elem.pollfdValue.revents = 0; + elem.pollfdValue.events = events; + return (E_OK); + + default: + // This issue should never happen! + return (E_DATABASE_ERROR); } } return (E_UNKNOWN); diff --git a/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp b/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp index ef42d56..3fde011 100644 --- a/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp +++ b/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp @@ -22,6 +22,7 @@ #include "CAmSocketHandlerTest.h" #include <cstdio> +#include <cstring> #include <sys/socket.h> #include <arpa/inet.h> #include <sys/ioctl.h> @@ -57,12 +58,6 @@ struct TestUserData float f; }; -struct TestStressUserData -{ - CAmSocketHandler &socket; - std::vector<sh_pollHandle_t> &handles; -}; - MockIAmSignalHandler *pMockSignalHandler = NULL; static void signalHandler(int sig, siginfo_t *siginfo, void *context) { @@ -317,26 +312,6 @@ void* threadCallbackUnixSocketAndTimers(void* data) return sendTestData(sock, (struct sockaddr*)&servAddr, sizeof(servAddr), 500000); } -void* threadRaceFd(void* pData) -{ - struct TestStressUserData data = *(struct TestStressUserData*)pData; - usleep(50000); - auto elem = data.handles.begin(); - std::advance(elem, data.handles.size() / 2); - data.socket.removeFDPoll(*elem, sh_rmv_e::RMV_N_CLS); - data.handles.erase(elem); - - return NULL; -} -void* threadEnd(void* pData) -{ - struct TestStressUserData data = *(struct TestStressUserData*)pData; - usleep(1000000); - data.socket.exit_mainloop(); - - return NULL; -} - TEST(CAmSocketHandlerTest, stressTestUnixSocketAndTimers) { @@ -375,55 +350,41 @@ TEST(CAmSocketHandlerTest, stressTestUnixSocketAndTimers) } -TEST(CAmSocketHandlerTest, fdStressTest) +TEST(CAmSocketHandlerTest, fdTest) { CAmSocketHandler myHandler; ASSERT_FALSE(myHandler.fatalErrorOccurred()); - //Check unkonw systemd fd ids - sh_pollHandle_t handle; - EXPECT_EQ(myHandler.addFDPoll(100, 0, NULL, NULL, NULL, NULL, NULL, handle), E_NON_EXISTENT); - - int fd(-1); - std::vector<sh_pollHandle_t> handles(10); - for (auto& hndl : handles) - { - fd = eventfd(0, 0); - ASSERT_EQ(myHandler.addFDPoll(fd, POLL_IN, NULL, NULL, NULL, NULL, NULL, hndl), E_OK); - } - - // remove/add check - ASSERT_EQ(myHandler.addFDPoll(fd, POLL_IN, NULL, NULL, NULL, NULL, NULL, handles.back()), E_ALREADY_EXISTS); - ASSERT_EQ(myHandler.removeFDPoll(handles.back()), E_OK); - ASSERT_EQ(myHandler.addFDPoll(fd, POLL_IN, NULL, NULL, NULL, NULL, NULL, handles.back()), E_OK); - - // create a copy to check if all handles are removed - std::vector<sh_pollHandle_t> handlesCheckup(handles); + // for some simple fd tests + timespec endTime{0, 100000000}; // 0,1 + timespec timeoutTime{0, 10000000}; // 0,01 - while (handles.size()) - { - pthread_t tid1, tid2; + // check unknown system fd ids + sh_pollHandle_t handle; + ASSERT_EQ(myHandler.addFDPoll(100, 0, NULL, NULL, NULL, NULL, NULL, handle), E_NON_EXISTENT); - // this removes an element before starting the socket handler and we - // erase the last handle - myHandler.removeFDPoll(handles.back(), sh_rmv_e::RMV_N_CLS); - handles.erase(handles.end()-1); + // add/remove/add check of same fd + int fd = eventfd(0, 0); + ASSERT_EQ(myHandler.addFDPoll(fd, POLL_IN, NULL, NULL, NULL, NULL, NULL, handle), E_OK); + ASSERT_EQ(myHandler.addFDPoll(fd, POLL_IN, NULL, NULL, NULL, NULL, NULL, handle), E_ALREADY_EXISTS); + ASSERT_EQ(myHandler.removeFDPoll(handle), E_OK); + close(fd); - TestStressUserData data = {myHandler, handles}; - pthread_create(&tid1, NULL, threadEnd, &data); + // Create x handles + TestUserData userData{1, 1.f}; + CAmTimerStressTest timer(&myHandler, timeoutTime, 0); - // erase the handle in the middle - pthread_create(&tid2, NULL, threadRaceFd, &data); + ASSERT_EQ(myHandler.addTimer(timeoutTime, &timer.pTimerCallback, handle, &userData, true), E_OK); + EXPECT_CALL(timer, timerCallback(_,&userData)).Times(AnyNumber()); - myHandler.start_listenting(); + // for some simple fd tests + CAmTimerSockethandlerController endCallback(&myHandler, endTime); + ASSERT_EQ(myHandler.addTimer(endTime, &endCallback.pTimerCallback, handle, NULL), E_OK); + EXPECT_CALL(endCallback,timerCallback(handle,NULL)).Times(Exactly(1)); - pthread_join(tid2, NULL); - pthread_join(tid1, NULL); - } + myHandler.start_listenting(); - // now do the check - for (auto& hndl : handlesCheckup) - EXPECT_EQ(myHandler.removeFDPoll(hndl), E_UNKNOWN) << "Handle " << hndl << " not correctly removed before"; + ASSERT_FALSE(myHandler.fatalErrorOccurred()); } TEST(CAmSocketHandlerTest, timersOneshot) @@ -614,6 +575,8 @@ TEST(CAmSocketHandlerTest, timersStressTest) { delete timer, timer = NULL; } + + EXPECT_FALSE(myHandler.fatalErrorOccurred()); } |