diff options
author | Philip Rauwolf <rauwolf@itestra.de> | 2013-09-10 15:33:03 +0200 |
---|---|---|
committer | Philip Rauwolf <rauwolf@itestra.de> | 2013-09-10 15:33:03 +0200 |
commit | 8dbea02e1879c1c4be481999126ec0b522c05fb5 (patch) | |
tree | b826a00e79c0a173e32c81f198bdc2d888d127f3 /src | |
parent | 4bd27d7f06d1141e338adb11e0807f95be6412be (diff) | |
download | genivi-common-api-dbus-runtime-8dbea02e1879c1c4be481999126ec0b522c05fb5.tar.gz |
Fixed some subtle bugs.
Two pointers to temporaries, one testcase, plus the possibility to
destruct the Connection from within the handling of timed out calls.
Diffstat (limited to 'src')
-rw-r--r-- | src/CommonAPI/DBus/DBusConnection.cpp | 113 | ||||
-rw-r--r-- | src/CommonAPI/DBus/DBusConnection.h | 6 | ||||
-rw-r--r-- | src/test/DBusSelectiveBroadcastTest.cpp | 67 |
3 files changed, 106 insertions, 80 deletions
diff --git a/src/CommonAPI/DBus/DBusConnection.cpp b/src/CommonAPI/DBus/DBusConnection.cpp index 9ac9cee..c867136 100644 --- a/src/CommonAPI/DBus/DBusConnection.cpp +++ b/src/CommonAPI/DBus/DBusConnection.cpp @@ -89,7 +89,7 @@ DBusConnection::DBusConnection(BusType busType) : connectionNameCount_(), dispatchSource_(), mainLoopContext_(std::shared_ptr<MainLoopContext>(NULL)), - enforcerThread(NULL) { + enforcerThread_(NULL) { dbus_threads_init_default(); } @@ -105,7 +105,7 @@ DBusConnection::DBusConnection(::DBusConnection* libDbusConnection) : connectionNameCount_(), dispatchSource_(), mainLoopContext_(std::shared_ptr<MainLoopContext>(NULL)), - enforcerThread(NULL) { + enforcerThread_(NULL) { dbus_threads_init_default(); } @@ -118,7 +118,7 @@ bool DBusConnection::isObjectPathMessageHandlerSet() { } DBusConnection::~DBusConnection() { - if(auto lockedContext = mainLoopContext_.lock()) { + if (auto lockedContext = mainLoopContext_.lock()) { dbus_connection_set_watch_functions(libdbusConnection_, NULL, NULL, NULL, NULL, NULL); dbus_connection_set_timeout_functions(libdbusConnection_, NULL, NULL, NULL, NULL, NULL); @@ -128,6 +128,29 @@ DBusConnection::~DBusConnection() { } disconnect(); + + //Assert that the enforcerThread_ is in a position to finish itself correctly even after destruction + //of the DBusConnection. Also assert all resources are cleaned up. + if (enforcerThread_) { + enforceTimeoutMutex_.lock(); + + auto it = timeoutMap_.begin(); + while (it != timeoutMap_.end()) { + DBusPendingCall* libdbusPendingCall = it->first; + + if (!dbus_pending_call_get_completed(libdbusPendingCall)) { + dbus_pending_call_cancel(libdbusPendingCall); + DBusMessageReplyAsyncHandler* asyncHandler = std::get<1>(it->second); + DBusMessage& dbusMessageCall = std::get<2>(it->second); + asyncHandler->onDBusMessageReply(CallStatus::REMOTE_ERROR, dbusMessageCall.createMethodError(DBUS_ERROR_TIMEOUT)); + delete asyncHandler; + } + it = timeoutMap_.erase(it); + dbus_pending_call_unref(libdbusPendingCall); + } + + enforceTimeoutMutex_.unlock(); + } } @@ -453,50 +476,62 @@ void DBusConnection::onLibdbusDataCleanup(void* userData) { //Would not be needed if libdbus would actually handle its timeouts for pending calls. void DBusConnection::enforceAsynchronousTimeouts() const { - enforeTimeoutMutex.lock(); + enforceTimeoutMutex_.lock(); - while (!timeoutMap.empty()) { - auto minTimeoutElement = std::min_element(timeoutMap.begin(), timeoutMap.end(), + //Assert that we DO have a reference to the executing thread, even if the DBusConnection is destroyed. + //We need it to assess whether we still may access the members of the DBusConnection. + std::shared_ptr<std::thread> threadPtr = enforcerThread_; + + while (!timeoutMap_.empty()) { + auto minTimeoutElement = std::min_element(timeoutMap_.begin(), timeoutMap_.end(), [] (const TimeoutMapElement& lhs, const TimeoutMapElement& rhs) { return std::get<0>(lhs.second) < std::get<0>(rhs.second); }); int minTimeout = std::get<0>(minTimeoutElement->second); - enforeTimeoutMutex.unlock(); + enforceTimeoutMutex_.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(minTimeout)); - enforeTimeoutMutex.lock(); - - for (auto it = timeoutMap.begin(); it != timeoutMap.end(); ) { - int& currentTimeout = std::get<0>(it->second); - currentTimeout -= minTimeout; - if (currentTimeout <= 0) { - DBusPendingCall* libdbusPendingCall = it->first; - - if (!dbus_pending_call_get_completed(libdbusPendingCall)) { - dbus_pending_call_cancel(libdbusPendingCall); - DBusMessageReplyAsyncHandler* asyncHandler = std::get<1>(it->second); - DBusMessage& dbusMessageCall = std::get<2>(it->second); - asyncHandler->onDBusMessageReply(CallStatus::REMOTE_ERROR, dbusMessageCall.createMethodError(DBUS_ERROR_TIMEOUT)); - delete asyncHandler; + //Do not access members if the DBusConnection was destroyed during the unlocked phase. + if (!threadPtr.unique()) { + enforceTimeoutMutex_.lock(); + auto it = timeoutMap_.begin(); + while (!threadPtr.unique() && it != timeoutMap_.end()) { + int& currentTimeout = std::get<0>(it->second); + currentTimeout -= minTimeout; + if (currentTimeout <= 0) { + DBusPendingCall* libdbusPendingCall = it->first; + + if (!dbus_pending_call_get_completed(libdbusPendingCall)) { + dbus_pending_call_cancel(libdbusPendingCall); + DBusMessageReplyAsyncHandler* asyncHandler = std::get<1>(it->second); + DBusMessage& dbusMessageCall = std::get<2>(it->second); + asyncHandler->onDBusMessageReply(CallStatus::REMOTE_ERROR, dbusMessageCall.createMethodError(DBUS_ERROR_TIMEOUT)); + delete asyncHandler; + } + it = timeoutMap_.erase(it); + + //This unref MIGHT cause the destruction of the last callback object that references the DBusConnection. + //So after this unref has been called, it has to be ensured that continuation of the loop is an option. + dbus_pending_call_unref(libdbusPendingCall); + } else { + ++it; } - dbus_pending_call_unref(libdbusPendingCall); - it = timeoutMap.erase(it); - } else { - ++it; } } } - //Must be atomic with respect to local threading - auto threadPtr = enforcerThread; - enforcerThread = NULL; - enforeTimeoutMutex.unlock(); + //Normally there is at least the member of DBusConnection plus the local copy of this pointer. + //If the local copy is the only one remaining, we have to assume that the DBusConnection was + //destroyed and therefore we must no longer access its members. + if (!threadPtr.unique()) { + enforcerThread_.reset(); + enforceTimeoutMutex_.unlock(); + } threadPtr->detach(); - delete threadPtr; } std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync( @@ -539,12 +574,12 @@ std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync( dbus_pending_call_ref(libdbusPendingCall); std::tuple<int, DBusMessageReplyAsyncHandler*, DBusMessage> toInsert {timeoutMilliseconds, replyAsyncHandler, dbusMessage}; - enforeTimeoutMutex.lock(); - timeoutMap.insert( {libdbusPendingCall, toInsert } ); - if (!enforcerThread) { - enforcerThread = new std::thread(std::bind(&DBusConnection::enforceAsynchronousTimeouts, this->shared_from_this())); + enforceTimeoutMutex_.lock(); + timeoutMap_.insert( {libdbusPendingCall, toInsert } ); + if (!enforcerThread_) { + enforcerThread_ = std::make_shared<std::thread>(std::bind(&DBusConnection::enforceAsynchronousTimeouts, this->shared_from_this())); } - enforeTimeoutMutex.unlock(); + enforceTimeoutMutex_.unlock(); } return replyAsyncHandler->getFuture(); @@ -593,13 +628,13 @@ DBusProxyConnection::DBusSignalHandlerToken DBusConnection::subscribeForSelectiv DBusSignalHandler* dbusSignalHandler, DBusProxy* callingProxy) { - const char* methodName = ("subscribeFor" + interfaceMemberName + "Selective").c_str(); + std::string methodName = "subscribeFor" + interfaceMemberName + "Selective"; subscriptionAccepted = false; CommonAPI::CallStatus callStatus; DBusProxyHelper<CommonAPI::DBus::DBusSerializableArguments<>, CommonAPI::DBus::DBusSerializableArguments<bool>>::callMethodWithReply( - *callingProxy, methodName, "", callStatus, subscriptionAccepted); + *callingProxy, methodName.c_str(), "", callStatus, subscriptionAccepted); DBusProxyConnection::DBusSignalHandlerToken subscriptionToken; @@ -625,11 +660,11 @@ void DBusConnection::unsubsribeFromSelectiveBroadcast(const std::string& eventNa if (lastListenerOnConnectionRemoved) { // send unsubscribe message to stub - const char* methodName = ("unsubscribeFrom" + eventName + "Selective").c_str(); + std::string methodName = "unsubscribeFrom" + eventName + "Selective"; CommonAPI::CallStatus callStatus; DBusProxyHelper<CommonAPI::DBus::DBusSerializableArguments<>, CommonAPI::DBus::DBusSerializableArguments<>>::callMethodWithReply( - *callingProxy, methodName, "", callStatus); + *callingProxy, methodName.c_str(), "", callStatus); } } diff --git a/src/CommonAPI/DBus/DBusConnection.h b/src/CommonAPI/DBus/DBusConnection.h index 61c7418..9ee5588 100644 --- a/src/CommonAPI/DBus/DBusConnection.h +++ b/src/CommonAPI/DBus/DBusConnection.h @@ -209,9 +209,9 @@ class DBusConnection: public DBusProxyConnection, public std::enable_shared_from mutable std::unordered_map<std::string, uint16_t> connectionNameCount_; typedef std::pair<DBusPendingCall*, std::tuple<int, DBusMessageReplyAsyncHandler*, DBusMessage> > TimeoutMapElement; - mutable std::map<DBusPendingCall*, std::tuple<int, DBusMessageReplyAsyncHandler*, DBusMessage>> timeoutMap; - mutable std::thread* enforcerThread; - mutable std::mutex enforeTimeoutMutex; + mutable std::map<DBusPendingCall*, std::tuple<int, DBusMessageReplyAsyncHandler*, DBusMessage>> timeoutMap_; + mutable std::shared_ptr<std::thread> enforcerThread_; + mutable std::mutex enforceTimeoutMutex_; }; std::shared_ptr<DBusConnection> DBusConnection::getBus(const BusType& busType) { diff --git a/src/test/DBusSelectiveBroadcastTest.cpp b/src/test/DBusSelectiveBroadcastTest.cpp index 8f5df33..4b2cb25 100644 --- a/src/test/DBusSelectiveBroadcastTest.cpp +++ b/src/test/DBusSelectiveBroadcastTest.cpp @@ -58,7 +58,7 @@ public: const std::shared_ptr<CommonAPI::ClientId> clientId, const CommonAPI::SelectiveBroadcastSubscriptionEvent event) { - if(event == CommonAPI::SelectiveBroadcastSubscriptionEvent::SUBSCRIBED) + if (event == CommonAPI::SelectiveBroadcastSubscriptionEvent::SUBSCRIBED) lastSubscribedClient = clientId; } @@ -118,11 +118,9 @@ protected: } virtual void TearDown() { + servicePublisher_->unregisterService(serviceAddress_); } - - - std::shared_ptr<CommonAPI::Runtime> runtime_; std::shared_ptr<CommonAPI::Factory> proxyFactory_; std::shared_ptr<CommonAPI::Factory> proxyFactory2_; @@ -130,9 +128,6 @@ protected: std::shared_ptr<CommonAPI::ServicePublisher> servicePublisher_; static const std::string serviceAddress_; - static const std::string serviceAddress2_; - static const std::string serviceAddress3_; - static const std::string serviceAddress4_; static const std::string nonstandardAddress_; int selectiveBroadcastArrivedAtProxyFromSameFactory1; @@ -156,27 +151,23 @@ public: }; const std::string DBusSelectiveBroadcastTest::serviceAddress_ = "local:CommonAPI.DBus.tests.DBusProxyTestInterface:CommonAPI.DBus.tests.DBusProxyTestService"; -const std::string DBusSelectiveBroadcastTest::serviceAddress2_ = "local:CommonAPI.DBus.tests.DBusProxyTestInterface:CommonAPI.DBus.tests.DBusProxyTestService2"; -const std::string DBusSelectiveBroadcastTest::serviceAddress3_ = "local:CommonAPI.DBus.tests.DBusProxyTestInterface:CommonAPI.DBus.tests.DBusProxyTestService3"; -const std::string DBusSelectiveBroadcastTest::serviceAddress4_ = "local:CommonAPI.DBus.tests.DBusProxyTestInterface:CommonAPI.DBus.tests.DBusProxyTestService4"; const std::string DBusSelectiveBroadcastTest::nonstandardAddress_ = "local:non.standard.ServiceName:non.standard.participand.ID"; TEST_F(DBusSelectiveBroadcastTest, ProxysCanSubscribe) { - auto proxyFromSameFactory1 = proxyFactory_->buildProxy<commonapi::tests::TestInterfaceProxy>(serviceAddress4_); + auto proxyFromSameFactory1 = proxyFactory_->buildProxy<commonapi::tests::TestInterfaceProxy>(serviceAddress_); ASSERT_TRUE((bool)proxyFromSameFactory1); - auto proxyFromSameFactory2 = proxyFactory_->buildProxy<commonapi::tests::TestInterfaceProxy>(serviceAddress4_); + auto proxyFromSameFactory2 = proxyFactory_->buildProxy<commonapi::tests::TestInterfaceProxy>(serviceAddress_); ASSERT_TRUE((bool)proxyFromSameFactory2); - auto proxyFromOtherFactory = proxyFactory2_->buildProxy<commonapi::tests::TestInterfaceProxy>(serviceAddress4_); + auto proxyFromOtherFactory = proxyFactory2_->buildProxy<commonapi::tests::TestInterfaceProxy>(serviceAddress_); ASSERT_TRUE((bool)proxyFromOtherFactory); auto stub = std::make_shared<SelectiveBroadcastSender>(); - - bool serviceRegistered = servicePublisher_->registerService(stub, serviceAddress4_, stubFactory_); + bool serviceRegistered = servicePublisher_->registerService(stub, serviceAddress_, stubFactory_); for (unsigned int i = 0; !serviceRegistered && i < 100; ++i) { - serviceRegistered = servicePublisher_->registerService(stub, serviceAddress4_, stubFactory_); + serviceRegistered = servicePublisher_->registerService(stub, serviceAddress_, stubFactory_); usleep(10000); } ASSERT_TRUE(serviceRegistered); @@ -186,24 +177,26 @@ TEST_F(DBusSelectiveBroadcastTest, ProxysCanSubscribe) } ASSERT_TRUE(proxyFromSameFactory1->isAvailable()); - auto subscriptionResult1 = proxyFromSameFactory1->getTestSelectiveBroadcastSelectiveEvent().subscribe(std::bind(&DBusSelectiveBroadcastTest::selectiveBroadcastCallbackForProxyFromSameFactory1, this)); + auto subscriptionResult1 = proxyFromSameFactory1->getTestSelectiveBroadcastSelectiveEvent().subscribe( + std::bind(&DBusSelectiveBroadcastTest::selectiveBroadcastCallbackForProxyFromSameFactory1, this)); + usleep(20000); ASSERT_EQ(stub->getNumberOfSubscribedClients(), 1); stub->send(); - usleep(20000); - ASSERT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory1, 1); - ASSERT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory2, 0); + usleep(200000); + EXPECT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory1, 1); + EXPECT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory2, 0); auto subscriptionResult2 = proxyFromSameFactory2->getTestSelectiveBroadcastSelectiveEvent().subscribe(std::bind(&DBusSelectiveBroadcastTest::selectiveBroadcastCallbackForProxyFromSameFactory2, this)); ASSERT_EQ(stub->getNumberOfSubscribedClients(), 1); // should still be one because these were created by the same factory thus using the same connection stub->send(); - usleep(20000); - ASSERT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory1, 2); - ASSERT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory2, 1); + usleep(200000); + EXPECT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory1, 2); + EXPECT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory2, 1); auto subscriptionResult3 = proxyFromOtherFactory->getTestSelectiveBroadcastSelectiveEvent().subscribe(std::bind(&DBusSelectiveBroadcastTest::selectiveBroadcastCallbackForProxyFromOtherFactory, this)); ASSERT_EQ(stub->getNumberOfSubscribedClients(), 2); // should still be two because proxyFromSameFactory1_ is still subscribed @@ -212,36 +205,34 @@ TEST_F(DBusSelectiveBroadcastTest, ProxysCanSubscribe) ASSERT_EQ(stub->getNumberOfSubscribedClients(), 2); // should still be two because proxyFromSameFactory1_ is still subscribed stub->send(); - usleep(20000); - ASSERT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory1, 3); - ASSERT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory2, 1); - ASSERT_EQ(selectiveBroadcastArrivedAtProxyFromOtherFactory, 1); + usleep(200000); + EXPECT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory1, 3); + EXPECT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory2, 1); + EXPECT_EQ(selectiveBroadcastArrivedAtProxyFromOtherFactory, 1); // now only the last subscribed client (which is the one from the other factory) should receive the signal stub->sendToLastSubscribedClient(); - usleep(20000); - ASSERT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory1, 3); - ASSERT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory2, 1); - ASSERT_EQ(selectiveBroadcastArrivedAtProxyFromOtherFactory, 2); + usleep(200000); + EXPECT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory1, 3); + EXPECT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory2, 1); + EXPECT_EQ(selectiveBroadcastArrivedAtProxyFromOtherFactory, 2); proxyFromSameFactory1->getTestSelectiveBroadcastSelectiveEvent().unsubscribe(subscriptionResult1); - ASSERT_EQ(stub->getNumberOfSubscribedClients(), 1); - - servicePublisher_->unregisterService(serviceAddress4_); + EXPECT_EQ(stub->getNumberOfSubscribedClients(), 1); } TEST_F(DBusSelectiveBroadcastTest, ProxysCanBeRejected) { - auto proxyFromSameFactory1 = proxyFactory_->buildProxy<commonapi::tests::TestInterfaceProxy>(serviceAddress4_); + auto proxyFromSameFactory1 = proxyFactory_->buildProxy<commonapi::tests::TestInterfaceProxy>(serviceAddress_); ASSERT_TRUE((bool)proxyFromSameFactory1); - auto proxyFromOtherFactory = proxyFactory2_->buildProxy<commonapi::tests::TestInterfaceProxy>(serviceAddress4_); + auto proxyFromOtherFactory = proxyFactory2_->buildProxy<commonapi::tests::TestInterfaceProxy>(serviceAddress_); ASSERT_TRUE((bool)proxyFromOtherFactory); auto stub = std::make_shared<SelectiveBroadcastSender>(); - bool serviceRegistered = servicePublisher_->registerService(stub, serviceAddress4_, stubFactory_); + bool serviceRegistered = servicePublisher_->registerService(stub, serviceAddress_, stubFactory_); for (unsigned int i = 0; !serviceRegistered && i < 100; ++i) { - serviceRegistered = servicePublisher_->registerService(stub, serviceAddress4_, stubFactory_); + serviceRegistered = servicePublisher_->registerService(stub, serviceAddress_, stubFactory_); usleep(10000); } ASSERT_TRUE(serviceRegistered); |