diff options
Diffstat (limited to 'src/CommonAPI/DBus/DBusProxy.cpp')
-rw-r--r-- | src/CommonAPI/DBus/DBusProxy.cpp | 237 |
1 files changed, 230 insertions, 7 deletions
diff --git a/src/CommonAPI/DBus/DBusProxy.cpp b/src/CommonAPI/DBus/DBusProxy.cpp index 47336f7..1de88fc 100644 --- a/src/CommonAPI/DBus/DBusProxy.cpp +++ b/src/CommonAPI/DBus/DBusProxy.cpp @@ -9,6 +9,8 @@ #include <CommonAPI/Utils.hpp> #include <CommonAPI/DBus/DBusProxy.hpp> #include <CommonAPI/DBus/DBusUtils.hpp> +#include <CommonAPI/DBus/DBusProxyAsyncSignalMemberCallbackHandler.hpp> +#include <CommonAPI/Logger.hpp> namespace CommonAPI { namespace DBus { @@ -17,7 +19,7 @@ DBusProxyStatusEvent::DBusProxyStatusEvent(DBusProxy *_dbusProxy) : dbusProxy_(_dbusProxy) { } -void DBusProxyStatusEvent::onListenerAdded(const Listener& listener) { +void DBusProxyStatusEvent::onListenerAdded(const Listener& listener, const Subscription subscription) { if (dbusProxy_->isAvailable()) listener(AvailabilityStatus::AVAILABLE); } @@ -28,7 +30,8 @@ DBusProxy::DBusProxy(const DBusAddress &_dbusAddress, dbusProxyStatusEvent_(this), availabilityStatus_(AvailabilityStatus::UNKNOWN), interfaceVersionAttribute_(*this, "uu", "getInterfaceVersion"), - dbusServiceRegistry_(DBusServiceRegistry::get(_connection)) + dbusServiceRegistry_(DBusServiceRegistry::get(_connection)), + signalMemberHandlerInfo_(3000) { } @@ -49,11 +52,15 @@ bool DBusProxy::isAvailable() const { } bool DBusProxy::isAvailableBlocking() const { - std::unique_lock<std::mutex> lock(availabilityMutex_); + std::unique_lock<std::mutex> lock(availabilityMutex_); + + if(!getDBusConnection()->hasDispatchThread()) { + return isAvailable(); + } while (availabilityStatus_ != AvailabilityStatus::AVAILABLE) { - availabilityCondition_.wait(lock); - } + availabilityCondition_.wait(lock); + } return true; } @@ -66,9 +73,79 @@ InterfaceVersionAttribute& DBusProxy::getInterfaceVersionAttribute() { return interfaceVersionAttribute_; } +void DBusProxy::signalMemberCallback(const CallStatus dbusMessageCallStatus, + const DBusMessage& dbusMessage, + DBusProxyConnection::DBusSignalHandler* dbusSignalHandler, + const uint32_t tag) { + dbusSignalHandler->onSignalDBusMessage(dbusMessage); +} + +void DBusProxy::signalInitialValueCallback(const CallStatus dbusMessageCallStatus, + const DBusMessage& dbusMessage, + DBusProxyConnection::DBusSignalHandler* dbusSignalHandler, + const uint32_t tag) { + dbusSignalHandler->onInitialValueSignalDBusMessage(dbusMessage, tag); +} + void DBusProxy::onDBusServiceInstanceStatus(const AvailabilityStatus& availabilityStatus) { - availabilityStatus_ = availabilityStatus; - dbusProxyStatusEvent_.notifyListeners(availabilityStatus); + if (availabilityStatus != availabilityStatus_) { + availabilityStatusMutex_.lock(); + availabilityStatus_ = availabilityStatus; + availabilityStatusMutex_.unlock(); + + dbusProxyStatusEvent_.notifyListeners(availabilityStatus); + + if (availabilityStatus == AvailabilityStatus::AVAILABLE) { + std::lock_guard < std::mutex > queueLock(signalMemberHandlerQueueMutex_); + + for(auto signalMemberHandlerIterator = signalMemberHandlerQueue_.begin(); + signalMemberHandlerIterator != signalMemberHandlerQueue_.end(); + signalMemberHandlerIterator++) { + + if (!std::get<7>(*signalMemberHandlerIterator)) { + connection_->addSignalMemberHandler( + std::get<0>(*signalMemberHandlerIterator), + std::get<1>(*signalMemberHandlerIterator), + std::get<2>(*signalMemberHandlerIterator), + std::get<3>(*signalMemberHandlerIterator), + std::get<5>(*signalMemberHandlerIterator), + std::get<6>(*signalMemberHandlerIterator)); + std::get<7>(*signalMemberHandlerIterator) = true; + + DBusMessage message = createMethodCall(std::get<4>(*signalMemberHandlerIterator), ""); + + DBusProxyAsyncSignalMemberCallbackHandler::FunctionType myFunc = std::bind( + &DBusProxy::signalMemberCallback, + this, + std::placeholders::_1, + std::placeholders::_2, + std::placeholders::_3, + std::placeholders::_4); + connection_->sendDBusMessageWithReplyAsync( + message, + DBusProxyAsyncSignalMemberCallbackHandler::create(myFunc, std::get<5>(*signalMemberHandlerIterator), 0), + &signalMemberHandlerInfo_); + } + } + } else { + std::lock_guard < std::mutex > queueLock(signalMemberHandlerQueueMutex_); + + for(auto signalMemberHandlerIterator = signalMemberHandlerQueue_.begin(); + signalMemberHandlerIterator != signalMemberHandlerQueue_.end(); + signalMemberHandlerIterator++) { + + if (std::get<7>(*signalMemberHandlerIterator)) { + DBusProxyConnection::DBusSignalHandlerToken signalHandlerToken ( + std::get<0>(*signalMemberHandlerIterator), + std::get<1>(*signalMemberHandlerIterator), + std::get<2>(*signalMemberHandlerIterator), + std::get<3>(*signalMemberHandlerIterator)); + connection_->removeSignalMemberHandler(signalHandlerToken, std::get<5>(*signalMemberHandlerIterator)); + std::get<7>(*signalMemberHandlerIterator) = false; + } + } + } + } availabilityCondition_.notify_one(); } @@ -96,5 +173,151 @@ void DBusProxy::unsubscribeFromSelectiveBroadcast(const std::string& eventName, getDBusConnection()->unsubscribeFromSelectiveBroadcast(eventName, subscription, this, dbusSignalHandler); } +DBusProxyConnection::DBusSignalHandlerToken DBusProxy::addSignalMemberHandler( + const std::string& objectPath, + const std::string& interfaceName, + const std::string& signalName, + const std::string& signalSignature, + DBusProxyConnection::DBusSignalHandler* dbusSignalHandler, + const bool justAddFilter) { + return DBusProxyBase::addSignalMemberHandler( + objectPath, + interfaceName, + signalName, + signalSignature, + dbusSignalHandler, + justAddFilter); +} + +DBusProxyConnection::DBusSignalHandlerToken DBusProxy::addSignalMemberHandler( + const std::string &objectPath, + const std::string &interfaceName, + const std::string &signalName, + const std::string &signalSignature, + const std::string &getMethodName, + DBusProxyConnection::DBusSignalHandler *dbusSignalHandler, + const bool justAddFilter) { + + DBusProxyConnection::DBusSignalHandlerToken signalHandlerToken ( + objectPath, + interfaceName, + signalName, + signalSignature); + + if (getMethodName != "") { + + SignalMemberHandlerTuple signalMemberHandler( + objectPath, + interfaceName, + signalName, + signalSignature, + getMethodName, + dbusSignalHandler, + justAddFilter, + false); + + availabilityStatusMutex_.lock(); + if (availabilityStatus_ == AvailabilityStatus::AVAILABLE) { + availabilityStatusMutex_.unlock(); + signalHandlerToken = connection_->addSignalMemberHandler( + objectPath, + interfaceName, + signalName, + signalSignature, + dbusSignalHandler, + justAddFilter); + std::get<7>(signalMemberHandler) = true; + } else { + availabilityStatusMutex_.unlock(); + } + addSignalMemberHandlerToQueue(signalMemberHandler); + } else { + signalHandlerToken = connection_->addSignalMemberHandler( + objectPath, + interfaceName, + signalName, + signalSignature, + dbusSignalHandler, + justAddFilter); + } + + return signalHandlerToken; +} + +void DBusProxy::addSignalMemberHandlerToQueue(SignalMemberHandlerTuple& _signalMemberHandler) { + + std::lock_guard < std::mutex > queueLock(signalMemberHandlerQueueMutex_); + bool found = false; + + for(auto signalMemberHandlerIterator = signalMemberHandlerQueue_.begin(); + signalMemberHandlerIterator != signalMemberHandlerQueue_.end(); + signalMemberHandlerIterator++) { + + if ( (std::get<0>(*signalMemberHandlerIterator) == std::get<0>(_signalMemberHandler)) && + (std::get<1>(*signalMemberHandlerIterator) == std::get<1>(_signalMemberHandler)) && + (std::get<2>(*signalMemberHandlerIterator) == std::get<2>(_signalMemberHandler)) && + (std::get<3>(*signalMemberHandlerIterator) == std::get<3>(_signalMemberHandler))) { + + found = true; + break; + } + } + if (!found) { + signalMemberHandlerQueue_.push_back(_signalMemberHandler); + } +} + +bool DBusProxy::removeSignalMemberHandler( + const DBusProxyConnection::DBusSignalHandlerToken &_dbusSignalHandlerToken, + const DBusProxyConnection::DBusSignalHandler *_dbusSignalHandler) { + + { + std::lock_guard < std::mutex > queueLock(signalMemberHandlerQueueMutex_); + for(auto signalMemberHandlerIterator = signalMemberHandlerQueue_.begin(); + signalMemberHandlerIterator != signalMemberHandlerQueue_.end(); + signalMemberHandlerIterator++) { + + if ( (std::get<0>(*signalMemberHandlerIterator) == std::get<0>(_dbusSignalHandlerToken)) && + (std::get<1>(*signalMemberHandlerIterator) == std::get<1>(_dbusSignalHandlerToken)) && + (std::get<2>(*signalMemberHandlerIterator) == std::get<2>(_dbusSignalHandlerToken)) && + (std::get<3>(*signalMemberHandlerIterator) == std::get<3>(_dbusSignalHandlerToken))) { + signalMemberHandlerIterator = signalMemberHandlerQueue_.erase(signalMemberHandlerIterator); + + if (signalMemberHandlerIterator == signalMemberHandlerQueue_.end()) { + break; + } + } + } + } + + return connection_->removeSignalMemberHandler(_dbusSignalHandlerToken, _dbusSignalHandler); +} + +void DBusProxy::getCurrentValueForSignalListener( + const std::string &getMethodName, + DBusProxyConnection::DBusSignalHandler *dbusSignalHandler, + const uint32_t subscription) { + + availabilityStatusMutex_.lock(); + if (availabilityStatus_ == AvailabilityStatus::AVAILABLE) { + availabilityStatusMutex_.unlock(); + + DBusMessage message = createMethodCall(getMethodName, ""); + + DBusProxyAsyncSignalMemberCallbackHandler::FunctionType myFunc = std::bind(&DBusProxy::signalInitialValueCallback, + this, + std::placeholders::_1, + std::placeholders::_2, + std::placeholders::_3, + std::placeholders::_4); + connection_->sendDBusMessageWithReplyAsync( + message, + DBusProxyAsyncSignalMemberCallbackHandler::create(myFunc, dbusSignalHandler, subscription), + &signalMemberHandlerInfo_); + } else { + availabilityStatusMutex_.unlock(); + } +} + } // namespace DBus } // namespace CommonAPI |