summaryrefslogtreecommitdiff
path: root/src/CommonAPI/DBus/DBusProxy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/CommonAPI/DBus/DBusProxy.cpp')
-rw-r--r--src/CommonAPI/DBus/DBusProxy.cpp237
1 files changed, 230 insertions, 7 deletions
diff --git a/src/CommonAPI/DBus/DBusProxy.cpp b/src/CommonAPI/DBus/DBusProxy.cpp
index 47336f7..1de88fc 100644
--- a/src/CommonAPI/DBus/DBusProxy.cpp
+++ b/src/CommonAPI/DBus/DBusProxy.cpp
@@ -9,6 +9,8 @@
#include <CommonAPI/Utils.hpp>
#include <CommonAPI/DBus/DBusProxy.hpp>
#include <CommonAPI/DBus/DBusUtils.hpp>
+#include <CommonAPI/DBus/DBusProxyAsyncSignalMemberCallbackHandler.hpp>
+#include <CommonAPI/Logger.hpp>
namespace CommonAPI {
namespace DBus {
@@ -17,7 +19,7 @@ DBusProxyStatusEvent::DBusProxyStatusEvent(DBusProxy *_dbusProxy)
: dbusProxy_(_dbusProxy) {
}
-void DBusProxyStatusEvent::onListenerAdded(const Listener& listener) {
+void DBusProxyStatusEvent::onListenerAdded(const Listener& listener, const Subscription subscription) {
if (dbusProxy_->isAvailable())
listener(AvailabilityStatus::AVAILABLE);
}
@@ -28,7 +30,8 @@ DBusProxy::DBusProxy(const DBusAddress &_dbusAddress,
dbusProxyStatusEvent_(this),
availabilityStatus_(AvailabilityStatus::UNKNOWN),
interfaceVersionAttribute_(*this, "uu", "getInterfaceVersion"),
- dbusServiceRegistry_(DBusServiceRegistry::get(_connection))
+ dbusServiceRegistry_(DBusServiceRegistry::get(_connection)),
+ signalMemberHandlerInfo_(3000)
{
}
@@ -49,11 +52,15 @@ bool DBusProxy::isAvailable() const {
}
bool DBusProxy::isAvailableBlocking() const {
- std::unique_lock<std::mutex> lock(availabilityMutex_);
+ std::unique_lock<std::mutex> lock(availabilityMutex_);
+
+ if(!getDBusConnection()->hasDispatchThread()) {
+ return isAvailable();
+ }
while (availabilityStatus_ != AvailabilityStatus::AVAILABLE) {
- availabilityCondition_.wait(lock);
- }
+ availabilityCondition_.wait(lock);
+ }
return true;
}
@@ -66,9 +73,79 @@ InterfaceVersionAttribute& DBusProxy::getInterfaceVersionAttribute() {
return interfaceVersionAttribute_;
}
+void DBusProxy::signalMemberCallback(const CallStatus dbusMessageCallStatus,
+ const DBusMessage& dbusMessage,
+ DBusProxyConnection::DBusSignalHandler* dbusSignalHandler,
+ const uint32_t tag) {
+ dbusSignalHandler->onSignalDBusMessage(dbusMessage);
+}
+
+void DBusProxy::signalInitialValueCallback(const CallStatus dbusMessageCallStatus,
+ const DBusMessage& dbusMessage,
+ DBusProxyConnection::DBusSignalHandler* dbusSignalHandler,
+ const uint32_t tag) {
+ dbusSignalHandler->onInitialValueSignalDBusMessage(dbusMessage, tag);
+}
+
void DBusProxy::onDBusServiceInstanceStatus(const AvailabilityStatus& availabilityStatus) {
- availabilityStatus_ = availabilityStatus;
- dbusProxyStatusEvent_.notifyListeners(availabilityStatus);
+ if (availabilityStatus != availabilityStatus_) {
+ availabilityStatusMutex_.lock();
+ availabilityStatus_ = availabilityStatus;
+ availabilityStatusMutex_.unlock();
+
+ dbusProxyStatusEvent_.notifyListeners(availabilityStatus);
+
+ if (availabilityStatus == AvailabilityStatus::AVAILABLE) {
+ std::lock_guard < std::mutex > queueLock(signalMemberHandlerQueueMutex_);
+
+ for(auto signalMemberHandlerIterator = signalMemberHandlerQueue_.begin();
+ signalMemberHandlerIterator != signalMemberHandlerQueue_.end();
+ signalMemberHandlerIterator++) {
+
+ if (!std::get<7>(*signalMemberHandlerIterator)) {
+ connection_->addSignalMemberHandler(
+ std::get<0>(*signalMemberHandlerIterator),
+ std::get<1>(*signalMemberHandlerIterator),
+ std::get<2>(*signalMemberHandlerIterator),
+ std::get<3>(*signalMemberHandlerIterator),
+ std::get<5>(*signalMemberHandlerIterator),
+ std::get<6>(*signalMemberHandlerIterator));
+ std::get<7>(*signalMemberHandlerIterator) = true;
+
+ DBusMessage message = createMethodCall(std::get<4>(*signalMemberHandlerIterator), "");
+
+ DBusProxyAsyncSignalMemberCallbackHandler::FunctionType myFunc = std::bind(
+ &DBusProxy::signalMemberCallback,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2,
+ std::placeholders::_3,
+ std::placeholders::_4);
+ connection_->sendDBusMessageWithReplyAsync(
+ message,
+ DBusProxyAsyncSignalMemberCallbackHandler::create(myFunc, std::get<5>(*signalMemberHandlerIterator), 0),
+ &signalMemberHandlerInfo_);
+ }
+ }
+ } else {
+ std::lock_guard < std::mutex > queueLock(signalMemberHandlerQueueMutex_);
+
+ for(auto signalMemberHandlerIterator = signalMemberHandlerQueue_.begin();
+ signalMemberHandlerIterator != signalMemberHandlerQueue_.end();
+ signalMemberHandlerIterator++) {
+
+ if (std::get<7>(*signalMemberHandlerIterator)) {
+ DBusProxyConnection::DBusSignalHandlerToken signalHandlerToken (
+ std::get<0>(*signalMemberHandlerIterator),
+ std::get<1>(*signalMemberHandlerIterator),
+ std::get<2>(*signalMemberHandlerIterator),
+ std::get<3>(*signalMemberHandlerIterator));
+ connection_->removeSignalMemberHandler(signalHandlerToken, std::get<5>(*signalMemberHandlerIterator));
+ std::get<7>(*signalMemberHandlerIterator) = false;
+ }
+ }
+ }
+ }
availabilityCondition_.notify_one();
}
@@ -96,5 +173,151 @@ void DBusProxy::unsubscribeFromSelectiveBroadcast(const std::string& eventName,
getDBusConnection()->unsubscribeFromSelectiveBroadcast(eventName, subscription, this, dbusSignalHandler);
}
+DBusProxyConnection::DBusSignalHandlerToken DBusProxy::addSignalMemberHandler(
+ const std::string& objectPath,
+ const std::string& interfaceName,
+ const std::string& signalName,
+ const std::string& signalSignature,
+ DBusProxyConnection::DBusSignalHandler* dbusSignalHandler,
+ const bool justAddFilter) {
+ return DBusProxyBase::addSignalMemberHandler(
+ objectPath,
+ interfaceName,
+ signalName,
+ signalSignature,
+ dbusSignalHandler,
+ justAddFilter);
+}
+
+DBusProxyConnection::DBusSignalHandlerToken DBusProxy::addSignalMemberHandler(
+ const std::string &objectPath,
+ const std::string &interfaceName,
+ const std::string &signalName,
+ const std::string &signalSignature,
+ const std::string &getMethodName,
+ DBusProxyConnection::DBusSignalHandler *dbusSignalHandler,
+ const bool justAddFilter) {
+
+ DBusProxyConnection::DBusSignalHandlerToken signalHandlerToken (
+ objectPath,
+ interfaceName,
+ signalName,
+ signalSignature);
+
+ if (getMethodName != "") {
+
+ SignalMemberHandlerTuple signalMemberHandler(
+ objectPath,
+ interfaceName,
+ signalName,
+ signalSignature,
+ getMethodName,
+ dbusSignalHandler,
+ justAddFilter,
+ false);
+
+ availabilityStatusMutex_.lock();
+ if (availabilityStatus_ == AvailabilityStatus::AVAILABLE) {
+ availabilityStatusMutex_.unlock();
+ signalHandlerToken = connection_->addSignalMemberHandler(
+ objectPath,
+ interfaceName,
+ signalName,
+ signalSignature,
+ dbusSignalHandler,
+ justAddFilter);
+ std::get<7>(signalMemberHandler) = true;
+ } else {
+ availabilityStatusMutex_.unlock();
+ }
+ addSignalMemberHandlerToQueue(signalMemberHandler);
+ } else {
+ signalHandlerToken = connection_->addSignalMemberHandler(
+ objectPath,
+ interfaceName,
+ signalName,
+ signalSignature,
+ dbusSignalHandler,
+ justAddFilter);
+ }
+
+ return signalHandlerToken;
+}
+
+void DBusProxy::addSignalMemberHandlerToQueue(SignalMemberHandlerTuple& _signalMemberHandler) {
+
+ std::lock_guard < std::mutex > queueLock(signalMemberHandlerQueueMutex_);
+ bool found = false;
+
+ for(auto signalMemberHandlerIterator = signalMemberHandlerQueue_.begin();
+ signalMemberHandlerIterator != signalMemberHandlerQueue_.end();
+ signalMemberHandlerIterator++) {
+
+ if ( (std::get<0>(*signalMemberHandlerIterator) == std::get<0>(_signalMemberHandler)) &&
+ (std::get<1>(*signalMemberHandlerIterator) == std::get<1>(_signalMemberHandler)) &&
+ (std::get<2>(*signalMemberHandlerIterator) == std::get<2>(_signalMemberHandler)) &&
+ (std::get<3>(*signalMemberHandlerIterator) == std::get<3>(_signalMemberHandler))) {
+
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ signalMemberHandlerQueue_.push_back(_signalMemberHandler);
+ }
+}
+
+bool DBusProxy::removeSignalMemberHandler(
+ const DBusProxyConnection::DBusSignalHandlerToken &_dbusSignalHandlerToken,
+ const DBusProxyConnection::DBusSignalHandler *_dbusSignalHandler) {
+
+ {
+ std::lock_guard < std::mutex > queueLock(signalMemberHandlerQueueMutex_);
+ for(auto signalMemberHandlerIterator = signalMemberHandlerQueue_.begin();
+ signalMemberHandlerIterator != signalMemberHandlerQueue_.end();
+ signalMemberHandlerIterator++) {
+
+ if ( (std::get<0>(*signalMemberHandlerIterator) == std::get<0>(_dbusSignalHandlerToken)) &&
+ (std::get<1>(*signalMemberHandlerIterator) == std::get<1>(_dbusSignalHandlerToken)) &&
+ (std::get<2>(*signalMemberHandlerIterator) == std::get<2>(_dbusSignalHandlerToken)) &&
+ (std::get<3>(*signalMemberHandlerIterator) == std::get<3>(_dbusSignalHandlerToken))) {
+ signalMemberHandlerIterator = signalMemberHandlerQueue_.erase(signalMemberHandlerIterator);
+
+ if (signalMemberHandlerIterator == signalMemberHandlerQueue_.end()) {
+ break;
+ }
+ }
+ }
+ }
+
+ return connection_->removeSignalMemberHandler(_dbusSignalHandlerToken, _dbusSignalHandler);
+}
+
+void DBusProxy::getCurrentValueForSignalListener(
+ const std::string &getMethodName,
+ DBusProxyConnection::DBusSignalHandler *dbusSignalHandler,
+ const uint32_t subscription) {
+
+ availabilityStatusMutex_.lock();
+ if (availabilityStatus_ == AvailabilityStatus::AVAILABLE) {
+ availabilityStatusMutex_.unlock();
+
+ DBusMessage message = createMethodCall(getMethodName, "");
+
+ DBusProxyAsyncSignalMemberCallbackHandler::FunctionType myFunc = std::bind(&DBusProxy::signalInitialValueCallback,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2,
+ std::placeholders::_3,
+ std::placeholders::_4);
+ connection_->sendDBusMessageWithReplyAsync(
+ message,
+ DBusProxyAsyncSignalMemberCallbackHandler::create(myFunc, dbusSignalHandler, subscription),
+ &signalMemberHandlerInfo_);
+ } else {
+ availabilityStatusMutex_.unlock();
+ }
+}
+
} // namespace DBus
} // namespace CommonAPI