From d3ccf97331935b181041394b80be20dca282ea71 Mon Sep 17 00:00:00 2001 From: christian mueller Date: Wed, 29 Feb 2012 13:27:27 +0100 Subject: * [ performance] for classes that do not need to be derived from, removed virtual desctructor * implemented confirmation of routing ready in RoutingReceiver * [Sockethandler] automatically set gDispatchDone to 0 when starting mainloop * fixed unit text to work with latest changes (expect Dbus command interface) * [GAM-4] added way to do synchronous calling on interfaces with the help of CAmSerializer.h * reworked AsyncRoutingPlugin to work with CAmSerializer.h * reworked AsyncRoutingPlugin to register elemtes in thread using CAmSerializer.h * reworked AsncPlugin Tests to work with remodelled Plugin --- .../include/RoutingReceiverAsyncShadow.h | 121 +----- .../include/RoutingSenderAsyn.h | 25 +- .../src/RoutingReceiverAsyncShadow.cpp | 473 +++------------------ .../src/RoutingSenderAsync.cpp | 466 +++++++++++--------- PluginRoutingInterfaceAsync/test/CMakeLists.txt | 4 +- PluginRoutingInterfaceAsync/test/mocklnterfaces.h | 155 +++---- .../test/testRoutingInterfaceAsync.cpp | 213 +++++++--- .../test/testRoutingInterfaceAsync.h | 28 +- .../test/testRoutingInterfaceAsyncInterrupt.cpp | 4 +- 9 files changed, 606 insertions(+), 883 deletions(-) (limited to 'PluginRoutingInterfaceAsync') diff --git a/PluginRoutingInterfaceAsync/include/RoutingReceiverAsyncShadow.h b/PluginRoutingInterfaceAsync/include/RoutingReceiverAsyncShadow.h index ea6da5f..bf5a5ed 100644 --- a/PluginRoutingInterfaceAsync/include/RoutingReceiverAsyncShadow.h +++ b/PluginRoutingInterfaceAsync/include/RoutingReceiverAsyncShadow.h @@ -29,6 +29,7 @@ #include #include #include +#include "CAmSerializer.h" namespace am { @@ -40,7 +41,7 @@ namespace am class RoutingReceiverAsyncShadow { public: - RoutingReceiverAsyncShadow(); + RoutingReceiverAsyncShadow(RoutingReceiveInterface* iReceiveInterface,SocketHandler* iSocketHandler); virtual ~RoutingReceiverAsyncShadow(); void ackConnect(const am_Handle_s handle, const am_connectionID_t connectionID, const am_Error_e error); void ackDisconnect(const am_Handle_s handle, const am_connectionID_t connectionID, const am_Error_e error); @@ -57,120 +58,20 @@ public: void hookSourceAvailablityStatusChange(const am_sourceID_t sourceID, const am_Availability_s& availability); void hookDomainStateChange(const am_domainID_t domainID, const am_DomainState_e domainState); void hookTimingInformationChanged(const am_connectionID_t connectionID, const am_timeSync_t delay); - - am_Error_e setRoutingInterface(RoutingReceiveInterface *receiveInterface); - void asyncMsgReceiver(const pollfd pollfd, const sh_pollHandle_t handle, void* userData); - bool asyncDispatcher(const sh_pollHandle_t handle, void* userData); - bool asyncChecker(const sh_pollHandle_t handle, void* userData); - - shPollFired_T asyncMsgReceive; - shPollDispatch_T asyncDispatch; - shPollCheck_T asyncCheck; + am_Error_e registerDomain(const am_Domain_s& domainData, am_domainID_t& domainID) ; + am_Error_e registerGateway(const am_Gateway_s& gatewayData, am_gatewayID_t& gatewayID) ; + am_Error_e registerSink(const am_Sink_s& sinkData, am_sinkID_t& sinkID) ; + am_Error_e deregisterSink(const am_sinkID_t sinkID) ; + am_Error_e registerSource(const am_Source_s& sourceData, am_sourceID_t& sourceID) ; + am_Error_e deregisterSource(const am_sourceID_t sourceID) ; + am_Error_e registerCrossfader(const am_Crossfader_s& crossfaderData, am_crossfaderID_t& crossfaderID) ; + void confirmRoutingReady(uint16_t starupHandle); private: - enum msgID_e - { - MSG_ACKCONNECT, MSG_ACKDISCONNECT, MSG_ACKSETSINKVOLUMECHANGE, MSG_ACKSETSOURCEVOLUMECHANGE, MSG_ACKSETSOURCESTATE, MSG_ACKSETSINKSOUNDPROPERTY, MSG_ACKSETSOURCESOUNDPROPERTY, MSG_ACKCROSSFADING, MSG_ACKSOURCEVOLUMETICK, MSG_ACKSINKVOLUMETICK, MSG_HOOKINTERRUPTSTATUSCHANGE, MSG_HOOKSINKAVAILABLITYSTATUSCHANGE, MSG_HOOKSOURCEAVAILABLITYSTATUSCHANGE, MSG_HOOKDOMAINSTATECHANGE, MSG_HOOKTIMINGINFORMATIONCHANGED - }; - - struct a_connect_s - { - am_Handle_s handle; - am_connectionID_t connectionID; - am_Error_e error; - }; - - struct a_volume_s - { - am_Handle_s handle; - am_volume_t volume; - am_Error_e error; - }; - - struct a_handle_s - { - am_Handle_s handle; - am_Error_e error; - }; - - struct a_crossfading_s - { - am_Handle_s handle; - am_HotSink_e hotSink; - am_Error_e error; - }; - - struct a_sourceVolumeTick_s - { - am_sourceID_t sourceID; - am_Handle_s handle; - am_volume_t volume; - }; - - struct a_sinkVolumeTick_s - { - am_sinkID_t sinkID; - am_Handle_s handle; - am_volume_t volume; - }; - - struct a_interruptStatusChange_s - { - am_sourceID_t sourceID; - am_InterruptState_e interruptState; - }; - - struct a_sinkAvailability_s - { - am_sinkID_t sinkID; - am_Availability_s availability; - }; - - struct a_sourceAvailability_s - { - am_sourceID_t sourceID; - am_Availability_s availability; - }; - - struct a_hookDomainStateChange_s - { - am_domainID_t domainID; - am_DomainState_e state; - }; - - struct a_timingInfoChanged_s - { - am_connectionID_t connectionID; - am_timeSync_t delay; - }; - - union parameter_u - { - a_connect_s connect; - a_volume_s volume; - a_handle_s handle; - a_crossfading_s crossfading; - a_sourceVolumeTick_s sourceVolumeTick; - a_sinkVolumeTick_s sinkVolumeTick; - a_interruptStatusChange_s interruptStatusChange; - a_sinkAvailability_s sinkAvailability; - a_sourceAvailability_s sourceAvailability; - a_hookDomainStateChange_s domainStateChange; - a_timingInfoChanged_s timingInfoChange; - }; - - struct msg_s - { - msgID_e msgID; - parameter_u parameters; - }; SocketHandler *mSocketHandler; RoutingReceiveInterface *mRoutingReceiveInterface; - std::queue mQueue; - static pthread_mutex_t mMutex; - sh_pollHandle_t mHandle; - int mPipe[2]; + CAmSerializer mSerializer; }; } /* namespace am */ diff --git a/PluginRoutingInterfaceAsync/include/RoutingSenderAsyn.h b/PluginRoutingInterfaceAsync/include/RoutingSenderAsyn.h index dd88be1..96e1b5c 100644 --- a/PluginRoutingInterfaceAsync/include/RoutingSenderAsyn.h +++ b/PluginRoutingInterfaceAsync/include/RoutingSenderAsyn.h @@ -29,6 +29,7 @@ #include #include "RoutingReceiverAsyncShadow.h" #include +#include namespace am { @@ -208,6 +209,12 @@ public: */ void updateDomainstateSafe(am_domainID_t domainID, am_DomainState_e domainState); + void updateDomainListSafe(std::vector listDomains); + + void updateSourceListSafe(std::vector listSource); + + void updateSinkListSafe(std::vector listSinks); + private: /** * Extra thread that handles dbus stimulation for interrupt tests @@ -216,12 +223,11 @@ private: * it is used just for testing, not intended to be used otherwise... * @param data */ - static void* InterruptEvents(void* data); std::vector createDomainTable(); std::vector createSinkTable(); std::vector createSourceTable(); std::vector createGatewayTable(); - RoutingReceiverAsyncShadow mShadow; + RoutingReceiverAsyncShadow* mShadow; RoutingReceiveInterface* mReceiveInterface; SocketHandler *mSocketHandler; std::vector mDomains; @@ -372,6 +378,21 @@ private: am_DomainState_e mDomainState; }; +class syncRegisterWorker: public Worker +{ +public: + syncRegisterWorker(AsyncRoutingSender * asyncSender, WorkerThreadPool* pool, RoutingReceiverAsyncShadow* shadow, const std::vector domains, const std::vector sinks, const std::vector sources, const uint16_t handle); + void start2work(); + void cancelWork(); +private: + AsyncRoutingSender * mAsyncSender; + RoutingReceiverAsyncShadow *mShadow; + std::vector mListDomains; + std::vector mListSinks; + std::vector mListSources; + uint16_t mHandle; +}; + } #endif /* ROUTINGSENDER_H_ */ diff --git a/PluginRoutingInterfaceAsync/src/RoutingReceiverAsyncShadow.cpp b/PluginRoutingInterfaceAsync/src/RoutingReceiverAsyncShadow.cpp index c11201e..5c93f60 100644 --- a/PluginRoutingInterfaceAsync/src/RoutingReceiverAsyncShadow.cpp +++ b/PluginRoutingInterfaceAsync/src/RoutingReceiverAsyncShadow.cpp @@ -36,16 +36,10 @@ using namespace am; -pthread_mutex_t RoutingReceiverAsyncShadow::mMutex = PTHREAD_MUTEX_INITIALIZER; - -RoutingReceiverAsyncShadow::RoutingReceiverAsyncShadow(): -asyncMsgReceive(this, &RoutingReceiverAsyncShadow::asyncMsgReceiver), // - asyncDispatch(this, &RoutingReceiverAsyncShadow::asyncDispatcher), // - asyncCheck(this, &RoutingReceiverAsyncShadow::asyncChecker), // - mSocketHandler(), // - mRoutingReceiveInterface(), // -mHandle (), // -mPipe() +RoutingReceiverAsyncShadow::RoutingReceiverAsyncShadow(RoutingReceiveInterface* iReceiveInterface, SocketHandler* iSocketHandler) : + mSocketHandler(iSocketHandler), // + mRoutingReceiveInterface(iReceiveInterface), // + mSerializer(iSocketHandler) { } @@ -56,472 +50,139 @@ RoutingReceiverAsyncShadow::~RoutingReceiverAsyncShadow() void RoutingReceiverAsyncShadow::ackConnect(const am_Handle_s handle, const am_connectionID_t connectionID, const am_Error_e error) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_connect_s temp; - temp.handle = handle; - temp.connectionID = connectionID; - temp.error = error; - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_ACKCONNECT; - msg.parameters.connect = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::ackConnect write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall(mRoutingReceiveInterface, &RoutingReceiveInterface::ackConnect, handle, connectionID, error); } void RoutingReceiverAsyncShadow::ackDisconnect(const am_Handle_s handle, const am_connectionID_t connectionID, const am_Error_e error) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_connect_s temp; - temp.handle = handle; - temp.connectionID = connectionID; - temp.error = error; - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_ACKDISCONNECT; - msg.parameters.connect = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::ackDisconnect write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall(mRoutingReceiveInterface, &RoutingReceiveInterface::ackDisconnect, handle, connectionID, error); } void RoutingReceiverAsyncShadow::ackSetSinkVolumeChange(const am_Handle_s handle, const am_volume_t volume, const am_Error_e error) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_volume_s temp; - temp.handle = handle; - temp.volume = volume; - temp.error = error; - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_ACKSETSINKVOLUMECHANGE; - msg.parameters.volume = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::ackSetSinkVolumeChange write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall(mRoutingReceiveInterface, &RoutingReceiveInterface::ackSetSinkVolumeChange, handle, volume, error); } void RoutingReceiverAsyncShadow::ackSetSourceVolumeChange(const am_Handle_s handle, const am_volume_t volume, const am_Error_e error) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_volume_s temp; - temp.handle = handle; - temp.volume = volume; - temp.error = error; - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_ACKSETSOURCEVOLUMECHANGE; - msg.parameters.volume = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::ackSetSourceVolumeChange write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall(mRoutingReceiveInterface, &RoutingReceiveInterface::ackSetSourceVolumeChange, handle, volume, error); } void RoutingReceiverAsyncShadow::ackSetSourceState(const am_Handle_s handle, const am_Error_e error) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_handle_s temp; - temp.handle = handle; - temp.error = error; - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_ACKSETSOURCESTATE; - msg.parameters.handle = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::ackSetSourceState write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall(mRoutingReceiveInterface, &RoutingReceiveInterface::ackSetSourceState, handle, error); } void RoutingReceiverAsyncShadow::ackSetSinkSoundProperty(const am_Handle_s handle, const am_Error_e error) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_handle_s temp; - temp.handle = handle; - temp.error = error; - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_ACKSETSINKSOUNDPROPERTY; - msg.parameters.handle = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::ackSetSinkSoundProperty write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall(mRoutingReceiveInterface, &RoutingReceiveInterface::ackSetSinkSoundProperty, handle, error); } void RoutingReceiverAsyncShadow::ackSetSourceSoundProperty(const am_Handle_s handle, const am_Error_e error) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_handle_s temp; - temp.handle = handle; - temp.error = error; - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_ACKSETSOURCESOUNDPROPERTY; - msg.parameters.handle = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::ackSetSourceSoundProperty write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall(mRoutingReceiveInterface, &RoutingReceiveInterface::ackSetSourceSoundProperty, handle, error); } void RoutingReceiverAsyncShadow::ackCrossFading(const am_Handle_s handle, const am_HotSink_e hotSink, const am_Error_e error) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_crossfading_s temp; - temp.handle = handle; - temp.hotSink = hotSink; - temp.error = error; - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_ACKCROSSFADING; - msg.parameters.crossfading = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::ackCrossFading write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall(mRoutingReceiveInterface, &RoutingReceiveInterface::ackCrossFading, handle, hotSink, error); } void RoutingReceiverAsyncShadow::ackSourceVolumeTick(const am_Handle_s handle, const am_sourceID_t sourceID, const am_volume_t volume) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_sourceVolumeTick_s temp; - temp.sourceID = sourceID; - temp.handle = handle; - temp.volume = volume; - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_ACKSOURCEVOLUMETICK; - msg.parameters.sourceVolumeTick = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::ackSourceVolumeTick write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall(mRoutingReceiveInterface, &RoutingReceiveInterface::ackSourceVolumeTick, handle, sourceID, volume); } void RoutingReceiverAsyncShadow::ackSinkVolumeTick(const am_Handle_s handle, const am_sinkID_t sinkID, const am_volume_t volume) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_sinkVolumeTick_s temp; - temp.sinkID = sinkID; - temp.handle = handle; - temp.volume = volume; - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_ACKSINKVOLUMETICK; - msg.parameters.sinkVolumeTick = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::ackSinkVolumeTick write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall(mRoutingReceiveInterface, &RoutingReceiveInterface::ackSinkVolumeTick, handle, sinkID, volume); } void RoutingReceiverAsyncShadow::hookInterruptStatusChange(const am_sourceID_t sourceID, const am_InterruptState_e interruptState) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_interruptStatusChange_s temp; - temp.sourceID = sourceID; - temp.interruptState = interruptState; - - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_HOOKINTERRUPTSTATUSCHANGE; - msg.parameters.interruptStatusChange = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::hookInterruptStatusChange write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall(mRoutingReceiveInterface, &RoutingReceiveInterface::hookInterruptStatusChange, sourceID, interruptState); } void RoutingReceiverAsyncShadow::hookSinkAvailablityStatusChange(const am_sinkID_t sinkID, const am_Availability_s & availability) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_sinkAvailability_s temp; - temp.sinkID = sinkID; - temp.availability = availability; - - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_HOOKSINKAVAILABLITYSTATUSCHANGE; - msg.parameters.sinkAvailability = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::hookSinkAvailablityStatusChange write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall(mRoutingReceiveInterface, &RoutingReceiveInterface::hookSinkAvailablityStatusChange, sinkID, availability); } void RoutingReceiverAsyncShadow::hookSourceAvailablityStatusChange(const am_sourceID_t sourceID, const am_Availability_s & availability) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_sourceAvailability_s temp; - temp.sourceID = sourceID; - temp.availability = availability; - - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_HOOKSOURCEAVAILABLITYSTATUSCHANGE; - msg.parameters.sourceAvailability = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::hookSourceAvailablityStatusChange write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall(mRoutingReceiveInterface, &RoutingReceiveInterface::hookSourceAvailablityStatusChange, sourceID, availability); } void RoutingReceiverAsyncShadow::hookDomainStateChange(const am_domainID_t domainID, const am_DomainState_e domainState) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_hookDomainStateChange_s temp; - temp.domainID = domainID; - temp.state = domainState; - - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_HOOKDOMAINSTATECHANGE; - msg.parameters.domainStateChange = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::hookDomainStateChange write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall(mRoutingReceiveInterface, &RoutingReceiveInterface::hookDomainStateChange, domainID, domainState); } void RoutingReceiverAsyncShadow::hookTimingInformationChanged(const am_connectionID_t connectionID, const am_timeSync_t delay) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_timingInfoChanged_s temp; - temp.connectionID = connectionID; - temp.delay = delay; - - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_HOOKTIMINGINFORMATIONCHANGED; - msg.parameters.timingInfoChange = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::hookTimingInformationChanged write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall(mRoutingReceiveInterface, &RoutingReceiveInterface::hookTimingInformationChanged, connectionID, delay); } -void RoutingReceiverAsyncShadow::asyncMsgReceiver(const pollfd pollfd, const sh_pollHandle_t handle, void *userData) +am_Error_e RoutingReceiverAsyncShadow::registerDomain(const am_Domain_s & domainData, am_domainID_t & domainID) { - (void) handle; - (void) userData; - //it is no really important what to read here, we will read the queue later... - char buffer[10]; - if (read(pollfd.fd, buffer, 10) == -1) - { - logError("RoutingReceiverAsyncShadow::asyncMsgReceiver could not read!"); - } + am_Error_e error (E_UNKNOWN); + am_Domain_s domainDataCopy(domainData); + mSerializer.syncCall(mRoutingReceiveInterface, &RoutingReceiveInterface::registerDomain, error, domainDataCopy, domainID); + return (error); } -bool RoutingReceiverAsyncShadow::asyncDispatcher(const sh_pollHandle_t handle, void *userData) +am_Error_e am::RoutingReceiverAsyncShadow::registerGateway(const am_Gateway_s & gatewayData, am_gatewayID_t & gatewayID) { - (void) handle; - (void) userData; - msg_s msg; - - //ok, let's receive, first lock - pthread_mutex_lock(&mMutex); - msg = mQueue.front(); - mQueue.pop(); - pthread_mutex_unlock(&mMutex); + am_Error_e error (E_UNKNOWN); + am_Gateway_s gatewayDataCopy(gatewayData); + mSerializer.syncCall(mRoutingReceiveInterface,&RoutingReceiveInterface::registerGateway, error, gatewayDataCopy, gatewayID); + return (error); +} - //check for the message: - switch (msg.msgID) - { - case MSG_ACKCONNECT: - mRoutingReceiveInterface->ackConnect(msg.parameters.connect.handle, msg.parameters.connect.connectionID, msg.parameters.connect.error); - break; - case MSG_ACKDISCONNECT: - mRoutingReceiveInterface->ackDisconnect(msg.parameters.connect.handle, msg.parameters.connect.connectionID, msg.parameters.connect.error); - break; - case MSG_ACKSETSINKVOLUMECHANGE: - mRoutingReceiveInterface->ackSetSinkVolumeChange(msg.parameters.volume.handle, msg.parameters.volume.volume, msg.parameters.volume.error); - break; - case MSG_ACKSETSOURCEVOLUMECHANGE: - mRoutingReceiveInterface->ackSetSourceVolumeChange(msg.parameters.volume.handle, msg.parameters.volume.volume, msg.parameters.volume.error); - break; - case MSG_ACKSETSOURCESTATE: - mRoutingReceiveInterface->ackSetSourceState(msg.parameters.handle.handle, msg.parameters.handle.error); - break; - case MSG_ACKSETSINKSOUNDPROPERTY: - mRoutingReceiveInterface->ackSetSinkSoundProperty(msg.parameters.handle.handle, msg.parameters.handle.error); - break; - case MSG_ACKSETSOURCESOUNDPROPERTY: - mRoutingReceiveInterface->ackSetSourceSoundProperty(msg.parameters.handle.handle, msg.parameters.handle.error); - break; - case MSG_ACKCROSSFADING: - mRoutingReceiveInterface->ackCrossFading(msg.parameters.crossfading.handle, msg.parameters.crossfading.hotSink, msg.parameters.crossfading.error); - break; - case MSG_ACKSOURCEVOLUMETICK: - mRoutingReceiveInterface->ackSourceVolumeTick(msg.parameters.sourceVolumeTick.handle, msg.parameters.sourceVolumeTick.sourceID, msg.parameters.sourceVolumeTick.volume); - break; - case MSG_ACKSINKVOLUMETICK: - mRoutingReceiveInterface->ackSinkVolumeTick(msg.parameters.sinkVolumeTick.handle, msg.parameters.sinkVolumeTick.sinkID, msg.parameters.sinkVolumeTick.volume); - break; - case MSG_HOOKINTERRUPTSTATUSCHANGE: - mRoutingReceiveInterface->hookInterruptStatusChange(msg.parameters.interruptStatusChange.sourceID, msg.parameters.interruptStatusChange.interruptState); - break; - case MSG_HOOKSINKAVAILABLITYSTATUSCHANGE: - mRoutingReceiveInterface->hookSinkAvailablityStatusChange(msg.parameters.sinkAvailability.sinkID, msg.parameters.sinkAvailability.availability); - break; - case MSG_HOOKSOURCEAVAILABLITYSTATUSCHANGE: - mRoutingReceiveInterface->hookSourceAvailablityStatusChange(msg.parameters.sourceAvailability.sourceID, msg.parameters.sourceAvailability.availability); - break; - case MSG_HOOKDOMAINSTATECHANGE: - mRoutingReceiveInterface->hookDomainStateChange(msg.parameters.domainStateChange.domainID, msg.parameters.domainStateChange.state); - break; - case MSG_HOOKTIMINGINFORMATIONCHANGED: - mRoutingReceiveInterface->hookTimingInformationChanged(msg.parameters.timingInfoChange.connectionID, msg.parameters.timingInfoChange.delay); - break; - default: - logError("RoutingReceiverAsyncShadow::asyncDispatcher unknown message was received:", msg.msgID); - break; - } +am_Error_e am::RoutingReceiverAsyncShadow::registerSink(const am_Sink_s & sinkData, am_sinkID_t & sinkID) +{ + am_Error_e error (E_UNKNOWN); + am_Sink_s sinkDataCopy(sinkData); + mSerializer.syncCall(mRoutingReceiveInterface,&RoutingReceiveInterface::registerSink, error, sinkDataCopy, sinkID); + return (error); +} - bool retVal = false; - pthread_mutex_lock(&mMutex); - if (mQueue.size() > 0) - retVal = true; - pthread_mutex_unlock(&mMutex); +am_Error_e am::RoutingReceiverAsyncShadow::deregisterSink(const am_sinkID_t sinkID) +{ + am_Error_e error; + am_sinkID_t s(sinkID); //no const values allowed in syncCalls due to reference ! + mSerializer.syncCall(mRoutingReceiveInterface, &RoutingReceiveInterface::deregisterSink, error, s); + return (error); +} - return (retVal); +am_Error_e am::RoutingReceiverAsyncShadow::registerSource(const am_Source_s & sourceData, am_sourceID_t & sourceID) +{ + am_Error_e error (E_UNKNOWN); + am_Source_s sourceDataCopy(sourceData); + mSerializer.syncCall(mRoutingReceiveInterface,&RoutingReceiveInterface::registerSource, error, sourceDataCopy, sourceID); + return (error); } -bool RoutingReceiverAsyncShadow::asyncChecker(const sh_pollHandle_t handle, void *userData) +am_Error_e am::RoutingReceiverAsyncShadow::deregisterSource(const am_sourceID_t sourceID) { - (void) handle; - (void) userData; - bool returnVal = false; - pthread_mutex_lock(&mMutex); - if (mQueue.size() > 0) - returnVal = true; - pthread_mutex_unlock(&mMutex); - return (returnVal); + am_Error_e error; + am_sourceID_t s(sourceID); //no const values allowed in syncCalls due to reference ! + mSerializer.syncCall(mRoutingReceiveInterface, &RoutingReceiveInterface::deregisterSource, error, s); + return (error); } -am_Error_e RoutingReceiverAsyncShadow::setRoutingInterface(RoutingReceiveInterface *receiveInterface) +am_Error_e am::RoutingReceiverAsyncShadow::registerCrossfader(const am_Crossfader_s & crossfaderData, am_crossfaderID_t & crossfaderID) { - assert(receiveInterface!=0); - mRoutingReceiveInterface = receiveInterface; - mRoutingReceiveInterface->getSocketHandler(mSocketHandler); - if (pipe(mPipe) == -1) - { - logError("RoutingReceiverAsyncShadow::setRoutingInterface could not create pipe!:"); - return (E_UNKNOWN); - } + am_Error_e error (E_UNKNOWN); + am_Crossfader_s crossfaderDataCopy(crossfaderData); + mSerializer.syncCall(mRoutingReceiveInterface,&RoutingReceiveInterface::registerCrossfader, error, crossfaderDataCopy, crossfaderID); + return (error); +} - short event = 0; - event |= POLLIN; - mSocketHandler->addFDPoll(mPipe[0], event, NULL, &asyncMsgReceive, &asyncCheck, &asyncDispatch, NULL, mHandle); - return (E_OK); +void am::RoutingReceiverAsyncShadow::confirmRoutingReady(uint16_t starupHandle) +{ + mSerializer.asyncCall(mRoutingReceiveInterface,&RoutingReceiveInterface::confirmRoutingReady,starupHandle); } + + + diff --git a/PluginRoutingInterfaceAsync/src/RoutingSenderAsync.cpp b/PluginRoutingInterfaceAsync/src/RoutingSenderAsync.cpp index 1a1ee70..68b260f 100644 --- a/PluginRoutingInterfaceAsync/src/RoutingSenderAsync.cpp +++ b/PluginRoutingInterfaceAsync/src/RoutingSenderAsync.cpp @@ -55,175 +55,174 @@ pthread_mutex_t AsyncRoutingSender::mSourcesMutex= PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t AsyncRoutingSender::mDomainsMutex= PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t WorkerThreadPool::mBlockingMutex = PTHREAD_MUTEX_INITIALIZER; -void* AsyncRoutingSender::InterruptEvents(void *data) -{ - RoutingReceiverAsyncShadow *shadow=(RoutingReceiverAsyncShadow *)data; - DBusError err; - DBusMessage* msg; - DBusConnection* conn; - dbus_error_init(&err); - conn = dbus_bus_get(DBUS_BUS_SESSION, &err); - dbus_uint32_t serial = 0; - DBusMessage* reply; - DBusMessageIter args; - dbus_bus_request_name(conn, "org.genivi.test",DBUS_NAME_FLAG_REPLACE_EXISTING , &err); - - while (dbus_connection_read_write_dispatch(conn, -1)) - { - dbus_connection_read_write(conn, 0); - msg = dbus_connection_pop_message(conn); - - if (dbus_message_is_method_call(msg, "org.genivi.test", "timingChanged")) +//void* AsyncRoutingSender::InterruptEvents(void *data) +//{ +// RoutingReceiverAsyncShadow *shadow=(RoutingReceiverAsyncShadow *)data; +// DBusError err; +// DBusMessage* msg; +// DBusConnection* conn; +// dbus_error_init(&err); +// conn = dbus_bus_get(DBUS_BUS_SESSION, &err); +// dbus_uint32_t serial = 0; +// DBusMessage* reply; +// DBusMessageIter args; +// dbus_bus_request_name(conn, "org.genivi.test",DBUS_NAME_FLAG_REPLACE_EXISTING , &err); +// +// while (dbus_connection_read_write_dispatch(conn, -1)) +// { +// dbus_connection_read_write(conn, 0); +// msg = dbus_connection_pop_message(conn); +// +// if (dbus_message_is_method_call(msg, "org.genivi.test", "timingChanged")) +// { +// am_connectionID_t connectionID; +// am_timeSync_t delay; +// dbus_message_iter_init(msg, &args); +// dbus_message_iter_get_basic(&args,(void*) &connectionID); +// dbus_message_iter_next(&args); +// dbus_message_iter_get_basic(&args,(void*) &delay); +// reply = dbus_message_new_method_return(msg); +// dbus_message_iter_init_append(reply, &args); +// dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &connectionID); +// dbus_connection_send(conn, reply, &serial); +// shadow->hookTimingInformationChanged(connectionID,delay); +// dbus_message_unref(reply); +// } +// else if (dbus_message_is_method_call(msg, "org.genivi.test", "SinkAvailablityStatusChange")) +// { +// am_sinkID_t sinkID; +// am_Availability_s availability; +// dbus_message_iter_init(msg, &args); +// dbus_message_iter_get_basic(&args,(void*) &sinkID); +// reply = dbus_message_new_method_return(msg); +// dbus_message_iter_init_append(reply, &args); +// dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sinkID); +// dbus_connection_send(conn, reply, &serial); +// shadow->hookSinkAvailablityStatusChange(sinkID,availability); +// dbus_message_unref(reply); +// } +// else if (dbus_message_is_method_call(msg, "org.genivi.test", "SourceAvailablityStatusChange")) +// { +// am_sourceID_t sourceID; +// am_Availability_s availability; +// dbus_message_iter_init(msg, &args); +// dbus_message_iter_get_basic(&args,(void*) &sourceID); +// reply = dbus_message_new_method_return(msg); +// dbus_message_iter_init_append(reply, &args); +// dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID); +// dbus_connection_send(conn, reply, &serial); +// shadow->hookSourceAvailablityStatusChange(sourceID,availability); +// dbus_message_unref(reply); +// } +// else if (dbus_message_is_method_call(msg, "org.genivi.test", "InterruptStatusChange")) +// { +// am_sourceID_t sourceID; +// +// am_InterruptState_e state=IS_UNKNOWN; +// dbus_message_iter_init(msg, &args); +// dbus_message_iter_get_basic(&args,(void*) &sourceID); +// reply = dbus_message_new_method_return(msg); +// dbus_message_iter_init_append(reply, &args); +// dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID); +// dbus_connection_send(conn, reply, &serial); +// shadow->hookInterruptStatusChange(sourceID,state); +// dbus_message_unref(reply); +// } +// dbus_connection_flush(conn); +// } +// return NULL; +//} + + void *WorkerThreadPool::WorkerThread(void* data) { - am_connectionID_t connectionID; - am_timeSync_t delay; - dbus_message_iter_init(msg, &args); - dbus_message_iter_get_basic(&args,(void*) &connectionID); - dbus_message_iter_next(&args); - dbus_message_iter_get_basic(&args,(void*) &delay); - reply = dbus_message_new_method_return(msg); - dbus_message_iter_init_append(reply, &args); - dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &connectionID); - dbus_connection_send(conn, reply, &serial); - shadow->hookTimingInformationChanged(connectionID,delay); - dbus_message_unref(reply); - } - else if (dbus_message_is_method_call(msg, "org.genivi.test", "SinkAvailablityStatusChange")) - { - am_sinkID_t sinkID; - am_Availability_s availability; - dbus_message_iter_init(msg, &args); - dbus_message_iter_get_basic(&args,(void*) &sinkID); - reply = dbus_message_new_method_return(msg); - dbus_message_iter_init_append(reply, &args); - dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sinkID); - dbus_connection_send(conn, reply, &serial); - shadow->hookSinkAvailablityStatusChange(sinkID,availability); - dbus_message_unref(reply); - } - else if (dbus_message_is_method_call(msg, "org.genivi.test", "SourceAvailablityStatusChange")) - { - am_sourceID_t sourceID; - am_Availability_s availability; - dbus_message_iter_init(msg, &args); - dbus_message_iter_get_basic(&args,(void*) &sourceID); - reply = dbus_message_new_method_return(msg); - dbus_message_iter_init_append(reply, &args); - dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID); - dbus_connection_send(conn, reply, &serial); - shadow->hookSourceAvailablityStatusChange(sourceID,availability); - dbus_message_unref(reply); + threadInfo_s *myInfo=(threadInfo_s*)data; + while (1) + { + sem_wait(&myInfo->block); + pthread_mutex_lock(&mBlockingMutex); + Worker* actWorker=myInfo->worker; + pthread_mutex_unlock(&mBlockingMutex); + actWorker->setCancelSempaphore(&myInfo->cancel); + actWorker->start2work(); + actWorker->pPool->finishedWork(myInfo->threadID); + } + return NULL; } - else if (dbus_message_is_method_call(msg, "org.genivi.test", "InterruptStatusChange")) + + WorkerThreadPool::WorkerThreadPool(int numThreads): + mNumThreads(numThreads) { - am_sourceID_t sourceID; - - am_InterruptState_e state=IS_UNKNOWN; - dbus_message_iter_init(msg, &args); - dbus_message_iter_get_basic(&args,(void*) &sourceID); - reply = dbus_message_new_method_return(msg); - dbus_message_iter_init_append(reply, &args); - dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID); - dbus_connection_send(conn, reply, &serial); - shadow->hookInterruptStatusChange(sourceID,state); - dbus_message_unref(reply); + int workerID=0; + mListWorkers.resize(mNumThreads); + for (int i=0;iblock); - pthread_mutex_lock(&mBlockingMutex); - Worker* actWorker=myInfo->worker; - pthread_mutex_unlock(&mBlockingMutex); - actWorker->setCancelSempaphore(&myInfo->cancel); - actWorker->start2work(); - actWorker->pPool->finishedWork(myInfo->threadID); - } - return NULL; -} -WorkerThreadPool::WorkerThreadPool(int numThreads): -mNumThreads(numThreads) -{ - int workerID=0; - mListWorkers.resize(mNumThreads); - for (int i=0;i::iterator it=mListWorkers.begin(); - for(;it!=mListWorkers.end();++it) - { - if(!it->busy) + int16_t WorkerThreadPool::startWork(Worker *worker) { - it->worker=worker; - it->busy=true; + pthread_mutex_lock(&mBlockingMutex); + std::vector::iterator it=mListWorkers.begin(); + for(;it!=mListWorkers.end();++it) + { + if(!it->busy) + { + it->worker=worker; + it->busy=true; + pthread_mutex_unlock(&mBlockingMutex); + sem_post(&it->block); + return ((int)it->workerID); + } + } pthread_mutex_unlock(&mBlockingMutex); - sem_post(&it->block); - return ((int)it->workerID); + return (-1); } - } - pthread_mutex_unlock(&mBlockingMutex); - return (-1); -} -bool WorkerThreadPool::cancelWork(int workerID) -{ - std::vector::iterator it=mListWorkers.begin(); - for(;it!=mListWorkers.end();++it) - { - if(it->workerID==workerID && it->busy) + bool WorkerThreadPool::cancelWork(int workerID) { - sem_post(&it->cancel); - return (true); + std::vector::iterator it=mListWorkers.begin(); + for(;it!=mListWorkers.end();++it) + { + if(it->workerID==workerID && it->busy) + { + sem_post(&it->cancel); + return (true); + } + } + return (false); } - } - return (false); -} -void WorkerThreadPool::finishedWork(pthread_t threadID) -{ - pthread_mutex_lock(&mBlockingMutex); - std::vector::iterator it=mListWorkers.begin(); - for(;it!=mListWorkers.end();++it) - { - if(it->threadID==threadID) + void WorkerThreadPool::finishedWork(pthread_t threadID) { - it->busy=false; - delete it->worker; - break; + pthread_mutex_lock(&mBlockingMutex); + std::vector::iterator it=mListWorkers.begin(); + for(;it!=mListWorkers.end();++it) + { + if(it->threadID==threadID) + { + it->busy=false; + delete it->worker; + break; + } + } + pthread_mutex_unlock(&mBlockingMutex); } - } - pthread_mutex_unlock(&mBlockingMutex); -} -WorkerThreadPool::~WorkerThreadPool() -{ - for (int i=0;igetSocketHandler(handler); + mShadow = new RoutingReceiverAsyncShadow(routingreceiveinterface, handler); + return E_OK; } void AsyncRoutingSender::setRoutingReady(const uint16_t handle) { - //todo: implement handle ! - assert(mReceiveInterface!=0); - am_Error_e eCode; - //first register the domains - std::vector::iterator domainIter = mDomains.begin(); - for (; domainIter != mDomains.end(); ++domainIter) - { - am_domainID_t domainID; - if ((eCode = mReceiveInterface->registerDomain(*domainIter, domainID)) != E_OK) - { - logError("AsyncRoutingSender::routingInterfacesReady error on registering domain, failed with", eCode); - } - domainIter->domainID = domainID; - } + syncRegisterWorker *worker = new syncRegisterWorker(this, &mPool, mShadow, mDomains, mSinks, mSources, handle); - //then sources - std::vector::iterator sourceIter = mSources.begin(); - for (; sourceIter != mSources.end(); ++sourceIter) + if ((mPool.startWork(worker)) == -1) { - am_sourceID_t sourceID; - //set the correct domainID - sourceIter->domainID = mDomains[0].domainID; - if ((eCode = mReceiveInterface->registerSource(*sourceIter, sourceID)) != E_OK) - { - logError("AsyncRoutingSender::routingInterfacesReady error on registering source, failed with", eCode); - } - } - - //sinks - std::vector::iterator sinkIter = mSinks.begin(); - for (; sinkIter != mSinks.end(); ++sinkIter) - { - am_sinkID_t sinkID; - //set the correct domainID - sinkIter->domainID = mDomains[0].domainID; - if ((eCode = mReceiveInterface->registerSink(*sinkIter, sinkID)) != E_OK) - { - logError("AsyncRoutingSender::routingInterfacesReady error on registering sink, failed with", eCode); - } + logError("AsyncRoutingSender::asyncConnect not enough threads!"); + delete worker; } //gateways @@ -422,7 +384,7 @@ am_Error_e AsyncRoutingSender::asyncConnect(const am_Handle_s handle, const am_c return (E_WRONG_FORMAT); //the operation is ok, lets create a worker, assign it to a task in the task pool - asycConnectWorker *worker = new asycConnectWorker(this, &mPool, &mShadow, handle, connectionID, sourceID, sinkID, connectionFormat); + asycConnectWorker *worker = new asycConnectWorker(this, &mPool, mShadow, handle, connectionID, sourceID, sinkID, connectionFormat); if ((work = mPool.startWork(worker)) == -1) { logError("AsyncRoutingSender::asyncConnect not enough threads!"); @@ -457,7 +419,7 @@ am_Error_e AsyncRoutingSender::asyncDisconnect(const am_Handle_s handle, const a pthread_mutex_unlock(&mMapConnectionMutex); //the operation is ok, lets create a worker, assign it to a task in the task pool - asycDisConnectWorker *worker = new asycDisConnectWorker(this, &mPool, &mShadow, handle, connectionID); + asycDisConnectWorker *worker = new asycDisConnectWorker(this, &mPool, mShadow, handle, connectionID); if ((work = mPool.startWork(worker)) == -1) { logError("AsyncRoutingSender::asyncDisconnect not enough threads!"); @@ -499,7 +461,7 @@ am_Error_e AsyncRoutingSender::asyncSetSinkVolume(const am_Handle_s handle, cons if (sinkIter == mSinks.end()) return (E_NON_EXISTENT); //not found! - asyncSetSinkVolumeWorker *worker = new asyncSetSinkVolumeWorker(this, &mPool, &mShadow, sinkIter->volume, handle, sinkID, volume, ramp, time); + asyncSetSinkVolumeWorker *worker = new asyncSetSinkVolumeWorker(this, &mPool, mShadow, sinkIter->volume, handle, sinkID, volume, ramp, time); if ((work = mPool.startWork(worker)) == -1) { logError("AsyncRoutingSender::asyncSetSinkVolume not enough threads!"); @@ -541,7 +503,7 @@ am_Error_e AsyncRoutingSender::asyncSetSourceVolume(const am_Handle_s handle, co if (sourceIter == mSources.end()) return (E_NON_EXISTENT); //not found! - asyncSetSourceVolumeWorker *worker = new asyncSetSourceVolumeWorker(this, &mPool, &mShadow, sourceIter->volume, handle, sourceID, volume, ramp, time); + asyncSetSourceVolumeWorker *worker = new asyncSetSourceVolumeWorker(this, &mPool, mShadow, sourceIter->volume, handle, sourceID, volume, ramp, time); if ((work = mPool.startWork(worker)) == -1) { logError("AsyncRoutingSender::asyncSetSourceVolume not enough threads!"); @@ -583,7 +545,7 @@ am_Error_e AsyncRoutingSender::asyncSetSourceState(const am_Handle_s handle, con if (sourceIter == mSources.end()) return (E_NON_EXISTENT); //not found! - asyncSetSourceStateWorker *worker = new asyncSetSourceStateWorker(this, &mPool, &mShadow, handle, sourceID, state); + asyncSetSourceStateWorker *worker = new asyncSetSourceStateWorker(this, &mPool, mShadow, handle, sourceID, state); if ((work = mPool.startWork(worker)) == -1) { logError("AsyncRoutingSender::asyncSetSourceState not enough threads!"); @@ -625,7 +587,7 @@ am_Error_e AsyncRoutingSender::asyncSetSinkSoundProperty(const am_Handle_s handl if (sinkIter == mSinks.end()) return (E_NON_EXISTENT); //not found! - asyncSetSinkSoundPropertyWorker *worker = new asyncSetSinkSoundPropertyWorker(this, &mPool, &mShadow, handle, soundProperty, sinkID); + asyncSetSinkSoundPropertyWorker *worker = new asyncSetSinkSoundPropertyWorker(this, &mPool, mShadow, handle, soundProperty, sinkID); if ((work = mPool.startWork(worker)) == -1) { logError("AsyncRoutingSender::asyncSetSinkSoundProperty not enough threads!"); @@ -676,7 +638,7 @@ am_Error_e AsyncRoutingSender::setDomainState(const am_domainID_t domainID, cons if (domainIter == mDomains.end()) return (E_NON_EXISTENT); //not found! - asyncDomainStateChangeWorker *worker = new asyncDomainStateChangeWorker(this, &mPool, &mShadow, domainID, domainState); + asyncDomainStateChangeWorker *worker = new asyncDomainStateChangeWorker(this, &mPool, mShadow, domainID, domainState); if ((work = mPool.startWork(worker)) == -1) { logError("AsyncRoutingSender::setDomainState not enough threads!"); @@ -714,7 +676,7 @@ am_Error_e AsyncRoutingSender::asyncSetSourceSoundProperty(const am_Handle_s han if (sourceIter == mSources.end()) return (E_NON_EXISTENT); //not found! - asyncSetSourceSoundPropertyWorker *worker = new asyncSetSourceSoundPropertyWorker(this, &mPool, &mShadow, handle, soundProperty, sourceID); + asyncSetSourceSoundPropertyWorker *worker = new asyncSetSourceSoundPropertyWorker(this, &mPool, mShadow, handle, soundProperty, sourceID); if ((work = mPool.startWork(worker)) == -1) { logError("AsyncRoutingSender::asyncSetSourceState not enough threads!"); @@ -953,6 +915,27 @@ void am::AsyncRoutingSender::updateDomainstateSafe(am_domainID_t domainID, am_Do pthread_mutex_unlock(&mDomainsMutex); } +void am::AsyncRoutingSender::updateDomainListSafe(std::vector listDomains) +{ + pthread_mutex_lock(&mDomainsMutex); + mDomains = listDomains; + pthread_mutex_unlock(&mDomainsMutex); +} + +void am::AsyncRoutingSender::updateSourceListSafe(std::vector listSource) +{ + pthread_mutex_lock(&mSourcesMutex); + mSources = listSource; + pthread_mutex_unlock(&mSourcesMutex); +} + +void am::AsyncRoutingSender::updateSinkListSafe(std::vector listSinks) +{ + pthread_mutex_lock(&mSinksMutex); + mSinks = listSinks; + pthread_mutex_unlock(&mSinksMutex); +} + void AsyncRoutingSender::getInterfaceVersion(std::string & version) const { version = RoutingSendVersion; @@ -1320,3 +1303,70 @@ void am::asyncDomainStateChangeWorker::cancelWork() mShadow->hookDomainStateChange(mDomainID, mDomainState); } +syncRegisterWorker::syncRegisterWorker(AsyncRoutingSender * asyncSender, WorkerThreadPool* pool, RoutingReceiverAsyncShadow* shadow, const std::vector domains, const std::vector sinks, const std::vector sources, const uint16_t handle) : + Worker(pool), // + mAsyncSender(asyncSender), // + mShadow(shadow), // + mListDomains(domains), // + mListSinks(sinks), // + mListSources(sources), + mHandle(handle) +{ +} + +void syncRegisterWorker::start2work() +{ + //todo: sendchanged data must be in here ! + logInfo("Start to register stuff"); + + am_Error_e eCode; + + std::vector::iterator domainIter = mListDomains.begin(); + for (; domainIter != mListDomains.end(); ++domainIter) + { + am_domainID_t domainID; + if ((eCode = mShadow->registerDomain(*domainIter, domainID)) != E_OK) + { + logError("syncRegisterWorker::start2work error on registering domain, failed with", eCode); + } + domainIter->domainID = domainID; + } + + mAsyncSender->updateDomainListSafe(mListDomains); + + //then sources + std::vector::iterator sourceIter = mListSources.begin(); + for (; sourceIter != mListSources.end(); ++sourceIter) + { + am_sourceID_t sourceID; + //set the correct domainID + sourceIter->domainID = mListDomains[0].domainID; + if ((eCode = mShadow->registerSource(*sourceIter, sourceID)) != E_OK) + { + logError("syncRegisterWorker::start2work error on registering source, failed with", eCode); + } + } + + mAsyncSender->updateSourceListSafe(mListSources); + + //sinks + std::vector::iterator sinkIter = mListSinks.begin(); + for (; sinkIter != mListSinks.end(); ++sinkIter) + { + am_sinkID_t sinkID; + //set the correct domainID + sinkIter->domainID = mListDomains[0].domainID; + if ((eCode = mShadow->registerSink(*sinkIter, sinkID)) != E_OK) + { + logError("syncRegisterWorker::start2work error on registering sink, failed with", eCode); + } + } + + mAsyncSender->updateSinkListSafe(mListSinks); + mShadow->confirmRoutingReady(mHandle); +} + +void syncRegisterWorker::cancelWork() +{ +} + diff --git a/PluginRoutingInterfaceAsync/test/CMakeLists.txt b/PluginRoutingInterfaceAsync/test/CMakeLists.txt index e80d80b..aa4c39d 100644 --- a/PluginRoutingInterfaceAsync/test/CMakeLists.txt +++ b/PluginRoutingInterfaceAsync/test/CMakeLists.txt @@ -34,6 +34,8 @@ find_package (Threads) FIND_PACKAGE(PkgConfig) FIND_PACKAGE(DBUS REQUIRED) pkg_check_modules(DLT REQUIRED automotive-dlt) +pkg_check_modules(SQLITE REQUIRED sqlite3) + INCLUDE_DIRECTORIES( ${STD_INCLUDE_DIRS} ${CMAKE_SOURCE_DIR} @@ -51,7 +53,6 @@ INCLUDE_DIRECTORIES( file(GLOB ASYNC_PLUGIN_INTERFACE_SRCS_CXX "../../AudioManagerDaemon/src/SocketHandler.cpp" - "../../AudioManagerDaemon/src/RoutingSender.cpp" "../../AudioManagerDaemon/src/DLTWrapper.cpp" "../src/*.cpp" "testRoutingInterfaceAsync.cpp" @@ -76,6 +77,7 @@ TARGET_LINK_LIBRARIES(asyncRoutingInterfaceTest ${CMAKE_THREAD_LIBS_INIT} ${GTEST_LIBRARIES} ${DBUS_LIBRARY} + ${SQLITE_LIBRARIES} gmock ) diff --git a/PluginRoutingInterfaceAsync/test/mocklnterfaces.h b/PluginRoutingInterfaceAsync/test/mocklnterfaces.h index 328d416..d8eef1f 100644 --- a/PluginRoutingInterfaceAsync/test/mocklnterfaces.h +++ b/PluginRoutingInterfaceAsync/test/mocklnterfaces.h @@ -31,81 +31,86 @@ namespace am { class MockRoutingReceiveInterface : public RoutingReceiveInterface { public: - MOCK_METHOD3(ackConnect, - void(const am_Handle_s handle, const am_connectionID_t connectionID, const am_Error_e error)); - MOCK_METHOD3(ackDisconnect, - void(const am_Handle_s handle, const am_connectionID_t connectionID, const am_Error_e error)); - MOCK_METHOD3(ackSetSinkVolumeChange, - void(const am_Handle_s handle, const am_volume_t volume, const am_Error_e error)); - MOCK_METHOD3(ackSetSourceVolumeChange, - void(const am_Handle_s handle, const am_volume_t volume, const am_Error_e error)); - MOCK_METHOD2(ackSetSourceState, - void(const am_Handle_s handle, const am_Error_e error)); - MOCK_METHOD2(ackSetSinkSoundProperties, - void(const am_Handle_s handle, const am_Error_e error)); - MOCK_METHOD2(ackSetSinkSoundProperty, - void(const am_Handle_s handle, const am_Error_e error)); - MOCK_METHOD2(ackSetSourceSoundProperties, - void(const am_Handle_s handle, const am_Error_e error)); - MOCK_METHOD2(ackSetSourceSoundProperty, - void(const am_Handle_s handle, const am_Error_e error)); - MOCK_METHOD3(ackCrossFading, - void(const am_Handle_s handle, const am_HotSink_e hotSink, const am_Error_e error)); - MOCK_METHOD3(ackSourceVolumeTick, - void(const am_Handle_s handle, const am_sourceID_t sourceID, const am_volume_t volume)); - MOCK_METHOD3(ackSinkVolumeTick, - void(const am_Handle_s handle, const am_sinkID_t sinkID, const am_volume_t volume)); - MOCK_METHOD2(peekDomain, - am_Error_e(const std::string& name, am_domainID_t& domainID)); - MOCK_METHOD2(registerDomain, - am_Error_e(const am_Domain_s& domainData, am_domainID_t& domainID)); - MOCK_METHOD1(deregisterDomain, - am_Error_e(const am_domainID_t domainID)); - MOCK_METHOD2(registerGateway, - am_Error_e(const am_Gateway_s& gatewayData, am_gatewayID_t& gatewayID)); - MOCK_METHOD1(deregisterGateway, - am_Error_e(const am_gatewayID_t gatewayID)); - MOCK_METHOD2(peekSink, - am_Error_e(const std::string& name, am_sinkID_t& sinkID)); - MOCK_METHOD2(registerSink, - am_Error_e(const am_Sink_s& sinkData, am_sinkID_t& sinkID)); - MOCK_METHOD1(deregisterSink, - am_Error_e(const am_sinkID_t sinkID)); - MOCK_METHOD2(peekSource, - am_Error_e(const std::string& name, am_sourceID_t& sourceID)); - MOCK_METHOD2(registerSource, - am_Error_e(const am_Source_s& sourceData, am_sourceID_t& sourceID)); - MOCK_METHOD1(deregisterSource, - am_Error_e(const am_sourceID_t sourceID)); - MOCK_METHOD2(registerCrossfader, - am_Error_e(const am_Crossfader_s& crossfaderData, am_crossfaderID_t& crossfaderID)); - MOCK_METHOD1(deregisterCrossfader, - am_Error_e(const am_crossfaderID_t crossfaderID)); - MOCK_METHOD2(peekSourceClassID, - am_Error_e(const std::string& name, am_sourceClass_t& sourceClassID)); - MOCK_METHOD2(peekSinkClassID, - am_Error_e(const std::string& name, am_sinkClass_t& sinkClassID)); - MOCK_METHOD2(hookInterruptStatusChange, - void(const am_sourceID_t sourceID, const am_InterruptState_e interruptState)); - MOCK_METHOD1(hookDomainRegistrationComplete, - void(const am_domainID_t domainID)); - MOCK_METHOD2(hookSinkAvailablityStatusChange, - void(const am_sinkID_t sinkID, const am_Availability_s& availability)); - MOCK_METHOD2(hookSourceAvailablityStatusChange, - void(const am_sourceID_t sourceID, const am_Availability_s& availability)); - MOCK_METHOD2(hookDomainStateChange, - void(const am_domainID_t domainID, const am_DomainState_e domainState)); - MOCK_METHOD2(hookTimingInformationChanged, - void(const am_connectionID_t connectionID, const am_timeSync_t delay)); - MOCK_METHOD1(sendChangedData, - void(const std::vector& earlyData)); - MOCK_CONST_METHOD1(getDBusConnectionWrapper, - am_Error_e(DBusWrapper*& dbusConnectionWrapper)); - MOCK_CONST_METHOD1(getSocketHandler, - am_Error_e(SocketHandler*& socketHandler)); - MOCK_CONST_METHOD0(getInterfaceVersion, - uint16_t()); -}; + MOCK_METHOD3(ackConnect, + void(const am_Handle_s handle, const am_connectionID_t connectionID, const am_Error_e error)); + MOCK_METHOD3(ackDisconnect, + void(const am_Handle_s handle, const am_connectionID_t connectionID, const am_Error_e error)); + MOCK_METHOD3(ackSetSinkVolumeChange, + void(const am_Handle_s handle, const am_volume_t volume, const am_Error_e error)); + MOCK_METHOD3(ackSetSourceVolumeChange, + void(const am_Handle_s handle, const am_volume_t volume, const am_Error_e error)); + MOCK_METHOD2(ackSetSourceState, + void(const am_Handle_s handle, const am_Error_e error)); + MOCK_METHOD2(ackSetSinkSoundProperties, + void(const am_Handle_s handle, const am_Error_e error)); + MOCK_METHOD2(ackSetSinkSoundProperty, + void(const am_Handle_s handle, const am_Error_e error)); + MOCK_METHOD2(ackSetSourceSoundProperties, + void(const am_Handle_s handle, const am_Error_e error)); + MOCK_METHOD2(ackSetSourceSoundProperty, + void(const am_Handle_s handle, const am_Error_e error)); + MOCK_METHOD3(ackCrossFading, + void(const am_Handle_s handle, const am_HotSink_e hotSink, const am_Error_e error)); + MOCK_METHOD3(ackSourceVolumeTick, + void(const am_Handle_s handle, const am_sourceID_t sourceID, const am_volume_t volume)); + MOCK_METHOD3(ackSinkVolumeTick, + void(const am_Handle_s handle, const am_sinkID_t sinkID, const am_volume_t volume)); + MOCK_METHOD2(peekDomain, + am_Error_e(const std::string& name, am_domainID_t& domainID)); + MOCK_METHOD2(registerDomain, + am_Error_e(const am_Domain_s& domainData, am_domainID_t& domainID)); + MOCK_METHOD1(deregisterDomain, + am_Error_e(const am_domainID_t domainID)); + MOCK_METHOD2(registerGateway, + am_Error_e(const am_Gateway_s& gatewayData, am_gatewayID_t& gatewayID)); + MOCK_METHOD1(deregisterGateway, + am_Error_e(const am_gatewayID_t gatewayID)); + MOCK_METHOD2(peekSink, + am_Error_e(const std::string& name, am_sinkID_t& sinkID)); + MOCK_METHOD2(registerSink, + am_Error_e(const am_Sink_s& sinkData, am_sinkID_t& sinkID)); + MOCK_METHOD1(deregisterSink, + am_Error_e(const am_sinkID_t sinkID)); + MOCK_METHOD2(peekSource, + am_Error_e(const std::string& name, am_sourceID_t& sourceID)); + MOCK_METHOD2(registerSource, + am_Error_e(const am_Source_s& sourceData, am_sourceID_t& sourceID)); + MOCK_METHOD1(deregisterSource, + am_Error_e(const am_sourceID_t sourceID)); + MOCK_METHOD2(registerCrossfader, + am_Error_e(const am_Crossfader_s& crossfaderData, am_crossfaderID_t& crossfaderID)); + MOCK_METHOD1(deregisterCrossfader, + am_Error_e(const am_crossfaderID_t crossfaderID)); + MOCK_METHOD2(peekSourceClassID, + am_Error_e(const std::string& name, am_sourceClass_t& sourceClassID)); + MOCK_METHOD2(peekSinkClassID, + am_Error_e(const std::string& name, am_sinkClass_t& sinkClassID)); + MOCK_METHOD2(hookInterruptStatusChange, + void(const am_sourceID_t sourceID, const am_InterruptState_e interruptState)); + MOCK_METHOD1(hookDomainRegistrationComplete, + void(const am_domainID_t domainID)); + MOCK_METHOD2(hookSinkAvailablityStatusChange, + void(const am_sinkID_t sinkID, const am_Availability_s& availability)); + MOCK_METHOD2(hookSourceAvailablityStatusChange, + void(const am_sourceID_t sourceID, const am_Availability_s& availability)); + MOCK_METHOD2(hookDomainStateChange, + void(const am_domainID_t domainID, const am_DomainState_e domainState)); + MOCK_METHOD2(hookTimingInformationChanged, + void(const am_connectionID_t connectionID, const am_timeSync_t delay)); + MOCK_METHOD1(sendChangedData, + void(const std::vector& earlyData)); + MOCK_CONST_METHOD1(getDBusConnectionWrapper, + am_Error_e(DBusWrapper*& dbusConnectionWrapper)); + MOCK_CONST_METHOD1(getSocketHandler, + am_Error_e(SocketHandler*& socketHandler)); + MOCK_CONST_METHOD1(getInterfaceVersion, + void(std::string& version)); + MOCK_METHOD1(confirmRoutingReady, + void(const uint16_t handle)); + MOCK_METHOD1(confirmRoutingRundown, + void(const uint16_t handle)); + + }; } // namespace am diff --git a/PluginRoutingInterfaceAsync/test/testRoutingInterfaceAsync.cpp b/PluginRoutingInterfaceAsync/test/testRoutingInterfaceAsync.cpp index 3149e7e..1e672a6 100644 --- a/PluginRoutingInterfaceAsync/test/testRoutingInterfaceAsync.cpp +++ b/PluginRoutingInterfaceAsync/test/testRoutingInterfaceAsync.cpp @@ -25,26 +25,28 @@ #include "testRoutingInterfaceAsync.h" #include "config.h" #include "DLTWrapper.h" +#include "RoutingReceiver.h" +#include "PluginTemplate.h" using namespace am; using namespace testing; -std::vector testRoutingInterfaceAsync::pListRoutingPluginDirs = returnListPlugins(); -am_domainID_t testRoutingInterfaceAsync::mDomainIDCount = 0; -RoutingSender testRoutingInterfaceAsync::pRoutingSender = RoutingSender(pListRoutingPluginDirs); +am_domainID_t MyEnvironment::mDomainIDCount = 0; +static RoutingSendInterface* pRouter; +static SocketHandler pSocketHandler; +static MockRoutingReceiveInterface pReceiveInterface; -testRoutingInterfaceAsync::testRoutingInterfaceAsync() : - pSocketHandler(), // - pReceiveInterface(), // - ptimerCallback(this, &testRoutingInterfaceAsync::timerCallback) +MyEnvironment::MyEnvironment() : + ptimerCallback(this, &MyEnvironment::timerCallback) { + DefaultValue::Set(E_OK); // Sets the default value to be returned. } -testRoutingInterfaceAsync::~testRoutingInterfaceAsync() +MyEnvironment::~MyEnvironment() { } -void testRoutingInterfaceAsync::SetUp() +void MyEnvironment::SetUp() { logInfo("RoutingSendInterface Test started "); @@ -53,59 +55,116 @@ void testRoutingInterfaceAsync::SetUp() domainIDs.push_back(1); EXPECT_CALL(pReceiveInterface,getSocketHandler(_)).WillOnce(DoAll(SetArgReferee<0>(&pSocketHandler), Return(E_OK))); - EXPECT_CALL(pReceiveInterface,registerDomain(_,_)).WillRepeatedly(Invoke(testRoutingInterfaceAsync::handleDomainRegister)); - EXPECT_CALL(pReceiveInterface,registerSource(_,_)).WillRepeatedly(Invoke(testRoutingInterfaceAsync::handleSourceRegister)); - EXPECT_CALL(pReceiveInterface,registerSink(_,_)).WillRepeatedly(Invoke(testRoutingInterfaceAsync::handleSinkRegister)); + EXPECT_CALL(pReceiveInterface,registerDomain(_,_)).WillRepeatedly(Invoke(MyEnvironment::handleDomainRegister)); + EXPECT_CALL(pReceiveInterface,registerSource(_,_)).WillRepeatedly(Invoke(MyEnvironment::handleSourceRegister)); + EXPECT_CALL(pReceiveInterface,registerSink(_,_)).WillRepeatedly(Invoke(MyEnvironment::handleSinkRegister)); + EXPECT_CALL(pReceiveInterface,confirmRoutingReady(_)).Times(1); + + RoutingSendInterface* (*createFunc)(); + void* tempLibHandle = NULL; + std::string libname("../plugins/routing/libPluginRoutingInterfaceAsync.so"); + createFunc = getCreateFunction(libname, tempLibHandle); - pRoutingSender.startupRoutingInterface(&pReceiveInterface); - pRoutingSender.routingInterfacesReady(); + if (!createFunc) + { + logError("RoutingSendInterface Test Entry point of RoutingPlugin not found"); + exit(1); + } + + pRouter = createFunc(); + + if (!pRouter) + { + logError("RoutingSendInterface Test RoutingPlugin initialization failed. Entry Function not callable"); + exit(1); + } + + pRouter->startupInterface(&pReceiveInterface); + pRouter->setRoutingReady(10); timespec t; t.tv_nsec = 0; - t.tv_sec = 4; + t.tv_sec = 2; sh_timerHandle_t handle; shTimerCallBack *buf = &ptimerCallback; //lets use a timeout so the test will finish pSocketHandler.addTimer(t, buf, handle, (void*) NULL); + pSocketHandler.start_listenting(); + +} + +void MyEnvironment::TearDown() +{ + +} + +testRoutingInterfaceAsync::testRoutingInterfaceAsync() : + ptimerCallback(this, &testRoutingInterfaceAsync::timerCallback) +{ +} + +testRoutingInterfaceAsync::~testRoutingInterfaceAsync() +{ +} + +void testRoutingInterfaceAsync::timerCallback(sh_timerHandle_t handle, void *userData) +{ + (void) handle; + (void) userData; + pSocketHandler.stop_listening(); +} + +void testRoutingInterfaceAsync::SetUp() +{ +// timespec t; +// t.tv_nsec = 0; +// t.tv_sec = 2; +// +// sh_timerHandle_t handle; +// +// shTimerCallBack *buf = &ptimerCallback; +// //lets use a timeout so the test will finish +// pSocketHandler.addTimer(t, buf, handle, (void*) NULL); } -std::vector am::testRoutingInterfaceAsync::returnListPlugins() +std::vector MyEnvironment::returnListPlugins() { std::vector list; list.push_back(std::string(DEFAULT_PLUGIN_ROUTING_DIR)); return (list); } -am_Error_e am::testRoutingInterfaceAsync::handleSourceRegister(const am_Source_s & sourceData, am_sourceID_t & sourceID) +am_Error_e MyEnvironment::handleSourceRegister(const am_Source_s & sourceData, am_sourceID_t & sourceID) { sourceID = sourceData.sourceID; - pRoutingSender.addSourceLookup(sourceData); return (E_OK); } -am_Error_e am::testRoutingInterfaceAsync::handleSinkRegister(const am_Sink_s & sinkData, am_sinkID_t & sinkID) +am_Error_e MyEnvironment::handleSinkRegister(const am_Sink_s & sinkData, am_sinkID_t & sinkID) { sinkID = sinkData.sinkID; - pRoutingSender.addSinkLookup(sinkData); return (E_OK); } -am_Error_e am::testRoutingInterfaceAsync::handleDomainRegister(const am_Domain_s & domainData, am_domainID_t & domainID) +am_Error_e MyEnvironment::handleDomainRegister(const am_Domain_s & domainData, am_domainID_t & domainID) { am_Domain_s domain = domainData; - domainID = mDomainIDCount++; + domainID = ++mDomainIDCount; domain.domainID = domainID; - pRoutingSender.addDomainLookup(domain); return (E_OK); } -void am::testRoutingInterfaceAsync::timerCallback(sh_timerHandle_t handle, void *userData) +void MyEnvironment::timerCallback(sh_timerHandle_t handle, void *userData) { (void) handle; (void) userData; pSocketHandler.stop_listening(); + timespec t; + t.tv_nsec = 0; + t.tv_sec = 2; + pSocketHandler.restartTimer(handle, t); } void testRoutingInterfaceAsync::TearDown() @@ -119,7 +178,7 @@ TEST_F(testRoutingInterfaceAsync,setDomainState) EXPECT_CALL(pReceiveInterface,hookDomainStateChange(_,DS_INDEPENDENT_RUNDOWN)).Times(1); - ASSERT_EQ(E_OK, pRoutingSender.setDomainState(domainID,state)); + ASSERT_EQ(E_OK, pRouter->setDomainState(domainID,state)); pSocketHandler.start_listenting(); } @@ -137,7 +196,7 @@ TEST_F(testRoutingInterfaceAsync,setSourceSoundProperty) EXPECT_CALL(pReceiveInterface,ackSetSourceSoundProperty(_,E_OK)).Times(1); - ASSERT_EQ(E_OK, pRoutingSender.asyncSetSourceSoundProperty(handle,sourceID,property)); + ASSERT_EQ(E_OK, pRouter->asyncSetSourceSoundProperty(handle,sourceID,property)); pSocketHandler.start_listenting(); } @@ -155,7 +214,7 @@ TEST_F(testRoutingInterfaceAsync,setSinkSoundProperty) EXPECT_CALL(pReceiveInterface,ackSetSinkSoundProperty(_,E_OK)).Times(1); - ASSERT_EQ(E_OK, pRoutingSender.asyncSetSinkSoundProperty(handle,sinkID,property)); + ASSERT_EQ(E_OK, pRouter->asyncSetSinkSoundProperty(handle,sinkID,property)); pSocketHandler.start_listenting(); } @@ -164,14 +223,14 @@ TEST_F(testRoutingInterfaceAsync,setSourceState) am_Handle_s handle; handle.handle = 1; - handle.handleType = H_SETSOURCEVOLUME; + handle.handleType = H_SETSOURCESTATE; am_sourceID_t sourceID = 1; am_SourceState_e state = SS_OFF; EXPECT_CALL(pReceiveInterface,ackSetSourceState(_,E_OK)).Times(1); - ASSERT_EQ(E_OK, pRoutingSender.asyncSetSourceState(handle,sourceID,state)); + ASSERT_EQ(E_OK, pRouter->asyncSetSourceState(handle,sourceID,state)); pSocketHandler.start_listenting(); } @@ -187,10 +246,10 @@ TEST_F(testRoutingInterfaceAsync,setSourceVolume) am_RampType_e ramp = RAMP_GENIVI_DIRECT; am_time_t myTime = 25; - EXPECT_CALL(pReceiveInterface,ackSourceVolumeTick(_,sourceID,_)).Times(3); + EXPECT_CALL(pReceiveInterface,ackSourceVolumeTick(_,sourceID,_)).Times(AtLeast(1)); EXPECT_CALL(pReceiveInterface,ackSetSourceVolumeChange(_,volume,E_OK)).Times(1); - ASSERT_EQ(E_OK, pRoutingSender.asyncSetSourceVolume(handle,sourceID,volume,ramp,myTime)); + ASSERT_EQ(E_OK, pRouter->asyncSetSourceVolume(handle,sourceID,volume,ramp,myTime)); pSocketHandler.start_listenting(); } @@ -206,10 +265,10 @@ TEST_F(testRoutingInterfaceAsync,setSinkVolume) am_RampType_e ramp = RAMP_GENIVI_DIRECT; am_time_t myTime = 25; - EXPECT_CALL(pReceiveInterface,ackSinkVolumeTick(_,sinkID,_)).Times(9); + EXPECT_CALL(pReceiveInterface,ackSinkVolumeTick(_,sinkID,_)).Times(AtLeast(2)); EXPECT_CALL(pReceiveInterface,ackSetSinkVolumeChange(_,volume,E_OK)).Times(1); - ASSERT_EQ(E_OK, pRoutingSender.asyncSetSinkVolume(handle,sinkID,volume,ramp,myTime)); + ASSERT_EQ(E_OK, pRouter->asyncSetSinkVolume(handle,sinkID,volume,ramp,myTime)); pSocketHandler.start_listenting(); } @@ -228,37 +287,37 @@ TEST_F(testRoutingInterfaceAsync,setSinkVolumeAbort) EXPECT_CALL(pReceiveInterface, ackSinkVolumeTick(_,sinkID,_)); EXPECT_CALL(pReceiveInterface,ackSetSinkVolumeChange(_,AllOf(Ne(volume),Ne(0)),E_ABORTED)).Times(1); - ASSERT_EQ(E_OK, pRoutingSender.asyncSetSinkVolume(handle,sinkID,volume,ramp,myTime)); + ASSERT_EQ(E_OK, pRouter->asyncSetSinkVolume(handle,sinkID,volume,ramp,myTime)); sleep(0.5); - ASSERT_EQ(E_OK, pRoutingSender.asyncAbort(handle)); + ASSERT_EQ(E_OK, pRouter->asyncAbort(handle)); pSocketHandler.start_listenting(); } -TEST_F(testRoutingInterfaceAsync,disconnectTooEarly) +TEST_F(testRoutingInterfaceAsync,disconnectNonExisting) { am_Handle_s handle; handle.handle = 1; - handle.handleType = H_CONNECT; + handle.handleType = H_DISCONNECT; am_connectionID_t connectionID = 4; - am_sourceID_t sourceID = 2; - am_sinkID_t sinkID = 1; - am_ConnectionFormat_e format = CF_GENIVI_ANALOG; - EXPECT_CALL(pReceiveInterface, ackConnect(_,connectionID,E_OK)); + EXPECT_CALL(pReceiveInterface,ackConnect(_,connectionID,E_OK)).Times(0); EXPECT_CALL(pReceiveInterface,ackDisconnect(_,connectionID,E_OK)).Times(0); - ASSERT_EQ(E_OK, pRoutingSender.asyncConnect(handle,connectionID,sourceID,sinkID,format)); - ASSERT_EQ(E_NON_EXISTENT, pRoutingSender.asyncDisconnect(handle,connectionID)); + ASSERT_EQ(E_NON_EXISTENT, pRouter->asyncDisconnect(handle,connectionID)); pSocketHandler.start_listenting(); } -TEST_F(testRoutingInterfaceAsync,disconnectAbort) +TEST_F(testRoutingInterfaceAsync,disconnectTooEarly) { + am_Handle_s handle_c; + handle_c.handle = 1; + handle_c.handleType = H_CONNECT; + am_Handle_s handle; handle.handle = 1; - handle.handleType = H_CONNECT; + handle.handleType = H_DISCONNECT; am_connectionID_t connectionID = 4; am_sourceID_t sourceID = 2; @@ -266,35 +325,47 @@ TEST_F(testRoutingInterfaceAsync,disconnectAbort) am_ConnectionFormat_e format = CF_GENIVI_ANALOG; EXPECT_CALL(pReceiveInterface, ackConnect(_,connectionID,E_OK)); - EXPECT_CALL(pReceiveInterface, ackDisconnect(_,connectionID,E_ABORTED)); - ASSERT_EQ(E_OK, pRoutingSender.asyncConnect(handle,connectionID,sourceID,sinkID,format)); - sleep(2); - ASSERT_EQ(E_OK, pRoutingSender.asyncDisconnect(handle,connectionID)); - ASSERT_EQ(E_OK, pRoutingSender.asyncAbort(handle)); + EXPECT_CALL(pReceiveInterface,ackDisconnect(_,connectionID,E_OK)).Times(0); + ASSERT_EQ(E_OK, pRouter->asyncConnect(handle_c,connectionID,sourceID,sinkID,format)); + ASSERT_EQ(E_NON_EXISTENT, pRouter->asyncDisconnect(handle,connectionID)); pSocketHandler.start_listenting(); } -TEST_F(testRoutingInterfaceAsync,disconnectNonExisting) +TEST_F(testRoutingInterfaceAsync,disconnectAbort) { + am_Handle_s handle_c; + handle_c.handle = 1; + handle_c.handleType = H_CONNECT; + am_Handle_s handle; handle.handle = 1; - handle.handleType = H_CONNECT; + handle.handleType = H_DISCONNECT; - am_connectionID_t connectionID = 4; + am_connectionID_t connectionID = 5; + am_sourceID_t sourceID = 2; + am_sinkID_t sinkID = 1; + am_ConnectionFormat_e format = CF_GENIVI_ANALOG; - EXPECT_CALL(pReceiveInterface,ackConnect(_,connectionID,E_OK)).Times(0); - EXPECT_CALL(pReceiveInterface,ackDisconnect(_,connectionID,E_OK)).Times(0); - ASSERT_EQ(E_NON_EXISTENT, pRoutingSender.asyncDisconnect(handle,connectionID)); + EXPECT_CALL(pReceiveInterface, ackConnect(_,connectionID,E_OK)); + EXPECT_CALL(pReceiveInterface, ackDisconnect(_,connectionID,E_ABORTED)); + ASSERT_EQ(E_OK, pRouter->asyncConnect(handle_c,connectionID,sourceID,sinkID,format)); + sleep(2); + ASSERT_EQ(E_OK, pRouter->asyncDisconnect(handle,connectionID)); + ASSERT_EQ(E_OK, pRouter->asyncAbort(handle)); pSocketHandler.start_listenting(); } TEST_F(testRoutingInterfaceAsync,disconnect) { + am_Handle_s handle_c; + handle_c.handle = 1; + handle_c.handleType = H_CONNECT; + am_Handle_s handle; handle.handle = 1; - handle.handleType = H_CONNECT; + handle.handleType = H_DISCONNECT; am_connectionID_t connectionID = 4; am_sourceID_t sourceID = 2; @@ -303,9 +374,9 @@ TEST_F(testRoutingInterfaceAsync,disconnect) EXPECT_CALL(pReceiveInterface, ackConnect(_,connectionID,E_OK)); EXPECT_CALL(pReceiveInterface, ackDisconnect(_,connectionID,E_OK)); - ASSERT_EQ(E_OK, pRoutingSender.asyncConnect(handle,connectionID,sourceID,sinkID,format)); + ASSERT_EQ(E_OK, pRouter->asyncConnect(handle_c,connectionID,sourceID,sinkID,format)); sleep(2); - ASSERT_EQ(E_OK, pRoutingSender.asyncDisconnect(handle,connectionID)); + ASSERT_EQ(E_OK, pRouter->asyncDisconnect(handle,connectionID)); pSocketHandler.start_listenting(); } @@ -322,12 +393,12 @@ TEST_F(testRoutingInterfaceAsync,connectNoMoreThreads) am_ConnectionFormat_e format = CF_GENIVI_ANALOG; EXPECT_CALL(pReceiveInterface,ackConnect(_,_,E_OK)).Times(10); - for (int i = 0; i < 10; i++) + for (int i = 0; i < 9; i++) { handle.handle++; connectionID++; - ASSERT_EQ(E_OK, pRoutingSender.asyncConnect(handle,connectionID,sourceID,sinkID,format)); - }ASSERT_EQ(E_NOT_POSSIBLE, pRoutingSender.asyncConnect(handle,connectionID,sourceID,sinkID,format)); + ASSERT_EQ(E_OK, pRouter->asyncConnect(handle,connectionID,sourceID,sinkID,format)); + }ASSERT_EQ(E_NOT_POSSIBLE, pRouter->asyncConnect(handle,connectionID,sourceID,sinkID,format)); pSocketHandler.start_listenting(); } @@ -344,9 +415,9 @@ TEST_F(testRoutingInterfaceAsync,connectAbortTooLate) am_ConnectionFormat_e format = CF_GENIVI_ANALOG; EXPECT_CALL(pReceiveInterface,ackConnect(_,connectionID,E_OK)).Times(1); - ASSERT_EQ(E_OK, pRoutingSender.asyncConnect(handle,connectionID,sourceID,sinkID,format)); + ASSERT_EQ(E_OK, pRouter->asyncConnect(handle,connectionID,sourceID,sinkID,format)); sleep(3); - ASSERT_EQ(E_NON_EXISTENT, pRoutingSender.asyncAbort(handle)); + ASSERT_EQ(E_NON_EXISTENT, pRouter->asyncAbort(handle)); pSocketHandler.start_listenting(); } @@ -363,9 +434,9 @@ TEST_F(testRoutingInterfaceAsync,connectAbort) am_ConnectionFormat_e format = CF_GENIVI_ANALOG; EXPECT_CALL(pReceiveInterface,ackConnect(_,connectionID,E_ABORTED)).Times(1); - ASSERT_EQ(E_OK, pRoutingSender.asyncConnect(handle,connectionID,sourceID,sinkID,format)); + ASSERT_EQ(E_OK, pRouter->asyncConnect(handle,connectionID,sourceID,sinkID,format)); sleep(0.5); - ASSERT_EQ(E_OK, pRoutingSender.asyncAbort(handle)); + ASSERT_EQ(E_OK, pRouter->asyncAbort(handle)); pSocketHandler.start_listenting(); } @@ -382,7 +453,7 @@ TEST_F(testRoutingInterfaceAsync,connectWrongFormat) am_ConnectionFormat_e format = CF_GENIVI_MONO; EXPECT_CALL(pReceiveInterface,ackConnect(_,connectionID,E_OK)).Times(0); - ASSERT_EQ(E_WRONG_FORMAT, pRoutingSender.asyncConnect(handle,connectionID,sourceID,sinkID,format)); + ASSERT_EQ(E_WRONG_FORMAT, pRouter->asyncConnect(handle,connectionID,sourceID,sinkID,format)); pSocketHandler.start_listenting(); } @@ -399,7 +470,7 @@ TEST_F(testRoutingInterfaceAsync,connectWrongSink) am_ConnectionFormat_e format = CF_GENIVI_ANALOG; EXPECT_CALL(pReceiveInterface,ackConnect(_,connectionID,E_OK)).Times(0); - ASSERT_EQ(E_NON_EXISTENT, pRoutingSender.asyncConnect(handle,connectionID,sourceID,sinkID,format)); + ASSERT_EQ(E_NON_EXISTENT, pRouter->asyncConnect(handle,connectionID,sourceID,sinkID,format)); pSocketHandler.start_listenting(); } @@ -415,7 +486,7 @@ TEST_F(testRoutingInterfaceAsync,connectWrongSource) am_ConnectionFormat_e format = CF_GENIVI_ANALOG; EXPECT_CALL(pReceiveInterface,ackConnect(_,connectionID,E_OK)).Times(0); - ASSERT_EQ(E_NON_EXISTENT, pRoutingSender.asyncConnect(handle,connectionID,sourceID,sinkID,format)); + ASSERT_EQ(E_NON_EXISTENT, pRouter->asyncConnect(handle,connectionID,sourceID,sinkID,format)); pSocketHandler.start_listenting(); } @@ -432,13 +503,15 @@ TEST_F(testRoutingInterfaceAsync,connect) am_ConnectionFormat_e format = CF_GENIVI_ANALOG; EXPECT_CALL(pReceiveInterface, ackConnect(_,connectionID,E_OK)); - ASSERT_EQ(E_OK, pRoutingSender.asyncConnect(handle,connectionID,sourceID,sinkID,format)); + ASSERT_EQ(E_OK, pRouter->asyncConnect(handle,connectionID,sourceID,sinkID,format)); pSocketHandler.start_listenting(); } int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); + ::testing::Environment* const env = ::testing::AddGlobalTestEnvironment(new MyEnvironment); + (void) env; return RUN_ALL_TESTS(); } diff --git a/PluginRoutingInterfaceAsync/test/testRoutingInterfaceAsync.h b/PluginRoutingInterfaceAsync/test/testRoutingInterfaceAsync.h index 364100c..3139b5a 100644 --- a/PluginRoutingInterfaceAsync/test/testRoutingInterfaceAsync.h +++ b/PluginRoutingInterfaceAsync/test/testRoutingInterfaceAsync.h @@ -30,33 +30,43 @@ #include "mocklnterfaces.h" #include "SocketHandler.h" #include "../../AudioManagerDaemon/include/RoutingSender.h" +#include "CAmSerializer.h" #define UNIT_TEST 1 namespace am { -class testRoutingInterfaceAsync: public ::testing::Test +class MyEnvironment: public ::testing::Environment { public: - static std::vector pListRoutingPluginDirs; - SocketHandler pSocketHandler; - MockRoutingReceiveInterface pReceiveInterface; - static RoutingSender pRoutingSender; static std::vector returnListPlugins(); static am_Error_e handleDomainRegister(const am_Domain_s& domainData, am_domainID_t& domainID); static am_Error_e handleSourceRegister(const am_Source_s& sourceData, am_sourceID_t& sourceID); static am_Error_e handleSinkRegister(const am_Sink_s& sinkData, am_sinkID_t& sinkID); void timerCallback(sh_timerHandle_t handle, void* userData); - shTimerCallBack_T ptimerCallback; - testRoutingInterfaceAsync(); - virtual ~testRoutingInterfaceAsync(); - + shTimerCallBack_T ptimerCallback; + MyEnvironment(); + ~MyEnvironment(); + // Override this to define how to set up the environment. void SetUp(); + // Override this to define how to tear down the environment. void TearDown(); private: static am_domainID_t mDomainIDCount; }; +class testRoutingInterfaceAsync: public ::testing::Test +{ +public: + testRoutingInterfaceAsync(); + ~testRoutingInterfaceAsync(); + void timerCallback(sh_timerHandle_t handle, void* userData); + shTimerCallBack_T ptimerCallback; + void SetUp(); + void TearDown(); +private: +}; + } /* namespace am */ #endif /* TESTROUTINGINTERFACEASYNC_H_ */ diff --git a/PluginRoutingInterfaceAsync/test/testRoutingInterfaceAsyncInterrupt.cpp b/PluginRoutingInterfaceAsync/test/testRoutingInterfaceAsyncInterrupt.cpp index f8ece51..e00de40 100644 --- a/PluginRoutingInterfaceAsync/test/testRoutingInterfaceAsyncInterrupt.cpp +++ b/PluginRoutingInterfaceAsync/test/testRoutingInterfaceAsyncInterrupt.cpp @@ -59,8 +59,8 @@ void testRoutingInterfaceAsync::SetUp() EXPECT_CALL(pReceiveInterface,registerSource(_,_)).WillRepeatedly(Invoke(testRoutingInterfaceAsync::handleSourceRegister)); EXPECT_CALL(pReceiveInterface,registerSink(_,_)).WillRepeatedly(Invoke(testRoutingInterfaceAsync::handleSinkRegister)); - pRoutingSender.startupRoutingInterface(&pReceiveInterface); - pRoutingSender.routingInterfacesReady(); + //pRoutingSender.startupRoutingInterface(&pReceiveInterface); + //pRoutingSender.routingInterfacesReady(); timespec t; t.tv_nsec = 0; -- cgit v1.2.1