summaryrefslogtreecommitdiff
path: root/src/CommonAPI/DBus/DBusConnection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/CommonAPI/DBus/DBusConnection.cpp')
-rw-r--r--src/CommonAPI/DBus/DBusConnection.cpp380
1 files changed, 201 insertions, 179 deletions
diff --git a/src/CommonAPI/DBus/DBusConnection.cpp b/src/CommonAPI/DBus/DBusConnection.cpp
index cb4f8aa..e3fb4a4 100644
--- a/src/CommonAPI/DBus/DBusConnection.cpp
+++ b/src/CommonAPI/DBus/DBusConnection.cpp
@@ -16,6 +16,7 @@
#include <CommonAPI/DBus/DBusProxy.hpp>
#include <CommonAPI/DBus/DBusAddressTranslator.hpp>
+
namespace CommonAPI {
namespace DBus {
@@ -23,7 +24,7 @@ DBusConnectionStatusEvent::DBusConnectionStatusEvent(DBusConnection* dbusConnect
dbusConnection_(dbusConnection) {
}
-void DBusConnectionStatusEvent::onListenerAdded(const Listener& listener) {
+void DBusConnectionStatusEvent::onListenerAdded(const Listener& listener, const Subscription subscription) {
if (dbusConnection_->isConnected())
listener(AvailabilityStatus::AVAILABLE);
}
@@ -43,18 +44,18 @@ const DBusObjectPathVTable* DBusConnection::getDBusObjectPathVTable() {
const int32_t ownUseCount = 2;
void DBusConnection::dispatch() {
- std::shared_ptr<DBusConnection> selfReference = this->shared_from_this();
- while (!stopDispatching_ && readWriteDispatch(10) && selfReference.use_count() > ownUseCount) {
- if (pauseDispatching_) {
- dispatchSuspendLock_.lock();
- dispatchSuspendLock_.unlock();
- }
- }
+ std::shared_ptr<DBusConnection> selfReference = this->shared_from_this();
+ while (!stopDispatching_ && readWriteDispatch(10) && selfReference.use_count() > ownUseCount) {
+ if (pauseDispatching_) {
+ dispatchSuspendLock_.lock();
+ dispatchSuspendLock_.unlock();
+ }
+ }
}
bool DBusConnection::readWriteDispatch(int _timeout) {
if(isConnected()) {
- return 0 != dbus_connection_read_write_dispatch(connection_, _timeout);
+ return 0 != dbus_connection_read_write_dispatch(connection_, _timeout);
}
return false;
}
@@ -76,13 +77,13 @@ DBusConnection::DBusConnection(DBusType_t busType) :
watchContext_(NULL),
pauseDispatching_(false),
connection_(NULL),
- busType_(DBusAddressTranslator::get()->getDBusBusType()),
+ busType_(busType),
dbusConnectionStatusEvent_(this),
libdbusSignalMatchRulesCount_(0),
dbusObjectMessageHandler_(),
connectionNameCount_(),
enforcerThread_(NULL),
- enforcerThreadCancelled_(false) {
+ enforcerThreadCancelled_(false) {
dbus_threads_init_default();
}
@@ -99,8 +100,8 @@ DBusConnection::DBusConnection(::DBusConnection *_connection) :
libdbusSignalMatchRulesCount_(0),
dbusObjectMessageHandler_(),
connectionNameCount_(),
- enforcerThread_(NULL),
- enforcerThreadCancelled_(false) {
+ enforcerThread_(NULL),
+ enforcerThreadCancelled_(false) {
dbus_threads_init_default();
}
@@ -124,27 +125,27 @@ DBusConnection::~DBusConnection() {
}
// ensure, the registry survives until disconnecting is done...
- //std::shared_ptr<DBusServiceRegistry> itsRegistry = DBusServiceRegistry::get(shared_from_this());
- disconnect();
+ //std::shared_ptr<DBusServiceRegistry> itsRegistry = DBusServiceRegistry::get(shared_from_this());
+ disconnect();
//Assert that the enforcerThread_ is in a position to finish itself correctly even after destruction
//of the DBusConnection. Also assert all resources are cleaned up.
- auto it = timeoutMap_.begin();
- while (it != timeoutMap_.end()) {
- DBusPendingCall* libdbusPendingCall = it->first;
+ auto it = timeoutMap_.begin();
+ while (it != timeoutMap_.end()) {
+ DBusPendingCall* libdbusPendingCall = it->first;
- if (!dbus_pending_call_get_completed(libdbusPendingCall)) {
- dbus_pending_call_cancel(libdbusPendingCall);
- DBusMessageReplyAsyncHandler* asyncHandler = std::get<1>(it->second);
- DBusMessage& dbusMessageCall = std::get<2>(it->second);
+ if (!dbus_pending_call_get_completed(libdbusPendingCall)) {
+ dbus_pending_call_cancel(libdbusPendingCall);
+ DBusMessageReplyAsyncHandler* asyncHandler = std::get<1>(it->second);
+ DBusMessage& dbusMessageCall = std::get<2>(it->second);
- asyncHandler->onDBusMessageReply(CallStatus::REMOTE_ERROR, dbusMessageCall.createMethodError(DBUS_ERROR_TIMEOUT));
- delete asyncHandler;
+ asyncHandler->onDBusMessageReply(CallStatus::REMOTE_ERROR, dbusMessageCall.createMethodError(DBUS_ERROR_TIMEOUT));
+ delete asyncHandler;
- }
- it = timeoutMap_.erase(it);
- dbus_pending_call_unref(libdbusPendingCall);
- }
+ }
+ it = timeoutMap_.erase(it);
+ dbus_pending_call_unref(libdbusPendingCall);
+ }
}
@@ -162,7 +163,7 @@ bool DBusConnection::attachMainLoopContext(std::weak_ptr<MainLoopContext> mainLo
&mainLoopContext_,
NULL);
- bool success = 0 != dbus_connection_set_watch_functions(
+ bool success = 0 != dbus_connection_set_watch_functions(
connection_,
&DBusConnection::onAddWatch,
&DBusConnection::onRemoveWatch,
@@ -174,7 +175,7 @@ bool DBusConnection::attachMainLoopContext(std::weak_ptr<MainLoopContext> mainLo
return false;
}
- success = 0 != dbus_connection_set_timeout_functions(
+ success = 0 != dbus_connection_set_timeout_functions(
connection_,
&DBusConnection::onAddTimeout,
&DBusConnection::onRemoveTimeout,
@@ -289,15 +290,15 @@ bool DBusConnection::connect(DBusError &dbusError, bool startDispatchThread) {
connection_ = dbus_bus_get_private(libdbusType, &dbusError.libdbusError_);
if (dbusError) {
- #ifdef _MSC_VER
- COMMONAPI_ERROR(std::string(__FUNCTION__) +
- ": Name: " + dbusError.getName() +
- " Message: " + dbusError.getMessage())
- #else
- COMMONAPI_ERROR(std::string(__PRETTY_FUNCTION__) +
- ": Name: " + dbusError.getName() +
- " Message: " + dbusError.getMessage())
- #endif
+ #ifdef _MSC_VER
+ COMMONAPI_ERROR(std::string(__FUNCTION__) +
+ ": Name: " + dbusError.getName() +
+ " Message: " + dbusError.getMessage())
+ #else
+ COMMONAPI_ERROR(std::string(__PRETTY_FUNCTION__) +
+ ": Name: " + dbusError.getName() +
+ " Message: " + dbusError.getMessage())
+ #endif
return false;
}
@@ -310,11 +311,11 @@ bool DBusConnection::connect(DBusError &dbusError, bool startDispatchThread) {
stopDispatching_ = !startDispatchThread;
if (startDispatchThread) {
- dispatchThread_ = new std::thread(std::bind(&DBusConnection::dispatch, this->shared_from_this()));
+ dispatchThread_ = new std::thread(std::bind(&DBusConnection::dispatch, this->shared_from_this()));
}
- enforcerThread_ = std::make_shared<std::thread>(
- std::bind(&DBusConnection::enforceAsynchronousTimeouts, shared_from_this()));
+ enforcerThread_ = std::make_shared<std::thread>(
+ std::bind(&DBusConnection::enforceAsynchronousTimeouts, shared_from_this()));
dbusConnectionStatusEvent_.notifyListeners(AvailabilityStatus::AVAILABLE);
@@ -352,7 +353,7 @@ void DBusConnection::disconnect() {
enforcerThreadCancelled_ = true;
enforceTimeoutCondition_.notify_one();
if (enforcerThread_->joinable()) {
- enforcerThread_->join();
+ enforcerThread_->join();
}
enforcerThreadCancelled_ = false;
@@ -388,7 +389,7 @@ bool DBusConnection::requestServiceNameAndBlock(const std::string& serviceName)
std::lock_guard<std::mutex> dbusConnectionLock(connectionGuard_);
auto conIter = connectionNameCount_.find(serviceName);
if (conIter == connectionNameCount_.end()) {
- suspendDispatching();
+ suspendDispatching();
const int libdbusStatus = dbus_bus_request_name(connection_,
serviceName.c_str(),
@@ -400,17 +401,17 @@ bool DBusConnection::requestServiceNameAndBlock(const std::string& serviceName)
isServiceNameAcquired = (libdbusStatus == DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER);
if (isServiceNameAcquired) {
connectionNameCount_.insert( { serviceName, (uint16_t)1 } );
- }
- else {
- #ifdef _MSC_VER // Visual Studio
- COMMONAPI_ERROR(std::string(__FUNCTION__) +
- ": Name: " + dbusError.getName() +
- " Message: " + dbusError.getMessage())
- #else
+ }
+ else {
+ #ifdef _MSC_VER // Visual Studio
+ COMMONAPI_ERROR(std::string(__FUNCTION__) +
+ ": Name: " + dbusError.getName() +
+ " Message: " + dbusError.getMessage())
+ #else
COMMONAPI_ERROR(std::string(__PRETTY_FUNCTION__) +
": Name: " + dbusError.getName() +
" Message: " + dbusError.getMessage())
- #endif
+ #endif
}
} else {
conIter->second = conIter->second + 1;
@@ -427,7 +428,7 @@ bool DBusConnection::releaseServiceName(const std::string& serviceName) const {
auto conIter = connectionNameCount_.find(serviceName);
if (conIter != connectionNameCount_.end()) {
if (conIter->second == 1) {
- suspendDispatching();
+ suspendDispatching();
const int libdbusStatus = dbus_bus_release_name(connection_,
serviceName.c_str(),
&dbusError.libdbusError_);
@@ -449,29 +450,40 @@ bool DBusConnection::sendDBusMessage(const DBusMessage &_message) const {
assert(isConnected());
dbus_uint32_t dbusSerial;
- bool result = 0 != dbus_connection_send(connection_, _message.message_, &dbusSerial);
+ bool result = 0 != dbus_connection_send(connection_, _message.message_, &dbusSerial);
return result;
}
-void DBusConnection::onLibdbusPendingCallNotifyThunk(::DBusPendingCall* libdbusPendingCall, void *userData) {
- assert(userData);
- assert(libdbusPendingCall);
+DBusMessage DBusConnection::convertToDBusMessage(::DBusPendingCall* _libdbusPendingCall,
+ CallStatus& _callStatus) {
+ assert(_libdbusPendingCall);
- auto dbusMessageReplyAsyncHandler = reinterpret_cast<DBusMessageReplyAsyncHandler*>(userData);
+ ::DBusMessage* libdbusMessage = dbus_pending_call_steal_reply(_libdbusPendingCall);
+ const bool increaseLibdbusMessageReferenceCount = false;
+ DBusMessage dbusMessage(libdbusMessage, increaseLibdbusMessageReferenceCount);
+ _callStatus = CallStatus::SUCCESS;
- ::DBusMessage* libdbusMessage = dbus_pending_call_steal_reply(libdbusPendingCall);
- const bool increaseLibdbusMessageReferenceCount = false;
- DBusMessage dbusMessage(libdbusMessage, increaseLibdbusMessageReferenceCount);
- CallStatus callStatus = CallStatus::SUCCESS;
+ if (!dbusMessage.isMethodReturnType()) {
+ _callStatus = CallStatus::REMOTE_ERROR;
+ }
- if (!dbusMessage.isMethodReturnType()) {
- callStatus = CallStatus::REMOTE_ERROR;
- }
+ return dbusMessage;
+}
+
+void DBusConnection::onLibdbusPendingCallNotifyThunk(::DBusPendingCall* _libdbusPendingCall, void *_userData) {
+ assert(_userData);
+ assert(_libdbusPendingCall);
+
+ auto dbusMessageReplyAsyncHandler = reinterpret_cast<DBusMessageReplyAsyncHandler*>(_userData);
+
+ DBusMessage dbusMessage;
+ CallStatus callStatus;
+ dbusMessage = DBusConnection::convertToDBusMessage(_libdbusPendingCall, callStatus);
dbusMessageReplyAsyncHandler->onDBusMessageReply(callStatus, dbusMessage);
// libdbus calls the cleanup method below
- dbus_pending_call_unref(libdbusPendingCall);
+ dbus_pending_call_unref(_libdbusPendingCall);
}
void DBusConnection::onLibdbusDataCleanup(void* userData) {
@@ -479,74 +491,77 @@ void DBusConnection::onLibdbusDataCleanup(void* userData) {
delete dbusMessageReplyAsyncHandler;
}
-
//Would not be needed if libdbus would actually handle its timeouts for pending calls.
void DBusConnection::enforceAsynchronousTimeouts() const {
- std::unique_lock<std::mutex> itsLock(enforcerThreadMutex_);
+ std::unique_lock<std::mutex> itsLock(enforcerThreadMutex_);
- while (!enforcerThreadCancelled_) {
+ while (!enforcerThreadCancelled_) {
enforceTimeoutMutex_.lock();
int timeout = std::numeric_limits<int>::max(); // not really, but nearly "forever"
if (timeoutMap_.size() > 0) {
- auto minTimeoutElement = std::min_element(timeoutMap_.begin(), timeoutMap_.end(),
- [] (const TimeoutMapElement& lhs, const TimeoutMapElement& rhs) {
- return std::get<0>(lhs.second) < std::get<0>(rhs.second);
- });
+ auto minTimeoutElement = std::min_element(timeoutMap_.begin(), timeoutMap_.end(),
+ [] (const TimeoutMapElement& lhs, const TimeoutMapElement& rhs) {
+ return std::get<0>(lhs.second) < std::get<0>(rhs.second);
+ });
- auto minTimeout = std::get<0>(minTimeoutElement->second);
+ auto minTimeout = std::get<0>(minTimeoutElement->second);
- std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now();
+ std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now();
- timeout = (int)std::chrono::duration_cast<std::chrono::milliseconds>(minTimeout - now).count();
+ timeout = (int)std::chrono::duration_cast<std::chrono::milliseconds>(minTimeout - now).count();
}
enforceTimeoutMutex_.unlock();
if (std::cv_status::timeout ==
- enforceTimeoutCondition_.wait_for(itsLock, std::chrono::milliseconds(timeout))) {
-
- //Do not access members if the DBusConnection was destroyed during the unlocked phase.
- enforceTimeoutMutex_.lock();
- auto it = timeoutMap_.begin();
- while (it != timeoutMap_.end()) {
- std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now();
-
- if (now > std::get<0>(it->second)) {
- DBusPendingCall* libdbusPendingCall = it->first;
-
- if (!dbus_pending_call_get_completed(libdbusPendingCall)) {
- dbus_pending_call_cancel(libdbusPendingCall);
- DBusMessageReplyAsyncHandler* asyncHandler = std::get<1>(it->second);
- DBusMessage& dbusMessageCall = std::get<2>(it->second);
-
- if (mainLoopContext_.lock()) {
- mainloopTimeouts_.push_back(std::make_pair(asyncHandler, dbusMessageCall));
- } else {
- enforceTimeoutMutex_.unlock(); // unlock before making callbacks to application to avoid deadlocks
- asyncHandler->onDBusMessageReply(CallStatus::REMOTE_ERROR, dbusMessageCall.createMethodError(DBUS_ERROR_TIMEOUT));
- enforceTimeoutMutex_.lock();
- delete asyncHandler;
- }
- }
- it = timeoutMap_.erase(it);
-
- //This unref MIGHT cause the destruction of the last callback object that references the DBusConnection.
- //So after this unref has been called, it has to be ensured that continuation of the loop is an option.
- dbus_pending_call_unref(libdbusPendingCall);
- } else {
- ++it;
- }
- }
- enforceTimeoutMutex_.unlock();
- }
+ enforceTimeoutCondition_.wait_for(itsLock, std::chrono::milliseconds(timeout))) {
+
+ //Do not access members if the DBusConnection was destroyed during the unlocked phase.
+ enforceTimeoutMutex_.lock();
+ auto it = timeoutMap_.begin();
+ while (it != timeoutMap_.end()) {
+ std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now();
+
+ if (now > std::get<0>(it->second)) {
+ DBusPendingCall* libdbusPendingCall = it->first;
+
+ if (!dbus_pending_call_get_completed(libdbusPendingCall)) {
+ dbus_pending_call_cancel(libdbusPendingCall);
+ DBusMessageReplyAsyncHandler* asyncHandler = std::get<1>(it->second);
+ DBusMessage& dbusMessageCall = std::get<2>(it->second);
+
+ if (mainLoopContext_.lock()) {
+ mainloopTimeouts_.push_back(std::make_tuple(asyncHandler,
+ dbusMessageCall.createMethodError(DBUS_ERROR_TIMEOUT),
+ CallStatus::REMOTE_ERROR,
+ nullptr));
+ mainLoopContext_.lock()->wakeup();
+ } else {
+ enforceTimeoutMutex_.unlock(); // unlock before making callbacks to application to avoid deadlocks
+ asyncHandler->onDBusMessageReply(CallStatus::REMOTE_ERROR, dbusMessageCall.createMethodError(DBUS_ERROR_TIMEOUT));
+ enforceTimeoutMutex_.lock();
+ delete asyncHandler;
+ }
+ }
+ it = timeoutMap_.erase(it);
+
+ //This unref MIGHT cause the destruction of the last callback object that references the DBusConnection.
+ //So after this unref has been called, it has to be ensured that continuation of the loop is an option.
+ dbus_pending_call_unref(libdbusPendingCall);
+ } else {
+ ++it;
+ }
+ }
+ enforceTimeoutMutex_.unlock();
+ }
}
}
std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync(
const DBusMessage& dbusMessage,
std::unique_ptr<DBusMessageReplyAsyncHandler> dbusMessageReplyAsyncHandler,
- const CommonAPI::CallInfo *_info) const {
+ const CommonAPI::CallInfo *_info) const {
assert(dbusMessage);
assert(isConnected());
@@ -555,56 +570,47 @@ std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync(
dbus_bool_t libdbusSuccess;
suspendDispatching();
- libdbusSuccess = dbus_connection_send_with_reply(connection_,
- dbusMessage.message_,
- &libdbusPendingCall,
- _info->timeout_);
+ libdbusSuccess = dbus_connection_send_with_reply_set_notify(connection_,
+ dbusMessage.message_,
+ &libdbusPendingCall,
+ onLibdbusPendingCallNotifyThunk,
+ dbusMessageReplyAsyncHandler.get(),
+ onLibdbusDataCleanup,
+ _info->timeout_);
if (_info->sender_ != 0) {
- COMMONAPI_DEBUG("Message sent: SenderID: ", _info->sender_, " - Serial number: ", dbusMessage.getSerial());
+ COMMONAPI_DEBUG("Message sent: SenderID: ", _info->sender_, " - Serial number: ", dbusMessage.getSerial());
}
if (!libdbusSuccess || !libdbusPendingCall) {
- dbusMessageReplyAsyncHandler->onDBusMessageReply(CallStatus::CONNECTION_FAILED, dbusMessage.createMethodError(DBUS_ERROR_DISCONNECTED));
- resumeDispatching();
- return dbusMessageReplyAsyncHandler->getFuture();
+ #ifdef _MSC_VER // Visual Studio
+ COMMONAPI_ERROR(std::string(__FUNCTION__) +
+ ": (!libdbusSuccess || !libdbusPendingCall) == true")
+ #else
+ COMMONAPI_ERROR(std::string(__PRETTY_FUNCTION__) +
+ ": (!libdbusSuccess || !libdbusPendingCall) == true")
+ #endif
+ if (libdbusPendingCall) {
+ dbus_pending_call_unref(libdbusPendingCall);
+ }
+ dbusMessageReplyAsyncHandler->onDBusMessageReply(CallStatus::CONNECTION_FAILED, dbusMessage.createMethodError(DBUS_ERROR_DISCONNECTED));
+ resumeDispatching();
+ return dbusMessageReplyAsyncHandler->getFuture();
}
- sendLock_.lock();
- if (dbus_pending_call_get_completed (libdbusPendingCall)) {
- onLibdbusPendingCallNotifyThunk(libdbusPendingCall, dbusMessageReplyAsyncHandler.get());
- onLibdbusDataCleanup(dbusMessageReplyAsyncHandler.get());
-
- } else {
- libdbusSuccess = dbus_pending_call_set_notify(
- libdbusPendingCall,
- onLibdbusPendingCallNotifyThunk,
- dbusMessageReplyAsyncHandler.get(),
- onLibdbusDataCleanup);
-
- if (!libdbusSuccess) {
- dbusMessageReplyAsyncHandler->onDBusMessageReply(CallStatus::OUT_OF_MEMORY, dbusMessage);
- dbus_pending_call_unref(libdbusPendingCall);
- resumeDispatching();
- sendLock_.unlock();
- return dbusMessageReplyAsyncHandler->getFuture();
- }
- }
- sendLock_.unlock();
-
DBusMessageReplyAsyncHandler* replyAsyncHandler = dbusMessageReplyAsyncHandler.release();
if (_info->timeout_ != DBUS_TIMEOUT_INFINITE) {
dbus_pending_call_ref(libdbusPendingCall);
auto timeoutPoint = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(_info->timeout_);
std::tuple<
- std::chrono::time_point<std::chrono::high_resolution_clock>,
- DBusMessageReplyAsyncHandler*,
- DBusMessage> toInsert {
- timeoutPoint,
- replyAsyncHandler,
- dbusMessage
- };
+ std::chrono::time_point<std::chrono::high_resolution_clock>,
+ DBusMessageReplyAsyncHandler*,
+ DBusMessage> toInsert {
+ timeoutPoint,
+ replyAsyncHandler,
+ dbusMessage
+ };
enforceTimeoutMutex_.lock();
timeoutMap_.insert( { libdbusPendingCall, toInsert } );
@@ -622,7 +628,7 @@ std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync(
DBusMessage DBusConnection::sendDBusMessageWithReplyAndBlock(const DBusMessage& dbusMessage,
DBusError& dbusError,
- const CommonAPI::CallInfo *_info) const {
+ const CommonAPI::CallInfo *_info) const {
assert(dbusMessage);
assert(!dbusError);
assert(isConnected());
@@ -635,7 +641,7 @@ DBusMessage DBusConnection::sendDBusMessageWithReplyAndBlock(const DBusMessage&
&dbusError.libdbusError_);
if (_info->sender_ != 0) {
- COMMONAPI_DEBUG("Message sent: SenderID: ", _info->sender_, " - Serial number: ", dbusMessage.getSerial());
+ COMMONAPI_DEBUG("Message sent: SenderID: ", _info->sender_, " - Serial number: ", dbusMessage.getSerial());
}
resumeDispatching();
@@ -650,18 +656,25 @@ DBusMessage DBusConnection::sendDBusMessageWithReplyAndBlock(const DBusMessage&
bool DBusConnection::singleDispatch() {
- for (auto t : mainloopTimeouts_) {
- t.first->onDBusMessageReply(CallStatus::REMOTE_ERROR, t.second.createMethodError(DBUS_ERROR_TIMEOUT));
- delete t.first;
- }
- mainloopTimeouts_.clear();
+ for (auto t : mainloopTimeouts_) {
+ std::get<0>(t)->onDBusMessageReply(std::get<2>(t), std::get<1>(t));
+ if (std::get<3>(t) != nullptr) {
+ dbus_pending_call_unref(std::get<3>(t));
+ }
+ delete std::get<0>(t);
+ }
+ mainloopTimeouts_.clear();
return (dbus_connection_dispatch(connection_) == DBUS_DISPATCH_DATA_REMAINS);
}
bool DBusConnection::isDispatchReady() {
return (dbus_connection_get_dispatch_status(connection_) == DBUS_DISPATCH_DATA_REMAINS ||
- !mainloopTimeouts_.empty());
+ !mainloopTimeouts_.empty());
+}
+
+bool DBusConnection::hasDispatchThread() {
+ return (dispatchThread_ != NULL);
}
DBusProxyConnection::DBusSignalHandlerToken DBusConnection::subscribeForSelectiveBroadcast(
@@ -685,13 +698,13 @@ DBusProxyConnection::DBusSignalHandlerToken DBusConnection::subscribeForSelectiv
DBusProxyConnection::DBusSignalHandlerToken subscriptionToken;
if (callStatus == CommonAPI::CallStatus::SUCCESS && subscriptionAccepted) {
subscriptionToken = addSignalMemberHandler(
- objectPath,
- interfaceName,
- interfaceMemberName,
- interfaceMemberSignature,
- dbusSignalHandler,
- true
- );
+ objectPath,
+ interfaceName,
+ interfaceMemberName,
+ interfaceMemberSignature,
+ dbusSignalHandler,
+ true
+ );
subscriptionAccepted = true;
}
@@ -735,7 +748,7 @@ DBusProxyConnection::DBusSignalHandlerToken DBusConnection::addSignalMemberHandl
handlerList.insert(dbusSignalHandler);
dbusSignalHandlerTable_.insert( {
- dbusSignalHandlerPath,
+ dbusSignalHandlerPath,
std::make_pair(std::make_shared<std::recursive_mutex>(), std::move(handlerList))
} );
} else {
@@ -763,6 +776,14 @@ bool DBusConnection::removeSignalMemberHandler(const DBusSignalHandlerToken &dbu
}
signalEntry->second.first->unlock();
}
+
+ if (lastHandlerRemoved) {
+ dbusSignalHandlerTable_.erase(signalEntry);
+ removeLibdbusSignalMatchRule(std::get<0>(dbusSignalHandlerToken),
+ std::get<1>(dbusSignalHandlerToken),
+ std::get<2>(dbusSignalHandlerToken));
+ }
+
return lastHandlerRemoved;
}
@@ -874,12 +895,12 @@ bool DBusConnection::addLibdbusSignalMatchRule(const std::string& dbusMatchRule)
// add the libdbus message signal filter
if (!libdbusSignalMatchRulesCount_) {
- libdbusSuccess = 0 != dbus_connection_add_filter(
- connection_,
+ libdbusSuccess = 0 != dbus_connection_add_filter(
+ connection_,
&onLibdbusSignalFilterThunk,
this,
NULL
- );
+ );
}
// finally add the match rule
@@ -912,10 +933,10 @@ bool DBusConnection::removeLibdbusSignalMatchRule(const std::string& dbusMatchRu
dbus_bus_remove_match(connection_, dbusMatchRule.c_str(), NULL);
- libdbusSignalMatchRulesCount_--;
- if (libdbusSignalMatchRulesCount_ == 0) {
- dbus_connection_remove_filter(connection_, &onLibdbusSignalFilterThunk, this);
- }
+ libdbusSignalMatchRulesCount_--;
+ if (libdbusSignalMatchRulesCount_ == 0) {
+ dbus_connection_remove_filter(connection_, &onLibdbusSignalFilterThunk, this);
+ }
resumeDispatching();
@@ -968,7 +989,7 @@ void DBusConnection::unregisterObjectPath(const std::string& objectPath) {
if (isConnected()) {
dbus_bool_t libdbusSuccess
- = dbus_connection_unregister_object_path(connection_, objectPath.c_str());
+ = dbus_connection_unregister_object_path(connection_, objectPath.c_str());
assert(libdbusSuccess);
}
}
@@ -1007,14 +1028,15 @@ void DBusConnection::addLibdbusSignalMatchRule(const std::string& objectPath,
if (isConnected()) {
bool libdbusSuccess = true;
suspendDispatching();
+
// add the libdbus message signal filter
if (isFirstMatchRule) {
- libdbusSuccess = 0 != dbus_connection_add_filter(
- connection_,
- &onLibdbusSignalFilterThunk,
- this,
- NULL);
+ libdbusSuccess = 0 != dbus_connection_add_filter(
+ connection_,
+ &onLibdbusSignalFilterThunk,
+ this,
+ NULL);
assert(libdbusSuccess);
}