diff options
Diffstat (limited to 'src/CommonAPI/DBus/DBusConnection.cpp')
-rw-r--r-- | src/CommonAPI/DBus/DBusConnection.cpp | 380 |
1 files changed, 201 insertions, 179 deletions
diff --git a/src/CommonAPI/DBus/DBusConnection.cpp b/src/CommonAPI/DBus/DBusConnection.cpp index cb4f8aa..e3fb4a4 100644 --- a/src/CommonAPI/DBus/DBusConnection.cpp +++ b/src/CommonAPI/DBus/DBusConnection.cpp @@ -16,6 +16,7 @@ #include <CommonAPI/DBus/DBusProxy.hpp> #include <CommonAPI/DBus/DBusAddressTranslator.hpp> + namespace CommonAPI { namespace DBus { @@ -23,7 +24,7 @@ DBusConnectionStatusEvent::DBusConnectionStatusEvent(DBusConnection* dbusConnect dbusConnection_(dbusConnection) { } -void DBusConnectionStatusEvent::onListenerAdded(const Listener& listener) { +void DBusConnectionStatusEvent::onListenerAdded(const Listener& listener, const Subscription subscription) { if (dbusConnection_->isConnected()) listener(AvailabilityStatus::AVAILABLE); } @@ -43,18 +44,18 @@ const DBusObjectPathVTable* DBusConnection::getDBusObjectPathVTable() { const int32_t ownUseCount = 2; void DBusConnection::dispatch() { - std::shared_ptr<DBusConnection> selfReference = this->shared_from_this(); - while (!stopDispatching_ && readWriteDispatch(10) && selfReference.use_count() > ownUseCount) { - if (pauseDispatching_) { - dispatchSuspendLock_.lock(); - dispatchSuspendLock_.unlock(); - } - } + std::shared_ptr<DBusConnection> selfReference = this->shared_from_this(); + while (!stopDispatching_ && readWriteDispatch(10) && selfReference.use_count() > ownUseCount) { + if (pauseDispatching_) { + dispatchSuspendLock_.lock(); + dispatchSuspendLock_.unlock(); + } + } } bool DBusConnection::readWriteDispatch(int _timeout) { if(isConnected()) { - return 0 != dbus_connection_read_write_dispatch(connection_, _timeout); + return 0 != dbus_connection_read_write_dispatch(connection_, _timeout); } return false; } @@ -76,13 +77,13 @@ DBusConnection::DBusConnection(DBusType_t busType) : watchContext_(NULL), pauseDispatching_(false), connection_(NULL), - busType_(DBusAddressTranslator::get()->getDBusBusType()), + busType_(busType), dbusConnectionStatusEvent_(this), libdbusSignalMatchRulesCount_(0), dbusObjectMessageHandler_(), connectionNameCount_(), enforcerThread_(NULL), - enforcerThreadCancelled_(false) { + enforcerThreadCancelled_(false) { dbus_threads_init_default(); } @@ -99,8 +100,8 @@ DBusConnection::DBusConnection(::DBusConnection *_connection) : libdbusSignalMatchRulesCount_(0), dbusObjectMessageHandler_(), connectionNameCount_(), - enforcerThread_(NULL), - enforcerThreadCancelled_(false) { + enforcerThread_(NULL), + enforcerThreadCancelled_(false) { dbus_threads_init_default(); } @@ -124,27 +125,27 @@ DBusConnection::~DBusConnection() { } // ensure, the registry survives until disconnecting is done... - //std::shared_ptr<DBusServiceRegistry> itsRegistry = DBusServiceRegistry::get(shared_from_this()); - disconnect(); + //std::shared_ptr<DBusServiceRegistry> itsRegistry = DBusServiceRegistry::get(shared_from_this()); + disconnect(); //Assert that the enforcerThread_ is in a position to finish itself correctly even after destruction //of the DBusConnection. Also assert all resources are cleaned up. - auto it = timeoutMap_.begin(); - while (it != timeoutMap_.end()) { - DBusPendingCall* libdbusPendingCall = it->first; + auto it = timeoutMap_.begin(); + while (it != timeoutMap_.end()) { + DBusPendingCall* libdbusPendingCall = it->first; - if (!dbus_pending_call_get_completed(libdbusPendingCall)) { - dbus_pending_call_cancel(libdbusPendingCall); - DBusMessageReplyAsyncHandler* asyncHandler = std::get<1>(it->second); - DBusMessage& dbusMessageCall = std::get<2>(it->second); + if (!dbus_pending_call_get_completed(libdbusPendingCall)) { + dbus_pending_call_cancel(libdbusPendingCall); + DBusMessageReplyAsyncHandler* asyncHandler = std::get<1>(it->second); + DBusMessage& dbusMessageCall = std::get<2>(it->second); - asyncHandler->onDBusMessageReply(CallStatus::REMOTE_ERROR, dbusMessageCall.createMethodError(DBUS_ERROR_TIMEOUT)); - delete asyncHandler; + asyncHandler->onDBusMessageReply(CallStatus::REMOTE_ERROR, dbusMessageCall.createMethodError(DBUS_ERROR_TIMEOUT)); + delete asyncHandler; - } - it = timeoutMap_.erase(it); - dbus_pending_call_unref(libdbusPendingCall); - } + } + it = timeoutMap_.erase(it); + dbus_pending_call_unref(libdbusPendingCall); + } } @@ -162,7 +163,7 @@ bool DBusConnection::attachMainLoopContext(std::weak_ptr<MainLoopContext> mainLo &mainLoopContext_, NULL); - bool success = 0 != dbus_connection_set_watch_functions( + bool success = 0 != dbus_connection_set_watch_functions( connection_, &DBusConnection::onAddWatch, &DBusConnection::onRemoveWatch, @@ -174,7 +175,7 @@ bool DBusConnection::attachMainLoopContext(std::weak_ptr<MainLoopContext> mainLo return false; } - success = 0 != dbus_connection_set_timeout_functions( + success = 0 != dbus_connection_set_timeout_functions( connection_, &DBusConnection::onAddTimeout, &DBusConnection::onRemoveTimeout, @@ -289,15 +290,15 @@ bool DBusConnection::connect(DBusError &dbusError, bool startDispatchThread) { connection_ = dbus_bus_get_private(libdbusType, &dbusError.libdbusError_); if (dbusError) { - #ifdef _MSC_VER - COMMONAPI_ERROR(std::string(__FUNCTION__) + - ": Name: " + dbusError.getName() + - " Message: " + dbusError.getMessage()) - #else - COMMONAPI_ERROR(std::string(__PRETTY_FUNCTION__) + - ": Name: " + dbusError.getName() + - " Message: " + dbusError.getMessage()) - #endif + #ifdef _MSC_VER + COMMONAPI_ERROR(std::string(__FUNCTION__) + + ": Name: " + dbusError.getName() + + " Message: " + dbusError.getMessage()) + #else + COMMONAPI_ERROR(std::string(__PRETTY_FUNCTION__) + + ": Name: " + dbusError.getName() + + " Message: " + dbusError.getMessage()) + #endif return false; } @@ -310,11 +311,11 @@ bool DBusConnection::connect(DBusError &dbusError, bool startDispatchThread) { stopDispatching_ = !startDispatchThread; if (startDispatchThread) { - dispatchThread_ = new std::thread(std::bind(&DBusConnection::dispatch, this->shared_from_this())); + dispatchThread_ = new std::thread(std::bind(&DBusConnection::dispatch, this->shared_from_this())); } - enforcerThread_ = std::make_shared<std::thread>( - std::bind(&DBusConnection::enforceAsynchronousTimeouts, shared_from_this())); + enforcerThread_ = std::make_shared<std::thread>( + std::bind(&DBusConnection::enforceAsynchronousTimeouts, shared_from_this())); dbusConnectionStatusEvent_.notifyListeners(AvailabilityStatus::AVAILABLE); @@ -352,7 +353,7 @@ void DBusConnection::disconnect() { enforcerThreadCancelled_ = true; enforceTimeoutCondition_.notify_one(); if (enforcerThread_->joinable()) { - enforcerThread_->join(); + enforcerThread_->join(); } enforcerThreadCancelled_ = false; @@ -388,7 +389,7 @@ bool DBusConnection::requestServiceNameAndBlock(const std::string& serviceName) std::lock_guard<std::mutex> dbusConnectionLock(connectionGuard_); auto conIter = connectionNameCount_.find(serviceName); if (conIter == connectionNameCount_.end()) { - suspendDispatching(); + suspendDispatching(); const int libdbusStatus = dbus_bus_request_name(connection_, serviceName.c_str(), @@ -400,17 +401,17 @@ bool DBusConnection::requestServiceNameAndBlock(const std::string& serviceName) isServiceNameAcquired = (libdbusStatus == DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER); if (isServiceNameAcquired) { connectionNameCount_.insert( { serviceName, (uint16_t)1 } ); - } - else { - #ifdef _MSC_VER // Visual Studio - COMMONAPI_ERROR(std::string(__FUNCTION__) + - ": Name: " + dbusError.getName() + - " Message: " + dbusError.getMessage()) - #else + } + else { + #ifdef _MSC_VER // Visual Studio + COMMONAPI_ERROR(std::string(__FUNCTION__) + + ": Name: " + dbusError.getName() + + " Message: " + dbusError.getMessage()) + #else COMMONAPI_ERROR(std::string(__PRETTY_FUNCTION__) + ": Name: " + dbusError.getName() + " Message: " + dbusError.getMessage()) - #endif + #endif } } else { conIter->second = conIter->second + 1; @@ -427,7 +428,7 @@ bool DBusConnection::releaseServiceName(const std::string& serviceName) const { auto conIter = connectionNameCount_.find(serviceName); if (conIter != connectionNameCount_.end()) { if (conIter->second == 1) { - suspendDispatching(); + suspendDispatching(); const int libdbusStatus = dbus_bus_release_name(connection_, serviceName.c_str(), &dbusError.libdbusError_); @@ -449,29 +450,40 @@ bool DBusConnection::sendDBusMessage(const DBusMessage &_message) const { assert(isConnected()); dbus_uint32_t dbusSerial; - bool result = 0 != dbus_connection_send(connection_, _message.message_, &dbusSerial); + bool result = 0 != dbus_connection_send(connection_, _message.message_, &dbusSerial); return result; } -void DBusConnection::onLibdbusPendingCallNotifyThunk(::DBusPendingCall* libdbusPendingCall, void *userData) { - assert(userData); - assert(libdbusPendingCall); +DBusMessage DBusConnection::convertToDBusMessage(::DBusPendingCall* _libdbusPendingCall, + CallStatus& _callStatus) { + assert(_libdbusPendingCall); - auto dbusMessageReplyAsyncHandler = reinterpret_cast<DBusMessageReplyAsyncHandler*>(userData); + ::DBusMessage* libdbusMessage = dbus_pending_call_steal_reply(_libdbusPendingCall); + const bool increaseLibdbusMessageReferenceCount = false; + DBusMessage dbusMessage(libdbusMessage, increaseLibdbusMessageReferenceCount); + _callStatus = CallStatus::SUCCESS; - ::DBusMessage* libdbusMessage = dbus_pending_call_steal_reply(libdbusPendingCall); - const bool increaseLibdbusMessageReferenceCount = false; - DBusMessage dbusMessage(libdbusMessage, increaseLibdbusMessageReferenceCount); - CallStatus callStatus = CallStatus::SUCCESS; + if (!dbusMessage.isMethodReturnType()) { + _callStatus = CallStatus::REMOTE_ERROR; + } - if (!dbusMessage.isMethodReturnType()) { - callStatus = CallStatus::REMOTE_ERROR; - } + return dbusMessage; +} + +void DBusConnection::onLibdbusPendingCallNotifyThunk(::DBusPendingCall* _libdbusPendingCall, void *_userData) { + assert(_userData); + assert(_libdbusPendingCall); + + auto dbusMessageReplyAsyncHandler = reinterpret_cast<DBusMessageReplyAsyncHandler*>(_userData); + + DBusMessage dbusMessage; + CallStatus callStatus; + dbusMessage = DBusConnection::convertToDBusMessage(_libdbusPendingCall, callStatus); dbusMessageReplyAsyncHandler->onDBusMessageReply(callStatus, dbusMessage); // libdbus calls the cleanup method below - dbus_pending_call_unref(libdbusPendingCall); + dbus_pending_call_unref(_libdbusPendingCall); } void DBusConnection::onLibdbusDataCleanup(void* userData) { @@ -479,74 +491,77 @@ void DBusConnection::onLibdbusDataCleanup(void* userData) { delete dbusMessageReplyAsyncHandler; } - //Would not be needed if libdbus would actually handle its timeouts for pending calls. void DBusConnection::enforceAsynchronousTimeouts() const { - std::unique_lock<std::mutex> itsLock(enforcerThreadMutex_); + std::unique_lock<std::mutex> itsLock(enforcerThreadMutex_); - while (!enforcerThreadCancelled_) { + while (!enforcerThreadCancelled_) { enforceTimeoutMutex_.lock(); int timeout = std::numeric_limits<int>::max(); // not really, but nearly "forever" if (timeoutMap_.size() > 0) { - auto minTimeoutElement = std::min_element(timeoutMap_.begin(), timeoutMap_.end(), - [] (const TimeoutMapElement& lhs, const TimeoutMapElement& rhs) { - return std::get<0>(lhs.second) < std::get<0>(rhs.second); - }); + auto minTimeoutElement = std::min_element(timeoutMap_.begin(), timeoutMap_.end(), + [] (const TimeoutMapElement& lhs, const TimeoutMapElement& rhs) { + return std::get<0>(lhs.second) < std::get<0>(rhs.second); + }); - auto minTimeout = std::get<0>(minTimeoutElement->second); + auto minTimeout = std::get<0>(minTimeoutElement->second); - std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now(); + std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now(); - timeout = (int)std::chrono::duration_cast<std::chrono::milliseconds>(minTimeout - now).count(); + timeout = (int)std::chrono::duration_cast<std::chrono::milliseconds>(minTimeout - now).count(); } enforceTimeoutMutex_.unlock(); if (std::cv_status::timeout == - enforceTimeoutCondition_.wait_for(itsLock, std::chrono::milliseconds(timeout))) { - - //Do not access members if the DBusConnection was destroyed during the unlocked phase. - enforceTimeoutMutex_.lock(); - auto it = timeoutMap_.begin(); - while (it != timeoutMap_.end()) { - std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now(); - - if (now > std::get<0>(it->second)) { - DBusPendingCall* libdbusPendingCall = it->first; - - if (!dbus_pending_call_get_completed(libdbusPendingCall)) { - dbus_pending_call_cancel(libdbusPendingCall); - DBusMessageReplyAsyncHandler* asyncHandler = std::get<1>(it->second); - DBusMessage& dbusMessageCall = std::get<2>(it->second); - - if (mainLoopContext_.lock()) { - mainloopTimeouts_.push_back(std::make_pair(asyncHandler, dbusMessageCall)); - } else { - enforceTimeoutMutex_.unlock(); // unlock before making callbacks to application to avoid deadlocks - asyncHandler->onDBusMessageReply(CallStatus::REMOTE_ERROR, dbusMessageCall.createMethodError(DBUS_ERROR_TIMEOUT)); - enforceTimeoutMutex_.lock(); - delete asyncHandler; - } - } - it = timeoutMap_.erase(it); - - //This unref MIGHT cause the destruction of the last callback object that references the DBusConnection. - //So after this unref has been called, it has to be ensured that continuation of the loop is an option. - dbus_pending_call_unref(libdbusPendingCall); - } else { - ++it; - } - } - enforceTimeoutMutex_.unlock(); - } + enforceTimeoutCondition_.wait_for(itsLock, std::chrono::milliseconds(timeout))) { + + //Do not access members if the DBusConnection was destroyed during the unlocked phase. + enforceTimeoutMutex_.lock(); + auto it = timeoutMap_.begin(); + while (it != timeoutMap_.end()) { + std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now(); + + if (now > std::get<0>(it->second)) { + DBusPendingCall* libdbusPendingCall = it->first; + + if (!dbus_pending_call_get_completed(libdbusPendingCall)) { + dbus_pending_call_cancel(libdbusPendingCall); + DBusMessageReplyAsyncHandler* asyncHandler = std::get<1>(it->second); + DBusMessage& dbusMessageCall = std::get<2>(it->second); + + if (mainLoopContext_.lock()) { + mainloopTimeouts_.push_back(std::make_tuple(asyncHandler, + dbusMessageCall.createMethodError(DBUS_ERROR_TIMEOUT), + CallStatus::REMOTE_ERROR, + nullptr)); + mainLoopContext_.lock()->wakeup(); + } else { + enforceTimeoutMutex_.unlock(); // unlock before making callbacks to application to avoid deadlocks + asyncHandler->onDBusMessageReply(CallStatus::REMOTE_ERROR, dbusMessageCall.createMethodError(DBUS_ERROR_TIMEOUT)); + enforceTimeoutMutex_.lock(); + delete asyncHandler; + } + } + it = timeoutMap_.erase(it); + + //This unref MIGHT cause the destruction of the last callback object that references the DBusConnection. + //So after this unref has been called, it has to be ensured that continuation of the loop is an option. + dbus_pending_call_unref(libdbusPendingCall); + } else { + ++it; + } + } + enforceTimeoutMutex_.unlock(); + } } } std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync( const DBusMessage& dbusMessage, std::unique_ptr<DBusMessageReplyAsyncHandler> dbusMessageReplyAsyncHandler, - const CommonAPI::CallInfo *_info) const { + const CommonAPI::CallInfo *_info) const { assert(dbusMessage); assert(isConnected()); @@ -555,56 +570,47 @@ std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync( dbus_bool_t libdbusSuccess; suspendDispatching(); - libdbusSuccess = dbus_connection_send_with_reply(connection_, - dbusMessage.message_, - &libdbusPendingCall, - _info->timeout_); + libdbusSuccess = dbus_connection_send_with_reply_set_notify(connection_, + dbusMessage.message_, + &libdbusPendingCall, + onLibdbusPendingCallNotifyThunk, + dbusMessageReplyAsyncHandler.get(), + onLibdbusDataCleanup, + _info->timeout_); if (_info->sender_ != 0) { - COMMONAPI_DEBUG("Message sent: SenderID: ", _info->sender_, " - Serial number: ", dbusMessage.getSerial()); + COMMONAPI_DEBUG("Message sent: SenderID: ", _info->sender_, " - Serial number: ", dbusMessage.getSerial()); } if (!libdbusSuccess || !libdbusPendingCall) { - dbusMessageReplyAsyncHandler->onDBusMessageReply(CallStatus::CONNECTION_FAILED, dbusMessage.createMethodError(DBUS_ERROR_DISCONNECTED)); - resumeDispatching(); - return dbusMessageReplyAsyncHandler->getFuture(); + #ifdef _MSC_VER // Visual Studio + COMMONAPI_ERROR(std::string(__FUNCTION__) + + ": (!libdbusSuccess || !libdbusPendingCall) == true") + #else + COMMONAPI_ERROR(std::string(__PRETTY_FUNCTION__) + + ": (!libdbusSuccess || !libdbusPendingCall) == true") + #endif + if (libdbusPendingCall) { + dbus_pending_call_unref(libdbusPendingCall); + } + dbusMessageReplyAsyncHandler->onDBusMessageReply(CallStatus::CONNECTION_FAILED, dbusMessage.createMethodError(DBUS_ERROR_DISCONNECTED)); + resumeDispatching(); + return dbusMessageReplyAsyncHandler->getFuture(); } - sendLock_.lock(); - if (dbus_pending_call_get_completed (libdbusPendingCall)) { - onLibdbusPendingCallNotifyThunk(libdbusPendingCall, dbusMessageReplyAsyncHandler.get()); - onLibdbusDataCleanup(dbusMessageReplyAsyncHandler.get()); - - } else { - libdbusSuccess = dbus_pending_call_set_notify( - libdbusPendingCall, - onLibdbusPendingCallNotifyThunk, - dbusMessageReplyAsyncHandler.get(), - onLibdbusDataCleanup); - - if (!libdbusSuccess) { - dbusMessageReplyAsyncHandler->onDBusMessageReply(CallStatus::OUT_OF_MEMORY, dbusMessage); - dbus_pending_call_unref(libdbusPendingCall); - resumeDispatching(); - sendLock_.unlock(); - return dbusMessageReplyAsyncHandler->getFuture(); - } - } - sendLock_.unlock(); - DBusMessageReplyAsyncHandler* replyAsyncHandler = dbusMessageReplyAsyncHandler.release(); if (_info->timeout_ != DBUS_TIMEOUT_INFINITE) { dbus_pending_call_ref(libdbusPendingCall); auto timeoutPoint = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(_info->timeout_); std::tuple< - std::chrono::time_point<std::chrono::high_resolution_clock>, - DBusMessageReplyAsyncHandler*, - DBusMessage> toInsert { - timeoutPoint, - replyAsyncHandler, - dbusMessage - }; + std::chrono::time_point<std::chrono::high_resolution_clock>, + DBusMessageReplyAsyncHandler*, + DBusMessage> toInsert { + timeoutPoint, + replyAsyncHandler, + dbusMessage + }; enforceTimeoutMutex_.lock(); timeoutMap_.insert( { libdbusPendingCall, toInsert } ); @@ -622,7 +628,7 @@ std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync( DBusMessage DBusConnection::sendDBusMessageWithReplyAndBlock(const DBusMessage& dbusMessage, DBusError& dbusError, - const CommonAPI::CallInfo *_info) const { + const CommonAPI::CallInfo *_info) const { assert(dbusMessage); assert(!dbusError); assert(isConnected()); @@ -635,7 +641,7 @@ DBusMessage DBusConnection::sendDBusMessageWithReplyAndBlock(const DBusMessage& &dbusError.libdbusError_); if (_info->sender_ != 0) { - COMMONAPI_DEBUG("Message sent: SenderID: ", _info->sender_, " - Serial number: ", dbusMessage.getSerial()); + COMMONAPI_DEBUG("Message sent: SenderID: ", _info->sender_, " - Serial number: ", dbusMessage.getSerial()); } resumeDispatching(); @@ -650,18 +656,25 @@ DBusMessage DBusConnection::sendDBusMessageWithReplyAndBlock(const DBusMessage& bool DBusConnection::singleDispatch() { - for (auto t : mainloopTimeouts_) { - t.first->onDBusMessageReply(CallStatus::REMOTE_ERROR, t.second.createMethodError(DBUS_ERROR_TIMEOUT)); - delete t.first; - } - mainloopTimeouts_.clear(); + for (auto t : mainloopTimeouts_) { + std::get<0>(t)->onDBusMessageReply(std::get<2>(t), std::get<1>(t)); + if (std::get<3>(t) != nullptr) { + dbus_pending_call_unref(std::get<3>(t)); + } + delete std::get<0>(t); + } + mainloopTimeouts_.clear(); return (dbus_connection_dispatch(connection_) == DBUS_DISPATCH_DATA_REMAINS); } bool DBusConnection::isDispatchReady() { return (dbus_connection_get_dispatch_status(connection_) == DBUS_DISPATCH_DATA_REMAINS || - !mainloopTimeouts_.empty()); + !mainloopTimeouts_.empty()); +} + +bool DBusConnection::hasDispatchThread() { + return (dispatchThread_ != NULL); } DBusProxyConnection::DBusSignalHandlerToken DBusConnection::subscribeForSelectiveBroadcast( @@ -685,13 +698,13 @@ DBusProxyConnection::DBusSignalHandlerToken DBusConnection::subscribeForSelectiv DBusProxyConnection::DBusSignalHandlerToken subscriptionToken; if (callStatus == CommonAPI::CallStatus::SUCCESS && subscriptionAccepted) { subscriptionToken = addSignalMemberHandler( - objectPath, - interfaceName, - interfaceMemberName, - interfaceMemberSignature, - dbusSignalHandler, - true - ); + objectPath, + interfaceName, + interfaceMemberName, + interfaceMemberSignature, + dbusSignalHandler, + true + ); subscriptionAccepted = true; } @@ -735,7 +748,7 @@ DBusProxyConnection::DBusSignalHandlerToken DBusConnection::addSignalMemberHandl handlerList.insert(dbusSignalHandler); dbusSignalHandlerTable_.insert( { - dbusSignalHandlerPath, + dbusSignalHandlerPath, std::make_pair(std::make_shared<std::recursive_mutex>(), std::move(handlerList)) } ); } else { @@ -763,6 +776,14 @@ bool DBusConnection::removeSignalMemberHandler(const DBusSignalHandlerToken &dbu } signalEntry->second.first->unlock(); } + + if (lastHandlerRemoved) { + dbusSignalHandlerTable_.erase(signalEntry); + removeLibdbusSignalMatchRule(std::get<0>(dbusSignalHandlerToken), + std::get<1>(dbusSignalHandlerToken), + std::get<2>(dbusSignalHandlerToken)); + } + return lastHandlerRemoved; } @@ -874,12 +895,12 @@ bool DBusConnection::addLibdbusSignalMatchRule(const std::string& dbusMatchRule) // add the libdbus message signal filter if (!libdbusSignalMatchRulesCount_) { - libdbusSuccess = 0 != dbus_connection_add_filter( - connection_, + libdbusSuccess = 0 != dbus_connection_add_filter( + connection_, &onLibdbusSignalFilterThunk, this, NULL - ); + ); } // finally add the match rule @@ -912,10 +933,10 @@ bool DBusConnection::removeLibdbusSignalMatchRule(const std::string& dbusMatchRu dbus_bus_remove_match(connection_, dbusMatchRule.c_str(), NULL); - libdbusSignalMatchRulesCount_--; - if (libdbusSignalMatchRulesCount_ == 0) { - dbus_connection_remove_filter(connection_, &onLibdbusSignalFilterThunk, this); - } + libdbusSignalMatchRulesCount_--; + if (libdbusSignalMatchRulesCount_ == 0) { + dbus_connection_remove_filter(connection_, &onLibdbusSignalFilterThunk, this); + } resumeDispatching(); @@ -968,7 +989,7 @@ void DBusConnection::unregisterObjectPath(const std::string& objectPath) { if (isConnected()) { dbus_bool_t libdbusSuccess - = dbus_connection_unregister_object_path(connection_, objectPath.c_str()); + = dbus_connection_unregister_object_path(connection_, objectPath.c_str()); assert(libdbusSuccess); } } @@ -1007,14 +1028,15 @@ void DBusConnection::addLibdbusSignalMatchRule(const std::string& objectPath, if (isConnected()) { bool libdbusSuccess = true; suspendDispatching(); + // add the libdbus message signal filter if (isFirstMatchRule) { - libdbusSuccess = 0 != dbus_connection_add_filter( - connection_, - &onLibdbusSignalFilterThunk, - this, - NULL); + libdbusSuccess = 0 != dbus_connection_add_filter( + connection_, + &onLibdbusSignalFilterThunk, + this, + NULL); assert(libdbusSuccess); } |