diff options
Diffstat (limited to 'src/CommonAPI/DBus/DBusConnection.cpp')
-rw-r--r-- | src/CommonAPI/DBus/DBusConnection.cpp | 618 |
1 files changed, 442 insertions, 176 deletions
diff --git a/src/CommonAPI/DBus/DBusConnection.cpp b/src/CommonAPI/DBus/DBusConnection.cpp index 47751cc..6071830 100644 --- a/src/CommonAPI/DBus/DBusConnection.cpp +++ b/src/CommonAPI/DBus/DBusConnection.cpp @@ -4,7 +4,6 @@ // file, You can obtain one at http://mozilla.org/MPL/2.0/. #include <algorithm> -#include <cassert> #include <chrono> #include <future> #include <sstream> @@ -44,11 +43,6 @@ const DBusObjectPathVTable* DBusConnection::getDBusObjectPathVTable() { return &libdbusObjectPathVTable; } -//std::bind used to start the dispatch thread holds one reference, and the selfReference -//created within the thread is the second. If only those two remain, no one but the -//dispatch thread references the connection, which therefore can be finished. -const int32_t ownUseCount = 2; - void DBusConnection::dispatch() { loop_->run(); } @@ -56,10 +50,8 @@ void DBusConnection::dispatch() { DBusConnection::DBusConnection(DBusType_t busType, const ConnectionId_t& _connectionId) : dispatchThread_(NULL), - stopDispatching_(false), dispatchSource_(), watchContext_(NULL), - pauseDispatching_(false), connection_(NULL), busType_(busType), dbusConnectionStatusEvent_(this), @@ -68,7 +60,11 @@ DBusConnection::DBusConnection(DBusType_t busType, connectionNameCount_(), enforcerThread_(NULL), enforcerThreadCancelled_(false), - connectionId_(_connectionId) { + connectionId_(_connectionId), + activeConnections_(0), + isDisconnecting_(false), + isDispatching_(false), + isWaitingOnFinishedDispatching_(false) { dbus_threads_init_default(); } @@ -76,10 +72,8 @@ DBusConnection::DBusConnection(DBusType_t busType, DBusConnection::DBusConnection(::DBusConnection *_connection, const ConnectionId_t& _connectionId) : dispatchThread_(NULL), - stopDispatching_(false), dispatchSource_(), watchContext_(NULL), - pauseDispatching_(false), connection_(_connection), busType_(DBusType_t::WRAPPED), dbusConnectionStatusEvent_(this), @@ -88,7 +82,11 @@ DBusConnection::DBusConnection(::DBusConnection *_connection, connectionNameCount_(), enforcerThread_(NULL), enforcerThreadCancelled_(false), - connectionId_(_connectionId) { + connectionId_(_connectionId), + activeConnections_(0), + isDisconnecting_(false), + isDispatching_(false), + isWaitingOnFinishedDispatching_(false){ dbus_threads_init_default(); } @@ -108,14 +106,17 @@ DBusConnection::~DBusConnection() { dbus_connection_set_timeout_functions(connection_, NULL, NULL, NULL, NULL, NULL); } + lockedContext->deregisterWatch(msgWatch_); + lockedContext->deregisterDispatchSource(msgDispatchSource_); lockedContext->deregisterDispatchSource(dispatchSource_); delete watchContext_; - delete dispatchSource_; } // ensure, the registry survives until disconnecting is done... //std::shared_ptr<DBusServiceRegistry> itsRegistry = DBusServiceRegistry::get(shared_from_this()); - disconnect(); + + // Disconnecting not possible because of circular dependency, the destructor will be called AFTER disconnect anyway. + //disconnect(); //Assert that the enforcerThread_ is in a position to finish itself correctly even after destruction //of the DBusConnection. Also assert all resources are cleaned up. @@ -174,8 +175,14 @@ bool DBusConnection::attachMainLoopContext(std::weak_ptr<MainLoopContext> mainLo mainLoopContext_ = mainLoopContext; if (auto lockedContext = mainLoopContext_.lock()) { + msgWatch_ = new DBusMessageWatch(shared_from_this()); + msgDispatchSource_ = new DBusMessageDispatchSource(msgWatch_); + + lockedContext->registerDispatchSource(msgDispatchSource_); + lockedContext->registerWatch(msgWatch_); + dispatchSource_ = new DBusDispatchSource(this); - watchContext_ = new WatchContext(mainLoopContext_, dispatchSource_); + watchContext_ = new WatchContext(mainLoopContext_, dispatchSource_, shared_from_this()); lockedContext->registerDispatchSource(dispatchSource_); dbus_connection_set_wakeup_main_function( @@ -216,31 +223,39 @@ bool DBusConnection::attachMainLoopContext(std::weak_ptr<MainLoopContext> mainLo void DBusConnection::onWakeupMainContext(void* data) { std::weak_ptr<MainLoopContext>* mainloop = static_cast<std::weak_ptr<MainLoopContext>*>(data); - assert(mainloop); - if(auto lockedContext = mainloop->lock()) { + if (!mainloop) { + COMMONAPI_ERROR(std::string(__FUNCTION__), "mainloop == nullptr"); + } else if(auto lockedContext = mainloop->lock()) { lockedContext->wakeup(); } } dbus_bool_t DBusConnection::onAddWatch(::DBusWatch* libdbusWatch, void* data) { WatchContext* watchContext = static_cast<WatchContext*>(data); - assert(watchContext); - - DBusWatch* dbusWatch = new DBusWatch(libdbusWatch, watchContext->mainLoopContext_); - dbusWatch->addDependentDispatchSource(watchContext->dispatchSource_); - dbus_watch_set_data(libdbusWatch, dbusWatch, NULL); + if (NULL == watchContext) { + COMMONAPI_ERROR(std::string(__FUNCTION__), "watchContext == NULL"); + return FALSE; + } else { + DBusWatch* dbusWatch = new DBusWatch(libdbusWatch, watchContext->mainLoopContext_, watchContext->dbusConnection_); + dbusWatch->addDependentDispatchSource(watchContext->dispatchSource_); + dbus_watch_set_data(libdbusWatch, dbusWatch, NULL); - if (dbusWatch->isReadyToBeWatched()) { - dbusWatch->startWatching(); + if (dbusWatch->isReadyToBeWatched()) { + dbusWatch->startWatching(); + } else { + delete dbusWatch; + dbus_watch_set_data(libdbusWatch, NULL, NULL); + } } return TRUE; } void DBusConnection::onRemoveWatch(::DBusWatch* libdbusWatch, void* data) { - assert(static_cast<WatchContext*>(data)); - (void)data; + if (NULL == static_cast<WatchContext*>(data)) { + COMMONAPI_ERROR(std::string(__FUNCTION__), "data (WatchContext) == NULL"); + } DBusWatch* dbusWatch = static_cast<DBusWatch*>(dbus_watch_get_data(libdbusWatch)); if (dbusWatch != NULL) { @@ -254,12 +269,16 @@ void DBusConnection::onRemoveWatch(::DBusWatch* libdbusWatch, void* data) { void DBusConnection::onToggleWatch(::DBusWatch* libdbusWatch, void* data) { WatchContext* watchContext = static_cast<WatchContext*>(data); - assert(watchContext); + + if (NULL == watchContext) { + COMMONAPI_ERROR(std::string(__FUNCTION__), "watchContext == NULL"); + return; + } DBusWatch* dbusWatch = static_cast<DBusWatch*>(dbus_watch_get_data(libdbusWatch)); if (dbusWatch == NULL) { - DBusWatch* dbusWatch = new DBusWatch(libdbusWatch, watchContext->mainLoopContext_); + DBusWatch* dbusWatch = new DBusWatch(libdbusWatch, watchContext->mainLoopContext_, watchContext->dbusConnection_); dbusWatch->addDependentDispatchSource(watchContext->dispatchSource_); dbus_watch_set_data(libdbusWatch, dbusWatch, NULL); @@ -270,6 +289,8 @@ void DBusConnection::onToggleWatch(::DBusWatch* libdbusWatch, void* data) { if (!dbusWatch->isReadyToBeWatched()) { dbusWatch->stopWatching(); dbus_watch_set_data(libdbusWatch, NULL, NULL); + } else { + dbusWatch->startWatching(); } } } @@ -277,21 +298,28 @@ void DBusConnection::onToggleWatch(::DBusWatch* libdbusWatch, void* data) { dbus_bool_t DBusConnection::onAddTimeout(::DBusTimeout* libdbusTimeout, void* data) { std::weak_ptr<MainLoopContext>* mainloop = static_cast<std::weak_ptr<MainLoopContext>*>(data); - assert(mainloop); + if (NULL == mainloop) { + COMMONAPI_ERROR(std::string(__FUNCTION__), "mainloop == NULL"); + return FALSE; + } DBusTimeout* dbusTimeout = new DBusTimeout(libdbusTimeout, *mainloop); dbus_timeout_set_data(libdbusTimeout, dbusTimeout, NULL); if (dbusTimeout->isReadyToBeMonitored()) { dbusTimeout->startMonitoring(); + } else { + delete dbusTimeout; + dbus_timeout_set_data(libdbusTimeout, NULL, NULL); } return TRUE; } void DBusConnection::onRemoveTimeout(::DBusTimeout* libdbusTimeout, void* data) { - assert(static_cast<std::weak_ptr<MainLoopContext>*>(data)); - (void)data; + if (NULL == static_cast<std::weak_ptr<MainLoopContext>*>(data)) { + COMMONAPI_ERROR(std::string(__FUNCTION__), "MainLoopContext == NULL"); + } DBusTimeout* dbusTimeout = static_cast<DBusTimeout*>(dbus_timeout_get_data(libdbusTimeout)); if (dbusTimeout) { @@ -302,8 +330,9 @@ void DBusConnection::onRemoveTimeout(::DBusTimeout* libdbusTimeout, void* data) } void DBusConnection::onToggleTimeout(::DBusTimeout* dbustimeout, void* data) { - assert(static_cast<std::weak_ptr<MainLoopContext>*>(data)); - (void)data; + if (NULL == static_cast<std::weak_ptr<MainLoopContext>*>(data)) { + COMMONAPI_ERROR(std::string(__FUNCTION__), "MainLoopContext == NULL"); + } DBusTimeout* timeout = static_cast<DBusTimeout*>(dbus_timeout_get_data(dbustimeout)); if (timeout->isReadyToBeMonitored()) { @@ -319,7 +348,11 @@ bool DBusConnection::connect(bool startDispatchThread) { } bool DBusConnection::connect(DBusError &dbusError, bool startDispatchThread) { - assert(!dbusError); + if (dbusError) { + COMMONAPI_ERROR(std::string(__FUNCTION__), "dbusError set"); + return false; + } + if (isConnected()) { return true; } @@ -340,7 +373,11 @@ bool DBusConnection::connect(DBusError &dbusError, bool startDispatchThread) { return false; } - assert(connection_); + if (NULL == connection_) { + COMMONAPI_ERROR(std::string(__FUNCTION__), "connection_ == NULL"); + return false; + } + dbus_connection_set_exit_on_disconnect(connection_, false); initLibdbusObjectPathHandlerAfterConnect(); @@ -352,7 +389,7 @@ bool DBusConnection::connect(DBusError &dbusError, bool startDispatchThread) { dbusConnectionStatusEvent_.notifyListeners(AvailabilityStatus::AVAILABLE); - stopDispatching_ = !startDispatchThread; + isDisconnecting_ = false; if (startDispatchThread) { std::shared_ptr<MainLoopContext> itsContext = std::make_shared<MainLoopContext>(); loop_ = std::make_shared<DBusMainLoop>(itsContext); @@ -365,6 +402,10 @@ bool DBusConnection::connect(DBusError &dbusError, bool startDispatchThread) { void DBusConnection::disconnect() { std::lock_guard<std::mutex> dbusConnectionLock(connectionGuard_); + + std::unique_lock<std::mutex> dispatchLock(dispatchMutex_); + isDisconnecting_ = true; + if (isConnected()) { dbusConnectionStatusEvent_.notifyListeners(AvailabilityStatus::NOT_AVAILABLE); @@ -375,7 +416,16 @@ void DBusConnection::disconnect() { connectionNameCount_.clear(); - stopDispatching_ = true; + //wait until dispatching is finished + auto it = dispatchThreads_.find(std::this_thread::get_id()); + if(it == dispatchThreads_.end()) { //wait only if disconnect is NOT triggered by main loop + while(isDispatching_) { + isWaitingOnFinishedDispatching_ = true; + dispatchCondition_.wait(dispatchLock); + isWaitingOnFinishedDispatching_ = false; + } + } + dispatchLock.unlock(); dbus_connection_close(connection_); @@ -394,7 +444,8 @@ void DBusConnection::disconnect() { enforcerThreadCancelled_ = true; enforceTimeoutCondition_.notify_one(); - if (enforcerThread_->joinable()) { + if (enforcerThread_->joinable() && + std::this_thread::get_id() != enforcerThread_->get_id()) { enforcerThread_->join(); } enforcerThreadCancelled_ = false; @@ -406,6 +457,10 @@ void DBusConnection::disconnect() { dbus_connection_unref(connection_); connection_ = nullptr; } + + if (std::shared_ptr<CommonAPI::MainLoopContext> mainLoopContext = mainLoopContext_.lock()) { + Factory::get()->releaseConnection(connectionId_, mainLoopContext.get()); + } } bool DBusConnection::isConnected() const { @@ -488,60 +543,87 @@ bool DBusConnection::releaseServiceName(const std::string& serviceName) const { } bool DBusConnection::sendDBusMessage(const DBusMessage &_message) const { - assert(_message); - assert(isConnected()); + if (!_message) { + COMMONAPI_ERROR(std::string(__FUNCTION__), "message == NULL"); + return false; + } + if (!isConnected()) { + COMMONAPI_ERROR(std::string(__FUNCTION__), "not connected"); + return false; + } dbus_uint32_t dbusSerial; - bool result = 0 != dbus_connection_send(connection_, _message.message_, &dbusSerial); + const bool result = (0 != dbus_connection_send(connection_, _message.message_, &dbusSerial)); return result; } -DBusMessage DBusConnection::convertToDBusMessage(::DBusPendingCall* _libdbusPendingCall, - CallStatus& _callStatus) { - assert(_libdbusPendingCall); +DBusMessage DBusConnection::convertToDBusMessage(::DBusPendingCall* _libdbusPendingCall) { + if (NULL == _libdbusPendingCall) { + COMMONAPI_ERROR(std::string(__FUNCTION__), "_libdbusPendingCall == NULL"); + return DBusMessage(); + } ::DBusMessage* libdbusMessage = dbus_pending_call_steal_reply(_libdbusPendingCall); const bool increaseLibdbusMessageReferenceCount = false; DBusMessage dbusMessage(libdbusMessage, increaseLibdbusMessageReferenceCount); - _callStatus = CallStatus::SUCCESS; - - if (!dbusMessage.isMethodReturnType()) { - _callStatus = CallStatus::REMOTE_ERROR; - } return dbusMessage; } -void DBusConnection::onLibdbusPendingCallNotifyThunk(::DBusPendingCall* _libdbusPendingCall, void *_userData) { - assert(_userData); - assert(_libdbusPendingCall); +void DBusConnection::onLibdbusPendingCall(::DBusPendingCall* _libdbusPendingCall, + const DBusMessage& _reply, + DBusMessageReplyAsyncHandler* _dbusMessageReplyAsyncHandler) const { - auto dbusMessageReplyAsyncHandler = reinterpret_cast<DBusMessageReplyAsyncHandler*>(_userData); + CallStatus callStatus = CallStatus::SUCCESS; + if (_reply.isErrorType() || !_reply.isMethodReturnType()) { + if(strcmp(_reply.getError(), DBUS_ERROR_UNKNOWN_METHOD) == 0) { + callStatus = CallStatus::NOT_AVAILABLE; + } else { + callStatus = CallStatus::REMOTE_ERROR; + } + } - dbusMessageReplyAsyncHandler->lock(); - bool processAsyncHandler = !dbusMessageReplyAsyncHandler->getTimeoutOccurred(); - dbusMessageReplyAsyncHandler->setExecutionStarted(); - dbusMessageReplyAsyncHandler->unlock(); + _dbusMessageReplyAsyncHandler->lock(); + bool processAsyncHandler = !_dbusMessageReplyAsyncHandler->getTimeoutOccurred(); + _dbusMessageReplyAsyncHandler->setExecutionStarted(); + _dbusMessageReplyAsyncHandler->unlock(); - if (processAsyncHandler) { - DBusMessage dbusMessage; - CallStatus callStatus; - dbusMessage = DBusConnection::convertToDBusMessage(_libdbusPendingCall, callStatus); + if (processAsyncHandler) + _dbusMessageReplyAsyncHandler->onDBusMessageReply(callStatus, _reply); - dbusMessageReplyAsyncHandler->onDBusMessageReply(callStatus, dbusMessage); - } + _dbusMessageReplyAsyncHandler->lock(); - dbusMessageReplyAsyncHandler->lock(); // libdbus calls the cleanup method below - dbus_pending_call_unref(_libdbusPendingCall); + if(_libdbusPendingCall) + dbus_pending_call_unref(_libdbusPendingCall); - dbusMessageReplyAsyncHandler->setExecutionFinished(); - if (dbusMessageReplyAsyncHandler->hasToBeDeleted()) { - dbusMessageReplyAsyncHandler->unlock(); - delete dbusMessageReplyAsyncHandler; + _dbusMessageReplyAsyncHandler->setExecutionFinished(); + if (_dbusMessageReplyAsyncHandler->hasToBeDeleted()) { + _dbusMessageReplyAsyncHandler->unlock(); + delete _dbusMessageReplyAsyncHandler; + } else { + _dbusMessageReplyAsyncHandler->unlock(); + } +} + +void DBusConnection::onLibdbusPendingCallNotifyThunk(::DBusPendingCall* _libdbusPendingCall, void *_userData) { + if (NULL == _libdbusPendingCall) { + COMMONAPI_ERROR(std::string(__FUNCTION__), "_libdbusPendingCall == NULL"); return; } - dbusMessageReplyAsyncHandler->unlock(); + if (NULL == _userData) { + COMMONAPI_ERROR(std::string(__FUNCTION__), "_userData == NULL"); + return; + } + + auto pendingCallNotificationData = reinterpret_cast<PendingCallNotificationData*>(_userData); + auto dbusMessageReplyAsyncHandler = pendingCallNotificationData->replyAsyncHandler_; + auto dbusConnection = pendingCallNotificationData->dbusConnection_; + delete pendingCallNotificationData; + + DBusMessage dbusMessage = DBusConnection::convertToDBusMessage(_libdbusPendingCall); + + dbusConnection->onLibdbusPendingCall(_libdbusPendingCall, dbusMessage, dbusMessageReplyAsyncHandler); } void DBusConnection::onLibdbusDataCleanup(void *_data) { @@ -608,15 +690,19 @@ void DBusConnection::enforceAsynchronousTimeouts() const { // => add asyncHandler to mainloopTimeouts list DBusMessage& dbusMessageCall = std::get<2>(it->second); - assert(mainLoopContext_.lock()); - { - std::lock_guard<std::mutex> itsLock(mainloopTimeoutsMutex_); - mainloopTimeouts_.push_back(std::make_tuple(asyncHandler, - dbusMessageCall.createMethodError(DBUS_ERROR_TIMEOUT), - CallStatus::REMOTE_ERROR, - nullptr)); + auto lockedContext = mainLoopContext_.lock(); + if (!lockedContext) { + COMMONAPI_ERROR(std::string(__FUNCTION__), "lockedContext == nullptr"); + } else { + { + std::lock_guard<std::mutex> itsLock(mainloopTimeoutsMutex_); + mainloopTimeouts_.push_back(std::make_tuple(asyncHandler, + dbusMessageCall.createMethodError(DBUS_ERROR_TIMEOUT), + CallStatus::REMOTE_ERROR, + nullptr)); + } + lockedContext->wakeup(); } - mainLoopContext_.lock()->wakeup(); it = timeoutMap_.erase(it); //This unref MIGHT cause the destruction of the last callback object that references the DBusConnection. @@ -687,21 +773,34 @@ std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync( std::unique_ptr<DBusMessageReplyAsyncHandler> dbusMessageReplyAsyncHandler, const CommonAPI::CallInfo *_info) const { - assert(dbusMessage); - assert(isConnected()); + if (!dbusMessage) { + COMMONAPI_ERROR(std::string(__FUNCTION__), "message == NULL"); + return std::future<CallStatus>(); + } + if (!isConnected()) { + COMMONAPI_ERROR(std::string(__FUNCTION__), "not connected"); + return std::future<CallStatus>(); + } DBusPendingCall* libdbusPendingCall; dbus_bool_t libdbusSuccess; DBusMessageReplyAsyncHandler* replyAsyncHandler = dbusMessageReplyAsyncHandler.release(); - std::future<CallStatus> callStatusFuture = replyAsyncHandler->getFuture(); + std::future<CallStatus> callStatusFuture; + try { + callStatusFuture = replyAsyncHandler->getFuture(); + } catch (std::exception& e) { + (void)e; + } + + PendingCallNotificationData* userData = new PendingCallNotificationData(this, replyAsyncHandler); libdbusSuccess = dbus_connection_send_with_reply_set_notify(connection_, dbusMessage.message_, &libdbusPendingCall, onLibdbusPendingCallNotifyThunk, - replyAsyncHandler, + userData, onLibdbusDataCleanup, _info->timeout_); @@ -743,7 +842,13 @@ std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync( }; enforceTimeoutMutex_.lock(); - timeoutMap_.insert( { libdbusPendingCall, toInsert } ); + auto ret = timeoutMap_.insert( { libdbusPendingCall, toInsert } ); + if (ret.second == false) { + // key has been reused + // update the map value with the new info + timeoutMap_.erase(ret.first); + timeoutMap_.insert( { libdbusPendingCall, toInsert } ); + } enforceTimeoutMutex_.unlock(); enforcerThreadMutex_.lock(); @@ -761,9 +866,14 @@ std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync( DBusMessage DBusConnection::sendDBusMessageWithReplyAndBlock(const DBusMessage& dbusMessage, DBusError& dbusError, const CommonAPI::CallInfo *_info) const { - assert(dbusMessage); - assert(!dbusError); - assert(isConnected()); + if (!dbusMessage) { + COMMONAPI_ERROR(std::string(__FUNCTION__), "message == NULL"); + return DBusMessage(); + } + if (!isConnected()) { + COMMONAPI_ERROR(std::string(__FUNCTION__), "not connected"); + return DBusMessage(); + } ::DBusMessage* libdbusMessageReply = dbus_connection_send_with_reply_and_block(connection_, dbusMessage.message_, @@ -775,6 +885,7 @@ DBusMessage DBusConnection::sendDBusMessageWithReplyAndBlock(const DBusMessage& } if (dbusError) { + COMMONAPI_ERROR(std::string(__FUNCTION__), "dbusError set"); return DBusMessage(); } @@ -782,6 +893,13 @@ DBusMessage DBusConnection::sendDBusMessageWithReplyAndBlock(const DBusMessage& return DBusMessage(libdbusMessageReply, increaseLibdbusMessageReferenceCount); } +void DBusConnection::dispatchDBusMessageReply(const DBusMessage& _reply, + DBusMessageReplyAsyncHandler* _dbusMessageReplyAsyncHandler) { + if(setDispatching(true)) { + onLibdbusPendingCall(NULL, _reply, _dbusMessageReplyAsyncHandler); + setDispatching(false); + } +} bool DBusConnection::singleDispatch() { { @@ -795,14 +913,25 @@ bool DBusConnection::singleDispatch() { } mainloopTimeouts_.clear(); } - - return (dbus_connection_dispatch(connection_) == DBUS_DISPATCH_DATA_REMAINS); + if(setDispatching(true)) { + bool dispatchStatus(connection_ && dbus_connection_dispatch(connection_) == DBUS_DISPATCH_DATA_REMAINS); + setDispatching(false); + return dispatchStatus; + } else { + return false; + } } bool DBusConnection::isDispatchReady() { std::lock_guard<std::mutex> itsLock(mainloopTimeoutsMutex_); - return (dbus_connection_get_dispatch_status(connection_) == DBUS_DISPATCH_DATA_REMAINS || + + if(setDispatching(true)) { + bool dispatchStatus((connection_ && dbus_connection_get_dispatch_status(connection_) == DBUS_DISPATCH_DATA_REMAINS) || !mainloopTimeouts_.empty()); + setDispatching(false); + return dispatchStatus; + } + return false; } bool DBusConnection::hasDispatchThread() { @@ -813,48 +942,95 @@ const ConnectionId_t& DBusConnection::getConnectionId() const { return connectionId_; } -bool DBusConnection::sendPendingSelectiveSubscription(DBusProxy* proxy, std::string methodName) { - bool subscriptionAccepted; - CommonAPI::CallStatus callStatus; - DBusProxyHelper<CommonAPI::DBus::DBusSerializableArguments<>, - CommonAPI::DBus::DBusSerializableArguments<bool>>::callMethodWithReply( - *proxy, methodName.c_str(), "", &CommonAPI::DBus::defaultCallInfo, callStatus, subscriptionAccepted); +void DBusConnection::incrementConnection() { + std::lock_guard < std::mutex > lock(activeConnectionsMutex_); + activeConnections_++; +} + +void DBusConnection::decrementConnection() { + std::lock_guard < std::mutex > lock(activeConnectionsMutex_); + activeConnections_--; + + if (activeConnections_ <= 0) { + disconnect(); + } +} + +bool DBusConnection::setDispatching(bool _isDispatching) { + std::lock_guard<std::mutex> dispatchLock(dispatchMutex_); + + if(isDispatching_ == _isDispatching) + return true; - return subscriptionAccepted; + dispatchThreads_.insert(std::this_thread::get_id()); + if(isDisconnecting_) { // we want to disconnect and only accept unsetting the dispatch flag + if(!_isDispatching) { + isDispatching_ = _isDispatching; + if(isWaitingOnFinishedDispatching_) + dispatchCondition_.notify_one(); + return true; + } else { + return false; + } + } else { + isDispatching_ = _isDispatching; + return true; + } +} + +void DBusConnection::sendPendingSelectiveSubscription(DBusProxy* callingProxy, std::string methodName, + DBusSignalHandler* dbusSignalHandler, uint32_t tag) { + bool outarg; + DBusProxyHelper<CommonAPI::DBus::DBusSerializableArguments<>, + CommonAPI::DBus::DBusSerializableArguments<bool>>::callMethodAsync( + *callingProxy, methodName.c_str(), "", + &CommonAPI::DBus::defaultCallInfo, + [this, dbusSignalHandler, callingProxy, tag] + (const CommonAPI::CallStatus& callStatus, const bool& accepted) { + + if (callStatus == CommonAPI::CallStatus::SUCCESS && accepted) { + dbusSignalHandler->onSpecificError(CommonAPI::CallStatus::SUCCESS, tag); + } else { + dbusSignalHandler->onSpecificError(CommonAPI::CallStatus::SUBSCRIPTION_REFUSED, tag); + } + }, std::make_tuple(outarg)); } -DBusProxyConnection::DBusSignalHandlerToken DBusConnection::subscribeForSelectiveBroadcast( - bool& subscriptionAccepted, +void DBusConnection::subscribeForSelectiveBroadcast( const std::string& objectPath, const std::string& interfaceName, const std::string& interfaceMemberName, const std::string& interfaceMemberSignature, DBusSignalHandler* dbusSignalHandler, - DBusProxy* callingProxy) { + DBusProxy* callingProxy, + uint32_t tag) { std::string methodName = "subscribeFor" + interfaceMemberName + "Selective"; - subscriptionAccepted = false; - - CommonAPI::CallStatus callStatus; + bool outarg; DBusProxyHelper<CommonAPI::DBus::DBusSerializableArguments<>, - CommonAPI::DBus::DBusSerializableArguments<bool>>::callMethodWithReply( - *callingProxy, methodName.c_str(), "", &CommonAPI::DBus::defaultCallInfo, callStatus, subscriptionAccepted); - - DBusProxyConnection::DBusSignalHandlerToken subscriptionToken; - if ((callStatus == CommonAPI::CallStatus::SUCCESS && subscriptionAccepted) || !callingProxy->isAvailable()) { - subscriptionToken = addSignalMemberHandler( - objectPath, - interfaceName, - interfaceMemberName, - interfaceMemberSignature, - dbusSignalHandler, - true - ); - subscriptionAccepted = true; - } - - return (subscriptionToken); + CommonAPI::DBus::DBusSerializableArguments<bool>>::callMethodAsync( + *callingProxy, methodName.c_str(), "", + &CommonAPI::DBus::defaultCallInfo, + [this, objectPath, interfaceName, interfaceMemberName, interfaceMemberSignature, dbusSignalHandler, callingProxy, tag] + (const CommonAPI::CallStatus& callStatus, const bool& accepted) { + if ((callStatus == CommonAPI::CallStatus::SUCCESS && accepted) || !callingProxy->isAvailable()) { + DBusProxyConnection::DBusSignalHandlerToken token = addSignalMemberHandler( + objectPath, + interfaceName, + interfaceMemberName, + interfaceMemberSignature, + dbusSignalHandler, + true + ); + dbusSignalHandler->setSubscriptionToken(token, tag); + } + if (accepted) { + dbusSignalHandler->onSpecificError(CommonAPI::CallStatus::SUCCESS, tag); + } else { + dbusSignalHandler->onSpecificError(CommonAPI::CallStatus::SUBSCRIPTION_REFUSED, tag); + } + }, std::make_tuple(outarg)); } void DBusConnection::unsubscribeFromSelectiveBroadcast(const std::string& eventName, @@ -955,8 +1131,9 @@ bool DBusConnection::addObjectManagerSignalMemberHandler(const std::string& dbus if (!isInsertSuccessful) { if (isConnected()) { const bool isRemoveSignalMatchRuleSuccessful = removeObjectManagerSignalMatchRule(dbusBusName); - assert(isRemoveSignalMatchRuleSuccessful); - (void)isRemoveSignalMatchRuleSuccessful; + if (!isRemoveSignalMatchRuleSuccessful) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " removeObjectManagerSignalMatchRule", dbusBusName, " failed"); + } } return false; } @@ -973,7 +1150,10 @@ bool DBusConnection::addObjectManagerSignalMemberHandler(const std::string& dbus bool DBusConnection::removeObjectManagerSignalMemberHandler(const std::string& dbusBusName, DBusSignalHandler* dbusSignalHandler) { - assert(!dbusBusName.empty()); + if (dbusBusName.empty()) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " empty dbusBusName"); + return false; + } std::lock_guard<std::mutex> dbusSignalLock(dbusObjectManagerSignalGuard_); @@ -996,8 +1176,11 @@ bool DBusConnection::removeObjectManagerSignalMemberHandler(const std::string& d size_t& dbusSignalMatchRuleReferenceCount = dbusSignalMatchRuleIterator->second; - assert(dbusSignalMatchRuleReferenceCount > 0); - dbusSignalMatchRuleReferenceCount--; + if (0 == dbusSignalMatchRuleReferenceCount) { + COMMONAPI_ERROR(std::string(__FUNCTION__), "ref count == 0"); + } else { + dbusSignalMatchRuleReferenceCount--; + } const bool isLastDBusSignalMatchRuleReference = (dbusSignalMatchRuleReferenceCount == 0); if (isLastDBusSignalMatchRuleReference) { @@ -1083,8 +1266,14 @@ bool DBusConnection::removeLibdbusSignalMatchRule(const std::string& dbusMatchRu } void DBusConnection::registerObjectPath(const std::string& objectPath) { - assert(!objectPath.empty()); - assert(objectPath[0] == '/'); + if (objectPath.empty()) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " empty objectPath"); + return; + } + if ('/' != objectPath[0]) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " invalid objectPath ", objectPath); + return; + } auto handlerIterator = libdbusRegisteredObjectPaths_.find(objectPath); const bool foundRegisteredObjectPathHandler = handlerIterator != libdbusRegisteredObjectPaths_.end(); @@ -1104,22 +1293,33 @@ void DBusConnection::registerObjectPath(const std::string& objectPath) { getDBusObjectPathVTable(), this, &dbusError.libdbusError_); - assert(libdbusSuccess); - assert(!dbusError); - (void)libdbusSuccess; - (void)dbusError; + + if (!libdbusSuccess) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " dbus_connection_try_register_object_path failed for ", objectPath); + } + if (dbusError) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " name: ", dbusError.getName(), " message: ", dbusError.getMessage()); + } } } void DBusConnection::unregisterObjectPath(const std::string& objectPath) { - assert(!objectPath.empty()); - assert(objectPath[0] == '/'); + if (objectPath.empty()) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " empty objectPath"); + return; + } + if ('/' != objectPath[0]) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " invalid objectPath ", objectPath); + return; + } auto handlerIterator = libdbusRegisteredObjectPaths_.find(objectPath); const bool foundRegisteredObjectPathHandler = handlerIterator != libdbusRegisteredObjectPaths_.end(); - assert(foundRegisteredObjectPathHandler); - (void)foundRegisteredObjectPathHandler; + if (!foundRegisteredObjectPathHandler) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " no handler found for ", objectPath); + return; + } uint32_t& referenceCount = handlerIterator->second; if (referenceCount > 1) { @@ -1130,10 +1330,11 @@ void DBusConnection::unregisterObjectPath(const std::string& objectPath) { libdbusRegisteredObjectPaths_.erase(handlerIterator); if (isConnected()) { - dbus_bool_t libdbusSuccess + const dbus_bool_t libdbusSuccess = dbus_connection_unregister_object_path(connection_, objectPath.c_str()); - assert(libdbusSuccess); - (void)libdbusSuccess; + if (!libdbusSuccess) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " dbus_connection_unregister_object_path failed for ", objectPath); + } } } @@ -1166,8 +1367,10 @@ void DBusConnection::addLibdbusSignalMatchRule(const std::string& objectPath, auto success = dbusSignalMatchRulesMap_.insert( DBusSignalMatchRulesMap::value_type(dbusSignalMatchRuleTuple, DBusSignalMatchRuleMapping(1, matchRuleString))); - assert(success.second); - (void)success; + if (!success.second) { + COMMONAPI_ERROR(std::string(__FUNCTION__), "dbusSignalMatchRulesMap_.insert failed ", matchRuleString); + return; + } if (isConnected()) { bool libdbusSuccess = true; @@ -1180,7 +1383,9 @@ void DBusConnection::addLibdbusSignalMatchRule(const std::string& objectPath, &onLibdbusSignalFilterThunk, this, NULL); - assert(libdbusSuccess); + if (!libdbusSuccess) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " dbus_connection_add_filter() failed"); + } } if (!justAddFilter) @@ -1188,8 +1393,10 @@ void DBusConnection::addLibdbusSignalMatchRule(const std::string& objectPath, // finally add the match rule DBusError dbusError; dbus_bus_add_match(connection_, matchRuleString.c_str(), &dbusError.libdbusError_); - assert(!dbusError); - (void)dbusError; + + if (dbusError) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " name: ", dbusError.getName(), " message: ", dbusError.getMessage()); + } } if (libdbusSuccess) { @@ -1206,8 +1413,10 @@ void DBusConnection::removeLibdbusSignalMatchRule(const std::string& objectPath, auto matchRuleIterator = dbusSignalMatchRulesMap_.find(dbusSignalMatchRuleTuple); const bool matchRuleFound = matchRuleIterator != dbusSignalMatchRulesMap_.end(); - assert(matchRuleFound); - (void)matchRuleFound; + if (!matchRuleFound) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " no match rule found for path: ", objectPath, + "interface: ", interfaceName, " member: ", interfaceMemberName); + } uint32_t& matchRuleReferenceCount = matchRuleIterator->second.first; if (matchRuleReferenceCount > 1) { @@ -1218,15 +1427,19 @@ void DBusConnection::removeLibdbusSignalMatchRule(const std::string& objectPath, if (isConnected()) { const std::string& matchRuleString = matchRuleIterator->second.second; const bool libdbusSuccess = removeLibdbusSignalMatchRule(matchRuleString); - assert(libdbusSuccess); - (void)libdbusSuccess; + if (!libdbusSuccess) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " removeLibdbusSignalMatchRule failed ", matchRuleString); + } } dbusSignalMatchRulesMap_.erase(matchRuleIterator); } void DBusConnection::initLibdbusObjectPathHandlerAfterConnect() { - assert(isConnected()); + if (!isConnected()) { + COMMONAPI_ERROR(std::string(__FUNCTION__), "not connected"); + return; + } // nothing to do if there aren't any registered object path handlers if (libdbusRegisteredObjectPaths_.empty()) { @@ -1248,37 +1461,46 @@ void DBusConnection::initLibdbusObjectPathHandlerAfterConnect() { getDBusObjectPathVTable(), this, &dbusError.libdbusError_); - assert(libdbusSuccess); - (void)libdbusSuccess; - - assert(!dbusError); + if (!libdbusSuccess) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " dbus_connection_try_register_object_path(", objectPath , ") failed "); + } + if (dbusError) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " name: ", dbusError.getName(), " message: ", dbusError.getMessage()); + } } } void DBusConnection::initLibdbusSignalFilterAfterConnect() { - assert(isConnected()); + if (!isConnected()) { + COMMONAPI_ERROR(std::string(__FUNCTION__), "not connected"); + return; + } // proxy/stub match rules for (const auto& dbusSignalMatchRuleIterator : dbusSignalMatchRulesMap_) { const auto& dbusSignalMatchRuleMapping = dbusSignalMatchRuleIterator.second; const std::string& dbusMatchRuleString = dbusSignalMatchRuleMapping.second; const bool libdbusSuccess = addLibdbusSignalMatchRule(dbusMatchRuleString); - assert(libdbusSuccess); - (void)libdbusSuccess; + if (!libdbusSuccess) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " addLibdbusSignalMatchRule(", dbusMatchRuleString , ") failed "); + } } // object manager match rules (see DBusServiceRegistry) for (const auto& dbusObjectManagerSignalMatchRuleIterator : dbusObjectManagerSignalMatchRulesMap_) { const std::string& dbusBusName = dbusObjectManagerSignalMatchRuleIterator.first; const bool libdbusSuccess = addObjectManagerSignalMatchRule(dbusBusName); - assert(libdbusSuccess); - (void)libdbusSuccess; + if (!libdbusSuccess) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " addObjectManagerSignalMatchRule(", dbusBusName , ") failed "); + } } } ::DBusHandlerResult DBusConnection::onLibdbusObjectPathMessage(::DBusMessage* libdbusMessage) { - assert(libdbusMessage); - (void)libdbusMessage; + if (NULL == libdbusMessage) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " libdbusMessage == NULL"); + return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; + } // handle only method call messages if (dbus_message_get_type(libdbusMessage) != DBUS_MESSAGE_TYPE_METHOD_CALL) { @@ -1299,7 +1521,6 @@ void notifyDBusSignalHandlers(DBusSignalHandlersTable& dbusSignalHandlerstable, return; } - signalEntry->second.first->lock(); auto handlerEntry = signalEntry->second.second.begin(); while (handlerEntry != signalEntry->second.second.end()) { DBusProxyConnection::DBusSignalHandler* dbusSignalHandler = *handlerEntry; @@ -1307,7 +1528,6 @@ void notifyDBusSignalHandlers(DBusSignalHandlersTable& dbusSignalHandlerstable, handlerEntry++; } dbusHandlerResult = DBUS_HANDLER_RESULT_HANDLED; - signalEntry->second.first->unlock(); } template<typename DBusSignalHandlersTable> @@ -1329,7 +1549,10 @@ void notifyDBusOMSignalHandlers(DBusSignalHandlersTable& dbusSignalHandlerstable } ::DBusHandlerResult DBusConnection::onLibdbusSignalFilter(::DBusMessage* libdbusMessage) { - assert(libdbusMessage); + if (NULL == libdbusMessage) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " libdbusMessage == NULL"); + return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; + } auto selfReference = this->shared_from_this(); @@ -1343,10 +1566,10 @@ void notifyDBusOMSignalHandlers(DBusSignalHandlersTable& dbusSignalHandlerstable const char* interfaceMemberName = dbus_message_get_member(libdbusMessage); const char* interfaceMemberSignature = dbus_message_get_signature(libdbusMessage); - assert(objectPath); - assert(interfaceName); - assert(interfaceMemberName); - assert(interfaceMemberSignature); + if (NULL == objectPath || NULL == interfaceName || NULL == interfaceMemberName || NULL == interfaceMemberSignature ) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " invalid message"); + return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; + } DBusMessage dbusMessage(libdbusMessage); ::DBusHandlerResult dbusHandlerResult = DBUS_HANDLER_RESULT_NOT_YET_HANDLED; @@ -1357,14 +1580,23 @@ void notifyDBusOMSignalHandlers(DBusSignalHandlersTable& dbusSignalHandlerstable interfaceName, interfaceMemberName, interfaceMemberSignature)); + + if(signalEntry != dbusSignalHandlerTable_.end()) + signalEntry->second.first->lock(); signalGuard_.unlock(); notifyDBusSignalHandlers(dbusSignalHandlerTable_, signalEntry, dbusMessage, dbusHandlerResult); + if(signalEntry != dbusSignalHandlerTable_.end()) + signalEntry->second.first->unlock(); + if (dbusMessage.hasInterfaceName("org.freedesktop.DBus.ObjectManager")) { const char* dbusSenderName = dbusMessage.getSender(); - assert(dbusSenderName); + if (NULL == dbusSenderName) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " dbusSenderName == NULL"); + return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; + } dbusObjectManagerSignalGuard_.lock(); auto dbusObjectManagerSignalHandlerIteratorPair = dbusObjectManagerSignalHandlerTable_.equal_range(dbusSenderName); @@ -1381,26 +1613,50 @@ void notifyDBusOMSignalHandlers(DBusSignalHandlersTable& dbusSignalHandlerstable ::DBusHandlerResult DBusConnection::onLibdbusSignalFilterThunk(::DBusConnection *_dbusConnection, ::DBusMessage* libdbusMessage, void* userData) { - assert(_dbusConnection); - assert(libdbusMessage); - assert(userData); - (void)_dbusConnection; + if (NULL == _dbusConnection) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " _dbusConnection == NULL"); + return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; + } + if (NULL == libdbusMessage) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " libdbusMessage == NULL"); + return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; + } + if (NULL == userData) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " userData == NULL"); + return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; + } DBusConnection* dbusConnection = reinterpret_cast<DBusConnection*>(userData); - assert(dbusConnection->connection_ == _dbusConnection); + if (dbusConnection->connection_ != _dbusConnection) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " wrong connection!?"); + return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; + } + return dbusConnection->onLibdbusSignalFilter(libdbusMessage); } ::DBusHandlerResult DBusConnection::onLibdbusObjectPathMessageThunk(::DBusConnection *_dbusConnection, ::DBusMessage* libdbusMessage, void* userData) { - assert(_dbusConnection); - assert(libdbusMessage); - assert(userData); - (void)_dbusConnection; + if (NULL == _dbusConnection) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " _dbusConnection == NULL"); + return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; + } + if (NULL == libdbusMessage) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " libdbusMessage == NULL"); + return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; + } + if (NULL == userData) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " userData == NULL"); + return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; + } DBusConnection* dbusConnection = reinterpret_cast<DBusConnection*>(userData); - assert(dbusConnection->connection_ == _dbusConnection); + if (dbusConnection->connection_ != _dbusConnection) { + COMMONAPI_ERROR(std::string(__FUNCTION__), " wrong connection!?"); + return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; + } + return dbusConnection->onLibdbusObjectPathMessage(libdbusMessage); } @@ -1412,5 +1668,15 @@ std::shared_ptr<DBusConnection> DBusConnection::wrap(::DBusConnection *_connecti return std::make_shared<DBusConnection>(_connection, _connectionId); } +void DBusConnection::pushDBusMessageReply(const DBusMessage& _reply, + std::unique_ptr<DBusMessageReplyAsyncHandler> _dbusMessageReplyAsyncHandler) { + // push message to the message queue + DBusMessageReplyAsyncHandler* replyAsyncHandler = _dbusMessageReplyAsyncHandler.release(); + replyAsyncHandler->setHasToBeDeleted(); + std::shared_ptr<DBusMessageWatch::MsgReplyQueueEntry> msgReplyQueueEntry = std::make_shared<DBusMessageWatch::MsgReplyQueueEntry>( + replyAsyncHandler, _reply); + msgWatch_->pushMsgQueue(msgReplyQueueEntry); +} + } // namespace DBus } // namespace CommonAPI |