diff options
author | christian mueller <christian.ei.mueller@bmw.de> | 2012-02-29 13:27:27 +0100 |
---|---|---|
committer | christian mueller <christian.ei.mueller@bmw.de> | 2012-02-29 13:27:27 +0100 |
commit | d3ccf97331935b181041394b80be20dca282ea71 (patch) | |
tree | f85e7b649c8a6d58c11e9728b2442cdcff53826d /PluginRoutingInterfaceAsync/src | |
parent | aa93713377d28a8ce7821466ef828f79a18e982d (diff) | |
download | audiomanager-d3ccf97331935b181041394b80be20dca282ea71.tar.gz |
* [ performance] for classes that do not need to be derived from, removed virtual desctructor
* implemented confirmation of routing ready in RoutingReceiver
* [Sockethandler] automatically set gDispatchDone to 0 when starting mainloop
* fixed unit text to work with latest changes (expect Dbus command interface)
* [GAM-4] added way to do synchronous calling on interfaces with the help of CAmSerializer.h
* reworked AsyncRoutingPlugin to work with CAmSerializer.h
* reworked AsyncRoutingPlugin to register elemtes in thread using CAmSerializer.h
* reworked AsncPlugin Tests to work with remodelled Plugin
Diffstat (limited to 'PluginRoutingInterfaceAsync/src')
-rw-r--r-- | PluginRoutingInterfaceAsync/src/RoutingReceiverAsyncShadow.cpp | 473 | ||||
-rw-r--r-- | PluginRoutingInterfaceAsync/src/RoutingSenderAsync.cpp | 466 |
2 files changed, 325 insertions, 614 deletions
diff --git a/PluginRoutingInterfaceAsync/src/RoutingReceiverAsyncShadow.cpp b/PluginRoutingInterfaceAsync/src/RoutingReceiverAsyncShadow.cpp index c11201e..5c93f60 100644 --- a/PluginRoutingInterfaceAsync/src/RoutingReceiverAsyncShadow.cpp +++ b/PluginRoutingInterfaceAsync/src/RoutingReceiverAsyncShadow.cpp @@ -36,16 +36,10 @@ using namespace am; -pthread_mutex_t RoutingReceiverAsyncShadow::mMutex = PTHREAD_MUTEX_INITIALIZER; - -RoutingReceiverAsyncShadow::RoutingReceiverAsyncShadow(): -asyncMsgReceive(this, &RoutingReceiverAsyncShadow::asyncMsgReceiver), // - asyncDispatch(this, &RoutingReceiverAsyncShadow::asyncDispatcher), // - asyncCheck(this, &RoutingReceiverAsyncShadow::asyncChecker), // - mSocketHandler(), // - mRoutingReceiveInterface(), // -mHandle (), // -mPipe() +RoutingReceiverAsyncShadow::RoutingReceiverAsyncShadow(RoutingReceiveInterface* iReceiveInterface, SocketHandler* iSocketHandler) : + mSocketHandler(iSocketHandler), // + mRoutingReceiveInterface(iReceiveInterface), // + mSerializer(iSocketHandler) { } @@ -56,472 +50,139 @@ RoutingReceiverAsyncShadow::~RoutingReceiverAsyncShadow() void RoutingReceiverAsyncShadow::ackConnect(const am_Handle_s handle, const am_connectionID_t connectionID, const am_Error_e error) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_connect_s temp; - temp.handle = handle; - temp.connectionID = connectionID; - temp.error = error; - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_ACKCONNECT; - msg.parameters.connect = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::ackConnect write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall<RoutingReceiveInterface, const am_Handle_s, const am_connectionID_t, const am_Error_e>(mRoutingReceiveInterface, &RoutingReceiveInterface::ackConnect, handle, connectionID, error); } void RoutingReceiverAsyncShadow::ackDisconnect(const am_Handle_s handle, const am_connectionID_t connectionID, const am_Error_e error) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_connect_s temp; - temp.handle = handle; - temp.connectionID = connectionID; - temp.error = error; - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_ACKDISCONNECT; - msg.parameters.connect = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::ackDisconnect write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall<RoutingReceiveInterface, const am_Handle_s, const am_connectionID_t, const am_Error_e>(mRoutingReceiveInterface, &RoutingReceiveInterface::ackDisconnect, handle, connectionID, error); } void RoutingReceiverAsyncShadow::ackSetSinkVolumeChange(const am_Handle_s handle, const am_volume_t volume, const am_Error_e error) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_volume_s temp; - temp.handle = handle; - temp.volume = volume; - temp.error = error; - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_ACKSETSINKVOLUMECHANGE; - msg.parameters.volume = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::ackSetSinkVolumeChange write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall<RoutingReceiveInterface, const am_Handle_s, const am_volume_t, const am_Error_e>(mRoutingReceiveInterface, &RoutingReceiveInterface::ackSetSinkVolumeChange, handle, volume, error); } void RoutingReceiverAsyncShadow::ackSetSourceVolumeChange(const am_Handle_s handle, const am_volume_t volume, const am_Error_e error) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_volume_s temp; - temp.handle = handle; - temp.volume = volume; - temp.error = error; - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_ACKSETSOURCEVOLUMECHANGE; - msg.parameters.volume = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::ackSetSourceVolumeChange write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall<RoutingReceiveInterface, const am_Handle_s, const am_volume_t, const am_Error_e>(mRoutingReceiveInterface, &RoutingReceiveInterface::ackSetSourceVolumeChange, handle, volume, error); } void RoutingReceiverAsyncShadow::ackSetSourceState(const am_Handle_s handle, const am_Error_e error) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_handle_s temp; - temp.handle = handle; - temp.error = error; - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_ACKSETSOURCESTATE; - msg.parameters.handle = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::ackSetSourceState write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall<RoutingReceiveInterface, const am_Handle_s, const am_Error_e>(mRoutingReceiveInterface, &RoutingReceiveInterface::ackSetSourceState, handle, error); } void RoutingReceiverAsyncShadow::ackSetSinkSoundProperty(const am_Handle_s handle, const am_Error_e error) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_handle_s temp; - temp.handle = handle; - temp.error = error; - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_ACKSETSINKSOUNDPROPERTY; - msg.parameters.handle = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::ackSetSinkSoundProperty write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall<RoutingReceiveInterface, const am_Handle_s, const am_Error_e>(mRoutingReceiveInterface, &RoutingReceiveInterface::ackSetSinkSoundProperty, handle, error); } void RoutingReceiverAsyncShadow::ackSetSourceSoundProperty(const am_Handle_s handle, const am_Error_e error) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_handle_s temp; - temp.handle = handle; - temp.error = error; - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_ACKSETSOURCESOUNDPROPERTY; - msg.parameters.handle = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::ackSetSourceSoundProperty write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall<RoutingReceiveInterface, const am_Handle_s, const am_Error_e>(mRoutingReceiveInterface, &RoutingReceiveInterface::ackSetSourceSoundProperty, handle, error); } void RoutingReceiverAsyncShadow::ackCrossFading(const am_Handle_s handle, const am_HotSink_e hotSink, const am_Error_e error) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_crossfading_s temp; - temp.handle = handle; - temp.hotSink = hotSink; - temp.error = error; - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_ACKCROSSFADING; - msg.parameters.crossfading = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::ackCrossFading write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall<RoutingReceiveInterface, const am_Handle_s, const am_HotSink_e, const am_Error_e>(mRoutingReceiveInterface, &RoutingReceiveInterface::ackCrossFading, handle, hotSink, error); } void RoutingReceiverAsyncShadow::ackSourceVolumeTick(const am_Handle_s handle, const am_sourceID_t sourceID, const am_volume_t volume) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_sourceVolumeTick_s temp; - temp.sourceID = sourceID; - temp.handle = handle; - temp.volume = volume; - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_ACKSOURCEVOLUMETICK; - msg.parameters.sourceVolumeTick = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::ackSourceVolumeTick write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall<RoutingReceiveInterface, const am_Handle_s, const am_sourceID_t, const am_volume_t>(mRoutingReceiveInterface, &RoutingReceiveInterface::ackSourceVolumeTick, handle, sourceID, volume); } void RoutingReceiverAsyncShadow::ackSinkVolumeTick(const am_Handle_s handle, const am_sinkID_t sinkID, const am_volume_t volume) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_sinkVolumeTick_s temp; - temp.sinkID = sinkID; - temp.handle = handle; - temp.volume = volume; - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_ACKSINKVOLUMETICK; - msg.parameters.sinkVolumeTick = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::ackSinkVolumeTick write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall<RoutingReceiveInterface, const am_Handle_s, const am_sinkID_t, const am_volume_t>(mRoutingReceiveInterface, &RoutingReceiveInterface::ackSinkVolumeTick, handle, sinkID, volume); } void RoutingReceiverAsyncShadow::hookInterruptStatusChange(const am_sourceID_t sourceID, const am_InterruptState_e interruptState) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_interruptStatusChange_s temp; - temp.sourceID = sourceID; - temp.interruptState = interruptState; - - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_HOOKINTERRUPTSTATUSCHANGE; - msg.parameters.interruptStatusChange = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::hookInterruptStatusChange write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall<RoutingReceiveInterface, const am_sinkID_t, const am_InterruptState_e>(mRoutingReceiveInterface, &RoutingReceiveInterface::hookInterruptStatusChange, sourceID, interruptState); } void RoutingReceiverAsyncShadow::hookSinkAvailablityStatusChange(const am_sinkID_t sinkID, const am_Availability_s & availability) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_sinkAvailability_s temp; - temp.sinkID = sinkID; - temp.availability = availability; - - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_HOOKSINKAVAILABLITYSTATUSCHANGE; - msg.parameters.sinkAvailability = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::hookSinkAvailablityStatusChange write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall<RoutingReceiveInterface, const am_sinkID_t, const am_Availability_s&>(mRoutingReceiveInterface, &RoutingReceiveInterface::hookSinkAvailablityStatusChange, sinkID, availability); } void RoutingReceiverAsyncShadow::hookSourceAvailablityStatusChange(const am_sourceID_t sourceID, const am_Availability_s & availability) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_sourceAvailability_s temp; - temp.sourceID = sourceID; - temp.availability = availability; - - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_HOOKSOURCEAVAILABLITYSTATUSCHANGE; - msg.parameters.sourceAvailability = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::hookSourceAvailablityStatusChange write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall<RoutingReceiveInterface, const am_sourceID_t, const am_Availability_s&>(mRoutingReceiveInterface, &RoutingReceiveInterface::hookSourceAvailablityStatusChange, sourceID, availability); } void RoutingReceiverAsyncShadow::hookDomainStateChange(const am_domainID_t domainID, const am_DomainState_e domainState) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_hookDomainStateChange_s temp; - temp.domainID = domainID; - temp.state = domainState; - - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_HOOKDOMAINSTATECHANGE; - msg.parameters.domainStateChange = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::hookDomainStateChange write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall<RoutingReceiveInterface, const am_domainID_t, const am_DomainState_e>(mRoutingReceiveInterface, &RoutingReceiveInterface::hookDomainStateChange, domainID, domainState); } void RoutingReceiverAsyncShadow::hookTimingInformationChanged(const am_connectionID_t connectionID, const am_timeSync_t delay) { - assert(mPipe[0]!=0); - //put the data in the queue: - a_timingInfoChanged_s temp; - temp.connectionID = connectionID; - temp.delay = delay; - - //then we make a message out of it: - msg_s msg; - msg.msgID = MSG_HOOKTIMINGINFORMATIONCHANGED; - msg.parameters.timingInfoChange = temp; - //here we share data ! - pthread_mutex_lock(&mMutex); - mQueue.push(msg); - pthread_mutex_unlock(&mMutex); - - //ok, fire the signal that data needs to be received ! - if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1) - { - logError("RoutingReceiverAsyncShadow::hookTimingInformationChanged write failed, error code:", strerror(errno)); - } + mSerializer.asyncCall<RoutingReceiveInterface, const am_connectionID_t, const am_timeSync_t>(mRoutingReceiveInterface, &RoutingReceiveInterface::hookTimingInformationChanged, connectionID, delay); } -void RoutingReceiverAsyncShadow::asyncMsgReceiver(const pollfd pollfd, const sh_pollHandle_t handle, void *userData) +am_Error_e RoutingReceiverAsyncShadow::registerDomain(const am_Domain_s & domainData, am_domainID_t & domainID) { - (void) handle; - (void) userData; - //it is no really important what to read here, we will read the queue later... - char buffer[10]; - if (read(pollfd.fd, buffer, 10) == -1) - { - logError("RoutingReceiverAsyncShadow::asyncMsgReceiver could not read!"); - } + am_Error_e error (E_UNKNOWN); + am_Domain_s domainDataCopy(domainData); + mSerializer.syncCall<RoutingReceiveInterface, am_Error_e, const am_Domain_s&,am_domainID_t&, am_Domain_s, am_domainID_t>(mRoutingReceiveInterface, &RoutingReceiveInterface::registerDomain, error, domainDataCopy, domainID); + return (error); } -bool RoutingReceiverAsyncShadow::asyncDispatcher(const sh_pollHandle_t handle, void *userData) +am_Error_e am::RoutingReceiverAsyncShadow::registerGateway(const am_Gateway_s & gatewayData, am_gatewayID_t & gatewayID) { - (void) handle; - (void) userData; - msg_s msg; - - //ok, let's receive, first lock - pthread_mutex_lock(&mMutex); - msg = mQueue.front(); - mQueue.pop(); - pthread_mutex_unlock(&mMutex); + am_Error_e error (E_UNKNOWN); + am_Gateway_s gatewayDataCopy(gatewayData); + mSerializer.syncCall<RoutingReceiveInterface, am_Error_e, const am_Gateway_s&, am_gatewayID_t&, am_Gateway_s, am_gatewayID_t>(mRoutingReceiveInterface,&RoutingReceiveInterface::registerGateway, error, gatewayDataCopy, gatewayID); + return (error); +} - //check for the message: - switch (msg.msgID) - { - case MSG_ACKCONNECT: - mRoutingReceiveInterface->ackConnect(msg.parameters.connect.handle, msg.parameters.connect.connectionID, msg.parameters.connect.error); - break; - case MSG_ACKDISCONNECT: - mRoutingReceiveInterface->ackDisconnect(msg.parameters.connect.handle, msg.parameters.connect.connectionID, msg.parameters.connect.error); - break; - case MSG_ACKSETSINKVOLUMECHANGE: - mRoutingReceiveInterface->ackSetSinkVolumeChange(msg.parameters.volume.handle, msg.parameters.volume.volume, msg.parameters.volume.error); - break; - case MSG_ACKSETSOURCEVOLUMECHANGE: - mRoutingReceiveInterface->ackSetSourceVolumeChange(msg.parameters.volume.handle, msg.parameters.volume.volume, msg.parameters.volume.error); - break; - case MSG_ACKSETSOURCESTATE: - mRoutingReceiveInterface->ackSetSourceState(msg.parameters.handle.handle, msg.parameters.handle.error); - break; - case MSG_ACKSETSINKSOUNDPROPERTY: - mRoutingReceiveInterface->ackSetSinkSoundProperty(msg.parameters.handle.handle, msg.parameters.handle.error); - break; - case MSG_ACKSETSOURCESOUNDPROPERTY: - mRoutingReceiveInterface->ackSetSourceSoundProperty(msg.parameters.handle.handle, msg.parameters.handle.error); - break; - case MSG_ACKCROSSFADING: - mRoutingReceiveInterface->ackCrossFading(msg.parameters.crossfading.handle, msg.parameters.crossfading.hotSink, msg.parameters.crossfading.error); - break; - case MSG_ACKSOURCEVOLUMETICK: - mRoutingReceiveInterface->ackSourceVolumeTick(msg.parameters.sourceVolumeTick.handle, msg.parameters.sourceVolumeTick.sourceID, msg.parameters.sourceVolumeTick.volume); - break; - case MSG_ACKSINKVOLUMETICK: - mRoutingReceiveInterface->ackSinkVolumeTick(msg.parameters.sinkVolumeTick.handle, msg.parameters.sinkVolumeTick.sinkID, msg.parameters.sinkVolumeTick.volume); - break; - case MSG_HOOKINTERRUPTSTATUSCHANGE: - mRoutingReceiveInterface->hookInterruptStatusChange(msg.parameters.interruptStatusChange.sourceID, msg.parameters.interruptStatusChange.interruptState); - break; - case MSG_HOOKSINKAVAILABLITYSTATUSCHANGE: - mRoutingReceiveInterface->hookSinkAvailablityStatusChange(msg.parameters.sinkAvailability.sinkID, msg.parameters.sinkAvailability.availability); - break; - case MSG_HOOKSOURCEAVAILABLITYSTATUSCHANGE: - mRoutingReceiveInterface->hookSourceAvailablityStatusChange(msg.parameters.sourceAvailability.sourceID, msg.parameters.sourceAvailability.availability); - break; - case MSG_HOOKDOMAINSTATECHANGE: - mRoutingReceiveInterface->hookDomainStateChange(msg.parameters.domainStateChange.domainID, msg.parameters.domainStateChange.state); - break; - case MSG_HOOKTIMINGINFORMATIONCHANGED: - mRoutingReceiveInterface->hookTimingInformationChanged(msg.parameters.timingInfoChange.connectionID, msg.parameters.timingInfoChange.delay); - break; - default: - logError("RoutingReceiverAsyncShadow::asyncDispatcher unknown message was received:", msg.msgID); - break; - } +am_Error_e am::RoutingReceiverAsyncShadow::registerSink(const am_Sink_s & sinkData, am_sinkID_t & sinkID) +{ + am_Error_e error (E_UNKNOWN); + am_Sink_s sinkDataCopy(sinkData); + mSerializer.syncCall<RoutingReceiveInterface, am_Error_e, const am_Sink_s&, am_sinkID_t&, am_Sink_s, am_sinkID_t>(mRoutingReceiveInterface,&RoutingReceiveInterface::registerSink, error, sinkDataCopy, sinkID); + return (error); +} - bool retVal = false; - pthread_mutex_lock(&mMutex); - if (mQueue.size() > 0) - retVal = true; - pthread_mutex_unlock(&mMutex); +am_Error_e am::RoutingReceiverAsyncShadow::deregisterSink(const am_sinkID_t sinkID) +{ + am_Error_e error; + am_sinkID_t s(sinkID); //no const values allowed in syncCalls due to reference ! + mSerializer.syncCall<RoutingReceiveInterface, am_Error_e, am_sinkID_t>(mRoutingReceiveInterface, &RoutingReceiveInterface::deregisterSink, error, s); + return (error); +} - return (retVal); +am_Error_e am::RoutingReceiverAsyncShadow::registerSource(const am_Source_s & sourceData, am_sourceID_t & sourceID) +{ + am_Error_e error (E_UNKNOWN); + am_Source_s sourceDataCopy(sourceData); + mSerializer.syncCall<RoutingReceiveInterface, am_Error_e, const am_Source_s&, am_sourceID_t&, am_Source_s, am_sourceID_t>(mRoutingReceiveInterface,&RoutingReceiveInterface::registerSource, error, sourceDataCopy, sourceID); + return (error); } -bool RoutingReceiverAsyncShadow::asyncChecker(const sh_pollHandle_t handle, void *userData) +am_Error_e am::RoutingReceiverAsyncShadow::deregisterSource(const am_sourceID_t sourceID) { - (void) handle; - (void) userData; - bool returnVal = false; - pthread_mutex_lock(&mMutex); - if (mQueue.size() > 0) - returnVal = true; - pthread_mutex_unlock(&mMutex); - return (returnVal); + am_Error_e error; + am_sourceID_t s(sourceID); //no const values allowed in syncCalls due to reference ! + mSerializer.syncCall<RoutingReceiveInterface, am_Error_e, am_sinkID_t>(mRoutingReceiveInterface, &RoutingReceiveInterface::deregisterSource, error, s); + return (error); } -am_Error_e RoutingReceiverAsyncShadow::setRoutingInterface(RoutingReceiveInterface *receiveInterface) +am_Error_e am::RoutingReceiverAsyncShadow::registerCrossfader(const am_Crossfader_s & crossfaderData, am_crossfaderID_t & crossfaderID) { - assert(receiveInterface!=0); - mRoutingReceiveInterface = receiveInterface; - mRoutingReceiveInterface->getSocketHandler(mSocketHandler); - if (pipe(mPipe) == -1) - { - logError("RoutingReceiverAsyncShadow::setRoutingInterface could not create pipe!:"); - return (E_UNKNOWN); - } + am_Error_e error (E_UNKNOWN); + am_Crossfader_s crossfaderDataCopy(crossfaderData); + mSerializer.syncCall<RoutingReceiveInterface, am_Error_e, const am_Crossfader_s&, am_crossfaderID_t&, am_Crossfader_s, am_crossfaderID_t>(mRoutingReceiveInterface,&RoutingReceiveInterface::registerCrossfader, error, crossfaderDataCopy, crossfaderID); + return (error); +} - short event = 0; - event |= POLLIN; - mSocketHandler->addFDPoll(mPipe[0], event, NULL, &asyncMsgReceive, &asyncCheck, &asyncDispatch, NULL, mHandle); - return (E_OK); +void am::RoutingReceiverAsyncShadow::confirmRoutingReady(uint16_t starupHandle) +{ + mSerializer.asyncCall<RoutingReceiveInterface,uint16_t>(mRoutingReceiveInterface,&RoutingReceiveInterface::confirmRoutingReady,starupHandle); } + + + diff --git a/PluginRoutingInterfaceAsync/src/RoutingSenderAsync.cpp b/PluginRoutingInterfaceAsync/src/RoutingSenderAsync.cpp index 1a1ee70..68b260f 100644 --- a/PluginRoutingInterfaceAsync/src/RoutingSenderAsync.cpp +++ b/PluginRoutingInterfaceAsync/src/RoutingSenderAsync.cpp @@ -55,175 +55,174 @@ pthread_mutex_t AsyncRoutingSender::mSourcesMutex= PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t AsyncRoutingSender::mDomainsMutex= PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t WorkerThreadPool::mBlockingMutex = PTHREAD_MUTEX_INITIALIZER; -void* AsyncRoutingSender::InterruptEvents(void *data) -{ - RoutingReceiverAsyncShadow *shadow=(RoutingReceiverAsyncShadow *)data; - DBusError err; - DBusMessage* msg; - DBusConnection* conn; - dbus_error_init(&err); - conn = dbus_bus_get(DBUS_BUS_SESSION, &err); - dbus_uint32_t serial = 0; - DBusMessage* reply; - DBusMessageIter args; - dbus_bus_request_name(conn, "org.genivi.test",DBUS_NAME_FLAG_REPLACE_EXISTING , &err); - - while (dbus_connection_read_write_dispatch(conn, -1)) - { - dbus_connection_read_write(conn, 0); - msg = dbus_connection_pop_message(conn); - - if (dbus_message_is_method_call(msg, "org.genivi.test", "timingChanged")) +//void* AsyncRoutingSender::InterruptEvents(void *data) +//{ +// RoutingReceiverAsyncShadow *shadow=(RoutingReceiverAsyncShadow *)data; +// DBusError err; +// DBusMessage* msg; +// DBusConnection* conn; +// dbus_error_init(&err); +// conn = dbus_bus_get(DBUS_BUS_SESSION, &err); +// dbus_uint32_t serial = 0; +// DBusMessage* reply; +// DBusMessageIter args; +// dbus_bus_request_name(conn, "org.genivi.test",DBUS_NAME_FLAG_REPLACE_EXISTING , &err); +// +// while (dbus_connection_read_write_dispatch(conn, -1)) +// { +// dbus_connection_read_write(conn, 0); +// msg = dbus_connection_pop_message(conn); +// +// if (dbus_message_is_method_call(msg, "org.genivi.test", "timingChanged")) +// { +// am_connectionID_t connectionID; +// am_timeSync_t delay; +// dbus_message_iter_init(msg, &args); +// dbus_message_iter_get_basic(&args,(void*) &connectionID); +// dbus_message_iter_next(&args); +// dbus_message_iter_get_basic(&args,(void*) &delay); +// reply = dbus_message_new_method_return(msg); +// dbus_message_iter_init_append(reply, &args); +// dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &connectionID); +// dbus_connection_send(conn, reply, &serial); +// shadow->hookTimingInformationChanged(connectionID,delay); +// dbus_message_unref(reply); +// } +// else if (dbus_message_is_method_call(msg, "org.genivi.test", "SinkAvailablityStatusChange")) +// { +// am_sinkID_t sinkID; +// am_Availability_s availability; +// dbus_message_iter_init(msg, &args); +// dbus_message_iter_get_basic(&args,(void*) &sinkID); +// reply = dbus_message_new_method_return(msg); +// dbus_message_iter_init_append(reply, &args); +// dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sinkID); +// dbus_connection_send(conn, reply, &serial); +// shadow->hookSinkAvailablityStatusChange(sinkID,availability); +// dbus_message_unref(reply); +// } +// else if (dbus_message_is_method_call(msg, "org.genivi.test", "SourceAvailablityStatusChange")) +// { +// am_sourceID_t sourceID; +// am_Availability_s availability; +// dbus_message_iter_init(msg, &args); +// dbus_message_iter_get_basic(&args,(void*) &sourceID); +// reply = dbus_message_new_method_return(msg); +// dbus_message_iter_init_append(reply, &args); +// dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID); +// dbus_connection_send(conn, reply, &serial); +// shadow->hookSourceAvailablityStatusChange(sourceID,availability); +// dbus_message_unref(reply); +// } +// else if (dbus_message_is_method_call(msg, "org.genivi.test", "InterruptStatusChange")) +// { +// am_sourceID_t sourceID; +// +// am_InterruptState_e state=IS_UNKNOWN; +// dbus_message_iter_init(msg, &args); +// dbus_message_iter_get_basic(&args,(void*) &sourceID); +// reply = dbus_message_new_method_return(msg); +// dbus_message_iter_init_append(reply, &args); +// dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID); +// dbus_connection_send(conn, reply, &serial); +// shadow->hookInterruptStatusChange(sourceID,state); +// dbus_message_unref(reply); +// } +// dbus_connection_flush(conn); +// } +// return NULL; +//} + + void *WorkerThreadPool::WorkerThread(void* data) { - am_connectionID_t connectionID; - am_timeSync_t delay; - dbus_message_iter_init(msg, &args); - dbus_message_iter_get_basic(&args,(void*) &connectionID); - dbus_message_iter_next(&args); - dbus_message_iter_get_basic(&args,(void*) &delay); - reply = dbus_message_new_method_return(msg); - dbus_message_iter_init_append(reply, &args); - dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &connectionID); - dbus_connection_send(conn, reply, &serial); - shadow->hookTimingInformationChanged(connectionID,delay); - dbus_message_unref(reply); - } - else if (dbus_message_is_method_call(msg, "org.genivi.test", "SinkAvailablityStatusChange")) - { - am_sinkID_t sinkID; - am_Availability_s availability; - dbus_message_iter_init(msg, &args); - dbus_message_iter_get_basic(&args,(void*) &sinkID); - reply = dbus_message_new_method_return(msg); - dbus_message_iter_init_append(reply, &args); - dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sinkID); - dbus_connection_send(conn, reply, &serial); - shadow->hookSinkAvailablityStatusChange(sinkID,availability); - dbus_message_unref(reply); - } - else if (dbus_message_is_method_call(msg, "org.genivi.test", "SourceAvailablityStatusChange")) - { - am_sourceID_t sourceID; - am_Availability_s availability; - dbus_message_iter_init(msg, &args); - dbus_message_iter_get_basic(&args,(void*) &sourceID); - reply = dbus_message_new_method_return(msg); - dbus_message_iter_init_append(reply, &args); - dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID); - dbus_connection_send(conn, reply, &serial); - shadow->hookSourceAvailablityStatusChange(sourceID,availability); - dbus_message_unref(reply); + threadInfo_s *myInfo=(threadInfo_s*)data; + while (1) + { + sem_wait(&myInfo->block); + pthread_mutex_lock(&mBlockingMutex); + Worker* actWorker=myInfo->worker; + pthread_mutex_unlock(&mBlockingMutex); + actWorker->setCancelSempaphore(&myInfo->cancel); + actWorker->start2work(); + actWorker->pPool->finishedWork(myInfo->threadID); + } + return NULL; } - else if (dbus_message_is_method_call(msg, "org.genivi.test", "InterruptStatusChange")) + + WorkerThreadPool::WorkerThreadPool(int numThreads): + mNumThreads(numThreads) { - am_sourceID_t sourceID; - - am_InterruptState_e state=IS_UNKNOWN; - dbus_message_iter_init(msg, &args); - dbus_message_iter_get_basic(&args,(void*) &sourceID); - reply = dbus_message_new_method_return(msg); - dbus_message_iter_init_append(reply, &args); - dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID); - dbus_connection_send(conn, reply, &serial); - shadow->hookInterruptStatusChange(sourceID,state); - dbus_message_unref(reply); + int workerID=0; + mListWorkers.resize(mNumThreads); + for (int i=0;i<mNumThreads;i++) + { + sem_init(&mListWorkers[i].block,NULL,NULL); + sem_init(&mListWorkers[i].cancel,NULL,NULL); + mListWorkers[i].busy=false; + mListWorkers[i].workerID=++workerID; + pthread_create(&mListWorkers[i].threadID,NULL,&WorkerThreadPool::WorkerThread,(void*)&mListWorkers[i]); + } } - dbus_connection_flush(conn); - } - return NULL; -} - -void *WorkerThreadPool::WorkerThread(void* data) -{ - threadInfo_s *myInfo=(threadInfo_s*)data; - while (1) - { - sem_wait(&myInfo->block); - pthread_mutex_lock(&mBlockingMutex); - Worker* actWorker=myInfo->worker; - pthread_mutex_unlock(&mBlockingMutex); - actWorker->setCancelSempaphore(&myInfo->cancel); - actWorker->start2work(); - actWorker->pPool->finishedWork(myInfo->threadID); - } - return NULL; -} -WorkerThreadPool::WorkerThreadPool(int numThreads): -mNumThreads(numThreads) -{ - int workerID=0; - mListWorkers.resize(mNumThreads); - for (int i=0;i<mNumThreads;i++) - { - sem_init(&mListWorkers[i].block,NULL,NULL); - sem_init(&mListWorkers[i].cancel,NULL,NULL); - mListWorkers[i].busy=false; - mListWorkers[i].workerID=++workerID; - pthread_create(&mListWorkers[i].threadID,NULL,&WorkerThreadPool::WorkerThread,(void*)&mListWorkers[i]); - } -} - -int16_t WorkerThreadPool::startWork(Worker *worker) -{ - pthread_mutex_lock(&mBlockingMutex); - std::vector<threadInfo_s>::iterator it=mListWorkers.begin(); - for(;it!=mListWorkers.end();++it) - { - if(!it->busy) + int16_t WorkerThreadPool::startWork(Worker *worker) { - it->worker=worker; - it->busy=true; + pthread_mutex_lock(&mBlockingMutex); + std::vector<threadInfo_s>::iterator it=mListWorkers.begin(); + for(;it!=mListWorkers.end();++it) + { + if(!it->busy) + { + it->worker=worker; + it->busy=true; + pthread_mutex_unlock(&mBlockingMutex); + sem_post(&it->block); + return ((int)it->workerID); + } + } pthread_mutex_unlock(&mBlockingMutex); - sem_post(&it->block); - return ((int)it->workerID); + return (-1); } - } - pthread_mutex_unlock(&mBlockingMutex); - return (-1); -} -bool WorkerThreadPool::cancelWork(int workerID) -{ - std::vector<threadInfo_s>::iterator it=mListWorkers.begin(); - for(;it!=mListWorkers.end();++it) - { - if(it->workerID==workerID && it->busy) + bool WorkerThreadPool::cancelWork(int workerID) { - sem_post(&it->cancel); - return (true); + std::vector<threadInfo_s>::iterator it=mListWorkers.begin(); + for(;it!=mListWorkers.end();++it) + { + if(it->workerID==workerID && it->busy) + { + sem_post(&it->cancel); + return (true); + } + } + return (false); } - } - return (false); -} -void WorkerThreadPool::finishedWork(pthread_t threadID) -{ - pthread_mutex_lock(&mBlockingMutex); - std::vector<threadInfo_s>::iterator it=mListWorkers.begin(); - for(;it!=mListWorkers.end();++it) - { - if(it->threadID==threadID) + void WorkerThreadPool::finishedWork(pthread_t threadID) { - it->busy=false; - delete it->worker; - break; + pthread_mutex_lock(&mBlockingMutex); + std::vector<threadInfo_s>::iterator it=mListWorkers.begin(); + for(;it!=mListWorkers.end();++it) + { + if(it->threadID==threadID) + { + it->busy=false; + delete it->worker; + break; + } + } + pthread_mutex_unlock(&mBlockingMutex); } - } - pthread_mutex_unlock(&mBlockingMutex); -} -WorkerThreadPool::~WorkerThreadPool() -{ - for (int i=0;i<mNumThreads;i++) - { - pthread_cancel(mListWorkers[i].threadID); - } -} + WorkerThreadPool::~WorkerThreadPool() + { + for (int i=0;i<mNumThreads;i++) + { + pthread_cancel(mListWorkers[i].threadID); + } + } -Worker::Worker(WorkerThreadPool *pool): -pPool(pool), // - mCancelSem() + Worker::Worker(WorkerThreadPool *pool): + pPool(pool), mCancelSem() { } @@ -262,20 +261,13 @@ pPool(pool), // } AsyncRoutingSender::AsyncRoutingSender(): - mShadow(), // - mReceiveInterface(0), // - mDomains(createDomainTable()), // - mSinks(createSinkTable()), // -mSources ( createSourceTable ( ) ), // -mGateways ( createGatewayTable ( ) ) , // -mMapHandleWorker ( ), // -mMapConnectionIDRoute(),// -mPool(10) + mReceiveInterface(0), mDomains(createDomainTable()), mSinks(createSinkTable()), mSources ( createSourceTable ( ) ), mGateways ( createGatewayTable ( ) ), mMapHandleWorker ( ) , mMapConnectionIDRoute ( ) , mPool (10) { } AsyncRoutingSender::~AsyncRoutingSender() { + delete mShadow; } am_Error_e AsyncRoutingSender::startupInterface(RoutingReceiveInterface *routingreceiveinterface) @@ -283,50 +275,20 @@ am_Error_e AsyncRoutingSender::startupInterface(RoutingReceiveInterface *routing //first, create the Shadow: assert(routingreceiveinterface!=0); mReceiveInterface = routingreceiveinterface; - mShadow.setRoutingInterface(routingreceiveinterface); + SocketHandler* handler; + routingreceiveinterface->getSocketHandler(handler); + mShadow = new RoutingReceiverAsyncShadow(routingreceiveinterface, handler); + return E_OK; } void AsyncRoutingSender::setRoutingReady(const uint16_t handle) { - //todo: implement handle ! - assert(mReceiveInterface!=0); - am_Error_e eCode; - //first register the domains - std::vector<am_Domain_s>::iterator domainIter = mDomains.begin(); - for (; domainIter != mDomains.end(); ++domainIter) - { - am_domainID_t domainID; - if ((eCode = mReceiveInterface->registerDomain(*domainIter, domainID)) != E_OK) - { - logError("AsyncRoutingSender::routingInterfacesReady error on registering domain, failed with", eCode); - } - domainIter->domainID = domainID; - } + syncRegisterWorker *worker = new syncRegisterWorker(this, &mPool, mShadow, mDomains, mSinks, mSources, handle); - //then sources - std::vector<am_Source_s>::iterator sourceIter = mSources.begin(); - for (; sourceIter != mSources.end(); ++sourceIter) + if ((mPool.startWork(worker)) == -1) { - am_sourceID_t sourceID; - //set the correct domainID - sourceIter->domainID = mDomains[0].domainID; - if ((eCode = mReceiveInterface->registerSource(*sourceIter, sourceID)) != E_OK) - { - logError("AsyncRoutingSender::routingInterfacesReady error on registering source, failed with", eCode); - } - } - - //sinks - std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin(); - for (; sinkIter != mSinks.end(); ++sinkIter) - { - am_sinkID_t sinkID; - //set the correct domainID - sinkIter->domainID = mDomains[0].domainID; - if ((eCode = mReceiveInterface->registerSink(*sinkIter, sinkID)) != E_OK) - { - logError("AsyncRoutingSender::routingInterfacesReady error on registering sink, failed with", eCode); - } + logError("AsyncRoutingSender::asyncConnect not enough threads!"); + delete worker; } //gateways @@ -422,7 +384,7 @@ am_Error_e AsyncRoutingSender::asyncConnect(const am_Handle_s handle, const am_c return (E_WRONG_FORMAT); //the operation is ok, lets create a worker, assign it to a task in the task pool - asycConnectWorker *worker = new asycConnectWorker(this, &mPool, &mShadow, handle, connectionID, sourceID, sinkID, connectionFormat); + asycConnectWorker *worker = new asycConnectWorker(this, &mPool, mShadow, handle, connectionID, sourceID, sinkID, connectionFormat); if ((work = mPool.startWork(worker)) == -1) { logError("AsyncRoutingSender::asyncConnect not enough threads!"); @@ -457,7 +419,7 @@ am_Error_e AsyncRoutingSender::asyncDisconnect(const am_Handle_s handle, const a pthread_mutex_unlock(&mMapConnectionMutex); //the operation is ok, lets create a worker, assign it to a task in the task pool - asycDisConnectWorker *worker = new asycDisConnectWorker(this, &mPool, &mShadow, handle, connectionID); + asycDisConnectWorker *worker = new asycDisConnectWorker(this, &mPool, mShadow, handle, connectionID); if ((work = mPool.startWork(worker)) == -1) { logError("AsyncRoutingSender::asyncDisconnect not enough threads!"); @@ -499,7 +461,7 @@ am_Error_e AsyncRoutingSender::asyncSetSinkVolume(const am_Handle_s handle, cons if (sinkIter == mSinks.end()) return (E_NON_EXISTENT); //not found! - asyncSetSinkVolumeWorker *worker = new asyncSetSinkVolumeWorker(this, &mPool, &mShadow, sinkIter->volume, handle, sinkID, volume, ramp, time); + asyncSetSinkVolumeWorker *worker = new asyncSetSinkVolumeWorker(this, &mPool, mShadow, sinkIter->volume, handle, sinkID, volume, ramp, time); if ((work = mPool.startWork(worker)) == -1) { logError("AsyncRoutingSender::asyncSetSinkVolume not enough threads!"); @@ -541,7 +503,7 @@ am_Error_e AsyncRoutingSender::asyncSetSourceVolume(const am_Handle_s handle, co if (sourceIter == mSources.end()) return (E_NON_EXISTENT); //not found! - asyncSetSourceVolumeWorker *worker = new asyncSetSourceVolumeWorker(this, &mPool, &mShadow, sourceIter->volume, handle, sourceID, volume, ramp, time); + asyncSetSourceVolumeWorker *worker = new asyncSetSourceVolumeWorker(this, &mPool, mShadow, sourceIter->volume, handle, sourceID, volume, ramp, time); if ((work = mPool.startWork(worker)) == -1) { logError("AsyncRoutingSender::asyncSetSourceVolume not enough threads!"); @@ -583,7 +545,7 @@ am_Error_e AsyncRoutingSender::asyncSetSourceState(const am_Handle_s handle, con if (sourceIter == mSources.end()) return (E_NON_EXISTENT); //not found! - asyncSetSourceStateWorker *worker = new asyncSetSourceStateWorker(this, &mPool, &mShadow, handle, sourceID, state); + asyncSetSourceStateWorker *worker = new asyncSetSourceStateWorker(this, &mPool, mShadow, handle, sourceID, state); if ((work = mPool.startWork(worker)) == -1) { logError("AsyncRoutingSender::asyncSetSourceState not enough threads!"); @@ -625,7 +587,7 @@ am_Error_e AsyncRoutingSender::asyncSetSinkSoundProperty(const am_Handle_s handl if (sinkIter == mSinks.end()) return (E_NON_EXISTENT); //not found! - asyncSetSinkSoundPropertyWorker *worker = new asyncSetSinkSoundPropertyWorker(this, &mPool, &mShadow, handle, soundProperty, sinkID); + asyncSetSinkSoundPropertyWorker *worker = new asyncSetSinkSoundPropertyWorker(this, &mPool, mShadow, handle, soundProperty, sinkID); if ((work = mPool.startWork(worker)) == -1) { logError("AsyncRoutingSender::asyncSetSinkSoundProperty not enough threads!"); @@ -676,7 +638,7 @@ am_Error_e AsyncRoutingSender::setDomainState(const am_domainID_t domainID, cons if (domainIter == mDomains.end()) return (E_NON_EXISTENT); //not found! - asyncDomainStateChangeWorker *worker = new asyncDomainStateChangeWorker(this, &mPool, &mShadow, domainID, domainState); + asyncDomainStateChangeWorker *worker = new asyncDomainStateChangeWorker(this, &mPool, mShadow, domainID, domainState); if ((work = mPool.startWork(worker)) == -1) { logError("AsyncRoutingSender::setDomainState not enough threads!"); @@ -714,7 +676,7 @@ am_Error_e AsyncRoutingSender::asyncSetSourceSoundProperty(const am_Handle_s han if (sourceIter == mSources.end()) return (E_NON_EXISTENT); //not found! - asyncSetSourceSoundPropertyWorker *worker = new asyncSetSourceSoundPropertyWorker(this, &mPool, &mShadow, handle, soundProperty, sourceID); + asyncSetSourceSoundPropertyWorker *worker = new asyncSetSourceSoundPropertyWorker(this, &mPool, mShadow, handle, soundProperty, sourceID); if ((work = mPool.startWork(worker)) == -1) { logError("AsyncRoutingSender::asyncSetSourceState not enough threads!"); @@ -953,6 +915,27 @@ void am::AsyncRoutingSender::updateDomainstateSafe(am_domainID_t domainID, am_Do pthread_mutex_unlock(&mDomainsMutex); } +void am::AsyncRoutingSender::updateDomainListSafe(std::vector<am_Domain_s> listDomains) +{ + pthread_mutex_lock(&mDomainsMutex); + mDomains = listDomains; + pthread_mutex_unlock(&mDomainsMutex); +} + +void am::AsyncRoutingSender::updateSourceListSafe(std::vector<am_Source_s> listSource) +{ + pthread_mutex_lock(&mSourcesMutex); + mSources = listSource; + pthread_mutex_unlock(&mSourcesMutex); +} + +void am::AsyncRoutingSender::updateSinkListSafe(std::vector<am_Sink_s> listSinks) +{ + pthread_mutex_lock(&mSinksMutex); + mSinks = listSinks; + pthread_mutex_unlock(&mSinksMutex); +} + void AsyncRoutingSender::getInterfaceVersion(std::string & version) const { version = RoutingSendVersion; @@ -1320,3 +1303,70 @@ void am::asyncDomainStateChangeWorker::cancelWork() mShadow->hookDomainStateChange(mDomainID, mDomainState); } +syncRegisterWorker::syncRegisterWorker(AsyncRoutingSender * asyncSender, WorkerThreadPool* pool, RoutingReceiverAsyncShadow* shadow, const std::vector<am_Domain_s> domains, const std::vector<am_Sink_s> sinks, const std::vector<am_Source_s> sources, const uint16_t handle) : + Worker(pool), // + mAsyncSender(asyncSender), // + mShadow(shadow), // + mListDomains(domains), // + mListSinks(sinks), // + mListSources(sources), + mHandle(handle) +{ +} + +void syncRegisterWorker::start2work() +{ + //todo: sendchanged data must be in here ! + logInfo("Start to register stuff"); + + am_Error_e eCode; + + std::vector<am_Domain_s>::iterator domainIter = mListDomains.begin(); + for (; domainIter != mListDomains.end(); ++domainIter) + { + am_domainID_t domainID; + if ((eCode = mShadow->registerDomain(*domainIter, domainID)) != E_OK) + { + logError("syncRegisterWorker::start2work error on registering domain, failed with", eCode); + } + domainIter->domainID = domainID; + } + + mAsyncSender->updateDomainListSafe(mListDomains); + + //then sources + std::vector<am_Source_s>::iterator sourceIter = mListSources.begin(); + for (; sourceIter != mListSources.end(); ++sourceIter) + { + am_sourceID_t sourceID; + //set the correct domainID + sourceIter->domainID = mListDomains[0].domainID; + if ((eCode = mShadow->registerSource(*sourceIter, sourceID)) != E_OK) + { + logError("syncRegisterWorker::start2work error on registering source, failed with", eCode); + } + } + + mAsyncSender->updateSourceListSafe(mListSources); + + //sinks + std::vector<am_Sink_s>::iterator sinkIter = mListSinks.begin(); + for (; sinkIter != mListSinks.end(); ++sinkIter) + { + am_sinkID_t sinkID; + //set the correct domainID + sinkIter->domainID = mListDomains[0].domainID; + if ((eCode = mShadow->registerSink(*sinkIter, sinkID)) != E_OK) + { + logError("syncRegisterWorker::start2work error on registering sink, failed with", eCode); + } + } + + mAsyncSender->updateSinkListSafe(mListSinks); + mShadow->confirmRoutingReady(mHandle); +} + +void syncRegisterWorker::cancelWork() +{ +} + |