summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--AudioManagerUtilities/include/CAmSocketHandler.h20
-rw-r--r--AudioManagerUtilities/src/CAmSocketHandler.cpp115
-rw-r--r--AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp91
3 files changed, 110 insertions, 116 deletions
diff --git a/AudioManagerUtilities/include/CAmSocketHandler.h b/AudioManagerUtilities/include/CAmSocketHandler.h
index 797551d..db3207e 100644
--- a/AudioManagerUtilities/include/CAmSocketHandler.h
+++ b/AudioManagerUtilities/include/CAmSocketHandler.h
@@ -50,7 +50,6 @@ namespace am
typedef uint16_t sh_pollHandle_t; //!<this is a handle for a filedescriptor to be used with the SocketHandler
typedef sh_pollHandle_t sh_timerHandle_t; //!<this is a handle for a timer to be used with the SocketHandler
-typedef enum:uint8_t { RMV_ONLY, RMV_N_CLS } sh_rmv_e;
/**
* prototype for poll prepared callback
@@ -218,11 +217,11 @@ class CAmSocketHandler
{
typedef enum:uint8_t
{
- UNINIT = 0u, // new, uninitialized element which needs to be inserted to ppoll array
- VALID = 1u, // it is a valid element in ppoll array
- UPDATE = 2u, // update of event information therefore update ppoll array
- REMOVE = 3u, // remove from ppoll array and internal map
- CLOSE = 4u // close and remove from ppoll array and internal map
+ ADD = 0u, // new, uninitialized element which needs to be added to ppoll array
+ UPDATE = 1u, // update of event information therefore update ppoll array
+ VALID = 2u, // it is a valid element in ppoll array
+ REMOVE = 3u, // remove from ppoll array and internal map
+ INVALID = 4u // uninit element requested to be removed from internal map only
} poll_states_e;
struct sh_poll_s //!<struct that holds information about polls
@@ -237,7 +236,7 @@ class CAmSocketHandler
poll_states_e state;
sh_poll_s() :
- handle(0), pollfdValue(), prepareCB(), firedCB(), checkCB(), dispatchCB(), userData(0), state(UNINIT)
+ handle(0), pollfdValue(), prepareCB(), firedCB(), checkCB(), dispatchCB(), userData(0), state(ADD)
{}
};
@@ -288,8 +287,9 @@ class CAmSocketHandler
typedef enum:uint8_t
{
- NO_ERROR = 0u, // OK
- FD_ERROR = 1u, // Invalid file descriptor
+ NO_ERROR = 0u, // OK
+ FD_ERROR = 1u, // Invalid file descriptor
+ MT_ERROR = 2u // Multi-thread issue
} internal_codes_e;
typedef uint8_t internal_codes_t;
@@ -463,7 +463,7 @@ public:
std::function<bool(const sh_pollHandle_t handle, void* userData)> check, std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatch, void* userData, sh_pollHandle_t& handle);
am_Error_e addFDPoll(const int fd, const short event, IAmShPollPrepare *prepare, IAmShPollFired *fired, IAmShPollCheck *check, IAmShPollDispatch *dispatch, void* userData, sh_pollHandle_t& handle);
- am_Error_e removeFDPoll(const sh_pollHandle_t handle, const sh_rmv_e rmv = RMV_ONLY);
+ am_Error_e removeFDPoll(const sh_pollHandle_t handle);
am_Error_e updateEventFlags(const sh_pollHandle_t handle, const short events);
am_Error_e addSignalHandler(std::function<void(const sh_pollHandle_t handle, const signalfd_siginfo & info, void* userData)> callback, sh_pollHandle_t& handle, void * userData);
am_Error_e removeSignalHandler(const sh_pollHandle_t handle);
diff --git a/AudioManagerUtilities/src/CAmSocketHandler.cpp b/AudioManagerUtilities/src/CAmSocketHandler.cpp
index a1f82ab..472fa7f 100644
--- a/AudioManagerUtilities/src/CAmSocketHandler.cpp
+++ b/AudioManagerUtilities/src/CAmSocketHandler.cpp
@@ -41,7 +41,6 @@
#endif
#define END_EVENT (UINT64_MAX >> 1)
-#define SHPOLL_IS_ACTIVE(state) (!(state == poll_states_e::REMOVE || state == poll_states_e::CLOSE))
namespace am
{
@@ -76,8 +75,11 @@ CAmSocketHandler::CAmSocketHandler() :
{
for (auto & elem : mMapShPoll)
{
- if (SHPOLL_IS_ACTIVE(elem.second.state))
- elem.second.state = poll_states_e::UNINIT;
+ if (elem.second.state == poll_states_e::UPDATE ||
+ elem.second.state == poll_states_e::VALID)
+ {
+ elem.second.state = poll_states_e::ADD;
+ }
}
mDispatchDone = true;
}
@@ -135,44 +137,44 @@ void CAmSocketHandler::start_listenting()
auto fdPollIt = fdPollingArray.begin();
for (auto it = mMapShPoll.begin(); it != mMapShPoll.end(); )
{
- // NOTE: The order of the switch/case statement is important and should modified with care
+ // NOTE: The order of the switch/case statement reflects the state flow
auto& elem = it->second;
switch (elem.state)
{
- case poll_states_e::CLOSE:
- close(elem.pollfdValue.fd);
- // fallthrough
-
- case poll_states_e::REMOVE:
- /* The first check is needed for the restart behavior were an element is marked
- * as removed but not part of the polling array - which is stored on heap.
- * The second check is needed in case a map element was inserted newly but was
- * directly marked after as REMOVE but in between there was no sync. of the
- * polling array processed.
- */
- if (fdPollIt != fdPollingArray.end() && fdPollIt->fd == elem.pollfdValue.fd)
- fdPollIt = fdPollingArray.erase(fdPollIt);
- it = mMapShPoll.erase(it);
- continue;
-
- case poll_states_e::UNINIT:
+ case poll_states_e::ADD:
+ elem.state = poll_states_e::UPDATE;
fdPollIt = fdPollingArray.emplace(fdPollIt);
- // fallthrough
+ break;
case poll_states_e::UPDATE:
- CAmSocketHandler::prepare(elem);
elem.state = poll_states_e::VALID;
+ CAmSocketHandler::prepare(elem);
*fdPollIt = elem.pollfdValue;
break;
case poll_states_e::VALID:
- default:
+ // check for multi-thread access
+ assert(fdPollIt != fdPollingArray.end());
+ ++fdPollIt;
+ ++it;
+ break;
+
+ case poll_states_e::REMOVE:
+ elem.state = poll_states_e::INVALID;
+ fdPollIt = fdPollingArray.erase(fdPollIt);
+ break;
+
+ case poll_states_e::INVALID:
+ it = mMapShPoll.erase(it);
break;
}
- // Ensures that fdPollIt will be never on its end before incrementing it further!
- assert(fdPollIt != fdPollingArray.end());
- ++fdPollIt;
- ++it;
+ }
+
+ if (fdPollingArray.size() != mMapShPoll.size())
+ {
+ mInternalCodes |= internal_codes_e::MT_ERROR;
+ logError("CAmSocketHandler::start_listenting is NOT multi-thread safe!");
+ return;
}
#ifndef WITH_TIMERFD
@@ -269,8 +271,8 @@ void CAmSocketHandler::wakeupWorker(const std::string & func, const uint64_t val
}
bool CAmSocketHandler::fatalErrorOccurred()
-{
- return (mInternalCodes & internal_codes_e::FD_ERROR);
+{
+ return (mInternalCodes != internal_codes_e::NO_ERROR);
}
/**
@@ -392,14 +394,29 @@ am_Error_e CAmSocketHandler::addFDPoll(const int fd,
void* userData,
sh_pollHandle_t& handle)
{
+ sh_poll_s pollData;
+
if (!fdIsValid(fd))
return E_NON_EXISTENT;
const auto elem = mMapShPoll.find(fd);
- if (elem != mMapShPoll.end() && SHPOLL_IS_ACTIVE(elem->second.state))
+ if (elem != mMapShPoll.end())
{
- logError("CAmSocketHandler::addFDPoll fd", fd, "already registered!");
- return E_ALREADY_EXISTS;
+ // The fd was already in map therefore we need to trigger an update instead
+ switch (elem->second.state)
+ {
+ case poll_states_e::REMOVE:
+ pollData.state = poll_states_e::UPDATE;
+ break;
+
+ case poll_states_e::INVALID:
+ pollData.state = poll_states_e::ADD;
+ break;
+
+ default:
+ logError("CAmSocketHandler::addFDPoll fd", fd, "already registered!");
+ return E_ALREADY_EXISTS;
+ }
}
//create a new handle for the poll
@@ -409,7 +426,6 @@ am_Error_e CAmSocketHandler::addFDPoll(const int fd,
return (E_NOT_POSSIBLE);
}
- sh_poll_s pollData;
pollData.pollfdValue.fd = fd;
pollData.handle = mSetPollKeys.lastUsedID;
pollData.pollfdValue.events = event;
@@ -470,13 +486,13 @@ am::am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, IAmS
* @param [rmv] default RMV_ONLY_FDPOLL
* @return
*/
-am_Error_e CAmSocketHandler::removeFDPoll(const sh_pollHandle_t handle, const sh_rmv_e rmv)
+am_Error_e CAmSocketHandler::removeFDPoll(const sh_pollHandle_t handle)
{
for (auto& it : mMapShPoll)
{
if (it.second.handle == handle)
{
- it.second.state = (rmv == sh_rmv_e::RMV_N_CLS ? poll_states_e::CLOSE : poll_states_e::REMOVE);
+ it.second.state = (it.second.state == poll_states_e::ADD ? poll_states_e::INVALID : poll_states_e::REMOVE);
wakeupWorker("removeFDPoll");
mSetPollKeys.pollHandles.erase(handle);
return E_OK;
@@ -661,8 +677,10 @@ am_Error_e CAmSocketHandler::removeTimer(const sh_timerHandle_t handle)
{
if (it->handle == handle)
{
+ am_Error_e err = removeFDPoll(handle);
+ close(it->fd);
mListTimer.erase(it);
- return removeFDPoll(handle, sh_rmv_e::RMV_N_CLS);
+ return err;
}
++it;
}
@@ -901,12 +919,25 @@ am_Error_e CAmSocketHandler::updateEventFlags(const sh_pollHandle_t handle, cons
for (auto& it : mMapShPoll)
{
auto& elem = it.second;
- if (elem.handle == handle && SHPOLL_IS_ACTIVE(elem.state))
+ if (elem.handle != handle)
+ continue;
+
+ switch (elem.state)
{
- elem.pollfdValue.events = events;
- elem.pollfdValue.revents = 0;
- elem.state = poll_states_e::UPDATE;
- return (E_OK);
+ case poll_states_e::ADD:
+ elem.pollfdValue.events = events;
+ return (E_OK);
+
+ case poll_states_e::UPDATE:
+ case poll_states_e::VALID:
+ elem.state = poll_states_e::UPDATE;
+ elem.pollfdValue.revents = 0;
+ elem.pollfdValue.events = events;
+ return (E_OK);
+
+ default:
+ // This issue should never happen!
+ return (E_DATABASE_ERROR);
}
}
return (E_UNKNOWN);
diff --git a/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp b/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp
index ef42d56..3fde011 100644
--- a/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp
+++ b/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp
@@ -22,6 +22,7 @@
#include "CAmSocketHandlerTest.h"
#include <cstdio>
+#include <cstring>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/ioctl.h>
@@ -57,12 +58,6 @@ struct TestUserData
float f;
};
-struct TestStressUserData
-{
- CAmSocketHandler &socket;
- std::vector<sh_pollHandle_t> &handles;
-};
-
MockIAmSignalHandler *pMockSignalHandler = NULL;
static void signalHandler(int sig, siginfo_t *siginfo, void *context)
{
@@ -317,26 +312,6 @@ void* threadCallbackUnixSocketAndTimers(void* data)
return sendTestData(sock, (struct sockaddr*)&servAddr, sizeof(servAddr), 500000);
}
-void* threadRaceFd(void* pData)
-{
- struct TestStressUserData data = *(struct TestStressUserData*)pData;
- usleep(50000);
- auto elem = data.handles.begin();
- std::advance(elem, data.handles.size() / 2);
- data.socket.removeFDPoll(*elem, sh_rmv_e::RMV_N_CLS);
- data.handles.erase(elem);
-
- return NULL;
-}
-void* threadEnd(void* pData)
-{
- struct TestStressUserData data = *(struct TestStressUserData*)pData;
- usleep(1000000);
- data.socket.exit_mainloop();
-
- return NULL;
-}
-
TEST(CAmSocketHandlerTest, stressTestUnixSocketAndTimers)
{
@@ -375,55 +350,41 @@ TEST(CAmSocketHandlerTest, stressTestUnixSocketAndTimers)
}
-TEST(CAmSocketHandlerTest, fdStressTest)
+TEST(CAmSocketHandlerTest, fdTest)
{
CAmSocketHandler myHandler;
ASSERT_FALSE(myHandler.fatalErrorOccurred());
- //Check unkonw systemd fd ids
- sh_pollHandle_t handle;
- EXPECT_EQ(myHandler.addFDPoll(100, 0, NULL, NULL, NULL, NULL, NULL, handle), E_NON_EXISTENT);
-
- int fd(-1);
- std::vector<sh_pollHandle_t> handles(10);
- for (auto& hndl : handles)
- {
- fd = eventfd(0, 0);
- ASSERT_EQ(myHandler.addFDPoll(fd, POLL_IN, NULL, NULL, NULL, NULL, NULL, hndl), E_OK);
- }
-
- // remove/add check
- ASSERT_EQ(myHandler.addFDPoll(fd, POLL_IN, NULL, NULL, NULL, NULL, NULL, handles.back()), E_ALREADY_EXISTS);
- ASSERT_EQ(myHandler.removeFDPoll(handles.back()), E_OK);
- ASSERT_EQ(myHandler.addFDPoll(fd, POLL_IN, NULL, NULL, NULL, NULL, NULL, handles.back()), E_OK);
-
- // create a copy to check if all handles are removed
- std::vector<sh_pollHandle_t> handlesCheckup(handles);
+ // for some simple fd tests
+ timespec endTime{0, 100000000}; // 0,1
+ timespec timeoutTime{0, 10000000}; // 0,01
- while (handles.size())
- {
- pthread_t tid1, tid2;
+ // check unknown system fd ids
+ sh_pollHandle_t handle;
+ ASSERT_EQ(myHandler.addFDPoll(100, 0, NULL, NULL, NULL, NULL, NULL, handle), E_NON_EXISTENT);
- // this removes an element before starting the socket handler and we
- // erase the last handle
- myHandler.removeFDPoll(handles.back(), sh_rmv_e::RMV_N_CLS);
- handles.erase(handles.end()-1);
+ // add/remove/add check of same fd
+ int fd = eventfd(0, 0);
+ ASSERT_EQ(myHandler.addFDPoll(fd, POLL_IN, NULL, NULL, NULL, NULL, NULL, handle), E_OK);
+ ASSERT_EQ(myHandler.addFDPoll(fd, POLL_IN, NULL, NULL, NULL, NULL, NULL, handle), E_ALREADY_EXISTS);
+ ASSERT_EQ(myHandler.removeFDPoll(handle), E_OK);
+ close(fd);
- TestStressUserData data = {myHandler, handles};
- pthread_create(&tid1, NULL, threadEnd, &data);
+ // Create x handles
+ TestUserData userData{1, 1.f};
+ CAmTimerStressTest timer(&myHandler, timeoutTime, 0);
- // erase the handle in the middle
- pthread_create(&tid2, NULL, threadRaceFd, &data);
+ ASSERT_EQ(myHandler.addTimer(timeoutTime, &timer.pTimerCallback, handle, &userData, true), E_OK);
+ EXPECT_CALL(timer, timerCallback(_,&userData)).Times(AnyNumber());
- myHandler.start_listenting();
+ // for some simple fd tests
+ CAmTimerSockethandlerController endCallback(&myHandler, endTime);
+ ASSERT_EQ(myHandler.addTimer(endTime, &endCallback.pTimerCallback, handle, NULL), E_OK);
+ EXPECT_CALL(endCallback,timerCallback(handle,NULL)).Times(Exactly(1));
- pthread_join(tid2, NULL);
- pthread_join(tid1, NULL);
- }
+ myHandler.start_listenting();
- // now do the check
- for (auto& hndl : handlesCheckup)
- EXPECT_EQ(myHandler.removeFDPoll(hndl), E_UNKNOWN) << "Handle " << hndl << " not correctly removed before";
+ ASSERT_FALSE(myHandler.fatalErrorOccurred());
}
TEST(CAmSocketHandlerTest, timersOneshot)
@@ -614,6 +575,8 @@ TEST(CAmSocketHandlerTest, timersStressTest)
{
delete timer, timer = NULL;
}
+
+ EXPECT_FALSE(myHandler.fatalErrorOccurred());
}