diff options
author | Jens Lorenz <jlorenz@de.adit-jv.com> | 2018-04-04 09:47:25 +0200 |
---|---|---|
committer | Jens Lorenz <jlorenz@de.adit-jv.com> | 2018-04-09 17:00:47 +0200 |
commit | 29b816429d141584af128256545ca0dc96ce0be3 (patch) | |
tree | 1a92719a70418f91284174eb0c079018c65c8bfa | |
parent | e9240071f34ae96f72f4befd19f6fa68cc721ad1 (diff) | |
download | audiomanager-29b816429d141584af128256545ca0dc96ce0be3.tar.gz |
AMUtil: Rework of socketHandler to avoid calls of invalidated objects
This patch tries to follow the idea raised in PR26.
Following two patches have been reworked:
commit: cfe0e77aaf87a0590ceea42f6afa62b0c7d95e80
commit: bc33226f59910a960f62d419ba10d4ea761e3724
The biggest change applies to the internal database. Instead of having a
vector for all items which will be copied inside the worker thread the new
approach aims a central map which allows to store the sh_poll elements in
containers. By this a container is valid until it is remove from map.
The remove of items inside a map is now centralized within the worker and
only the worker is responsible to keep the ppoll list and the map in sync.
This patch also extends the unit tests to stress different timer scenarios.
Signed-off-by: Aleksandar Donchev <Aleksander.Donchev@partner.bmw.de>
Signed-off-by: Jens Lorenz <jlorenz@de.adit-jv.com>
4 files changed, 505 insertions, 219 deletions
diff --git a/AudioManagerUtilities/include/CAmSocketHandler.h b/AudioManagerUtilities/include/CAmSocketHandler.h index 5465839..6564d7f 100644 --- a/AudioManagerUtilities/include/CAmSocketHandler.h +++ b/AudioManagerUtilities/include/CAmSocketHandler.h @@ -215,6 +215,14 @@ public: */ class CAmSocketHandler { + typedef enum:uint8_t + { + UNINIT = 0u, // new, uninitialized element which needs to be inserted to ppoll array + VALID = 1u, // it is a valid element in ppoll array + UPDATE = 2u, // update of event information therefore update ppoll array + REMOVE = 3u // remove from ppoll array and internal map + } poll_states_e; + struct sh_poll_s //!<struct that holds information about polls { sh_pollHandle_t handle; //!<handle to uniquely adress a filedesriptor @@ -224,9 +232,10 @@ class CAmSocketHandler std::function<bool(const sh_pollHandle_t handle, void* userData)> checkCB; //check callback std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatchCB; //dispatch callback void* userData; + poll_states_e state; sh_poll_s() : - handle(0), pollfdValue(), prepareCB(), firedCB(), checkCB(), dispatchCB(), userData(0) + handle(0), pollfdValue(), prepareCB(), firedCB(), checkCB(), dispatchCB(), userData(0), state(UNINIT) {} }; @@ -271,8 +280,8 @@ class CAmSocketHandler }; typedef std::reverse_iterator<sh_timer_s> rListTimerIter; //!<typedef for reverseiterator on timer lists - typedef std::vector<pollfd> VectorListPollfd_t; //!<vector of filedescriptors - typedef std::vector<sh_poll_s> VectorListPoll_t; //!<list for the callbacks + typedef std::vector<pollfd> VectorPollfd_t; //!<vector of filedescriptors + typedef std::map<int, sh_poll_s> MapShPoll_t; //!<list for the callbacks typedef std::vector<sh_signal_s> VectorSignalHandlers_t; //!<list for the callbacks typedef enum:uint8_t @@ -284,9 +293,9 @@ class CAmSocketHandler int mEventFd; bool mDispatchDone; //this starts / stops the mainloop + MapShPoll_t mMapShPoll; //!<list that holds all information for the ppoll sh_identifier_s mSetPollKeys; //!A set of all used ppoll keys - VectorListPoll_t mListPoll; //!<list that holds all information for the ppoll sh_identifier_s mSetTimerKeys; //!A set of all used timer keys std::list<sh_timer_s> mListTimer; //!<list of all timers #ifndef WITH_TIMERFD @@ -294,19 +303,18 @@ class CAmSocketHandler #endif sh_identifier_s mSetSignalhandlerKeys; //!A set of all used signal handler keys VectorSignalHandlers_t mSignalHandlers; - bool mRecreatePollfds; //!<when this is true, the poll list needs to be recreated internal_codes_t mInternalCodes; sh_pollHandle_t mSignalFdHandle; #ifndef WITH_TIMERFD timespec mStartTime; //!<here the actual time is saved for timecorrection #endif + private: bool fdIsValid(const int fd) const; timespec* insertTime(timespec& buffertime); #ifdef WITH_TIMERFD am_Error_e createTimeFD(const itimerspec & timeouts, int & fd); - #else void timerUp(); void timerCorrection(); @@ -408,28 +416,21 @@ private: * @param a * @return */ - inline static void fire(sh_poll_s& a); - - /** - * functor to return all fired events - * @param a - * @return - */ - inline static bool eventFired(const pollfd& a); + inline static void fire(const sh_poll_s& a); /** * functor to help find the items that do not need dispatching * @param a * @return */ - inline static bool noDispatching(const sh_poll_s& a); + inline static bool noDispatching(const sh_poll_s* a); /** * checks if dispatching is already finished * @param a * @return */ - inline static bool dispatchingFinished(const sh_poll_s& a); + inline static bool dispatchingFinished(const sh_poll_s* a); /** * timer fire callback diff --git a/AudioManagerUtilities/src/CAmSocketHandler.cpp b/AudioManagerUtilities/src/CAmSocketHandler.cpp index ba36363..7e12fc0 100644 --- a/AudioManagerUtilities/src/CAmSocketHandler.cpp +++ b/AudioManagerUtilities/src/CAmSocketHandler.cpp @@ -47,7 +47,7 @@ CAmSocketHandler::CAmSocketHandler() : mEventFd(-1), // mDispatchDone(true), // mSetPollKeys(MAX_POLLHANDLE), // - mListPoll(), // + mMapShPoll(), // mSetTimerKeys(MAX_TIMERHANDLE), mListTimer(), // #ifndef WITH_TIMERFD @@ -55,7 +55,6 @@ CAmSocketHandler::CAmSocketHandler() : #endif mSetSignalhandlerKeys(MAX_POLLHANDLE), // mSignalHandlers(), // - mRecreatePollfds(true), mInternalCodes(internal_codes_e::NO_ERROR), mSignalFdHandle(0) #ifndef WITH_TIMERFD @@ -71,6 +70,12 @@ CAmSocketHandler::CAmSocketHandler() : if (bytes == sizeof(events)) { if (events == UINT64_MAX-1) + { + for (auto & elem : mMapShPoll) + { + if (elem.second.state != poll_states_e::REMOVE) + elem.second.state = poll_states_e::UNINIT; + } mDispatchDone = true; return; } @@ -98,12 +103,9 @@ CAmSocketHandler::CAmSocketHandler() : CAmSocketHandler::~CAmSocketHandler() { - for (auto it : mListPoll) + for (const auto& it : mMapShPoll) { - // This check is needed to ensure that fd wasn't closed - // e.g. in the communication plugin's dtor's. - if (fdIsValid(mEventFd)) - close(it.pollfdValue.fd); + close(it.second.pollfdValue.fd); } } @@ -114,80 +116,78 @@ CAmSocketHandler::~CAmSocketHandler() void CAmSocketHandler::start_listenting() { mDispatchDone = false; - int16_t pollStatus; #ifndef WITH_TIMERFD clock_gettime(CLOCK_MONOTONIC, &mStartTime); #endif timespec buffertime; - VectorListPoll_t cloneListPoll; - VectorListPoll_t::iterator listmPollIt; - VectorListPollfd_t fdPollingArray; //!<the polling array for ppoll - - auto preparePollfd = [&](const sh_poll_s& row) - { - CAmSocketHandler::prepare((sh_poll_s&)row); - pollfd temp = row.pollfdValue; - temp.revents = 0; - fdPollingArray.push_back(temp); - }; + VectorPollfd_t fdPollingArray; //!<the polling array for ppoll while (!mDispatchDone) { - if (mRecreatePollfds) + /* Iterate all times through map and synchronize the polling array accordingly. + * In case a new element in map appears the polling array will be extended and + * in case an element gets removed the map and the polling array needs to be adapted. + */ + auto fdPollIt = fdPollingArray.begin(); + for (auto it = mMapShPoll.begin(); it != mMapShPoll.end(); ) { - fdPollingArray.clear(); - //freeze mListPoll by copying it - otherwise we get problems when we want to manipulate it during the next lines - cloneListPoll = mListPoll; - //there was a change in the setup, so we need to recreate the fdarray from the list - std::for_each(cloneListPoll.begin(), cloneListPoll.end(), preparePollfd); - mRecreatePollfds = false; - } - else - { - //first we go through the registered filedescriptors and check if someone needs preparation: - std::for_each(cloneListPoll.begin(), cloneListPoll.end(), CAmSocketHandler::prepare); + // NOTE: The order of the switch/case statement is important and should modified with care + auto& elem = it->second; + switch (elem.state) + { + case poll_states_e::REMOVE: + close(elem.pollfdValue.fd); + fdPollIt = fdPollingArray.erase(fdPollIt); + it = mMapShPoll.erase(it); + continue; + + case poll_states_e::UNINIT: + fdPollIt = fdPollingArray.emplace(fdPollIt); + // fallthrough + + case poll_states_e::UPDATE: + CAmSocketHandler::prepare(elem); + elem.state = poll_states_e::VALID; + *fdPollIt = elem.pollfdValue; + break; + + case poll_states_e::VALID: + default: + break; + } + // Ensures that fdPollIt will be never on its end before incrementing it further! + assert(fdPollIt != fdPollingArray.end()); + ++fdPollIt; + ++it; } #ifndef WITH_TIMERFD timerCorrection(); #endif - //block until something is on a filedescriptor - if ((pollStatus = ppoll(&fdPollingArray[0], fdPollingArray.size(), insertTime(buffertime), NULL)) < 0) + // block until something is on a file descriptor + int16_t pollStatus = ppoll(&fdPollingArray[0], fdPollingArray.size(), insertTime(buffertime), NULL); + if (pollStatus > 0) { - if (errno == EINTR) - { - //a signal was received, that means it's time to go... - pollStatus = 0; - } - else + // stage 0+1, call firedCB + std::list<sh_poll_s*> listPoll; + for (auto& it : fdPollingArray) { - logError("SocketHandler::start_listenting ppoll returned with error", errno); - throw std::runtime_error(std::string("SocketHandler::start_listenting ppoll returned with error.")); - } - } - - if (pollStatus != 0) //only check filedescriptors if there was a change - { - std::list<sh_poll_s> listPoll; - //todo: here could be a timer that makes sure naughty plugins return! - //stage 0+1, call firedCB - listmPollIt = cloneListPoll.begin(); - for (auto it : fdPollingArray) - { - if (CAmSocketHandler::eventFired(it)) - { - listmPollIt->pollfdValue.revents = it.revents; - listPoll.push_back(*listmPollIt); - CAmSocketHandler::fire(*listmPollIt); - } - else - { - listmPollIt->pollfdValue.revents = 0; - } - listmPollIt++; + it.revents &= it.events; + if (it.revents == 0) + continue; + + sh_poll_s& pollObj = mMapShPoll.at(it.fd); + if (pollObj.state != poll_states_e::VALID) + continue; + + // ensure to copy the revents fired in fdPollingArray + pollObj.pollfdValue.revents = it.revents; + listPoll.push_back(&pollObj); + CAmSocketHandler::fire(pollObj); + it.revents = 0; } //stage 2, lets ask around if some dispatching is necessary, the ones who need stay on the list @@ -198,16 +198,20 @@ void CAmSocketHandler::start_listenting() { listPoll.remove_if(CAmSocketHandler::dispatchingFinished); } while (!listPoll.empty()); - } -#ifndef WITH_TIMERFD + else if ((pollStatus < 0) && (errno != EINTR)) + { + logError("SocketHandler::start_listenting ppoll returned with error", errno); + throw std::runtime_error(std::string("SocketHandler::start_listenting ppoll returned with error.")); + } else //Timerevent { +#ifndef WITH_TIMERFD //this was a timer event, we need to take care about the timers //find out the timedifference to starttime timerUp(); - } #endif + } } } @@ -253,14 +257,13 @@ bool CAmSocketHandler::fatalErrorOccurred() am_Error_e CAmSocketHandler::getFDPollData(const sh_pollHandle_t handle, sh_poll_s & outPollData) { - VectorListPoll_t::iterator iterator = mListPoll.begin(); - for (; iterator != mListPoll.end(); ++iterator) + for (auto it : mMapShPoll) { - if (iterator->handle == handle) - { - outPollData = *iterator; - return (E_OK); - } + if (it.second.handle != handle) + continue; + + outPollData = it.second; + return (E_OK); } return (E_UNKNOWN); } @@ -380,20 +383,35 @@ am_Error_e CAmSocketHandler::listenToSignals(const std::vector<uint8_t> & listSi * @param dispatch a std::function that is called to dispatch the received data * @param userData a pointer to userdata that is always passed around * @param handle the handle of this poll - * @return E_OK if the descriptor was added, E_NON_EXISTENT if the fd is not valid + * @return E_OK if the descriptor was added + * E_NON_EXISTENT if the fd is not valid + * E_ALREADY_EXISTS if the fd is already known + * E_NOT_POSSIBLE if the maximum handle threshold is reached */ -am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, std::function<void(const sh_pollHandle_t handle, void* userData)> prepare, - std::function<void(const pollfd pollfd, const sh_pollHandle_t handle, void* userData)> fired, std::function<bool(const sh_pollHandle_t handle, void* userData)> check, - std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatch, void* userData, sh_pollHandle_t& handle) +am_Error_e CAmSocketHandler::addFDPoll(const int fd, + const short event, + std::function<void(const sh_pollHandle_t handle, void* userData)> prepare, + std::function<void(const pollfd pollfd, const sh_pollHandle_t handle, void* userData)> fired, + std::function<bool(const sh_pollHandle_t handle, void* userData)> check, + std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatch, + void* userData, + sh_pollHandle_t& handle) { if (!fdIsValid(fd)) - return (E_NON_EXISTENT); + return E_NON_EXISTENT; + + const auto elem = mMapShPoll.find(fd); + if (elem != mMapShPoll.end() && elem->second.state != poll_states_e::REMOVE) + { + logError("CAmSocketHandler::addFDPoll fd", fd, "already registered!"); + return E_ALREADY_EXISTS; + } //create a new handle for the poll if (!nextHandle(mSetPollKeys)) { - logError("Could not create new polls, too many open!"); + logError("CAmSocketHandler::addFDPoll Max handle count reached!"); return (E_NOT_POSSIBLE); } @@ -407,10 +425,9 @@ am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, std::fun pollData.checkCB = check; pollData.dispatchCB = dispatch; pollData.userData = userData; - //add new data to the list - mListPoll.push_back(pollData); - mRecreatePollfds = true; + //add new data to the list + mMapShPoll[fd] = pollData; handle = pollData.handle; return (E_OK); @@ -427,15 +444,18 @@ am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, std::fun * @param dispatch a callback that is called to dispatch the received data * @param userData a pointer to userdata that is always passed around * @param handle the handle of this poll - * @return E_OK if the descriptor was added, E_NON_EXISTENT if the fd is not valid + * @return E_OK if the descriptor was added + * E_NON_EXISTENT if the fd is not valid + * E_ALREADY_EXISTS if the fd is already known + * E_NOT_POSSIBLE if the maximum handle threshold is reached */ am::am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, IAmShPollPrepare *prepare, IAmShPollFired *fired, IAmShPollCheck *check, IAmShPollDispatch *dispatch, void *userData, sh_pollHandle_t & handle) { - std::function<void(const sh_pollHandle_t handle, void* userData)> prepareCB; //preperation callback - std::function<void(const pollfd poll, const sh_pollHandle_t handle, void* userData)> firedCB; //fired callback - std::function<bool(const sh_pollHandle_t handle, void* userData)> checkCB; //check callback - std::function<bool(const sh_pollHandle_t handle, void* userData)> dispatchCB; //check callback + std::function<void(const sh_pollHandle_t, void*)> prepareCB; //preperation callback + std::function<void(const pollfd, const sh_pollHandle_t, void*)> firedCB; //fired callback + std::function<bool(const sh_pollHandle_t, void*)> checkCB; //check callback + std::function<bool(const sh_pollHandle_t, void*)> dispatchCB; //check callback if (prepare) prepareCB = std::bind(&IAmShPollPrepare::Call, prepare, std::placeholders::_1, std::placeholders::_2); @@ -456,19 +476,25 @@ am::am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, IAmS */ am_Error_e CAmSocketHandler::removeFDPoll(const sh_pollHandle_t handle) { - VectorListPoll_t::iterator iterator = mListPoll.begin(); - - for (; iterator != mListPoll.end(); ++iterator) + for (auto& it : mMapShPoll) { - if (iterator->handle == handle) + if (it.second.handle == handle) { - iterator = mListPoll.erase(iterator); + it.second.state = poll_states_e::REMOVE; + + static uint64_t events(1); + if (write(mEventFd, &(++events), sizeof(events)) < 0) + { + std::ostringstream msg("CAmSocketHandler::removeFDPoll "); + msg << "Failed to write to event fd: " << mEventFd << " errno: " << std::strerror(errno); + throw std::runtime_error(msg.str()); + } mSetPollKeys.pollHandles.erase(handle); - mRecreatePollfds = true; - return (E_OK); + return E_OK; } } - return (E_UNKNOWN); + logError("CAmSocketHandler::removeFDPoll handle unknown", handle); + return E_UNKNOWN; } /** @@ -618,7 +644,13 @@ am_Error_e CAmSocketHandler::addTimer(const timespec & timeouts, std::function<v { timerItem.handle = handle; mListTimer.push_back(timerItem); + return E_OK; } + + // E_NOT_POSSIBLE is the only case were we need to close the timer + if (err == E_NOT_POSSIBLE) + close(timerItem.fd); + return err; #endif @@ -877,14 +909,14 @@ am_Error_e CAmSocketHandler::stopTimer(const sh_timerHandle_t handle) */ am_Error_e CAmSocketHandler::updateEventFlags(const sh_pollHandle_t handle, const short events) { - VectorListPoll_t::iterator iterator = mListPoll.begin(); - - for (; iterator != mListPoll.end(); ++iterator) + for (auto& it : mMapShPoll) { - if (iterator->handle == handle) + auto& elem = it.second; + if (elem.handle == handle && elem.state != poll_states_e::REMOVE) { - iterator->pollfdValue.events = events; - mRecreatePollfds = true; + elem.pollfdValue.events = events; + elem.pollfdValue.revents = 0; + elem.state = poll_states_e::UPDATE; return (E_OK); } } @@ -1003,7 +1035,7 @@ void CAmSocketHandler::prepare(am::CAmSocketHandler::sh_poll_s& row) /** * fire callback */ -void CAmSocketHandler::fire(sh_poll_s& a) +void CAmSocketHandler::fire(const sh_poll_s& a) { try { @@ -1017,31 +1049,24 @@ void CAmSocketHandler::fire(sh_poll_s& a) /** * should disptach */ -bool CAmSocketHandler::noDispatching(const sh_poll_s& a) +bool CAmSocketHandler::noDispatching(const sh_poll_s* a) { //remove from list of there is no checkCB - if (nullptr == a.checkCB) + if (nullptr == a->checkCB || a->state != poll_states_e::VALID) return (true); - return (!a.checkCB(a.handle, a.userData)); + return (!a->checkCB(a->handle, a->userData)); } /** * disptach */ -bool CAmSocketHandler::dispatchingFinished(const sh_poll_s& a) +bool CAmSocketHandler::dispatchingFinished(const sh_poll_s* a) { //remove from list of there is no dispatchCB - if (nullptr == a.dispatchCB) + if (nullptr == a->dispatchCB || a->state != poll_states_e::VALID) return (true); - return (!a.dispatchCB(a.handle, a.userData)); -} -/** - * event triggered - */ -bool CAmSocketHandler::eventFired(const pollfd& a) -{ - return (a.revents == 0 ? false : true); + return (!a->dispatchCB(a->handle, a->userData)); } /** diff --git a/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp b/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp index 7e059a8..bd4aee4 100644 --- a/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp +++ b/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp @@ -30,15 +30,17 @@ #include <fcntl.h> #include <sys/un.h> #include <sys/poll.h> - +#include "CAmDltWrapper.h" #include "CAmSocketHandler.h" -//todo: expand test, implement more usecases -//todo: test removeFD + +#undef ENABLED_SOCKETHANDLER_TEST_OUTPUT +#undef ENABLED_TIMERS_TEST_OUTPUT #define SOCK_PATH "/tmp/mysock" -#define SOCKET_TEST_LOOPS_COUNT 1000 +#define SOCKET_TEST_LOOPS_COUNT 50 +#define TIMERS_TO_TEST 100 using namespace testing; using namespace am; @@ -48,6 +50,11 @@ static const char * TEST_SOCKET_DATA_FINAL = "finish!"; static const std::chrono::time_point<std::chrono::high_resolution_clock> TP_ZERO; +struct TestUserData +{ + int i; + float f; +}; MockIAmSignalHandler *pMockSignalHandler = NULL; static void signalHandler(int sig, siginfo_t *siginfo, void *context) @@ -149,6 +156,41 @@ void am::CAmTimer::timerCallback(sh_timerHandle_t handle, void* userData) } } +CAmTimerStressTest::CAmTimerStressTest(CAmSocketHandler *myHandler, const timespec &timeout, const int32_t repeats) : + MockIAmTimerCb(), mpSocketHandler(myHandler), mUpdateTimeout(timeout), pTimerCallback(this, &CAmTimerStressTest::timerCallback), mRepeats(repeats), mHandle(0) +{ +} + +am::CAmTimerStressTest::~CAmTimerStressTest() +{ +} + +void am::CAmTimerStressTest::timerCallback(sh_timerHandle_t handle, void* pUserData) +{ + mpSocketHandler->removeTimer(handle); + MockIAmTimerCb::timerCallback(handle, pUserData); + sh_timerHandle_t handle1; + mpSocketHandler->addTimer(mUpdateTimeout, &pTimerCallback, handle1, &(*((TestUserData*)pUserData)), true); +} + +CAmTimerStressTest2::CAmTimerStressTest2(CAmSocketHandler *myHandler, const timespec &timeout, const int32_t repeats) : + MockIAmTimerCb(), mpSocketHandler(myHandler), mUpdateTimeout(timeout), pTimerCallback(this, &CAmTimerStressTest2::timerCallback), mRepeats(repeats), mHandle(0) +{ +} + +am::CAmTimerStressTest2::~CAmTimerStressTest2() +{ +} + +void am::CAmTimerStressTest2::timerCallback(sh_timerHandle_t handle, void* pUserData) +{ + #ifdef ENABLED_SOCKETHANDLER_TEST_OUTPUT + std::cout<<"timerCallback handle=" << handle <<std::endl; + #endif + MockIAmTimerCb::timerCallback(handle, pUserData); +} + + CAmTimerMeasurment::CAmTimerMeasurment(CAmSocketHandler *myHandler, const timespec &timeout, const std::string & label, const int32_t repeats, void * userData) : MockIAmTimerCb() , pTimerCallback(this, &CAmTimerMeasurment::timerCallback) @@ -200,18 +242,107 @@ void am::CAmTimerMeasurment::timerCallback(sh_timerHandle_t handle, void* userDa } } +void* sendTestData(int sock, const struct sockaddr* servAddr, unsigned int size, __useconds_t time = 0u) +{ + int ret = connect(sock, servAddr, size); + if (ret < 0) + { + std::cerr << "ERROR: connect() failed\n" << std::endl; + return (NULL); + } + + for (int i = 1; i < SOCKET_TEST_LOOPS_COUNT; i++) + { + std::string string(TEST_SOCKET_DATA); + send(sock, string.c_str(), string.size(), 0); + if (time) + usleep(time); + } + std::string string(TEST_SOCKET_DATA_FINAL); + send(sock, string.c_str(), string.size(), 0); +} + void* playWithSocketServer(void* data) { - CAmSocketHandler *pSockethandler = (CAmSocketHandler*) data; - pSockethandler->start_listenting(); - return (NULL); + int sock = *((int*)data); + struct sockaddr_in servAddr; + unsigned short servPort = 6060; + struct hostent *host; + + if ((host = (struct hostent*) gethostbyname("localhost")) == 0) + { + std::cout << "ERROR: gethostbyname() failed\n" << std::endl; + exit(1); + } + + memset(&servAddr, 0, sizeof(servAddr)); + servAddr.sin_family = AF_INET; + servAddr.sin_addr.s_addr = inet_addr(inet_ntoa(*(struct in_addr*) (host->h_addr_list[0]))); + servAddr.sin_port = htons(servPort); + sleep(1); + + return sendTestData(sock, (struct sockaddr*)&servAddr, sizeof(servAddr)); } void* playWithUnixSocketServer(void* data) { - CAmSocketHandler *pSockethandler = (CAmSocketHandler*) data; - pSockethandler->start_listenting(); - return (NULL); + int sock = *((int*)data); + struct sockaddr_un servAddr; + memset(&servAddr, 0, sizeof(servAddr)); + strcpy(servAddr.sun_path, SOCK_PATH); + servAddr.sun_family = AF_UNIX; + sleep(1); + + return sendTestData(sock, (struct sockaddr*)&servAddr, sizeof(servAddr)); +} + +void* threadCallbackUnixSocketAndTimers(void* data) +{ + int sock = *((int*)data); + struct sockaddr_un servAddr; + memset(&servAddr, 0, sizeof(servAddr)); + strcpy(servAddr.sun_path, SOCK_PATH); + servAddr.sun_family = AF_UNIX; + sleep(1); + + return sendTestData(sock, (struct sockaddr*)&servAddr, sizeof(servAddr), 500000); +} + +TEST(CAmSocketHandlerTest, stressTestUnixSocketAndTimers) +{ + + pthread_t serverThread; + + int socket_; + + CAmSocketHandler myHandler; + ASSERT_FALSE(myHandler.fatalErrorOccurred()); + CAmSamplePluginStressTest::sockType_e type = CAmSamplePlugin::UNIX; + CAmSamplePluginStressTest myplugin(&myHandler, type); + + EXPECT_CALL(myplugin,receiveData(Field(&pollfd::revents, Eq(POLL_IN)),_,_)).Times(Exactly(SOCKET_TEST_LOOPS_COUNT)); + EXPECT_CALL(myplugin,dispatchData(_,_)).Times(Exactly(SOCKET_TEST_LOOPS_COUNT)); + EXPECT_CALL(myplugin,check(_,_)).Times(Exactly(SOCKET_TEST_LOOPS_COUNT)); + + for(int i=0;i<myplugin.getTimers().size();i++) + { + EXPECT_CALL(*myplugin.getTimers()[i],timerCallback(_,_)).Times(AnyNumber()); + } + + + if ((socket_ = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) + { + std::cout << "socket problem" << std::endl; + } + ASSERT_GT(socket_, -1); + + //creates a thread that handles the serverpart + pthread_create(&serverThread, NULL, threadCallbackUnixSocketAndTimers, &socket_); + + myHandler.start_listenting(); + + pthread_join(serverThread, NULL); + shutdown(socket_, SHUT_RDWR); } TEST(CAmSocketHandlerTest, timersOneshot) @@ -341,6 +472,70 @@ TEST(CAmSocketHandlerTest, timersGeneral) myHandler.start_listenting(); } +TEST(CAmSocketHandlerTest, timersStressTest) +{ + CAmSocketHandler myHandler; + ASSERT_FALSE(myHandler.fatalErrorOccurred()); + + sh_timerHandle_t handle; + TestUserData userData; + userData.i = 1; + userData.f = 1.f; + + timespec timeout4; + timeout4.tv_nsec = 0; + timeout4.tv_sec = 60; + + timespec timeoutTime; + timeoutTime.tv_sec = 0; + timeoutTime.tv_nsec = 10000000;// 0,01 + + std::vector<CAmTimerStressTest*> timers(TIMERS_TO_TEST, NULL); + + for (auto & timer: timers) + { + timer = new CAmTimerStressTest(&myHandler, timeoutTime, 0); + ASSERT_EQ(myHandler.addTimer(timeoutTime, &(timer->pTimerCallback), handle, &userData, true), E_OK); + EXPECT_CALL(*timer, timerCallback(_,&userData)).Times(AnyNumber()); + } + + timespec timeoutTime11, timeout12, timeout13; + timeoutTime11.tv_sec = 1; + timeoutTime11.tv_nsec = 34000000; + CAmTimerMeasurment testCallback11(&myHandler, timeoutTime11, "repeated 1", std::numeric_limits<int32_t>::max()); + + timeout12.tv_sec = 0; + timeout12.tv_nsec = 100000000; + CAmTimerMeasurment testCallback12(&myHandler, timeout12, "repeated 2", std::numeric_limits<int32_t>::max()); + + timeout13.tv_sec = 3; + timeout13.tv_nsec = 333000000; + CAmTimerMeasurment testCallback13(&myHandler, timeout13, "oneshot 3"); + + ASSERT_EQ(myHandler.addTimer(timeoutTime11, &testCallback11.pTimerCallback, handle, NULL, true), E_OK); + EXPECT_CALL(testCallback11,timerCallback(_,NULL)).Times(AnyNumber()); + + ASSERT_EQ(myHandler.addTimer(timeout12, &testCallback12.pTimerCallback, handle, NULL, true), E_OK); + EXPECT_CALL(testCallback12,timerCallback(_,NULL)).Times(AnyNumber()); + + ASSERT_EQ(myHandler.addTimer(timeout13, &testCallback13.pTimerCallback, handle, NULL), E_OK); + EXPECT_CALL(testCallback13,timerCallback(_,NULL)).Times(Exactly(1)); + + + CAmTimerSockethandlerController testCallback4(&myHandler, timeout4); + + ASSERT_EQ(myHandler.addTimer(timeout4, &testCallback4.pTimerCallback, handle, NULL), E_OK); + + EXPECT_CALL(testCallback4,timerCallback(_,NULL)).Times(Exactly(1)); + myHandler.start_listenting(); + + for (auto & timer: timers) + { + delete timer, timer = NULL; + } +} + + TEST(CAmSocketHandlerTest,playWithTimers) { CAmSocketHandler myHandler; @@ -348,15 +543,16 @@ TEST(CAmSocketHandlerTest,playWithTimers) timespec timeoutTime, timeout2, timeout3, timeout4; timeoutTime.tv_sec = 1; timeoutTime.tv_nsec = 34000000; - CAmTimerMeasurment testCallback1(&myHandler, timeoutTime, "repeatedCallback 1", std::numeric_limits<int32_t>::max()); + CAmTimerMeasurment testCallback1(&myHandler, timeoutTime, "repeated 1", std::numeric_limits<int32_t>::max()); timeout2.tv_nsec = 2000000; timeout2.tv_sec = 0; - CAmTimerMeasurment testCallback2(&myHandler, timeout2, "repeatedCallback 2", std::numeric_limits<int32_t>::max()); + CAmTimerMeasurment testCallback2(&myHandler, timeout2, "repeated 2", std::numeric_limits<int32_t>::max()); timeout3.tv_nsec = 333000000; timeout3.tv_sec = 3; - CAmTimerMeasurment testCallback3(&myHandler, timeout3, "oneshotCallback 3"); + CAmTimerMeasurment testCallback3(&myHandler, timeout3, "oneshot 3"); + timeout4.tv_nsec = 0; timeout4.tv_sec = 8; CAmTimerSockethandlerController testCallback4(&myHandler, timeout4); @@ -479,94 +675,52 @@ TEST(CAmSocketHandlerTest,playWithUNIXSockets) ASSERT_FALSE(myHandler.fatalErrorOccurred()); CAmSamplePlugin::sockType_e type = CAmSamplePlugin::UNIX; CAmSamplePlugin myplugin(&myHandler, type); + ASSERT_TRUE(myplugin.isSocketOpened()); EXPECT_CALL(myplugin,receiveData(Field(&pollfd::revents, Eq(POLL_IN)),_,_)).Times(Exactly(SOCKET_TEST_LOOPS_COUNT)); EXPECT_CALL(myplugin,dispatchData(_,_)).Times(Exactly(SOCKET_TEST_LOOPS_COUNT)); EXPECT_CALL(myplugin,check(_,_)).Times(Exactly(SOCKET_TEST_LOOPS_COUNT)); - //creates a thread that handles the serverpart - pthread_create(&serverThread, NULL, playWithUnixSocketServer, &myHandler); - - sleep(1); //we need that here because the port needs to be opened if ((socket_ = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) { std::cout << "socket problem" << std::endl; - - } - - memset(&servAddr, 0, sizeof(servAddr)); - strcpy(servAddr.sun_path, SOCK_PATH); - servAddr.sun_family = AF_UNIX; - if (connect(socket_, (struct sockaddr *) &servAddr, sizeof(servAddr)) < 0) - { - std::cout << "ERROR: connect() failed\n" << std::endl; } + ASSERT_GT(socket_, -1); + //creates a thread that handles the serverpart + pthread_create(&serverThread, NULL, playWithUnixSocketServer, &socket_); - for (int i = 1; i < SOCKET_TEST_LOOPS_COUNT; i++) - { - std::string stringToSend(TEST_SOCKET_DATA); - send(socket_, stringToSend.c_str(), stringToSend.size(), 0); - } - std::string stringToSend(TEST_SOCKET_DATA_FINAL); - send(socket_, stringToSend.c_str(), stringToSend.size(), 0); + myHandler.start_listenting(); pthread_join(serverThread, NULL); - + shutdown(socket_, SHUT_RDWR); } TEST(CAmSocketHandlerTest,playWithSockets) { pthread_t serverThread; - struct sockaddr_in servAddr; - unsigned short servPort = 6060; - struct hostent *host; int socket_; CAmSocketHandler myHandler; ASSERT_FALSE(myHandler.fatalErrorOccurred()); CAmSamplePlugin::sockType_e type = CAmSamplePlugin::INET; CAmSamplePlugin myplugin(&myHandler, type); - + ASSERT_TRUE(myplugin.isSocketOpened()); EXPECT_CALL(myplugin,receiveData(Field(&pollfd::revents, Eq(POLL_IN)),_,_)).Times(Exactly(SOCKET_TEST_LOOPS_COUNT)); EXPECT_CALL(myplugin,dispatchData(_,_)).Times(Exactly(SOCKET_TEST_LOOPS_COUNT)); EXPECT_CALL(myplugin,check(_,_)).Times(Exactly(SOCKET_TEST_LOOPS_COUNT)); - //creates a thread that handles the serverpart - pthread_create(&serverThread, NULL, playWithSocketServer, &myHandler); - - sleep(1); //we need that here because the port needs to be opened if ((socket_ = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { std::cout << "socket problem" << std::endl; - - } - - if ((host = (struct hostent*) gethostbyname("localhost")) == 0) - { - std::cout << "ERROR: gethostbyname() failed\n" << std::endl; - exit(1); } + ASSERT_GT(socket_, -1); + //creates a thread that handles the serverpart + pthread_create(&serverThread, NULL, playWithSocketServer, &socket_); - memset(&servAddr, 0, sizeof(servAddr)); - servAddr.sin_family = AF_INET; - servAddr.sin_addr.s_addr = inet_addr(inet_ntoa(*(struct in_addr*) (host->h_addr_list[0]))); - servAddr.sin_port = htons(servPort); - - if (connect(socket_, (struct sockaddr *) &servAddr, sizeof(servAddr)) < 0) - { - std::cout << "ERROR: connect() failed\n" << std::endl; - } - - for (int i = 1; i < SOCKET_TEST_LOOPS_COUNT; i++) - { - std::string string(TEST_SOCKET_DATA); - send(socket_, string.c_str(), string.size(), 0); - } - std::string string(TEST_SOCKET_DATA_FINAL); - send(socket_, string.c_str(), string.size(), 0); + myHandler.start_listenting(); pthread_join(serverThread, NULL); - + shutdown(socket_, SHUT_RDWR); } int main(int argc, char **argv) @@ -582,12 +736,12 @@ am::CAmSamplePlugin::CAmSamplePlugin(CAmSocketHandler *mySocketHandler, sockType sampleCheckCB(this, &CAmSamplePlugin::check), // mSocketHandler(mySocketHandler), // mConnecthandle(), // - mReceiveHandle(), // - msgList() + mReceiveHandle(), // + msgList(), + mSocket(-1) { int yes = 1; - int socketHandle; struct sockaddr_in servAddr; struct sockaddr_un unixAddr; unsigned int servPort = 6060; @@ -595,26 +749,30 @@ am::CAmSamplePlugin::CAmSamplePlugin(CAmSocketHandler *mySocketHandler, sockType switch (socketType) { case UNIX: - socketHandle = socket(AF_UNIX, SOCK_STREAM, 0); + mSocket = socket(AF_UNIX, SOCK_STREAM, 0); + if(mSocket==-1) + return; unixAddr.sun_family = AF_UNIX; strcpy(unixAddr.sun_path, SOCK_PATH); unlink(unixAddr.sun_path); - bind(socketHandle, (struct sockaddr *) &unixAddr, strlen(unixAddr.sun_path) + sizeof(unixAddr.sun_family)); + bind(mSocket, (struct sockaddr *) &unixAddr, strlen(unixAddr.sun_path) + sizeof(unixAddr.sun_family)); break; case INET: - socketHandle = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); - setsockopt(socketHandle, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); + mSocket = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); + if(mSocket==-1) + return; + setsockopt(mSocket, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); memset(&servAddr, 0, sizeof(servAddr)); servAddr.sin_family = AF_INET; servAddr.sin_addr.s_addr = INADDR_ANY; servAddr.sin_port = htons(servPort); - bind(socketHandle, (struct sockaddr *) &servAddr, sizeof(servAddr)); + bind(mSocket, (struct sockaddr *) &servAddr, sizeof(servAddr)); break; default: break; } - if (listen(socketHandle, 3) < 0) + if (listen(mSocket, 3) < 0) { #ifdef ENABLED_SOCKETHANDLER_TEST_OUTPUT std::cout << "listen ok" << std::endl; @@ -622,12 +780,12 @@ am::CAmSamplePlugin::CAmSamplePlugin(CAmSocketHandler *mySocketHandler, sockType } /* if */ int a = 1; - ioctl(socketHandle, FIONBIO, (char *) &a); - setsockopt(socketHandle, SOL_SOCKET, SO_KEEPALIVE, (char *) &a, sizeof(a)); + ioctl(mSocket, FIONBIO, (char *) &a); + setsockopt(mSocket, SOL_SOCKET, SO_KEEPALIVE, (char *) &a, sizeof(a)); short events = 0; events |= POLLIN; - mySocketHandler->addFDPoll(socketHandle, events, NULL, &connectFiredCB, NULL, NULL, NULL, mConnecthandle); + mySocketHandler->addFDPoll(mSocket, events, NULL, &connectFiredCB, NULL, NULL, NULL, mConnecthandle); #ifdef ENABLED_SOCKETHANDLER_TEST_OUTPUT std::cout << "setup server - listening" << std::endl; #endif @@ -714,3 +872,53 @@ bool am::CAmSamplePlugin::check(const sh_pollHandle_t handle, void *userData) return false; } +CAmSamplePluginStressTest::CAmSamplePluginStressTest(CAmSocketHandler *mySocketHandler, sockType_e socketType):CAmSamplePlugin(mySocketHandler,socketType) +, mTimers(TIMERS_TO_TEST, NULL) +{ + sh_timerHandle_t handle; + TestUserData userData; + userData.i = 1; + userData.f = 1.f; + timespec timeoutTime; + timeoutTime.tv_sec = 0; + timeoutTime.tv_nsec = 10000000;// 0,01 + for (auto & timer : mTimers) + { + timer = new CAmTimerStressTest2(mySocketHandler, timeoutTime, 0); + if (mySocketHandler->addTimer(timeoutTime, &(timer->pTimerCallback), handle, &userData, true) == E_OK); + timer->setHandle(handle); + + EXPECT_CALL(*timer, timerCallback(_,&userData)).Times(AnyNumber()); + } +} + +CAmSamplePluginStressTest::~CAmSamplePluginStressTest() +{ + for (auto & timer : mTimers) + { + delete timer, timer = NULL; + } +} + +void CAmSamplePluginStressTest::receiveData(const pollfd pollfd, const sh_pollHandle_t handle, void* userData) +{ + CAmSamplePlugin::receiveData(pollfd, handle, userData); + + sh_timerHandle_t handle1; + for (auto & timer : mTimers) + { + ASSERT_EQ(mSocketHandler->removeTimer(timer->getHandle()), E_OK); + ASSERT_EQ(mSocketHandler->addTimer(timer->getUpdateTimeout(), &(timer->pTimerCallback), handle1, NULL, true), E_OK); + timer->setHandle(handle1); + } +} + +bool CAmSamplePluginStressTest::dispatchData(const sh_pollHandle_t handle, void* userData) +{ + return CAmSamplePlugin::dispatchData( handle, userData); +} + +bool CAmSamplePluginStressTest::check(const sh_pollHandle_t handle, void* userData) +{ + return CAmSamplePlugin::check( handle, userData); +} diff --git a/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.h b/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.h index a274605..93620b5 100644 --- a/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.h +++ b/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.h @@ -101,23 +101,21 @@ namespace am UNIX, INET }; CAmSamplePlugin(CAmSocketHandler *mySocketHandler, sockType_e socketType); - ~CAmSamplePlugin() - { - } - ; + virtual ~CAmSamplePlugin() { } void connectSocket(const pollfd pollfd, const sh_pollHandle_t handle, void* userData); - void receiveData(const pollfd pollfd, const sh_pollHandle_t handle, void* userData); - bool dispatchData(const sh_pollHandle_t handle, void* userData); - bool check(const sh_pollHandle_t handle, void* userData); + virtual void receiveData(const pollfd pollfd, const sh_pollHandle_t handle, void* userData); + virtual bool dispatchData(const sh_pollHandle_t handle, void* userData); + virtual bool check(const sh_pollHandle_t handle, void* userData); TAmShPollFired<CAmSamplePlugin> connectFiredCB; TAmShPollFired<CAmSamplePlugin> receiveFiredCB; TAmShPollDispatch<CAmSamplePlugin> sampleDispatchCB; TAmShPollCheck<CAmSamplePlugin> sampleCheckCB; - - private: + bool isSocketOpened() { return mSocket>-1; } + protected: CAmSocketHandler *mSocketHandler; sh_pollHandle_t mConnecthandle, mReceiveHandle; std::queue<std::string> msgList; + int mSocket; }; class CAmTimerSockethandlerController: public MockIAmTimerCb @@ -162,6 +160,46 @@ namespace am TAmShTimerCallBack<CAmTimer> pTimerCallback; }; + class CAmTimerStressTest: public MockIAmTimerCb + { + CAmSocketHandler *mpSocketHandler; + timespec mUpdateTimeout; + int32_t mRepeats; + int32_t mHandle; + public: + explicit CAmTimerStressTest(CAmSocketHandler *SocketHandler, const timespec &timeout, const int32_t repeats = 0u); + virtual ~CAmTimerStressTest(); + + int32_t getHandle() { return mHandle; } + void setHandle(const int32_t id) { mHandle=id; } + + timespec getUpdateTimeout( ) { return mUpdateTimeout; } + + void timerCallback(sh_timerHandle_t handle, void * userData); + + TAmShTimerCallBack<CAmTimerStressTest> pTimerCallback; + }; + + class CAmTimerStressTest2: public MockIAmTimerCb + { + CAmSocketHandler *mpSocketHandler; + timespec mUpdateTimeout; + int32_t mRepeats; + int32_t mHandle; + public: + explicit CAmTimerStressTest2(CAmSocketHandler *SocketHandler, const timespec &timeout, const int32_t repeats = 0u); + virtual ~CAmTimerStressTest2(); + + int32_t getHandle() { return mHandle; } + void setHandle(const int32_t id) { mHandle=id; } + + timespec getUpdateTimeout( ) { return mUpdateTimeout; } + + void timerCallback(sh_timerHandle_t handle, void * userData); + + TAmShTimerCallBack<CAmTimerStressTest2> pTimerCallback; + }; + class CAmTimerMeasurment: public MockIAmTimerCb { CAmSocketHandler *mSocketHandler; @@ -189,5 +227,19 @@ namespace am void TearDown(); }; + class CAmSamplePluginStressTest: public CAmSamplePlugin + { + std::vector<CAmTimerStressTest2*> mTimers; + public: + CAmSamplePluginStressTest(CAmSocketHandler *mySocketHandler, sockType_e socketType); + virtual ~CAmSamplePluginStressTest(); + + void receiveData(const pollfd pollfd, const sh_pollHandle_t handle, void* userData) final; + bool dispatchData(const sh_pollHandle_t handle, void* userData) final; + bool check(const sh_pollHandle_t handle, void* userData) final; + + std::vector<CAmTimerStressTest2*> & getTimers() { return mTimers; } + }; + } /* namespace am */ #endif /* SOCKETHANDLERTEST_H_ */ |