summaryrefslogtreecommitdiff
path: root/PluginRoutingInterfaceAsync/src
diff options
context:
space:
mode:
authorchristian mueller <christian.ei.mueller@bmw.de>2012-02-29 13:27:27 +0100
committerchristian mueller <christian.ei.mueller@bmw.de>2012-02-29 13:27:27 +0100
commitd3ccf97331935b181041394b80be20dca282ea71 (patch)
treef85e7b649c8a6d58c11e9728b2442cdcff53826d /PluginRoutingInterfaceAsync/src
parentaa93713377d28a8ce7821466ef828f79a18e982d (diff)
downloadaudiomanager-d3ccf97331935b181041394b80be20dca282ea71.tar.gz
* [ performance] for classes that do not need to be derived from, removed virtual desctructor
* implemented confirmation of routing ready in RoutingReceiver * [Sockethandler] automatically set gDispatchDone to 0 when starting mainloop * fixed unit text to work with latest changes (expect Dbus command interface) * [GAM-4] added way to do synchronous calling on interfaces with the help of CAmSerializer.h * reworked AsyncRoutingPlugin to work with CAmSerializer.h * reworked AsyncRoutingPlugin to register elemtes in thread using CAmSerializer.h * reworked AsncPlugin Tests to work with remodelled Plugin
Diffstat (limited to 'PluginRoutingInterfaceAsync/src')
-rw-r--r--PluginRoutingInterfaceAsync/src/RoutingReceiverAsyncShadow.cpp473
-rw-r--r--PluginRoutingInterfaceAsync/src/RoutingSenderAsync.cpp466
2 files changed, 325 insertions, 614 deletions
diff --git a/PluginRoutingInterfaceAsync/src/RoutingReceiverAsyncShadow.cpp b/PluginRoutingInterfaceAsync/src/RoutingReceiverAsyncShadow.cpp
index c11201e..5c93f60 100644
--- a/PluginRoutingInterfaceAsync/src/RoutingReceiverAsyncShadow.cpp
+++ b/PluginRoutingInterfaceAsync/src/RoutingReceiverAsyncShadow.cpp
@@ -36,16 +36,10 @@
using namespace am;
-pthread_mutex_t RoutingReceiverAsyncShadow::mMutex = PTHREAD_MUTEX_INITIALIZER;
-
-RoutingReceiverAsyncShadow::RoutingReceiverAsyncShadow():
-asyncMsgReceive(this, &RoutingReceiverAsyncShadow::asyncMsgReceiver), //
- asyncDispatch(this, &RoutingReceiverAsyncShadow::asyncDispatcher), //
- asyncCheck(this, &RoutingReceiverAsyncShadow::asyncChecker), //
- mSocketHandler(), //
- mRoutingReceiveInterface(), //
-mHandle (), //
-mPipe()
+RoutingReceiverAsyncShadow::RoutingReceiverAsyncShadow(RoutingReceiveInterface* iReceiveInterface, SocketHandler* iSocketHandler) :
+ mSocketHandler(iSocketHandler), //
+ mRoutingReceiveInterface(iReceiveInterface), //
+ mSerializer(iSocketHandler)
{
}
@@ -56,472 +50,139 @@ RoutingReceiverAsyncShadow::~RoutingReceiverAsyncShadow()
void RoutingReceiverAsyncShadow::ackConnect(const am_Handle_s handle, const am_connectionID_t connectionID, const am_Error_e error)
{
- assert(mPipe[0]!=0);
- //put the data in the queue:
- a_connect_s temp;
- temp.handle = handle;
- temp.connectionID = connectionID;
- temp.error = error;
- //then we make a message out of it:
- msg_s msg;
- msg.msgID = MSG_ACKCONNECT;
- msg.parameters.connect = temp;
- //here we share data !
- pthread_mutex_lock(&mMutex);
- mQueue.push(msg);
- pthread_mutex_unlock(&mMutex);
-
- //ok, fire the signal that data needs to be received !
- if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1)
- {
- logError("RoutingReceiverAsyncShadow::ackConnect write failed, error code:", strerror(errno));
- }
+ mSerializer.asyncCall<RoutingReceiveInterface, const am_Handle_s, const am_connectionID_t, const am_Error_e>(mRoutingReceiveInterface, &RoutingReceiveInterface::ackConnect, handle, connectionID, error);
}
void RoutingReceiverAsyncShadow::ackDisconnect(const am_Handle_s handle, const am_connectionID_t connectionID, const am_Error_e error)
{
- assert(mPipe[0]!=0);
- //put the data in the queue:
- a_connect_s temp;
- temp.handle = handle;
- temp.connectionID = connectionID;
- temp.error = error;
- //then we make a message out of it:
- msg_s msg;
- msg.msgID = MSG_ACKDISCONNECT;
- msg.parameters.connect = temp;
- //here we share data !
- pthread_mutex_lock(&mMutex);
- mQueue.push(msg);
- pthread_mutex_unlock(&mMutex);
-
- //ok, fire the signal that data needs to be received !
- if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1)
- {
- logError("RoutingReceiverAsyncShadow::ackDisconnect write failed, error code:", strerror(errno));
- }
+ mSerializer.asyncCall<RoutingReceiveInterface, const am_Handle_s, const am_connectionID_t, const am_Error_e>(mRoutingReceiveInterface, &RoutingReceiveInterface::ackDisconnect, handle, connectionID, error);
}
void RoutingReceiverAsyncShadow::ackSetSinkVolumeChange(const am_Handle_s handle, const am_volume_t volume, const am_Error_e error)
{
- assert(mPipe[0]!=0);
- //put the data in the queue:
- a_volume_s temp;
- temp.handle = handle;
- temp.volume = volume;
- temp.error = error;
- //then we make a message out of it:
- msg_s msg;
- msg.msgID = MSG_ACKSETSINKVOLUMECHANGE;
- msg.parameters.volume = temp;
- //here we share data !
- pthread_mutex_lock(&mMutex);
- mQueue.push(msg);
- pthread_mutex_unlock(&mMutex);
-
- //ok, fire the signal that data needs to be received !
- if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1)
- {
- logError("RoutingReceiverAsyncShadow::ackSetSinkVolumeChange write failed, error code:", strerror(errno));
- }
+ mSerializer.asyncCall<RoutingReceiveInterface, const am_Handle_s, const am_volume_t, const am_Error_e>(mRoutingReceiveInterface, &RoutingReceiveInterface::ackSetSinkVolumeChange, handle, volume, error);
}
void RoutingReceiverAsyncShadow::ackSetSourceVolumeChange(const am_Handle_s handle, const am_volume_t volume, const am_Error_e error)
{
- assert(mPipe[0]!=0);
- //put the data in the queue:
- a_volume_s temp;
- temp.handle = handle;
- temp.volume = volume;
- temp.error = error;
- //then we make a message out of it:
- msg_s msg;
- msg.msgID = MSG_ACKSETSOURCEVOLUMECHANGE;
- msg.parameters.volume = temp;
- //here we share data !
- pthread_mutex_lock(&mMutex);
- mQueue.push(msg);
- pthread_mutex_unlock(&mMutex);
-
- //ok, fire the signal that data needs to be received !
- if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1)
- {
- logError("RoutingReceiverAsyncShadow::ackSetSourceVolumeChange write failed, error code:", strerror(errno));
- }
+ mSerializer.asyncCall<RoutingReceiveInterface, const am_Handle_s, const am_volume_t, const am_Error_e>(mRoutingReceiveInterface, &RoutingReceiveInterface::ackSetSourceVolumeChange, handle, volume, error);
}
void RoutingReceiverAsyncShadow::ackSetSourceState(const am_Handle_s handle, const am_Error_e error)
{
- assert(mPipe[0]!=0);
- //put the data in the queue:
- a_handle_s temp;
- temp.handle = handle;
- temp.error = error;
- //then we make a message out of it:
- msg_s msg;
- msg.msgID = MSG_ACKSETSOURCESTATE;
- msg.parameters.handle = temp;
- //here we share data !
- pthread_mutex_lock(&mMutex);
- mQueue.push(msg);
- pthread_mutex_unlock(&mMutex);
-
- //ok, fire the signal that data needs to be received !
- if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1)
- {
- logError("RoutingReceiverAsyncShadow::ackSetSourceState write failed, error code:", strerror(errno));
- }
+ mSerializer.asyncCall<RoutingReceiveInterface, const am_Handle_s, const am_Error_e>(mRoutingReceiveInterface, &RoutingReceiveInterface::ackSetSourceState, handle, error);
}
void RoutingReceiverAsyncShadow::ackSetSinkSoundProperty(const am_Handle_s handle, const am_Error_e error)
{
- assert(mPipe[0]!=0);
- //put the data in the queue:
- a_handle_s temp;
- temp.handle = handle;
- temp.error = error;
- //then we make a message out of it:
- msg_s msg;
- msg.msgID = MSG_ACKSETSINKSOUNDPROPERTY;
- msg.parameters.handle = temp;
- //here we share data !
- pthread_mutex_lock(&mMutex);
- mQueue.push(msg);
- pthread_mutex_unlock(&mMutex);
-
- //ok, fire the signal that data needs to be received !
- if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1)
- {
- logError("RoutingReceiverAsyncShadow::ackSetSinkSoundProperty write failed, error code:", strerror(errno));
- }
+ mSerializer.asyncCall<RoutingReceiveInterface, const am_Handle_s, const am_Error_e>(mRoutingReceiveInterface, &RoutingReceiveInterface::ackSetSinkSoundProperty, handle, error);
}
void RoutingReceiverAsyncShadow::ackSetSourceSoundProperty(const am_Handle_s handle, const am_Error_e error)
{
- assert(mPipe[0]!=0);
- //put the data in the queue:
- a_handle_s temp;
- temp.handle = handle;
- temp.error = error;
- //then we make a message out of it:
- msg_s msg;
- msg.msgID = MSG_ACKSETSOURCESOUNDPROPERTY;
- msg.parameters.handle = temp;
- //here we share data !
- pthread_mutex_lock(&mMutex);
- mQueue.push(msg);
- pthread_mutex_unlock(&mMutex);
-
- //ok, fire the signal that data needs to be received !
- if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1)
- {
- logError("RoutingReceiverAsyncShadow::ackSetSourceSoundProperty write failed, error code:", strerror(errno));
- }
+ mSerializer.asyncCall<RoutingReceiveInterface, const am_Handle_s, const am_Error_e>(mRoutingReceiveInterface, &RoutingReceiveInterface::ackSetSourceSoundProperty, handle, error);
}
void RoutingReceiverAsyncShadow::ackCrossFading(const am_Handle_s handle, const am_HotSink_e hotSink, const am_Error_e error)
{
- assert(mPipe[0]!=0);
- //put the data in the queue:
- a_crossfading_s temp;
- temp.handle = handle;
- temp.hotSink = hotSink;
- temp.error = error;
- //then we make a message out of it:
- msg_s msg;
- msg.msgID = MSG_ACKCROSSFADING;
- msg.parameters.crossfading = temp;
- //here we share data !
- pthread_mutex_lock(&mMutex);
- mQueue.push(msg);
- pthread_mutex_unlock(&mMutex);
-
- //ok, fire the signal that data needs to be received !
- if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1)
- {
- logError("RoutingReceiverAsyncShadow::ackCrossFading write failed, error code:", strerror(errno));
- }
+ mSerializer.asyncCall<RoutingReceiveInterface, const am_Handle_s, const am_HotSink_e, const am_Error_e>(mRoutingReceiveInterface, &RoutingReceiveInterface::ackCrossFading, handle, hotSink, error);
}
void RoutingReceiverAsyncShadow::ackSourceVolumeTick(const am_Handle_s handle, const am_sourceID_t sourceID, const am_volume_t volume)
{
- assert(mPipe[0]!=0);
- //put the data in the queue:
- a_sourceVolumeTick_s temp;
- temp.sourceID = sourceID;
- temp.handle = handle;
- temp.volume = volume;
- //then we make a message out of it:
- msg_s msg;
- msg.msgID = MSG_ACKSOURCEVOLUMETICK;
- msg.parameters.sourceVolumeTick = temp;
- //here we share data !
- pthread_mutex_lock(&mMutex);
- mQueue.push(msg);
- pthread_mutex_unlock(&mMutex);
-
- //ok, fire the signal that data needs to be received !
- if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1)
- {
- logError("RoutingReceiverAsyncShadow::ackSourceVolumeTick write failed, error code:", strerror(errno));
- }
+ mSerializer.asyncCall<RoutingReceiveInterface, const am_Handle_s, const am_sourceID_t, const am_volume_t>(mRoutingReceiveInterface, &RoutingReceiveInterface::ackSourceVolumeTick, handle, sourceID, volume);
}
void RoutingReceiverAsyncShadow::ackSinkVolumeTick(const am_Handle_s handle, const am_sinkID_t sinkID, const am_volume_t volume)
{
- assert(mPipe[0]!=0);
- //put the data in the queue:
- a_sinkVolumeTick_s temp;
- temp.sinkID = sinkID;
- temp.handle = handle;
- temp.volume = volume;
- //then we make a message out of it:
- msg_s msg;
- msg.msgID = MSG_ACKSINKVOLUMETICK;
- msg.parameters.sinkVolumeTick = temp;
- //here we share data !
- pthread_mutex_lock(&mMutex);
- mQueue.push(msg);
- pthread_mutex_unlock(&mMutex);
-
- //ok, fire the signal that data needs to be received !
- if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1)
- {
- logError("RoutingReceiverAsyncShadow::ackSinkVolumeTick write failed, error code:", strerror(errno));
- }
+ mSerializer.asyncCall<RoutingReceiveInterface, const am_Handle_s, const am_sinkID_t, const am_volume_t>(mRoutingReceiveInterface, &RoutingReceiveInterface::ackSinkVolumeTick, handle, sinkID, volume);
}
void RoutingReceiverAsyncShadow::hookInterruptStatusChange(const am_sourceID_t sourceID, const am_InterruptState_e interruptState)
{
- assert(mPipe[0]!=0);
- //put the data in the queue:
- a_interruptStatusChange_s temp;
- temp.sourceID = sourceID;
- temp.interruptState = interruptState;
-
- //then we make a message out of it:
- msg_s msg;
- msg.msgID = MSG_HOOKINTERRUPTSTATUSCHANGE;
- msg.parameters.interruptStatusChange = temp;
- //here we share data !
- pthread_mutex_lock(&mMutex);
- mQueue.push(msg);
- pthread_mutex_unlock(&mMutex);
-
- //ok, fire the signal that data needs to be received !
- if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1)
- {
- logError("RoutingReceiverAsyncShadow::hookInterruptStatusChange write failed, error code:", strerror(errno));
- }
+ mSerializer.asyncCall<RoutingReceiveInterface, const am_sinkID_t, const am_InterruptState_e>(mRoutingReceiveInterface, &RoutingReceiveInterface::hookInterruptStatusChange, sourceID, interruptState);
}
void RoutingReceiverAsyncShadow::hookSinkAvailablityStatusChange(const am_sinkID_t sinkID, const am_Availability_s & availability)
{
- assert(mPipe[0]!=0);
- //put the data in the queue:
- a_sinkAvailability_s temp;
- temp.sinkID = sinkID;
- temp.availability = availability;
-
- //then we make a message out of it:
- msg_s msg;
- msg.msgID = MSG_HOOKSINKAVAILABLITYSTATUSCHANGE;
- msg.parameters.sinkAvailability = temp;
- //here we share data !
- pthread_mutex_lock(&mMutex);
- mQueue.push(msg);
- pthread_mutex_unlock(&mMutex);
-
- //ok, fire the signal that data needs to be received !
- if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1)
- {
- logError("RoutingReceiverAsyncShadow::hookSinkAvailablityStatusChange write failed, error code:", strerror(errno));
- }
+ mSerializer.asyncCall<RoutingReceiveInterface, const am_sinkID_t, const am_Availability_s&>(mRoutingReceiveInterface, &RoutingReceiveInterface::hookSinkAvailablityStatusChange, sinkID, availability);
}
void RoutingReceiverAsyncShadow::hookSourceAvailablityStatusChange(const am_sourceID_t sourceID, const am_Availability_s & availability)
{
- assert(mPipe[0]!=0);
- //put the data in the queue:
- a_sourceAvailability_s temp;
- temp.sourceID = sourceID;
- temp.availability = availability;
-
- //then we make a message out of it:
- msg_s msg;
- msg.msgID = MSG_HOOKSOURCEAVAILABLITYSTATUSCHANGE;
- msg.parameters.sourceAvailability = temp;
- //here we share data !
- pthread_mutex_lock(&mMutex);
- mQueue.push(msg);
- pthread_mutex_unlock(&mMutex);
-
- //ok, fire the signal that data needs to be received !
- if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1)
- {
- logError("RoutingReceiverAsyncShadow::hookSourceAvailablityStatusChange write failed, error code:", strerror(errno));
- }
+ mSerializer.asyncCall<RoutingReceiveInterface, const am_sourceID_t, const am_Availability_s&>(mRoutingReceiveInterface, &RoutingReceiveInterface::hookSourceAvailablityStatusChange, sourceID, availability);
}
void RoutingReceiverAsyncShadow::hookDomainStateChange(const am_domainID_t domainID, const am_DomainState_e domainState)
{
- assert(mPipe[0]!=0);
- //put the data in the queue:
- a_hookDomainStateChange_s temp;
- temp.domainID = domainID;
- temp.state = domainState;
-
- //then we make a message out of it:
- msg_s msg;
- msg.msgID = MSG_HOOKDOMAINSTATECHANGE;
- msg.parameters.domainStateChange = temp;
- //here we share data !
- pthread_mutex_lock(&mMutex);
- mQueue.push(msg);
- pthread_mutex_unlock(&mMutex);
-
- //ok, fire the signal that data needs to be received !
- if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1)
- {
- logError("RoutingReceiverAsyncShadow::hookDomainStateChange write failed, error code:", strerror(errno));
- }
+ mSerializer.asyncCall<RoutingReceiveInterface, const am_domainID_t, const am_DomainState_e>(mRoutingReceiveInterface, &RoutingReceiveInterface::hookDomainStateChange, domainID, domainState);
}
void RoutingReceiverAsyncShadow::hookTimingInformationChanged(const am_connectionID_t connectionID, const am_timeSync_t delay)
{
- assert(mPipe[0]!=0);
- //put the data in the queue:
- a_timingInfoChanged_s temp;
- temp.connectionID = connectionID;
- temp.delay = delay;
-
- //then we make a message out of it:
- msg_s msg;
- msg.msgID = MSG_HOOKTIMINGINFORMATIONCHANGED;
- msg.parameters.timingInfoChange = temp;
- //here we share data !
- pthread_mutex_lock(&mMutex);
- mQueue.push(msg);
- pthread_mutex_unlock(&mMutex);
-
- //ok, fire the signal that data needs to be received !
- if (write(mPipe[1], &msg.msgID, sizeof(msgID_e)) == -1)
- {
- logError("RoutingReceiverAsyncShadow::hookTimingInformationChanged write failed, error code:", strerror(errno));
- }
+ mSerializer.asyncCall<RoutingReceiveInterface, const am_connectionID_t, const am_timeSync_t>(mRoutingReceiveInterface, &RoutingReceiveInterface::hookTimingInformationChanged, connectionID, delay);
}
-void RoutingReceiverAsyncShadow::asyncMsgReceiver(const pollfd pollfd, const sh_pollHandle_t handle, void *userData)
+am_Error_e RoutingReceiverAsyncShadow::registerDomain(const am_Domain_s & domainData, am_domainID_t & domainID)
{
- (void) handle;
- (void) userData;
- //it is no really important what to read here, we will read the queue later...
- char buffer[10];
- if (read(pollfd.fd, buffer, 10) == -1)
- {
- logError("RoutingReceiverAsyncShadow::asyncMsgReceiver could not read!");
- }
+ am_Error_e error (E_UNKNOWN);
+ am_Domain_s domainDataCopy(domainData);
+ mSerializer.syncCall<RoutingReceiveInterface, am_Error_e, const am_Domain_s&,am_domainID_t&, am_Domain_s, am_domainID_t>(mRoutingReceiveInterface, &RoutingReceiveInterface::registerDomain, error, domainDataCopy, domainID);
+ return (error);
}
-bool RoutingReceiverAsyncShadow::asyncDispatcher(const sh_pollHandle_t handle, void *userData)
+am_Error_e am::RoutingReceiverAsyncShadow::registerGateway(const am_Gateway_s & gatewayData, am_gatewayID_t & gatewayID)
{
- (void) handle;
- (void) userData;
- msg_s msg;
-
- //ok, let's receive, first lock
- pthread_mutex_lock(&mMutex);
- msg = mQueue.front();
- mQueue.pop();
- pthread_mutex_unlock(&mMutex);
+ am_Error_e error (E_UNKNOWN);
+ am_Gateway_s gatewayDataCopy(gatewayData);
+ mSerializer.syncCall<RoutingReceiveInterface, am_Error_e, const am_Gateway_s&, am_gatewayID_t&, am_Gateway_s, am_gatewayID_t>(mRoutingReceiveInterface,&RoutingReceiveInterface::registerGateway, error, gatewayDataCopy, gatewayID);
+ return (error);
+}
- //check for the message:
- switch (msg.msgID)
- {
- case MSG_ACKCONNECT:
- mRoutingReceiveInterface->ackConnect(msg.parameters.connect.handle, msg.parameters.connect.connectionID, msg.parameters.connect.error);
- break;
- case MSG_ACKDISCONNECT:
- mRoutingReceiveInterface->ackDisconnect(msg.parameters.connect.handle, msg.parameters.connect.connectionID, msg.parameters.connect.error);
- break;
- case MSG_ACKSETSINKVOLUMECHANGE:
- mRoutingReceiveInterface->ackSetSinkVolumeChange(msg.parameters.volume.handle, msg.parameters.volume.volume, msg.parameters.volume.error);
- break;
- case MSG_ACKSETSOURCEVOLUMECHANGE:
- mRoutingReceiveInterface->ackSetSourceVolumeChange(msg.parameters.volume.handle, msg.parameters.volume.volume, msg.parameters.volume.error);
- break;
- case MSG_ACKSETSOURCESTATE:
- mRoutingReceiveInterface->ackSetSourceState(msg.parameters.handle.handle, msg.parameters.handle.error);
- break;
- case MSG_ACKSETSINKSOUNDPROPERTY:
- mRoutingReceiveInterface->ackSetSinkSoundProperty(msg.parameters.handle.handle, msg.parameters.handle.error);
- break;
- case MSG_ACKSETSOURCESOUNDPROPERTY:
- mRoutingReceiveInterface->ackSetSourceSoundProperty(msg.parameters.handle.handle, msg.parameters.handle.error);
- break;
- case MSG_ACKCROSSFADING:
- mRoutingReceiveInterface->ackCrossFading(msg.parameters.crossfading.handle, msg.parameters.crossfading.hotSink, msg.parameters.crossfading.error);
- break;
- case MSG_ACKSOURCEVOLUMETICK:
- mRoutingReceiveInterface->ackSourceVolumeTick(msg.parameters.sourceVolumeTick.handle, msg.parameters.sourceVolumeTick.sourceID, msg.parameters.sourceVolumeTick.volume);
- break;
- case MSG_ACKSINKVOLUMETICK:
- mRoutingReceiveInterface->ackSinkVolumeTick(msg.parameters.sinkVolumeTick.handle, msg.parameters.sinkVolumeTick.sinkID, msg.parameters.sinkVolumeTick.volume);
- break;
- case MSG_HOOKINTERRUPTSTATUSCHANGE:
- mRoutingReceiveInterface->hookInterruptStatusChange(msg.parameters.interruptStatusChange.sourceID, msg.parameters.interruptStatusChange.interruptState);
- break;
- case MSG_HOOKSINKAVAILABLITYSTATUSCHANGE:
- mRoutingReceiveInterface->hookSinkAvailablityStatusChange(msg.parameters.sinkAvailability.sinkID, msg.parameters.sinkAvailability.availability);
- break;
- case MSG_HOOKSOURCEAVAILABLITYSTATUSCHANGE:
- mRoutingReceiveInterface->hookSourceAvailablityStatusChange(msg.parameters.sourceAvailability.sourceID, msg.parameters.sourceAvailability.availability);
- break;
- case MSG_HOOKDOMAINSTATECHANGE:
- mRoutingReceiveInterface->hookDomainStateChange(msg.parameters.domainStateChange.domainID, msg.parameters.domainStateChange.state);
- break;
- case MSG_HOOKTIMINGINFORMATIONCHANGED:
- mRoutingReceiveInterface->hookTimingInformationChanged(msg.parameters.timingInfoChange.connectionID, msg.parameters.timingInfoChange.delay);
- break;
- default:
- logError("RoutingReceiverAsyncShadow::asyncDispatcher unknown message was received:", msg.msgID);
- break;
- }
+am_Error_e am::RoutingReceiverAsyncShadow::registerSink(const am_Sink_s & sinkData, am_sinkID_t & sinkID)
+{
+ am_Error_e error (E_UNKNOWN);
+ am_Sink_s sinkDataCopy(sinkData);
+ mSerializer.syncCall<RoutingReceiveInterface, am_Error_e, const am_Sink_s&, am_sinkID_t&, am_Sink_s, am_sinkID_t>(mRoutingReceiveInterface,&RoutingReceiveInterface::registerSink, error, sinkDataCopy, sinkID);
+ return (error);
+}
- bool retVal = false;
- pthread_mutex_lock(&mMutex);
- if (mQueue.size() > 0)
- retVal = true;
- pthread_mutex_unlock(&mMutex);
+am_Error_e am::RoutingReceiverAsyncShadow::deregisterSink(const am_sinkID_t sinkID)
+{
+ am_Error_e error;
+ am_sinkID_t s(sinkID); //no const values allowed in syncCalls due to reference !
+ mSerializer.syncCall<RoutingReceiveInterface, am_Error_e, am_sinkID_t>(mRoutingReceiveInterface, &RoutingReceiveInterface::deregisterSink, error, s);
+ return (error);
+}
- return (retVal);
+am_Error_e am::RoutingReceiverAsyncShadow::registerSource(const am_Source_s & sourceData, am_sourceID_t & sourceID)
+{
+ am_Error_e error (E_UNKNOWN);
+ am_Source_s sourceDataCopy(sourceData);
+ mSerializer.syncCall<RoutingReceiveInterface, am_Error_e, const am_Source_s&, am_sourceID_t&, am_Source_s, am_sourceID_t>(mRoutingReceiveInterface,&RoutingReceiveInterface::registerSource, error, sourceDataCopy, sourceID);
+ return (error);
}
-bool RoutingReceiverAsyncShadow::asyncChecker(const sh_pollHandle_t handle, void *userData)
+am_Error_e am::RoutingReceiverAsyncShadow::deregisterSource(const am_sourceID_t sourceID)
{
- (void) handle;
- (void) userData;
- bool returnVal = false;
- pthread_mutex_lock(&mMutex);
- if (mQueue.size() > 0)
- returnVal = true;
- pthread_mutex_unlock(&mMutex);
- return (returnVal);
+ am_Error_e error;
+ am_sourceID_t s(sourceID); //no const values allowed in syncCalls due to reference !
+ mSerializer.syncCall<RoutingReceiveInterface, am_Error_e, am_sinkID_t>(mRoutingReceiveInterface, &RoutingReceiveInterface::deregisterSource, error, s);
+ return (error);
}
-am_Error_e RoutingReceiverAsyncShadow::setRoutingInterface(RoutingReceiveInterface *receiveInterface)
+am_Error_e am::RoutingReceiverAsyncShadow::registerCrossfader(const am_Crossfader_s & crossfaderData, am_crossfaderID_t & crossfaderID)
{
- assert(receiveInterface!=0);
- mRoutingReceiveInterface = receiveInterface;
- mRoutingReceiveInterface->getSocketHandler(mSocketHandler);
- if (pipe(mPipe) == -1)
- {
- logError("RoutingReceiverAsyncShadow::setRoutingInterface could not create pipe!:");
- return (E_UNKNOWN);
- }
+ am_Error_e error (E_UNKNOWN);
+ am_Crossfader_s crossfaderDataCopy(crossfaderData);
+ mSerializer.syncCall<RoutingReceiveInterface, am_Error_e, const am_Crossfader_s&, am_crossfaderID_t&, am_Crossfader_s, am_crossfaderID_t>(mRoutingReceiveInterface,&RoutingReceiveInterface::registerCrossfader, error, crossfaderDataCopy, crossfaderID);
+ return (error);
+}
- short event = 0;
- event |= POLLIN;
- mSocketHandler->addFDPoll(mPipe[0], event, NULL, &asyncMsgReceive, &asyncCheck, &asyncDispatch, NULL, mHandle);
- return (E_OK);
+void am::RoutingReceiverAsyncShadow::confirmRoutingReady(uint16_t starupHandle)
+{
+ mSerializer.asyncCall<RoutingReceiveInterface,uint16_t>(mRoutingReceiveInterface,&RoutingReceiveInterface::confirmRoutingReady,starupHandle);
}
+
+
+
diff --git a/PluginRoutingInterfaceAsync/src/RoutingSenderAsync.cpp b/PluginRoutingInterfaceAsync/src/RoutingSenderAsync.cpp
index 1a1ee70..68b260f 100644
--- a/PluginRoutingInterfaceAsync/src/RoutingSenderAsync.cpp
+++ b/PluginRoutingInterfaceAsync/src/RoutingSenderAsync.cpp
@@ -55,175 +55,174 @@ pthread_mutex_t AsyncRoutingSender::mSourcesMutex= PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t AsyncRoutingSender::mDomainsMutex= PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t WorkerThreadPool::mBlockingMutex = PTHREAD_MUTEX_INITIALIZER;
-void* AsyncRoutingSender::InterruptEvents(void *data)
-{
- RoutingReceiverAsyncShadow *shadow=(RoutingReceiverAsyncShadow *)data;
- DBusError err;
- DBusMessage* msg;
- DBusConnection* conn;
- dbus_error_init(&err);
- conn = dbus_bus_get(DBUS_BUS_SESSION, &err);
- dbus_uint32_t serial = 0;
- DBusMessage* reply;
- DBusMessageIter args;
- dbus_bus_request_name(conn, "org.genivi.test",DBUS_NAME_FLAG_REPLACE_EXISTING , &err);
-
- while (dbus_connection_read_write_dispatch(conn, -1))
- {
- dbus_connection_read_write(conn, 0);
- msg = dbus_connection_pop_message(conn);
-
- if (dbus_message_is_method_call(msg, "org.genivi.test", "timingChanged"))
+//void* AsyncRoutingSender::InterruptEvents(void *data)
+//{
+// RoutingReceiverAsyncShadow *shadow=(RoutingReceiverAsyncShadow *)data;
+// DBusError err;
+// DBusMessage* msg;
+// DBusConnection* conn;
+// dbus_error_init(&err);
+// conn = dbus_bus_get(DBUS_BUS_SESSION, &err);
+// dbus_uint32_t serial = 0;
+// DBusMessage* reply;
+// DBusMessageIter args;
+// dbus_bus_request_name(conn, "org.genivi.test",DBUS_NAME_FLAG_REPLACE_EXISTING , &err);
+//
+// while (dbus_connection_read_write_dispatch(conn, -1))
+// {
+// dbus_connection_read_write(conn, 0);
+// msg = dbus_connection_pop_message(conn);
+//
+// if (dbus_message_is_method_call(msg, "org.genivi.test", "timingChanged"))
+// {
+// am_connectionID_t connectionID;
+// am_timeSync_t delay;
+// dbus_message_iter_init(msg, &args);
+// dbus_message_iter_get_basic(&args,(void*) &connectionID);
+// dbus_message_iter_next(&args);
+// dbus_message_iter_get_basic(&args,(void*) &delay);
+// reply = dbus_message_new_method_return(msg);
+// dbus_message_iter_init_append(reply, &args);
+// dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &connectionID);
+// dbus_connection_send(conn, reply, &serial);
+// shadow->hookTimingInformationChanged(connectionID,delay);
+// dbus_message_unref(reply);
+// }
+// else if (dbus_message_is_method_call(msg, "org.genivi.test", "SinkAvailablityStatusChange"))
+// {
+// am_sinkID_t sinkID;
+// am_Availability_s availability;
+// dbus_message_iter_init(msg, &args);
+// dbus_message_iter_get_basic(&args,(void*) &sinkID);
+// reply = dbus_message_new_method_return(msg);
+// dbus_message_iter_init_append(reply, &args);
+// dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sinkID);
+// dbus_connection_send(conn, reply, &serial);
+// shadow->hookSinkAvailablityStatusChange(sinkID,availability);
+// dbus_message_unref(reply);
+// }
+// else if (dbus_message_is_method_call(msg, "org.genivi.test", "SourceAvailablityStatusChange"))
+// {
+// am_sourceID_t sourceID;
+// am_Availability_s availability;
+// dbus_message_iter_init(msg, &args);
+// dbus_message_iter_get_basic(&args,(void*) &sourceID);
+// reply = dbus_message_new_method_return(msg);
+// dbus_message_iter_init_append(reply, &args);
+// dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID);
+// dbus_connection_send(conn, reply, &serial);
+// shadow->hookSourceAvailablityStatusChange(sourceID,availability);
+// dbus_message_unref(reply);
+// }
+// else if (dbus_message_is_method_call(msg, "org.genivi.test", "InterruptStatusChange"))
+// {
+// am_sourceID_t sourceID;
+//
+// am_InterruptState_e state=IS_UNKNOWN;
+// dbus_message_iter_init(msg, &args);
+// dbus_message_iter_get_basic(&args,(void*) &sourceID);
+// reply = dbus_message_new_method_return(msg);
+// dbus_message_iter_init_append(reply, &args);
+// dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID);
+// dbus_connection_send(conn, reply, &serial);
+// shadow->hookInterruptStatusChange(sourceID,state);
+// dbus_message_unref(reply);
+// }
+// dbus_connection_flush(conn);
+// }
+// return NULL;
+//}
+
+ void *WorkerThreadPool::WorkerThread(void* data)
{
- am_connectionID_t connectionID;
- am_timeSync_t delay;
- dbus_message_iter_init(msg, &args);
- dbus_message_iter_get_basic(&args,(void*) &connectionID);
- dbus_message_iter_next(&args);
- dbus_message_iter_get_basic(&args,(void*) &delay);
- reply = dbus_message_new_method_return(msg);
- dbus_message_iter_init_append(reply, &args);
- dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &connectionID);
- dbus_connection_send(conn, reply, &serial);
- shadow->hookTimingInformationChanged(connectionID,delay);
- dbus_message_unref(reply);
- }
- else if (dbus_message_is_method_call(msg, "org.genivi.test", "SinkAvailablityStatusChange"))
- {
- am_sinkID_t sinkID;
- am_Availability_s availability;
- dbus_message_iter_init(msg, &args);
- dbus_message_iter_get_basic(&args,(void*) &sinkID);
- reply = dbus_message_new_method_return(msg);
- dbus_message_iter_init_append(reply, &args);
- dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sinkID);
- dbus_connection_send(conn, reply, &serial);
- shadow->hookSinkAvailablityStatusChange(sinkID,availability);
- dbus_message_unref(reply);
- }
- else if (dbus_message_is_method_call(msg, "org.genivi.test", "SourceAvailablityStatusChange"))
- {
- am_sourceID_t sourceID;
- am_Availability_s availability;
- dbus_message_iter_init(msg, &args);
- dbus_message_iter_get_basic(&args,(void*) &sourceID);
- reply = dbus_message_new_method_return(msg);
- dbus_message_iter_init_append(reply, &args);
- dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID);
- dbus_connection_send(conn, reply, &serial);
- shadow->hookSourceAvailablityStatusChange(sourceID,availability);
- dbus_message_unref(reply);
+ threadInfo_s *myInfo=(threadInfo_s*)data;
+ while (1)
+ {
+ sem_wait(&myInfo->block);
+ pthread_mutex_lock(&mBlockingMutex);
+ Worker* actWorker=myInfo->worker;
+ pthread_mutex_unlock(&mBlockingMutex);
+ actWorker->setCancelSempaphore(&myInfo->cancel);
+ actWorker->start2work();
+ actWorker->pPool->finishedWork(myInfo->threadID);
+ }
+ return NULL;
}
- else if (dbus_message_is_method_call(msg, "org.genivi.test", "InterruptStatusChange"))
+
+ WorkerThreadPool::WorkerThreadPool(int numThreads):
+ mNumThreads(numThreads)
{
- am_sourceID_t sourceID;
-
- am_InterruptState_e state=IS_UNKNOWN;
- dbus_message_iter_init(msg, &args);
- dbus_message_iter_get_basic(&args,(void*) &sourceID);
- reply = dbus_message_new_method_return(msg);
- dbus_message_iter_init_append(reply, &args);
- dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID);
- dbus_connection_send(conn, reply, &serial);
- shadow->hookInterruptStatusChange(sourceID,state);
- dbus_message_unref(reply);
+ int workerID=0;
+ mListWorkers.resize(mNumThreads);
+ for (int i=0;i<mNumThreads;i++)
+ {
+ sem_init(&mListWorkers[i].block,NULL,NULL);
+ sem_init(&mListWorkers[i].cancel,NULL,NULL);
+ mListWorkers[i].busy=false;
+ mListWorkers[i].workerID=++workerID;
+ pthread_create(&mListWorkers[i].threadID,NULL,&WorkerThreadPool::WorkerThread,(void*)&mListWorkers[i]);
+ }
}
- dbus_connection_flush(conn);
- }
- return NULL;
-}
-
-void *WorkerThreadPool::WorkerThread(void* data)
-{
- threadInfo_s *myInfo=(threadInfo_s*)data;
- while (1)
- {
- sem_wait(&myInfo->block);
- pthread_mutex_lock(&mBlockingMutex);
- Worker* actWorker=myInfo->worker;
- pthread_mutex_unlock(&mBlockingMutex);
- actWorker->setCancelSempaphore(&myInfo->cancel);
- actWorker->start2work();
- actWorker->pPool->finishedWork(myInfo->threadID);
- }
- return NULL;
-}
-WorkerThreadPool::WorkerThreadPool(int numThreads):
-mNumThreads(numThreads)
-{
- int workerID=0;
- mListWorkers.resize(mNumThreads);
- for (int i=0;i<mNumThreads;i++)
- {
- sem_init(&mListWorkers[i].block,NULL,NULL);
- sem_init(&mListWorkers[i].cancel,NULL,NULL);
- mListWorkers[i].busy=false;
- mListWorkers[i].workerID=++workerID;
- pthread_create(&mListWorkers[i].threadID,NULL,&WorkerThreadPool::WorkerThread,(void*)&mListWorkers[i]);
- }
-}
-
-int16_t WorkerThreadPool::startWork(Worker *worker)
-{
- pthread_mutex_lock(&mBlockingMutex);
- std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
- for(;it!=mListWorkers.end();++it)
- {
- if(!it->busy)
+ int16_t WorkerThreadPool::startWork(Worker *worker)
{
- it->worker=worker;
- it->busy=true;
+ pthread_mutex_lock(&mBlockingMutex);
+ std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
+ for(;it!=mListWorkers.end();++it)
+ {
+ if(!it->busy)
+ {
+ it->worker=worker;
+ it->busy=true;
+ pthread_mutex_unlock(&mBlockingMutex);
+ sem_post(&it->block);
+ return ((int)it->workerID);
+ }
+ }
pthread_mutex_unlock(&mBlockingMutex);
- sem_post(&it->block);
- return ((int)it->workerID);
+ return (-1);
}
- }
- pthread_mutex_unlock(&mBlockingMutex);
- return (-1);
-}
-bool WorkerThreadPool::cancelWork(int workerID)
-{
- std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
- for(;it!=mListWorkers.end();++it)
- {
- if(it->workerID==workerID && it->busy)
+ bool WorkerThreadPool::cancelWork(int workerID)
{
- sem_post(&it->cancel);
- return (true);
+ std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
+ for(;it!=mListWorkers.end();++it)
+ {
+ if(it->workerID==workerID && it->busy)
+ {
+ sem_post(&it->cancel);
+ return (true);
+ }
+ }
+ return (false);
}
- }
- return (false);
-}
-void WorkerThreadPool::finishedWork(pthread_t threadID)
-{
- pthread_mutex_lock(&mBlockingMutex);
- std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
- for(;it!=mListWorkers.end();++it)
- {
- if(it->threadID==threadID)
+ void WorkerThreadPool::finishedWork(pthread_t threadID)
{
- it->busy=false;
- delete it->worker;
- break;
+ pthread_mutex_lock(&mBlockingMutex);
+ std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
+ for(;it!=mListWorkers.end();++it)
+ {
+ if(it->threadID==threadID)
+ {
+ it->busy=false;
+ delete it->worker;
+ break;
+ }
+ }
+ pthread_mutex_unlock(&mBlockingMutex);
}
- }
- pthread_mutex_unlock(&mBlockingMutex);
-}
-WorkerThreadPool::~WorkerThreadPool()
-{
- for (int i=0;i<mNumThreads;i++)
- {
- pthread_cancel(mListWorkers[i].threadID);
- }
-}
+ WorkerThreadPool::~WorkerThreadPool()
+ {
+ for (int i=0;i<mNumThreads;i++)
+ {
+ pthread_cancel(mListWorkers[i].threadID);
+ }
+ }
-Worker::Worker(WorkerThreadPool *pool):
-pPool(pool), //
- mCancelSem()
+ Worker::Worker(WorkerThreadPool *pool):
+ pPool(pool), mCancelSem()
{
}
@@ -262,20 +261,13 @@ pPool(pool), //
}
AsyncRoutingSender::AsyncRoutingSender():
- mShadow(), //
- mReceiveInterface(0), //
- mDomains(createDomainTable()), //
- mSinks(createSinkTable()), //
-mSources ( createSourceTable ( ) ), //
-mGateways ( createGatewayTable ( ) ) , //
-mMapHandleWorker ( ), //
-mMapConnectionIDRoute(),//
-mPool(10)
+ mReceiveInterface(0), mDomains(createDomainTable()), mSinks(createSinkTable()), mSources ( createSourceTable ( ) ), mGateways ( createGatewayTable ( ) ), mMapHandleWorker ( ) , mMapConnectionIDRoute ( ) , mPool (10)
{
}
AsyncRoutingSender::~AsyncRoutingSender()
{
+ delete mShadow;
}
am_Error_e AsyncRoutingSender::startupInterface(RoutingReceiveInterface *routingreceiveinterface)
@@ -283,50 +275,20 @@ am_Error_e AsyncRoutingSender::startupInterface(RoutingReceiveInterface *routing
//first, create the Shadow:
assert(routingreceiveinterface!=0);
mReceiveInterface = routingreceiveinterface;
- mShadow.setRoutingInterface(routingreceiveinterface);
+ SocketHandler* handler;
+ routingreceiveinterface->getSocketHandler(handler);
+ mShadow = new RoutingReceiverAsyncShadow(routingreceiveinterface, handler);
+ return E_OK;
}
void AsyncRoutingSender::setRoutingReady(const uint16_t handle)
{
- //todo: implement handle !
- assert(mReceiveInterface!=0);
- am_Error_e eCode;
- //first register the domains
- std::vector<am_Domain_s>::iterator domainIter = mDomains.begin();
- for (; domainIter != mDomains.end(); ++domainIter)
- {
- am_domainID_t domainID;
- if ((eCode = mReceiveInterface->registerDomain(*domainIter, domainID)) != E_OK)
- {
- logError("AsyncRoutingSender::routingInterfacesReady error on registering domain, failed with", eCode);
- }
- domainIter->domainID = domainID;
- }
+ syncRegisterWorker *worker = new syncRegisterWorker(this, &mPool, mShadow, mDomains, mSinks, mSources, handle);
- //then sources
- std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
- for (; sourceIter != mSources.end(); ++sourceIter)
+ if ((mPool.startWork(worker)) == -1)
{
- am_sourceID_t sourceID;
- //set the correct domainID
- sourceIter->domainID = mDomains[0].domainID;
- if ((eCode = mReceiveInterface->registerSource(*sourceIter, sourceID)) != E_OK)
- {
- logError("AsyncRoutingSender::routingInterfacesReady error on registering source, failed with", eCode);
- }
- }
-
- //sinks
- std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
- for (; sinkIter != mSinks.end(); ++sinkIter)
- {
- am_sinkID_t sinkID;
- //set the correct domainID
- sinkIter->domainID = mDomains[0].domainID;
- if ((eCode = mReceiveInterface->registerSink(*sinkIter, sinkID)) != E_OK)
- {
- logError("AsyncRoutingSender::routingInterfacesReady error on registering sink, failed with", eCode);
- }
+ logError("AsyncRoutingSender::asyncConnect not enough threads!");
+ delete worker;
}
//gateways
@@ -422,7 +384,7 @@ am_Error_e AsyncRoutingSender::asyncConnect(const am_Handle_s handle, const am_c
return (E_WRONG_FORMAT);
//the operation is ok, lets create a worker, assign it to a task in the task pool
- asycConnectWorker *worker = new asycConnectWorker(this, &mPool, &mShadow, handle, connectionID, sourceID, sinkID, connectionFormat);
+ asycConnectWorker *worker = new asycConnectWorker(this, &mPool, mShadow, handle, connectionID, sourceID, sinkID, connectionFormat);
if ((work = mPool.startWork(worker)) == -1)
{
logError("AsyncRoutingSender::asyncConnect not enough threads!");
@@ -457,7 +419,7 @@ am_Error_e AsyncRoutingSender::asyncDisconnect(const am_Handle_s handle, const a
pthread_mutex_unlock(&mMapConnectionMutex);
//the operation is ok, lets create a worker, assign it to a task in the task pool
- asycDisConnectWorker *worker = new asycDisConnectWorker(this, &mPool, &mShadow, handle, connectionID);
+ asycDisConnectWorker *worker = new asycDisConnectWorker(this, &mPool, mShadow, handle, connectionID);
if ((work = mPool.startWork(worker)) == -1)
{
logError("AsyncRoutingSender::asyncDisconnect not enough threads!");
@@ -499,7 +461,7 @@ am_Error_e AsyncRoutingSender::asyncSetSinkVolume(const am_Handle_s handle, cons
if (sinkIter == mSinks.end())
return (E_NON_EXISTENT); //not found!
- asyncSetSinkVolumeWorker *worker = new asyncSetSinkVolumeWorker(this, &mPool, &mShadow, sinkIter->volume, handle, sinkID, volume, ramp, time);
+ asyncSetSinkVolumeWorker *worker = new asyncSetSinkVolumeWorker(this, &mPool, mShadow, sinkIter->volume, handle, sinkID, volume, ramp, time);
if ((work = mPool.startWork(worker)) == -1)
{
logError("AsyncRoutingSender::asyncSetSinkVolume not enough threads!");
@@ -541,7 +503,7 @@ am_Error_e AsyncRoutingSender::asyncSetSourceVolume(const am_Handle_s handle, co
if (sourceIter == mSources.end())
return (E_NON_EXISTENT); //not found!
- asyncSetSourceVolumeWorker *worker = new asyncSetSourceVolumeWorker(this, &mPool, &mShadow, sourceIter->volume, handle, sourceID, volume, ramp, time);
+ asyncSetSourceVolumeWorker *worker = new asyncSetSourceVolumeWorker(this, &mPool, mShadow, sourceIter->volume, handle, sourceID, volume, ramp, time);
if ((work = mPool.startWork(worker)) == -1)
{
logError("AsyncRoutingSender::asyncSetSourceVolume not enough threads!");
@@ -583,7 +545,7 @@ am_Error_e AsyncRoutingSender::asyncSetSourceState(const am_Handle_s handle, con
if (sourceIter == mSources.end())
return (E_NON_EXISTENT); //not found!
- asyncSetSourceStateWorker *worker = new asyncSetSourceStateWorker(this, &mPool, &mShadow, handle, sourceID, state);
+ asyncSetSourceStateWorker *worker = new asyncSetSourceStateWorker(this, &mPool, mShadow, handle, sourceID, state);
if ((work = mPool.startWork(worker)) == -1)
{
logError("AsyncRoutingSender::asyncSetSourceState not enough threads!");
@@ -625,7 +587,7 @@ am_Error_e AsyncRoutingSender::asyncSetSinkSoundProperty(const am_Handle_s handl
if (sinkIter == mSinks.end())
return (E_NON_EXISTENT); //not found!
- asyncSetSinkSoundPropertyWorker *worker = new asyncSetSinkSoundPropertyWorker(this, &mPool, &mShadow, handle, soundProperty, sinkID);
+ asyncSetSinkSoundPropertyWorker *worker = new asyncSetSinkSoundPropertyWorker(this, &mPool, mShadow, handle, soundProperty, sinkID);
if ((work = mPool.startWork(worker)) == -1)
{
logError("AsyncRoutingSender::asyncSetSinkSoundProperty not enough threads!");
@@ -676,7 +638,7 @@ am_Error_e AsyncRoutingSender::setDomainState(const am_domainID_t domainID, cons
if (domainIter == mDomains.end())
return (E_NON_EXISTENT); //not found!
- asyncDomainStateChangeWorker *worker = new asyncDomainStateChangeWorker(this, &mPool, &mShadow, domainID, domainState);
+ asyncDomainStateChangeWorker *worker = new asyncDomainStateChangeWorker(this, &mPool, mShadow, domainID, domainState);
if ((work = mPool.startWork(worker)) == -1)
{
logError("AsyncRoutingSender::setDomainState not enough threads!");
@@ -714,7 +676,7 @@ am_Error_e AsyncRoutingSender::asyncSetSourceSoundProperty(const am_Handle_s han
if (sourceIter == mSources.end())
return (E_NON_EXISTENT); //not found!
- asyncSetSourceSoundPropertyWorker *worker = new asyncSetSourceSoundPropertyWorker(this, &mPool, &mShadow, handle, soundProperty, sourceID);
+ asyncSetSourceSoundPropertyWorker *worker = new asyncSetSourceSoundPropertyWorker(this, &mPool, mShadow, handle, soundProperty, sourceID);
if ((work = mPool.startWork(worker)) == -1)
{
logError("AsyncRoutingSender::asyncSetSourceState not enough threads!");
@@ -953,6 +915,27 @@ void am::AsyncRoutingSender::updateDomainstateSafe(am_domainID_t domainID, am_Do
pthread_mutex_unlock(&mDomainsMutex);
}
+void am::AsyncRoutingSender::updateDomainListSafe(std::vector<am_Domain_s> listDomains)
+{
+ pthread_mutex_lock(&mDomainsMutex);
+ mDomains = listDomains;
+ pthread_mutex_unlock(&mDomainsMutex);
+}
+
+void am::AsyncRoutingSender::updateSourceListSafe(std::vector<am_Source_s> listSource)
+{
+ pthread_mutex_lock(&mSourcesMutex);
+ mSources = listSource;
+ pthread_mutex_unlock(&mSourcesMutex);
+}
+
+void am::AsyncRoutingSender::updateSinkListSafe(std::vector<am_Sink_s> listSinks)
+{
+ pthread_mutex_lock(&mSinksMutex);
+ mSinks = listSinks;
+ pthread_mutex_unlock(&mSinksMutex);
+}
+
void AsyncRoutingSender::getInterfaceVersion(std::string & version) const
{
version = RoutingSendVersion;
@@ -1320,3 +1303,70 @@ void am::asyncDomainStateChangeWorker::cancelWork()
mShadow->hookDomainStateChange(mDomainID, mDomainState);
}
+syncRegisterWorker::syncRegisterWorker(AsyncRoutingSender * asyncSender, WorkerThreadPool* pool, RoutingReceiverAsyncShadow* shadow, const std::vector<am_Domain_s> domains, const std::vector<am_Sink_s> sinks, const std::vector<am_Source_s> sources, const uint16_t handle) :
+ Worker(pool), //
+ mAsyncSender(asyncSender), //
+ mShadow(shadow), //
+ mListDomains(domains), //
+ mListSinks(sinks), //
+ mListSources(sources),
+ mHandle(handle)
+{
+}
+
+void syncRegisterWorker::start2work()
+{
+ //todo: sendchanged data must be in here !
+ logInfo("Start to register stuff");
+
+ am_Error_e eCode;
+
+ std::vector<am_Domain_s>::iterator domainIter = mListDomains.begin();
+ for (; domainIter != mListDomains.end(); ++domainIter)
+ {
+ am_domainID_t domainID;
+ if ((eCode = mShadow->registerDomain(*domainIter, domainID)) != E_OK)
+ {
+ logError("syncRegisterWorker::start2work error on registering domain, failed with", eCode);
+ }
+ domainIter->domainID = domainID;
+ }
+
+ mAsyncSender->updateDomainListSafe(mListDomains);
+
+ //then sources
+ std::vector<am_Source_s>::iterator sourceIter = mListSources.begin();
+ for (; sourceIter != mListSources.end(); ++sourceIter)
+ {
+ am_sourceID_t sourceID;
+ //set the correct domainID
+ sourceIter->domainID = mListDomains[0].domainID;
+ if ((eCode = mShadow->registerSource(*sourceIter, sourceID)) != E_OK)
+ {
+ logError("syncRegisterWorker::start2work error on registering source, failed with", eCode);
+ }
+ }
+
+ mAsyncSender->updateSourceListSafe(mListSources);
+
+ //sinks
+ std::vector<am_Sink_s>::iterator sinkIter = mListSinks.begin();
+ for (; sinkIter != mListSinks.end(); ++sinkIter)
+ {
+ am_sinkID_t sinkID;
+ //set the correct domainID
+ sinkIter->domainID = mListDomains[0].domainID;
+ if ((eCode = mShadow->registerSink(*sinkIter, sinkID)) != E_OK)
+ {
+ logError("syncRegisterWorker::start2work error on registering sink, failed with", eCode);
+ }
+ }
+
+ mAsyncSender->updateSinkListSafe(mListSinks);
+ mShadow->confirmRoutingReady(mHandle);
+}
+
+void syncRegisterWorker::cancelWork()
+{
+}
+