From 29b816429d141584af128256545ca0dc96ce0be3 Mon Sep 17 00:00:00 2001 From: Jens Lorenz Date: Wed, 4 Apr 2018 09:47:25 +0200 Subject: AMUtil: Rework of socketHandler to avoid calls of invalidated objects This patch tries to follow the idea raised in PR26. Following two patches have been reworked: commit: cfe0e77aaf87a0590ceea42f6afa62b0c7d95e80 commit: bc33226f59910a960f62d419ba10d4ea761e3724 The biggest change applies to the internal database. Instead of having a vector for all items which will be copied inside the worker thread the new approach aims a central map which allows to store the sh_poll elements in containers. By this a container is valid until it is remove from map. The remove of items inside a map is now centralized within the worker and only the worker is responsible to keep the ppoll list and the map in sync. This patch also extends the unit tests to stress different timer scenarios. Signed-off-by: Aleksandar Donchev Signed-off-by: Jens Lorenz --- AudioManagerUtilities/include/CAmSocketHandler.h | 33 +- AudioManagerUtilities/src/CAmSocketHandler.cpp | 255 +++++++------- .../AmSocketHandlerTest/CAmSocketHandlerTest.cpp | 366 ++++++++++++++++----- .../AmSocketHandlerTest/CAmSocketHandlerTest.h | 70 +++- 4 files changed, 505 insertions(+), 219 deletions(-) diff --git a/AudioManagerUtilities/include/CAmSocketHandler.h b/AudioManagerUtilities/include/CAmSocketHandler.h index 5465839..6564d7f 100644 --- a/AudioManagerUtilities/include/CAmSocketHandler.h +++ b/AudioManagerUtilities/include/CAmSocketHandler.h @@ -215,6 +215,14 @@ public: */ class CAmSocketHandler { + typedef enum:uint8_t + { + UNINIT = 0u, // new, uninitialized element which needs to be inserted to ppoll array + VALID = 1u, // it is a valid element in ppoll array + UPDATE = 2u, // update of event information therefore update ppoll array + REMOVE = 3u // remove from ppoll array and internal map + } poll_states_e; + struct sh_poll_s //! checkCB; //check callback std::function dispatchCB; //dispatch callback void* userData; + poll_states_e state; sh_poll_s() : - handle(0), pollfdValue(), prepareCB(), firedCB(), checkCB(), dispatchCB(), userData(0) + handle(0), pollfdValue(), prepareCB(), firedCB(), checkCB(), dispatchCB(), userData(0), state(UNINIT) {} }; @@ -271,8 +280,8 @@ class CAmSocketHandler }; typedef std::reverse_iterator rListTimerIter; //! VectorListPollfd_t; //! VectorListPoll_t; //! VectorPollfd_t; //! MapShPoll_t; //! VectorSignalHandlers_t; //! mListTimer; //!second; + switch (elem.state) + { + case poll_states_e::REMOVE: + close(elem.pollfdValue.fd); + fdPollIt = fdPollingArray.erase(fdPollIt); + it = mMapShPoll.erase(it); + continue; + + case poll_states_e::UNINIT: + fdPollIt = fdPollingArray.emplace(fdPollIt); + // fallthrough + + case poll_states_e::UPDATE: + CAmSocketHandler::prepare(elem); + elem.state = poll_states_e::VALID; + *fdPollIt = elem.pollfdValue; + break; + + case poll_states_e::VALID: + default: + break; + } + // Ensures that fdPollIt will be never on its end before incrementing it further! + assert(fdPollIt != fdPollingArray.end()); + ++fdPollIt; + ++it; } #ifndef WITH_TIMERFD timerCorrection(); #endif - //block until something is on a filedescriptor - if ((pollStatus = ppoll(&fdPollingArray[0], fdPollingArray.size(), insertTime(buffertime), NULL)) < 0) + // block until something is on a file descriptor + int16_t pollStatus = ppoll(&fdPollingArray[0], fdPollingArray.size(), insertTime(buffertime), NULL); + if (pollStatus > 0) { - if (errno == EINTR) - { - //a signal was received, that means it's time to go... - pollStatus = 0; - } - else + // stage 0+1, call firedCB + std::list listPoll; + for (auto& it : fdPollingArray) { - logError("SocketHandler::start_listenting ppoll returned with error", errno); - throw std::runtime_error(std::string("SocketHandler::start_listenting ppoll returned with error.")); - } - } - - if (pollStatus != 0) //only check filedescriptors if there was a change - { - std::list listPoll; - //todo: here could be a timer that makes sure naughty plugins return! - //stage 0+1, call firedCB - listmPollIt = cloneListPoll.begin(); - for (auto it : fdPollingArray) - { - if (CAmSocketHandler::eventFired(it)) - { - listmPollIt->pollfdValue.revents = it.revents; - listPoll.push_back(*listmPollIt); - CAmSocketHandler::fire(*listmPollIt); - } - else - { - listmPollIt->pollfdValue.revents = 0; - } - listmPollIt++; + it.revents &= it.events; + if (it.revents == 0) + continue; + + sh_poll_s& pollObj = mMapShPoll.at(it.fd); + if (pollObj.state != poll_states_e::VALID) + continue; + + // ensure to copy the revents fired in fdPollingArray + pollObj.pollfdValue.revents = it.revents; + listPoll.push_back(&pollObj); + CAmSocketHandler::fire(pollObj); + it.revents = 0; } //stage 2, lets ask around if some dispatching is necessary, the ones who need stay on the list @@ -198,16 +198,20 @@ void CAmSocketHandler::start_listenting() { listPoll.remove_if(CAmSocketHandler::dispatchingFinished); } while (!listPoll.empty()); - } -#ifndef WITH_TIMERFD + else if ((pollStatus < 0) && (errno != EINTR)) + { + logError("SocketHandler::start_listenting ppoll returned with error", errno); + throw std::runtime_error(std::string("SocketHandler::start_listenting ppoll returned with error.")); + } else //Timerevent { +#ifndef WITH_TIMERFD //this was a timer event, we need to take care about the timers //find out the timedifference to starttime timerUp(); - } #endif + } } } @@ -253,14 +257,13 @@ bool CAmSocketHandler::fatalErrorOccurred() am_Error_e CAmSocketHandler::getFDPollData(const sh_pollHandle_t handle, sh_poll_s & outPollData) { - VectorListPoll_t::iterator iterator = mListPoll.begin(); - for (; iterator != mListPoll.end(); ++iterator) + for (auto it : mMapShPoll) { - if (iterator->handle == handle) - { - outPollData = *iterator; - return (E_OK); - } + if (it.second.handle != handle) + continue; + + outPollData = it.second; + return (E_OK); } return (E_UNKNOWN); } @@ -380,20 +383,35 @@ am_Error_e CAmSocketHandler::listenToSignals(const std::vector & listSi * @param dispatch a std::function that is called to dispatch the received data * @param userData a pointer to userdata that is always passed around * @param handle the handle of this poll - * @return E_OK if the descriptor was added, E_NON_EXISTENT if the fd is not valid + * @return E_OK if the descriptor was added + * E_NON_EXISTENT if the fd is not valid + * E_ALREADY_EXISTS if the fd is already known + * E_NOT_POSSIBLE if the maximum handle threshold is reached */ -am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, std::function prepare, - std::function fired, std::function check, - std::function dispatch, void* userData, sh_pollHandle_t& handle) +am_Error_e CAmSocketHandler::addFDPoll(const int fd, + const short event, + std::function prepare, + std::function fired, + std::function check, + std::function dispatch, + void* userData, + sh_pollHandle_t& handle) { if (!fdIsValid(fd)) - return (E_NON_EXISTENT); + return E_NON_EXISTENT; + + const auto elem = mMapShPoll.find(fd); + if (elem != mMapShPoll.end() && elem->second.state != poll_states_e::REMOVE) + { + logError("CAmSocketHandler::addFDPoll fd", fd, "already registered!"); + return E_ALREADY_EXISTS; + } //create a new handle for the poll if (!nextHandle(mSetPollKeys)) { - logError("Could not create new polls, too many open!"); + logError("CAmSocketHandler::addFDPoll Max handle count reached!"); return (E_NOT_POSSIBLE); } @@ -407,10 +425,9 @@ am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, std::fun pollData.checkCB = check; pollData.dispatchCB = dispatch; pollData.userData = userData; - //add new data to the list - mListPoll.push_back(pollData); - mRecreatePollfds = true; + //add new data to the list + mMapShPoll[fd] = pollData; handle = pollData.handle; return (E_OK); @@ -427,15 +444,18 @@ am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, std::fun * @param dispatch a callback that is called to dispatch the received data * @param userData a pointer to userdata that is always passed around * @param handle the handle of this poll - * @return E_OK if the descriptor was added, E_NON_EXISTENT if the fd is not valid + * @return E_OK if the descriptor was added + * E_NON_EXISTENT if the fd is not valid + * E_ALREADY_EXISTS if the fd is already known + * E_NOT_POSSIBLE if the maximum handle threshold is reached */ am::am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, IAmShPollPrepare *prepare, IAmShPollFired *fired, IAmShPollCheck *check, IAmShPollDispatch *dispatch, void *userData, sh_pollHandle_t & handle) { - std::function prepareCB; //preperation callback - std::function firedCB; //fired callback - std::function checkCB; //check callback - std::function dispatchCB; //check callback + std::function prepareCB; //preperation callback + std::function firedCB; //fired callback + std::function checkCB; //check callback + std::function dispatchCB; //check callback if (prepare) prepareCB = std::bind(&IAmShPollPrepare::Call, prepare, std::placeholders::_1, std::placeholders::_2); @@ -456,19 +476,25 @@ am::am_Error_e CAmSocketHandler::addFDPoll(const int fd, const short event, IAmS */ am_Error_e CAmSocketHandler::removeFDPoll(const sh_pollHandle_t handle) { - VectorListPoll_t::iterator iterator = mListPoll.begin(); - - for (; iterator != mListPoll.end(); ++iterator) + for (auto& it : mMapShPoll) { - if (iterator->handle == handle) + if (it.second.handle == handle) { - iterator = mListPoll.erase(iterator); + it.second.state = poll_states_e::REMOVE; + + static uint64_t events(1); + if (write(mEventFd, &(++events), sizeof(events)) < 0) + { + std::ostringstream msg("CAmSocketHandler::removeFDPoll "); + msg << "Failed to write to event fd: " << mEventFd << " errno: " << std::strerror(errno); + throw std::runtime_error(msg.str()); + } mSetPollKeys.pollHandles.erase(handle); - mRecreatePollfds = true; - return (E_OK); + return E_OK; } } - return (E_UNKNOWN); + logError("CAmSocketHandler::removeFDPoll handle unknown", handle); + return E_UNKNOWN; } /** @@ -618,7 +644,13 @@ am_Error_e CAmSocketHandler::addTimer(const timespec & timeouts, std::functionhandle == handle) + auto& elem = it.second; + if (elem.handle == handle && elem.state != poll_states_e::REMOVE) { - iterator->pollfdValue.events = events; - mRecreatePollfds = true; + elem.pollfdValue.events = events; + elem.pollfdValue.revents = 0; + elem.state = poll_states_e::UPDATE; return (E_OK); } } @@ -1003,7 +1035,7 @@ void CAmSocketHandler::prepare(am::CAmSocketHandler::sh_poll_s& row) /** * fire callback */ -void CAmSocketHandler::fire(sh_poll_s& a) +void CAmSocketHandler::fire(const sh_poll_s& a) { try { @@ -1017,31 +1049,24 @@ void CAmSocketHandler::fire(sh_poll_s& a) /** * should disptach */ -bool CAmSocketHandler::noDispatching(const sh_poll_s& a) +bool CAmSocketHandler::noDispatching(const sh_poll_s* a) { //remove from list of there is no checkCB - if (nullptr == a.checkCB) + if (nullptr == a->checkCB || a->state != poll_states_e::VALID) return (true); - return (!a.checkCB(a.handle, a.userData)); + return (!a->checkCB(a->handle, a->userData)); } /** * disptach */ -bool CAmSocketHandler::dispatchingFinished(const sh_poll_s& a) +bool CAmSocketHandler::dispatchingFinished(const sh_poll_s* a) { //remove from list of there is no dispatchCB - if (nullptr == a.dispatchCB) + if (nullptr == a->dispatchCB || a->state != poll_states_e::VALID) return (true); - return (!a.dispatchCB(a.handle, a.userData)); -} -/** - * event triggered - */ -bool CAmSocketHandler::eventFired(const pollfd& a) -{ - return (a.revents == 0 ? false : true); + return (!a->dispatchCB(a->handle, a->userData)); } /** diff --git a/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp b/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp index 7e059a8..bd4aee4 100644 --- a/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp +++ b/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.cpp @@ -30,15 +30,17 @@ #include #include #include - +#include "CAmDltWrapper.h" #include "CAmSocketHandler.h" -//todo: expand test, implement more usecases -//todo: test removeFD + +#undef ENABLED_SOCKETHANDLER_TEST_OUTPUT +#undef ENABLED_TIMERS_TEST_OUTPUT #define SOCK_PATH "/tmp/mysock" -#define SOCKET_TEST_LOOPS_COUNT 1000 +#define SOCKET_TEST_LOOPS_COUNT 50 +#define TIMERS_TO_TEST 100 using namespace testing; using namespace am; @@ -48,6 +50,11 @@ static const char * TEST_SOCKET_DATA_FINAL = "finish!"; static const std::chrono::time_point TP_ZERO; +struct TestUserData +{ + int i; + float f; +}; MockIAmSignalHandler *pMockSignalHandler = NULL; static void signalHandler(int sig, siginfo_t *siginfo, void *context) @@ -149,6 +156,41 @@ void am::CAmTimer::timerCallback(sh_timerHandle_t handle, void* userData) } } +CAmTimerStressTest::CAmTimerStressTest(CAmSocketHandler *myHandler, const timespec &timeout, const int32_t repeats) : + MockIAmTimerCb(), mpSocketHandler(myHandler), mUpdateTimeout(timeout), pTimerCallback(this, &CAmTimerStressTest::timerCallback), mRepeats(repeats), mHandle(0) +{ +} + +am::CAmTimerStressTest::~CAmTimerStressTest() +{ +} + +void am::CAmTimerStressTest::timerCallback(sh_timerHandle_t handle, void* pUserData) +{ + mpSocketHandler->removeTimer(handle); + MockIAmTimerCb::timerCallback(handle, pUserData); + sh_timerHandle_t handle1; + mpSocketHandler->addTimer(mUpdateTimeout, &pTimerCallback, handle1, &(*((TestUserData*)pUserData)), true); +} + +CAmTimerStressTest2::CAmTimerStressTest2(CAmSocketHandler *myHandler, const timespec &timeout, const int32_t repeats) : + MockIAmTimerCb(), mpSocketHandler(myHandler), mUpdateTimeout(timeout), pTimerCallback(this, &CAmTimerStressTest2::timerCallback), mRepeats(repeats), mHandle(0) +{ +} + +am::CAmTimerStressTest2::~CAmTimerStressTest2() +{ +} + +void am::CAmTimerStressTest2::timerCallback(sh_timerHandle_t handle, void* pUserData) +{ + #ifdef ENABLED_SOCKETHANDLER_TEST_OUTPUT + std::cout<<"timerCallback handle=" << handle <start_listenting(); - return (NULL); + int sock = *((int*)data); + struct sockaddr_in servAddr; + unsigned short servPort = 6060; + struct hostent *host; + + if ((host = (struct hostent*) gethostbyname("localhost")) == 0) + { + std::cout << "ERROR: gethostbyname() failed\n" << std::endl; + exit(1); + } + + memset(&servAddr, 0, sizeof(servAddr)); + servAddr.sin_family = AF_INET; + servAddr.sin_addr.s_addr = inet_addr(inet_ntoa(*(struct in_addr*) (host->h_addr_list[0]))); + servAddr.sin_port = htons(servPort); + sleep(1); + + return sendTestData(sock, (struct sockaddr*)&servAddr, sizeof(servAddr)); } void* playWithUnixSocketServer(void* data) { - CAmSocketHandler *pSockethandler = (CAmSocketHandler*) data; - pSockethandler->start_listenting(); - return (NULL); + int sock = *((int*)data); + struct sockaddr_un servAddr; + memset(&servAddr, 0, sizeof(servAddr)); + strcpy(servAddr.sun_path, SOCK_PATH); + servAddr.sun_family = AF_UNIX; + sleep(1); + + return sendTestData(sock, (struct sockaddr*)&servAddr, sizeof(servAddr)); +} + +void* threadCallbackUnixSocketAndTimers(void* data) +{ + int sock = *((int*)data); + struct sockaddr_un servAddr; + memset(&servAddr, 0, sizeof(servAddr)); + strcpy(servAddr.sun_path, SOCK_PATH); + servAddr.sun_family = AF_UNIX; + sleep(1); + + return sendTestData(sock, (struct sockaddr*)&servAddr, sizeof(servAddr), 500000); +} + +TEST(CAmSocketHandlerTest, stressTestUnixSocketAndTimers) +{ + + pthread_t serverThread; + + int socket_; + + CAmSocketHandler myHandler; + ASSERT_FALSE(myHandler.fatalErrorOccurred()); + CAmSamplePluginStressTest::sockType_e type = CAmSamplePlugin::UNIX; + CAmSamplePluginStressTest myplugin(&myHandler, type); + + EXPECT_CALL(myplugin,receiveData(Field(&pollfd::revents, Eq(POLL_IN)),_,_)).Times(Exactly(SOCKET_TEST_LOOPS_COUNT)); + EXPECT_CALL(myplugin,dispatchData(_,_)).Times(Exactly(SOCKET_TEST_LOOPS_COUNT)); + EXPECT_CALL(myplugin,check(_,_)).Times(Exactly(SOCKET_TEST_LOOPS_COUNT)); + + for(int i=0;i timers(TIMERS_TO_TEST, NULL); + + for (auto & timer: timers) + { + timer = new CAmTimerStressTest(&myHandler, timeoutTime, 0); + ASSERT_EQ(myHandler.addTimer(timeoutTime, &(timer->pTimerCallback), handle, &userData, true), E_OK); + EXPECT_CALL(*timer, timerCallback(_,&userData)).Times(AnyNumber()); + } + + timespec timeoutTime11, timeout12, timeout13; + timeoutTime11.tv_sec = 1; + timeoutTime11.tv_nsec = 34000000; + CAmTimerMeasurment testCallback11(&myHandler, timeoutTime11, "repeated 1", std::numeric_limits::max()); + + timeout12.tv_sec = 0; + timeout12.tv_nsec = 100000000; + CAmTimerMeasurment testCallback12(&myHandler, timeout12, "repeated 2", std::numeric_limits::max()); + + timeout13.tv_sec = 3; + timeout13.tv_nsec = 333000000; + CAmTimerMeasurment testCallback13(&myHandler, timeout13, "oneshot 3"); + + ASSERT_EQ(myHandler.addTimer(timeoutTime11, &testCallback11.pTimerCallback, handle, NULL, true), E_OK); + EXPECT_CALL(testCallback11,timerCallback(_,NULL)).Times(AnyNumber()); + + ASSERT_EQ(myHandler.addTimer(timeout12, &testCallback12.pTimerCallback, handle, NULL, true), E_OK); + EXPECT_CALL(testCallback12,timerCallback(_,NULL)).Times(AnyNumber()); + + ASSERT_EQ(myHandler.addTimer(timeout13, &testCallback13.pTimerCallback, handle, NULL), E_OK); + EXPECT_CALL(testCallback13,timerCallback(_,NULL)).Times(Exactly(1)); + + + CAmTimerSockethandlerController testCallback4(&myHandler, timeout4); + + ASSERT_EQ(myHandler.addTimer(timeout4, &testCallback4.pTimerCallback, handle, NULL), E_OK); + + EXPECT_CALL(testCallback4,timerCallback(_,NULL)).Times(Exactly(1)); + myHandler.start_listenting(); + + for (auto & timer: timers) + { + delete timer, timer = NULL; + } +} + + TEST(CAmSocketHandlerTest,playWithTimers) { CAmSocketHandler myHandler; @@ -348,15 +543,16 @@ TEST(CAmSocketHandlerTest,playWithTimers) timespec timeoutTime, timeout2, timeout3, timeout4; timeoutTime.tv_sec = 1; timeoutTime.tv_nsec = 34000000; - CAmTimerMeasurment testCallback1(&myHandler, timeoutTime, "repeatedCallback 1", std::numeric_limits::max()); + CAmTimerMeasurment testCallback1(&myHandler, timeoutTime, "repeated 1", std::numeric_limits::max()); timeout2.tv_nsec = 2000000; timeout2.tv_sec = 0; - CAmTimerMeasurment testCallback2(&myHandler, timeout2, "repeatedCallback 2", std::numeric_limits::max()); + CAmTimerMeasurment testCallback2(&myHandler, timeout2, "repeated 2", std::numeric_limits::max()); timeout3.tv_nsec = 333000000; timeout3.tv_sec = 3; - CAmTimerMeasurment testCallback3(&myHandler, timeout3, "oneshotCallback 3"); + CAmTimerMeasurment testCallback3(&myHandler, timeout3, "oneshot 3"); + timeout4.tv_nsec = 0; timeout4.tv_sec = 8; CAmTimerSockethandlerController testCallback4(&myHandler, timeout4); @@ -479,94 +675,52 @@ TEST(CAmSocketHandlerTest,playWithUNIXSockets) ASSERT_FALSE(myHandler.fatalErrorOccurred()); CAmSamplePlugin::sockType_e type = CAmSamplePlugin::UNIX; CAmSamplePlugin myplugin(&myHandler, type); + ASSERT_TRUE(myplugin.isSocketOpened()); EXPECT_CALL(myplugin,receiveData(Field(&pollfd::revents, Eq(POLL_IN)),_,_)).Times(Exactly(SOCKET_TEST_LOOPS_COUNT)); EXPECT_CALL(myplugin,dispatchData(_,_)).Times(Exactly(SOCKET_TEST_LOOPS_COUNT)); EXPECT_CALL(myplugin,check(_,_)).Times(Exactly(SOCKET_TEST_LOOPS_COUNT)); - //creates a thread that handles the serverpart - pthread_create(&serverThread, NULL, playWithUnixSocketServer, &myHandler); - - sleep(1); //we need that here because the port needs to be opened if ((socket_ = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) { std::cout << "socket problem" << std::endl; - - } - - memset(&servAddr, 0, sizeof(servAddr)); - strcpy(servAddr.sun_path, SOCK_PATH); - servAddr.sun_family = AF_UNIX; - if (connect(socket_, (struct sockaddr *) &servAddr, sizeof(servAddr)) < 0) - { - std::cout << "ERROR: connect() failed\n" << std::endl; } + ASSERT_GT(socket_, -1); + //creates a thread that handles the serverpart + pthread_create(&serverThread, NULL, playWithUnixSocketServer, &socket_); - for (int i = 1; i < SOCKET_TEST_LOOPS_COUNT; i++) - { - std::string stringToSend(TEST_SOCKET_DATA); - send(socket_, stringToSend.c_str(), stringToSend.size(), 0); - } - std::string stringToSend(TEST_SOCKET_DATA_FINAL); - send(socket_, stringToSend.c_str(), stringToSend.size(), 0); + myHandler.start_listenting(); pthread_join(serverThread, NULL); - + shutdown(socket_, SHUT_RDWR); } TEST(CAmSocketHandlerTest,playWithSockets) { pthread_t serverThread; - struct sockaddr_in servAddr; - unsigned short servPort = 6060; - struct hostent *host; int socket_; CAmSocketHandler myHandler; ASSERT_FALSE(myHandler.fatalErrorOccurred()); CAmSamplePlugin::sockType_e type = CAmSamplePlugin::INET; CAmSamplePlugin myplugin(&myHandler, type); - + ASSERT_TRUE(myplugin.isSocketOpened()); EXPECT_CALL(myplugin,receiveData(Field(&pollfd::revents, Eq(POLL_IN)),_,_)).Times(Exactly(SOCKET_TEST_LOOPS_COUNT)); EXPECT_CALL(myplugin,dispatchData(_,_)).Times(Exactly(SOCKET_TEST_LOOPS_COUNT)); EXPECT_CALL(myplugin,check(_,_)).Times(Exactly(SOCKET_TEST_LOOPS_COUNT)); - //creates a thread that handles the serverpart - pthread_create(&serverThread, NULL, playWithSocketServer, &myHandler); - - sleep(1); //we need that here because the port needs to be opened if ((socket_ = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { std::cout << "socket problem" << std::endl; - - } - - if ((host = (struct hostent*) gethostbyname("localhost")) == 0) - { - std::cout << "ERROR: gethostbyname() failed\n" << std::endl; - exit(1); } + ASSERT_GT(socket_, -1); + //creates a thread that handles the serverpart + pthread_create(&serverThread, NULL, playWithSocketServer, &socket_); - memset(&servAddr, 0, sizeof(servAddr)); - servAddr.sin_family = AF_INET; - servAddr.sin_addr.s_addr = inet_addr(inet_ntoa(*(struct in_addr*) (host->h_addr_list[0]))); - servAddr.sin_port = htons(servPort); - - if (connect(socket_, (struct sockaddr *) &servAddr, sizeof(servAddr)) < 0) - { - std::cout << "ERROR: connect() failed\n" << std::endl; - } - - for (int i = 1; i < SOCKET_TEST_LOOPS_COUNT; i++) - { - std::string string(TEST_SOCKET_DATA); - send(socket_, string.c_str(), string.size(), 0); - } - std::string string(TEST_SOCKET_DATA_FINAL); - send(socket_, string.c_str(), string.size(), 0); + myHandler.start_listenting(); pthread_join(serverThread, NULL); - + shutdown(socket_, SHUT_RDWR); } int main(int argc, char **argv) @@ -582,12 +736,12 @@ am::CAmSamplePlugin::CAmSamplePlugin(CAmSocketHandler *mySocketHandler, sockType sampleCheckCB(this, &CAmSamplePlugin::check), // mSocketHandler(mySocketHandler), // mConnecthandle(), // - mReceiveHandle(), // - msgList() + mReceiveHandle(), // + msgList(), + mSocket(-1) { int yes = 1; - int socketHandle; struct sockaddr_in servAddr; struct sockaddr_un unixAddr; unsigned int servPort = 6060; @@ -595,26 +749,30 @@ am::CAmSamplePlugin::CAmSamplePlugin(CAmSocketHandler *mySocketHandler, sockType switch (socketType) { case UNIX: - socketHandle = socket(AF_UNIX, SOCK_STREAM, 0); + mSocket = socket(AF_UNIX, SOCK_STREAM, 0); + if(mSocket==-1) + return; unixAddr.sun_family = AF_UNIX; strcpy(unixAddr.sun_path, SOCK_PATH); unlink(unixAddr.sun_path); - bind(socketHandle, (struct sockaddr *) &unixAddr, strlen(unixAddr.sun_path) + sizeof(unixAddr.sun_family)); + bind(mSocket, (struct sockaddr *) &unixAddr, strlen(unixAddr.sun_path) + sizeof(unixAddr.sun_family)); break; case INET: - socketHandle = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); - setsockopt(socketHandle, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); + mSocket = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); + if(mSocket==-1) + return; + setsockopt(mSocket, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); memset(&servAddr, 0, sizeof(servAddr)); servAddr.sin_family = AF_INET; servAddr.sin_addr.s_addr = INADDR_ANY; servAddr.sin_port = htons(servPort); - bind(socketHandle, (struct sockaddr *) &servAddr, sizeof(servAddr)); + bind(mSocket, (struct sockaddr *) &servAddr, sizeof(servAddr)); break; default: break; } - if (listen(socketHandle, 3) < 0) + if (listen(mSocket, 3) < 0) { #ifdef ENABLED_SOCKETHANDLER_TEST_OUTPUT std::cout << "listen ok" << std::endl; @@ -622,12 +780,12 @@ am::CAmSamplePlugin::CAmSamplePlugin(CAmSocketHandler *mySocketHandler, sockType } /* if */ int a = 1; - ioctl(socketHandle, FIONBIO, (char *) &a); - setsockopt(socketHandle, SOL_SOCKET, SO_KEEPALIVE, (char *) &a, sizeof(a)); + ioctl(mSocket, FIONBIO, (char *) &a); + setsockopt(mSocket, SOL_SOCKET, SO_KEEPALIVE, (char *) &a, sizeof(a)); short events = 0; events |= POLLIN; - mySocketHandler->addFDPoll(socketHandle, events, NULL, &connectFiredCB, NULL, NULL, NULL, mConnecthandle); + mySocketHandler->addFDPoll(mSocket, events, NULL, &connectFiredCB, NULL, NULL, NULL, mConnecthandle); #ifdef ENABLED_SOCKETHANDLER_TEST_OUTPUT std::cout << "setup server - listening" << std::endl; #endif @@ -714,3 +872,53 @@ bool am::CAmSamplePlugin::check(const sh_pollHandle_t handle, void *userData) return false; } +CAmSamplePluginStressTest::CAmSamplePluginStressTest(CAmSocketHandler *mySocketHandler, sockType_e socketType):CAmSamplePlugin(mySocketHandler,socketType) +, mTimers(TIMERS_TO_TEST, NULL) +{ + sh_timerHandle_t handle; + TestUserData userData; + userData.i = 1; + userData.f = 1.f; + timespec timeoutTime; + timeoutTime.tv_sec = 0; + timeoutTime.tv_nsec = 10000000;// 0,01 + for (auto & timer : mTimers) + { + timer = new CAmTimerStressTest2(mySocketHandler, timeoutTime, 0); + if (mySocketHandler->addTimer(timeoutTime, &(timer->pTimerCallback), handle, &userData, true) == E_OK); + timer->setHandle(handle); + + EXPECT_CALL(*timer, timerCallback(_,&userData)).Times(AnyNumber()); + } +} + +CAmSamplePluginStressTest::~CAmSamplePluginStressTest() +{ + for (auto & timer : mTimers) + { + delete timer, timer = NULL; + } +} + +void CAmSamplePluginStressTest::receiveData(const pollfd pollfd, const sh_pollHandle_t handle, void* userData) +{ + CAmSamplePlugin::receiveData(pollfd, handle, userData); + + sh_timerHandle_t handle1; + for (auto & timer : mTimers) + { + ASSERT_EQ(mSocketHandler->removeTimer(timer->getHandle()), E_OK); + ASSERT_EQ(mSocketHandler->addTimer(timer->getUpdateTimeout(), &(timer->pTimerCallback), handle1, NULL, true), E_OK); + timer->setHandle(handle1); + } +} + +bool CAmSamplePluginStressTest::dispatchData(const sh_pollHandle_t handle, void* userData) +{ + return CAmSamplePlugin::dispatchData( handle, userData); +} + +bool CAmSamplePluginStressTest::check(const sh_pollHandle_t handle, void* userData) +{ + return CAmSamplePlugin::check( handle, userData); +} diff --git a/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.h b/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.h index a274605..93620b5 100644 --- a/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.h +++ b/AudioManagerUtilities/test/AmSocketHandlerTest/CAmSocketHandlerTest.h @@ -101,23 +101,21 @@ namespace am UNIX, INET }; CAmSamplePlugin(CAmSocketHandler *mySocketHandler, sockType_e socketType); - ~CAmSamplePlugin() - { - } - ; + virtual ~CAmSamplePlugin() { } void connectSocket(const pollfd pollfd, const sh_pollHandle_t handle, void* userData); - void receiveData(const pollfd pollfd, const sh_pollHandle_t handle, void* userData); - bool dispatchData(const sh_pollHandle_t handle, void* userData); - bool check(const sh_pollHandle_t handle, void* userData); + virtual void receiveData(const pollfd pollfd, const sh_pollHandle_t handle, void* userData); + virtual bool dispatchData(const sh_pollHandle_t handle, void* userData); + virtual bool check(const sh_pollHandle_t handle, void* userData); TAmShPollFired connectFiredCB; TAmShPollFired receiveFiredCB; TAmShPollDispatch sampleDispatchCB; TAmShPollCheck sampleCheckCB; - - private: + bool isSocketOpened() { return mSocket>-1; } + protected: CAmSocketHandler *mSocketHandler; sh_pollHandle_t mConnecthandle, mReceiveHandle; std::queue msgList; + int mSocket; }; class CAmTimerSockethandlerController: public MockIAmTimerCb @@ -162,6 +160,46 @@ namespace am TAmShTimerCallBack pTimerCallback; }; + class CAmTimerStressTest: public MockIAmTimerCb + { + CAmSocketHandler *mpSocketHandler; + timespec mUpdateTimeout; + int32_t mRepeats; + int32_t mHandle; + public: + explicit CAmTimerStressTest(CAmSocketHandler *SocketHandler, const timespec &timeout, const int32_t repeats = 0u); + virtual ~CAmTimerStressTest(); + + int32_t getHandle() { return mHandle; } + void setHandle(const int32_t id) { mHandle=id; } + + timespec getUpdateTimeout( ) { return mUpdateTimeout; } + + void timerCallback(sh_timerHandle_t handle, void * userData); + + TAmShTimerCallBack pTimerCallback; + }; + + class CAmTimerStressTest2: public MockIAmTimerCb + { + CAmSocketHandler *mpSocketHandler; + timespec mUpdateTimeout; + int32_t mRepeats; + int32_t mHandle; + public: + explicit CAmTimerStressTest2(CAmSocketHandler *SocketHandler, const timespec &timeout, const int32_t repeats = 0u); + virtual ~CAmTimerStressTest2(); + + int32_t getHandle() { return mHandle; } + void setHandle(const int32_t id) { mHandle=id; } + + timespec getUpdateTimeout( ) { return mUpdateTimeout; } + + void timerCallback(sh_timerHandle_t handle, void * userData); + + TAmShTimerCallBack pTimerCallback; + }; + class CAmTimerMeasurment: public MockIAmTimerCb { CAmSocketHandler *mSocketHandler; @@ -189,5 +227,19 @@ namespace am void TearDown(); }; + class CAmSamplePluginStressTest: public CAmSamplePlugin + { + std::vector mTimers; + public: + CAmSamplePluginStressTest(CAmSocketHandler *mySocketHandler, sockType_e socketType); + virtual ~CAmSamplePluginStressTest(); + + void receiveData(const pollfd pollfd, const sh_pollHandle_t handle, void* userData) final; + bool dispatchData(const sh_pollHandle_t handle, void* userData) final; + bool check(const sh_pollHandle_t handle, void* userData) final; + + std::vector & getTimers() { return mTimers; } + }; + } /* namespace am */ #endif /* SOCKETHANDLERTEST_H_ */ -- cgit v1.2.1