summaryrefslogtreecommitdiff
path: root/PluginRoutingInterfaceAsync/src/RoutingSenderAsync.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'PluginRoutingInterfaceAsync/src/RoutingSenderAsync.cpp')
-rw-r--r--PluginRoutingInterfaceAsync/src/RoutingSenderAsync.cpp466
1 files changed, 258 insertions, 208 deletions
diff --git a/PluginRoutingInterfaceAsync/src/RoutingSenderAsync.cpp b/PluginRoutingInterfaceAsync/src/RoutingSenderAsync.cpp
index 1a1ee70..68b260f 100644
--- a/PluginRoutingInterfaceAsync/src/RoutingSenderAsync.cpp
+++ b/PluginRoutingInterfaceAsync/src/RoutingSenderAsync.cpp
@@ -55,175 +55,174 @@ pthread_mutex_t AsyncRoutingSender::mSourcesMutex= PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t AsyncRoutingSender::mDomainsMutex= PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t WorkerThreadPool::mBlockingMutex = PTHREAD_MUTEX_INITIALIZER;
-void* AsyncRoutingSender::InterruptEvents(void *data)
-{
- RoutingReceiverAsyncShadow *shadow=(RoutingReceiverAsyncShadow *)data;
- DBusError err;
- DBusMessage* msg;
- DBusConnection* conn;
- dbus_error_init(&err);
- conn = dbus_bus_get(DBUS_BUS_SESSION, &err);
- dbus_uint32_t serial = 0;
- DBusMessage* reply;
- DBusMessageIter args;
- dbus_bus_request_name(conn, "org.genivi.test",DBUS_NAME_FLAG_REPLACE_EXISTING , &err);
-
- while (dbus_connection_read_write_dispatch(conn, -1))
- {
- dbus_connection_read_write(conn, 0);
- msg = dbus_connection_pop_message(conn);
-
- if (dbus_message_is_method_call(msg, "org.genivi.test", "timingChanged"))
+//void* AsyncRoutingSender::InterruptEvents(void *data)
+//{
+// RoutingReceiverAsyncShadow *shadow=(RoutingReceiverAsyncShadow *)data;
+// DBusError err;
+// DBusMessage* msg;
+// DBusConnection* conn;
+// dbus_error_init(&err);
+// conn = dbus_bus_get(DBUS_BUS_SESSION, &err);
+// dbus_uint32_t serial = 0;
+// DBusMessage* reply;
+// DBusMessageIter args;
+// dbus_bus_request_name(conn, "org.genivi.test",DBUS_NAME_FLAG_REPLACE_EXISTING , &err);
+//
+// while (dbus_connection_read_write_dispatch(conn, -1))
+// {
+// dbus_connection_read_write(conn, 0);
+// msg = dbus_connection_pop_message(conn);
+//
+// if (dbus_message_is_method_call(msg, "org.genivi.test", "timingChanged"))
+// {
+// am_connectionID_t connectionID;
+// am_timeSync_t delay;
+// dbus_message_iter_init(msg, &args);
+// dbus_message_iter_get_basic(&args,(void*) &connectionID);
+// dbus_message_iter_next(&args);
+// dbus_message_iter_get_basic(&args,(void*) &delay);
+// reply = dbus_message_new_method_return(msg);
+// dbus_message_iter_init_append(reply, &args);
+// dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &connectionID);
+// dbus_connection_send(conn, reply, &serial);
+// shadow->hookTimingInformationChanged(connectionID,delay);
+// dbus_message_unref(reply);
+// }
+// else if (dbus_message_is_method_call(msg, "org.genivi.test", "SinkAvailablityStatusChange"))
+// {
+// am_sinkID_t sinkID;
+// am_Availability_s availability;
+// dbus_message_iter_init(msg, &args);
+// dbus_message_iter_get_basic(&args,(void*) &sinkID);
+// reply = dbus_message_new_method_return(msg);
+// dbus_message_iter_init_append(reply, &args);
+// dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sinkID);
+// dbus_connection_send(conn, reply, &serial);
+// shadow->hookSinkAvailablityStatusChange(sinkID,availability);
+// dbus_message_unref(reply);
+// }
+// else if (dbus_message_is_method_call(msg, "org.genivi.test", "SourceAvailablityStatusChange"))
+// {
+// am_sourceID_t sourceID;
+// am_Availability_s availability;
+// dbus_message_iter_init(msg, &args);
+// dbus_message_iter_get_basic(&args,(void*) &sourceID);
+// reply = dbus_message_new_method_return(msg);
+// dbus_message_iter_init_append(reply, &args);
+// dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID);
+// dbus_connection_send(conn, reply, &serial);
+// shadow->hookSourceAvailablityStatusChange(sourceID,availability);
+// dbus_message_unref(reply);
+// }
+// else if (dbus_message_is_method_call(msg, "org.genivi.test", "InterruptStatusChange"))
+// {
+// am_sourceID_t sourceID;
+//
+// am_InterruptState_e state=IS_UNKNOWN;
+// dbus_message_iter_init(msg, &args);
+// dbus_message_iter_get_basic(&args,(void*) &sourceID);
+// reply = dbus_message_new_method_return(msg);
+// dbus_message_iter_init_append(reply, &args);
+// dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID);
+// dbus_connection_send(conn, reply, &serial);
+// shadow->hookInterruptStatusChange(sourceID,state);
+// dbus_message_unref(reply);
+// }
+// dbus_connection_flush(conn);
+// }
+// return NULL;
+//}
+
+ void *WorkerThreadPool::WorkerThread(void* data)
{
- am_connectionID_t connectionID;
- am_timeSync_t delay;
- dbus_message_iter_init(msg, &args);
- dbus_message_iter_get_basic(&args,(void*) &connectionID);
- dbus_message_iter_next(&args);
- dbus_message_iter_get_basic(&args,(void*) &delay);
- reply = dbus_message_new_method_return(msg);
- dbus_message_iter_init_append(reply, &args);
- dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &connectionID);
- dbus_connection_send(conn, reply, &serial);
- shadow->hookTimingInformationChanged(connectionID,delay);
- dbus_message_unref(reply);
- }
- else if (dbus_message_is_method_call(msg, "org.genivi.test", "SinkAvailablityStatusChange"))
- {
- am_sinkID_t sinkID;
- am_Availability_s availability;
- dbus_message_iter_init(msg, &args);
- dbus_message_iter_get_basic(&args,(void*) &sinkID);
- reply = dbus_message_new_method_return(msg);
- dbus_message_iter_init_append(reply, &args);
- dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sinkID);
- dbus_connection_send(conn, reply, &serial);
- shadow->hookSinkAvailablityStatusChange(sinkID,availability);
- dbus_message_unref(reply);
- }
- else if (dbus_message_is_method_call(msg, "org.genivi.test", "SourceAvailablityStatusChange"))
- {
- am_sourceID_t sourceID;
- am_Availability_s availability;
- dbus_message_iter_init(msg, &args);
- dbus_message_iter_get_basic(&args,(void*) &sourceID);
- reply = dbus_message_new_method_return(msg);
- dbus_message_iter_init_append(reply, &args);
- dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID);
- dbus_connection_send(conn, reply, &serial);
- shadow->hookSourceAvailablityStatusChange(sourceID,availability);
- dbus_message_unref(reply);
+ threadInfo_s *myInfo=(threadInfo_s*)data;
+ while (1)
+ {
+ sem_wait(&myInfo->block);
+ pthread_mutex_lock(&mBlockingMutex);
+ Worker* actWorker=myInfo->worker;
+ pthread_mutex_unlock(&mBlockingMutex);
+ actWorker->setCancelSempaphore(&myInfo->cancel);
+ actWorker->start2work();
+ actWorker->pPool->finishedWork(myInfo->threadID);
+ }
+ return NULL;
}
- else if (dbus_message_is_method_call(msg, "org.genivi.test", "InterruptStatusChange"))
+
+ WorkerThreadPool::WorkerThreadPool(int numThreads):
+ mNumThreads(numThreads)
{
- am_sourceID_t sourceID;
-
- am_InterruptState_e state=IS_UNKNOWN;
- dbus_message_iter_init(msg, &args);
- dbus_message_iter_get_basic(&args,(void*) &sourceID);
- reply = dbus_message_new_method_return(msg);
- dbus_message_iter_init_append(reply, &args);
- dbus_message_iter_append_basic(&args, DBUS_TYPE_INT32, &sourceID);
- dbus_connection_send(conn, reply, &serial);
- shadow->hookInterruptStatusChange(sourceID,state);
- dbus_message_unref(reply);
+ int workerID=0;
+ mListWorkers.resize(mNumThreads);
+ for (int i=0;i<mNumThreads;i++)
+ {
+ sem_init(&mListWorkers[i].block,NULL,NULL);
+ sem_init(&mListWorkers[i].cancel,NULL,NULL);
+ mListWorkers[i].busy=false;
+ mListWorkers[i].workerID=++workerID;
+ pthread_create(&mListWorkers[i].threadID,NULL,&WorkerThreadPool::WorkerThread,(void*)&mListWorkers[i]);
+ }
}
- dbus_connection_flush(conn);
- }
- return NULL;
-}
-
-void *WorkerThreadPool::WorkerThread(void* data)
-{
- threadInfo_s *myInfo=(threadInfo_s*)data;
- while (1)
- {
- sem_wait(&myInfo->block);
- pthread_mutex_lock(&mBlockingMutex);
- Worker* actWorker=myInfo->worker;
- pthread_mutex_unlock(&mBlockingMutex);
- actWorker->setCancelSempaphore(&myInfo->cancel);
- actWorker->start2work();
- actWorker->pPool->finishedWork(myInfo->threadID);
- }
- return NULL;
-}
-WorkerThreadPool::WorkerThreadPool(int numThreads):
-mNumThreads(numThreads)
-{
- int workerID=0;
- mListWorkers.resize(mNumThreads);
- for (int i=0;i<mNumThreads;i++)
- {
- sem_init(&mListWorkers[i].block,NULL,NULL);
- sem_init(&mListWorkers[i].cancel,NULL,NULL);
- mListWorkers[i].busy=false;
- mListWorkers[i].workerID=++workerID;
- pthread_create(&mListWorkers[i].threadID,NULL,&WorkerThreadPool::WorkerThread,(void*)&mListWorkers[i]);
- }
-}
-
-int16_t WorkerThreadPool::startWork(Worker *worker)
-{
- pthread_mutex_lock(&mBlockingMutex);
- std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
- for(;it!=mListWorkers.end();++it)
- {
- if(!it->busy)
+ int16_t WorkerThreadPool::startWork(Worker *worker)
{
- it->worker=worker;
- it->busy=true;
+ pthread_mutex_lock(&mBlockingMutex);
+ std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
+ for(;it!=mListWorkers.end();++it)
+ {
+ if(!it->busy)
+ {
+ it->worker=worker;
+ it->busy=true;
+ pthread_mutex_unlock(&mBlockingMutex);
+ sem_post(&it->block);
+ return ((int)it->workerID);
+ }
+ }
pthread_mutex_unlock(&mBlockingMutex);
- sem_post(&it->block);
- return ((int)it->workerID);
+ return (-1);
}
- }
- pthread_mutex_unlock(&mBlockingMutex);
- return (-1);
-}
-bool WorkerThreadPool::cancelWork(int workerID)
-{
- std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
- for(;it!=mListWorkers.end();++it)
- {
- if(it->workerID==workerID && it->busy)
+ bool WorkerThreadPool::cancelWork(int workerID)
{
- sem_post(&it->cancel);
- return (true);
+ std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
+ for(;it!=mListWorkers.end();++it)
+ {
+ if(it->workerID==workerID && it->busy)
+ {
+ sem_post(&it->cancel);
+ return (true);
+ }
+ }
+ return (false);
}
- }
- return (false);
-}
-void WorkerThreadPool::finishedWork(pthread_t threadID)
-{
- pthread_mutex_lock(&mBlockingMutex);
- std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
- for(;it!=mListWorkers.end();++it)
- {
- if(it->threadID==threadID)
+ void WorkerThreadPool::finishedWork(pthread_t threadID)
{
- it->busy=false;
- delete it->worker;
- break;
+ pthread_mutex_lock(&mBlockingMutex);
+ std::vector<threadInfo_s>::iterator it=mListWorkers.begin();
+ for(;it!=mListWorkers.end();++it)
+ {
+ if(it->threadID==threadID)
+ {
+ it->busy=false;
+ delete it->worker;
+ break;
+ }
+ }
+ pthread_mutex_unlock(&mBlockingMutex);
}
- }
- pthread_mutex_unlock(&mBlockingMutex);
-}
-WorkerThreadPool::~WorkerThreadPool()
-{
- for (int i=0;i<mNumThreads;i++)
- {
- pthread_cancel(mListWorkers[i].threadID);
- }
-}
+ WorkerThreadPool::~WorkerThreadPool()
+ {
+ for (int i=0;i<mNumThreads;i++)
+ {
+ pthread_cancel(mListWorkers[i].threadID);
+ }
+ }
-Worker::Worker(WorkerThreadPool *pool):
-pPool(pool), //
- mCancelSem()
+ Worker::Worker(WorkerThreadPool *pool):
+ pPool(pool), mCancelSem()
{
}
@@ -262,20 +261,13 @@ pPool(pool), //
}
AsyncRoutingSender::AsyncRoutingSender():
- mShadow(), //
- mReceiveInterface(0), //
- mDomains(createDomainTable()), //
- mSinks(createSinkTable()), //
-mSources ( createSourceTable ( ) ), //
-mGateways ( createGatewayTable ( ) ) , //
-mMapHandleWorker ( ), //
-mMapConnectionIDRoute(),//
-mPool(10)
+ mReceiveInterface(0), mDomains(createDomainTable()), mSinks(createSinkTable()), mSources ( createSourceTable ( ) ), mGateways ( createGatewayTable ( ) ), mMapHandleWorker ( ) , mMapConnectionIDRoute ( ) , mPool (10)
{
}
AsyncRoutingSender::~AsyncRoutingSender()
{
+ delete mShadow;
}
am_Error_e AsyncRoutingSender::startupInterface(RoutingReceiveInterface *routingreceiveinterface)
@@ -283,50 +275,20 @@ am_Error_e AsyncRoutingSender::startupInterface(RoutingReceiveInterface *routing
//first, create the Shadow:
assert(routingreceiveinterface!=0);
mReceiveInterface = routingreceiveinterface;
- mShadow.setRoutingInterface(routingreceiveinterface);
+ SocketHandler* handler;
+ routingreceiveinterface->getSocketHandler(handler);
+ mShadow = new RoutingReceiverAsyncShadow(routingreceiveinterface, handler);
+ return E_OK;
}
void AsyncRoutingSender::setRoutingReady(const uint16_t handle)
{
- //todo: implement handle !
- assert(mReceiveInterface!=0);
- am_Error_e eCode;
- //first register the domains
- std::vector<am_Domain_s>::iterator domainIter = mDomains.begin();
- for (; domainIter != mDomains.end(); ++domainIter)
- {
- am_domainID_t domainID;
- if ((eCode = mReceiveInterface->registerDomain(*domainIter, domainID)) != E_OK)
- {
- logError("AsyncRoutingSender::routingInterfacesReady error on registering domain, failed with", eCode);
- }
- domainIter->domainID = domainID;
- }
+ syncRegisterWorker *worker = new syncRegisterWorker(this, &mPool, mShadow, mDomains, mSinks, mSources, handle);
- //then sources
- std::vector<am_Source_s>::iterator sourceIter = mSources.begin();
- for (; sourceIter != mSources.end(); ++sourceIter)
+ if ((mPool.startWork(worker)) == -1)
{
- am_sourceID_t sourceID;
- //set the correct domainID
- sourceIter->domainID = mDomains[0].domainID;
- if ((eCode = mReceiveInterface->registerSource(*sourceIter, sourceID)) != E_OK)
- {
- logError("AsyncRoutingSender::routingInterfacesReady error on registering source, failed with", eCode);
- }
- }
-
- //sinks
- std::vector<am_Sink_s>::iterator sinkIter = mSinks.begin();
- for (; sinkIter != mSinks.end(); ++sinkIter)
- {
- am_sinkID_t sinkID;
- //set the correct domainID
- sinkIter->domainID = mDomains[0].domainID;
- if ((eCode = mReceiveInterface->registerSink(*sinkIter, sinkID)) != E_OK)
- {
- logError("AsyncRoutingSender::routingInterfacesReady error on registering sink, failed with", eCode);
- }
+ logError("AsyncRoutingSender::asyncConnect not enough threads!");
+ delete worker;
}
//gateways
@@ -422,7 +384,7 @@ am_Error_e AsyncRoutingSender::asyncConnect(const am_Handle_s handle, const am_c
return (E_WRONG_FORMAT);
//the operation is ok, lets create a worker, assign it to a task in the task pool
- asycConnectWorker *worker = new asycConnectWorker(this, &mPool, &mShadow, handle, connectionID, sourceID, sinkID, connectionFormat);
+ asycConnectWorker *worker = new asycConnectWorker(this, &mPool, mShadow, handle, connectionID, sourceID, sinkID, connectionFormat);
if ((work = mPool.startWork(worker)) == -1)
{
logError("AsyncRoutingSender::asyncConnect not enough threads!");
@@ -457,7 +419,7 @@ am_Error_e AsyncRoutingSender::asyncDisconnect(const am_Handle_s handle, const a
pthread_mutex_unlock(&mMapConnectionMutex);
//the operation is ok, lets create a worker, assign it to a task in the task pool
- asycDisConnectWorker *worker = new asycDisConnectWorker(this, &mPool, &mShadow, handle, connectionID);
+ asycDisConnectWorker *worker = new asycDisConnectWorker(this, &mPool, mShadow, handle, connectionID);
if ((work = mPool.startWork(worker)) == -1)
{
logError("AsyncRoutingSender::asyncDisconnect not enough threads!");
@@ -499,7 +461,7 @@ am_Error_e AsyncRoutingSender::asyncSetSinkVolume(const am_Handle_s handle, cons
if (sinkIter == mSinks.end())
return (E_NON_EXISTENT); //not found!
- asyncSetSinkVolumeWorker *worker = new asyncSetSinkVolumeWorker(this, &mPool, &mShadow, sinkIter->volume, handle, sinkID, volume, ramp, time);
+ asyncSetSinkVolumeWorker *worker = new asyncSetSinkVolumeWorker(this, &mPool, mShadow, sinkIter->volume, handle, sinkID, volume, ramp, time);
if ((work = mPool.startWork(worker)) == -1)
{
logError("AsyncRoutingSender::asyncSetSinkVolume not enough threads!");
@@ -541,7 +503,7 @@ am_Error_e AsyncRoutingSender::asyncSetSourceVolume(const am_Handle_s handle, co
if (sourceIter == mSources.end())
return (E_NON_EXISTENT); //not found!
- asyncSetSourceVolumeWorker *worker = new asyncSetSourceVolumeWorker(this, &mPool, &mShadow, sourceIter->volume, handle, sourceID, volume, ramp, time);
+ asyncSetSourceVolumeWorker *worker = new asyncSetSourceVolumeWorker(this, &mPool, mShadow, sourceIter->volume, handle, sourceID, volume, ramp, time);
if ((work = mPool.startWork(worker)) == -1)
{
logError("AsyncRoutingSender::asyncSetSourceVolume not enough threads!");
@@ -583,7 +545,7 @@ am_Error_e AsyncRoutingSender::asyncSetSourceState(const am_Handle_s handle, con
if (sourceIter == mSources.end())
return (E_NON_EXISTENT); //not found!
- asyncSetSourceStateWorker *worker = new asyncSetSourceStateWorker(this, &mPool, &mShadow, handle, sourceID, state);
+ asyncSetSourceStateWorker *worker = new asyncSetSourceStateWorker(this, &mPool, mShadow, handle, sourceID, state);
if ((work = mPool.startWork(worker)) == -1)
{
logError("AsyncRoutingSender::asyncSetSourceState not enough threads!");
@@ -625,7 +587,7 @@ am_Error_e AsyncRoutingSender::asyncSetSinkSoundProperty(const am_Handle_s handl
if (sinkIter == mSinks.end())
return (E_NON_EXISTENT); //not found!
- asyncSetSinkSoundPropertyWorker *worker = new asyncSetSinkSoundPropertyWorker(this, &mPool, &mShadow, handle, soundProperty, sinkID);
+ asyncSetSinkSoundPropertyWorker *worker = new asyncSetSinkSoundPropertyWorker(this, &mPool, mShadow, handle, soundProperty, sinkID);
if ((work = mPool.startWork(worker)) == -1)
{
logError("AsyncRoutingSender::asyncSetSinkSoundProperty not enough threads!");
@@ -676,7 +638,7 @@ am_Error_e AsyncRoutingSender::setDomainState(const am_domainID_t domainID, cons
if (domainIter == mDomains.end())
return (E_NON_EXISTENT); //not found!
- asyncDomainStateChangeWorker *worker = new asyncDomainStateChangeWorker(this, &mPool, &mShadow, domainID, domainState);
+ asyncDomainStateChangeWorker *worker = new asyncDomainStateChangeWorker(this, &mPool, mShadow, domainID, domainState);
if ((work = mPool.startWork(worker)) == -1)
{
logError("AsyncRoutingSender::setDomainState not enough threads!");
@@ -714,7 +676,7 @@ am_Error_e AsyncRoutingSender::asyncSetSourceSoundProperty(const am_Handle_s han
if (sourceIter == mSources.end())
return (E_NON_EXISTENT); //not found!
- asyncSetSourceSoundPropertyWorker *worker = new asyncSetSourceSoundPropertyWorker(this, &mPool, &mShadow, handle, soundProperty, sourceID);
+ asyncSetSourceSoundPropertyWorker *worker = new asyncSetSourceSoundPropertyWorker(this, &mPool, mShadow, handle, soundProperty, sourceID);
if ((work = mPool.startWork(worker)) == -1)
{
logError("AsyncRoutingSender::asyncSetSourceState not enough threads!");
@@ -953,6 +915,27 @@ void am::AsyncRoutingSender::updateDomainstateSafe(am_domainID_t domainID, am_Do
pthread_mutex_unlock(&mDomainsMutex);
}
+void am::AsyncRoutingSender::updateDomainListSafe(std::vector<am_Domain_s> listDomains)
+{
+ pthread_mutex_lock(&mDomainsMutex);
+ mDomains = listDomains;
+ pthread_mutex_unlock(&mDomainsMutex);
+}
+
+void am::AsyncRoutingSender::updateSourceListSafe(std::vector<am_Source_s> listSource)
+{
+ pthread_mutex_lock(&mSourcesMutex);
+ mSources = listSource;
+ pthread_mutex_unlock(&mSourcesMutex);
+}
+
+void am::AsyncRoutingSender::updateSinkListSafe(std::vector<am_Sink_s> listSinks)
+{
+ pthread_mutex_lock(&mSinksMutex);
+ mSinks = listSinks;
+ pthread_mutex_unlock(&mSinksMutex);
+}
+
void AsyncRoutingSender::getInterfaceVersion(std::string & version) const
{
version = RoutingSendVersion;
@@ -1320,3 +1303,70 @@ void am::asyncDomainStateChangeWorker::cancelWork()
mShadow->hookDomainStateChange(mDomainID, mDomainState);
}
+syncRegisterWorker::syncRegisterWorker(AsyncRoutingSender * asyncSender, WorkerThreadPool* pool, RoutingReceiverAsyncShadow* shadow, const std::vector<am_Domain_s> domains, const std::vector<am_Sink_s> sinks, const std::vector<am_Source_s> sources, const uint16_t handle) :
+ Worker(pool), //
+ mAsyncSender(asyncSender), //
+ mShadow(shadow), //
+ mListDomains(domains), //
+ mListSinks(sinks), //
+ mListSources(sources),
+ mHandle(handle)
+{
+}
+
+void syncRegisterWorker::start2work()
+{
+ //todo: sendchanged data must be in here !
+ logInfo("Start to register stuff");
+
+ am_Error_e eCode;
+
+ std::vector<am_Domain_s>::iterator domainIter = mListDomains.begin();
+ for (; domainIter != mListDomains.end(); ++domainIter)
+ {
+ am_domainID_t domainID;
+ if ((eCode = mShadow->registerDomain(*domainIter, domainID)) != E_OK)
+ {
+ logError("syncRegisterWorker::start2work error on registering domain, failed with", eCode);
+ }
+ domainIter->domainID = domainID;
+ }
+
+ mAsyncSender->updateDomainListSafe(mListDomains);
+
+ //then sources
+ std::vector<am_Source_s>::iterator sourceIter = mListSources.begin();
+ for (; sourceIter != mListSources.end(); ++sourceIter)
+ {
+ am_sourceID_t sourceID;
+ //set the correct domainID
+ sourceIter->domainID = mListDomains[0].domainID;
+ if ((eCode = mShadow->registerSource(*sourceIter, sourceID)) != E_OK)
+ {
+ logError("syncRegisterWorker::start2work error on registering source, failed with", eCode);
+ }
+ }
+
+ mAsyncSender->updateSourceListSafe(mListSources);
+
+ //sinks
+ std::vector<am_Sink_s>::iterator sinkIter = mListSinks.begin();
+ for (; sinkIter != mListSinks.end(); ++sinkIter)
+ {
+ am_sinkID_t sinkID;
+ //set the correct domainID
+ sinkIter->domainID = mListDomains[0].domainID;
+ if ((eCode = mShadow->registerSink(*sinkIter, sinkID)) != E_OK)
+ {
+ logError("syncRegisterWorker::start2work error on registering sink, failed with", eCode);
+ }
+ }
+
+ mAsyncSender->updateSinkListSafe(mListSinks);
+ mShadow->confirmRoutingReady(mHandle);
+}
+
+void syncRegisterWorker::cancelWork()
+{
+}
+