summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPhilip Rauwolf <rauwolf@itestra.de>2013-09-10 15:33:03 +0200
committerPhilip Rauwolf <rauwolf@itestra.de>2013-09-10 15:33:03 +0200
commit8dbea02e1879c1c4be481999126ec0b522c05fb5 (patch)
treeb826a00e79c0a173e32c81f198bdc2d888d127f3 /src
parent4bd27d7f06d1141e338adb11e0807f95be6412be (diff)
downloadgenivi-common-api-dbus-runtime-8dbea02e1879c1c4be481999126ec0b522c05fb5.tar.gz
Fixed some subtle bugs.
Two pointers to temporaries, one testcase, plus the possibility to destruct the Connection from within the handling of timed out calls.
Diffstat (limited to 'src')
-rw-r--r--src/CommonAPI/DBus/DBusConnection.cpp113
-rw-r--r--src/CommonAPI/DBus/DBusConnection.h6
-rw-r--r--src/test/DBusSelectiveBroadcastTest.cpp67
3 files changed, 106 insertions, 80 deletions
diff --git a/src/CommonAPI/DBus/DBusConnection.cpp b/src/CommonAPI/DBus/DBusConnection.cpp
index 9ac9cee..c867136 100644
--- a/src/CommonAPI/DBus/DBusConnection.cpp
+++ b/src/CommonAPI/DBus/DBusConnection.cpp
@@ -89,7 +89,7 @@ DBusConnection::DBusConnection(BusType busType) :
connectionNameCount_(),
dispatchSource_(),
mainLoopContext_(std::shared_ptr<MainLoopContext>(NULL)),
- enforcerThread(NULL) {
+ enforcerThread_(NULL) {
dbus_threads_init_default();
}
@@ -105,7 +105,7 @@ DBusConnection::DBusConnection(::DBusConnection* libDbusConnection) :
connectionNameCount_(),
dispatchSource_(),
mainLoopContext_(std::shared_ptr<MainLoopContext>(NULL)),
- enforcerThread(NULL) {
+ enforcerThread_(NULL) {
dbus_threads_init_default();
}
@@ -118,7 +118,7 @@ bool DBusConnection::isObjectPathMessageHandlerSet() {
}
DBusConnection::~DBusConnection() {
- if(auto lockedContext = mainLoopContext_.lock()) {
+ if (auto lockedContext = mainLoopContext_.lock()) {
dbus_connection_set_watch_functions(libdbusConnection_, NULL, NULL, NULL, NULL, NULL);
dbus_connection_set_timeout_functions(libdbusConnection_, NULL, NULL, NULL, NULL, NULL);
@@ -128,6 +128,29 @@ DBusConnection::~DBusConnection() {
}
disconnect();
+
+ //Assert that the enforcerThread_ is in a position to finish itself correctly even after destruction
+ //of the DBusConnection. Also assert all resources are cleaned up.
+ if (enforcerThread_) {
+ enforceTimeoutMutex_.lock();
+
+ auto it = timeoutMap_.begin();
+ while (it != timeoutMap_.end()) {
+ DBusPendingCall* libdbusPendingCall = it->first;
+
+ if (!dbus_pending_call_get_completed(libdbusPendingCall)) {
+ dbus_pending_call_cancel(libdbusPendingCall);
+ DBusMessageReplyAsyncHandler* asyncHandler = std::get<1>(it->second);
+ DBusMessage& dbusMessageCall = std::get<2>(it->second);
+ asyncHandler->onDBusMessageReply(CallStatus::REMOTE_ERROR, dbusMessageCall.createMethodError(DBUS_ERROR_TIMEOUT));
+ delete asyncHandler;
+ }
+ it = timeoutMap_.erase(it);
+ dbus_pending_call_unref(libdbusPendingCall);
+ }
+
+ enforceTimeoutMutex_.unlock();
+ }
}
@@ -453,50 +476,62 @@ void DBusConnection::onLibdbusDataCleanup(void* userData) {
//Would not be needed if libdbus would actually handle its timeouts for pending calls.
void DBusConnection::enforceAsynchronousTimeouts() const {
- enforeTimeoutMutex.lock();
+ enforceTimeoutMutex_.lock();
- while (!timeoutMap.empty()) {
- auto minTimeoutElement = std::min_element(timeoutMap.begin(), timeoutMap.end(),
+ //Assert that we DO have a reference to the executing thread, even if the DBusConnection is destroyed.
+ //We need it to assess whether we still may access the members of the DBusConnection.
+ std::shared_ptr<std::thread> threadPtr = enforcerThread_;
+
+ while (!timeoutMap_.empty()) {
+ auto minTimeoutElement = std::min_element(timeoutMap_.begin(), timeoutMap_.end(),
[] (const TimeoutMapElement& lhs, const TimeoutMapElement& rhs) {
return std::get<0>(lhs.second) < std::get<0>(rhs.second);
});
int minTimeout = std::get<0>(minTimeoutElement->second);
- enforeTimeoutMutex.unlock();
+ enforceTimeoutMutex_.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(minTimeout));
- enforeTimeoutMutex.lock();
-
- for (auto it = timeoutMap.begin(); it != timeoutMap.end(); ) {
- int& currentTimeout = std::get<0>(it->second);
- currentTimeout -= minTimeout;
- if (currentTimeout <= 0) {
- DBusPendingCall* libdbusPendingCall = it->first;
-
- if (!dbus_pending_call_get_completed(libdbusPendingCall)) {
- dbus_pending_call_cancel(libdbusPendingCall);
- DBusMessageReplyAsyncHandler* asyncHandler = std::get<1>(it->second);
- DBusMessage& dbusMessageCall = std::get<2>(it->second);
- asyncHandler->onDBusMessageReply(CallStatus::REMOTE_ERROR, dbusMessageCall.createMethodError(DBUS_ERROR_TIMEOUT));
- delete asyncHandler;
+ //Do not access members if the DBusConnection was destroyed during the unlocked phase.
+ if (!threadPtr.unique()) {
+ enforceTimeoutMutex_.lock();
+ auto it = timeoutMap_.begin();
+ while (!threadPtr.unique() && it != timeoutMap_.end()) {
+ int& currentTimeout = std::get<0>(it->second);
+ currentTimeout -= minTimeout;
+ if (currentTimeout <= 0) {
+ DBusPendingCall* libdbusPendingCall = it->first;
+
+ if (!dbus_pending_call_get_completed(libdbusPendingCall)) {
+ dbus_pending_call_cancel(libdbusPendingCall);
+ DBusMessageReplyAsyncHandler* asyncHandler = std::get<1>(it->second);
+ DBusMessage& dbusMessageCall = std::get<2>(it->second);
+ asyncHandler->onDBusMessageReply(CallStatus::REMOTE_ERROR, dbusMessageCall.createMethodError(DBUS_ERROR_TIMEOUT));
+ delete asyncHandler;
+ }
+ it = timeoutMap_.erase(it);
+
+ //This unref MIGHT cause the destruction of the last callback object that references the DBusConnection.
+ //So after this unref has been called, it has to be ensured that continuation of the loop is an option.
+ dbus_pending_call_unref(libdbusPendingCall);
+ } else {
+ ++it;
}
- dbus_pending_call_unref(libdbusPendingCall);
- it = timeoutMap.erase(it);
- } else {
- ++it;
}
}
}
- //Must be atomic with respect to local threading
- auto threadPtr = enforcerThread;
- enforcerThread = NULL;
- enforeTimeoutMutex.unlock();
+ //Normally there is at least the member of DBusConnection plus the local copy of this pointer.
+ //If the local copy is the only one remaining, we have to assume that the DBusConnection was
+ //destroyed and therefore we must no longer access its members.
+ if (!threadPtr.unique()) {
+ enforcerThread_.reset();
+ enforceTimeoutMutex_.unlock();
+ }
threadPtr->detach();
- delete threadPtr;
}
std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync(
@@ -539,12 +574,12 @@ std::future<CallStatus> DBusConnection::sendDBusMessageWithReplyAsync(
dbus_pending_call_ref(libdbusPendingCall);
std::tuple<int, DBusMessageReplyAsyncHandler*, DBusMessage> toInsert {timeoutMilliseconds, replyAsyncHandler, dbusMessage};
- enforeTimeoutMutex.lock();
- timeoutMap.insert( {libdbusPendingCall, toInsert } );
- if (!enforcerThread) {
- enforcerThread = new std::thread(std::bind(&DBusConnection::enforceAsynchronousTimeouts, this->shared_from_this()));
+ enforceTimeoutMutex_.lock();
+ timeoutMap_.insert( {libdbusPendingCall, toInsert } );
+ if (!enforcerThread_) {
+ enforcerThread_ = std::make_shared<std::thread>(std::bind(&DBusConnection::enforceAsynchronousTimeouts, this->shared_from_this()));
}
- enforeTimeoutMutex.unlock();
+ enforceTimeoutMutex_.unlock();
}
return replyAsyncHandler->getFuture();
@@ -593,13 +628,13 @@ DBusProxyConnection::DBusSignalHandlerToken DBusConnection::subscribeForSelectiv
DBusSignalHandler* dbusSignalHandler,
DBusProxy* callingProxy) {
- const char* methodName = ("subscribeFor" + interfaceMemberName + "Selective").c_str();
+ std::string methodName = "subscribeFor" + interfaceMemberName + "Selective";
subscriptionAccepted = false;
CommonAPI::CallStatus callStatus;
DBusProxyHelper<CommonAPI::DBus::DBusSerializableArguments<>,
CommonAPI::DBus::DBusSerializableArguments<bool>>::callMethodWithReply(
- *callingProxy, methodName, "", callStatus, subscriptionAccepted);
+ *callingProxy, methodName.c_str(), "", callStatus, subscriptionAccepted);
DBusProxyConnection::DBusSignalHandlerToken subscriptionToken;
@@ -625,11 +660,11 @@ void DBusConnection::unsubsribeFromSelectiveBroadcast(const std::string& eventNa
if (lastListenerOnConnectionRemoved) {
// send unsubscribe message to stub
- const char* methodName = ("unsubscribeFrom" + eventName + "Selective").c_str();
+ std::string methodName = "unsubscribeFrom" + eventName + "Selective";
CommonAPI::CallStatus callStatus;
DBusProxyHelper<CommonAPI::DBus::DBusSerializableArguments<>,
CommonAPI::DBus::DBusSerializableArguments<>>::callMethodWithReply(
- *callingProxy, methodName, "", callStatus);
+ *callingProxy, methodName.c_str(), "", callStatus);
}
}
diff --git a/src/CommonAPI/DBus/DBusConnection.h b/src/CommonAPI/DBus/DBusConnection.h
index 61c7418..9ee5588 100644
--- a/src/CommonAPI/DBus/DBusConnection.h
+++ b/src/CommonAPI/DBus/DBusConnection.h
@@ -209,9 +209,9 @@ class DBusConnection: public DBusProxyConnection, public std::enable_shared_from
mutable std::unordered_map<std::string, uint16_t> connectionNameCount_;
typedef std::pair<DBusPendingCall*, std::tuple<int, DBusMessageReplyAsyncHandler*, DBusMessage> > TimeoutMapElement;
- mutable std::map<DBusPendingCall*, std::tuple<int, DBusMessageReplyAsyncHandler*, DBusMessage>> timeoutMap;
- mutable std::thread* enforcerThread;
- mutable std::mutex enforeTimeoutMutex;
+ mutable std::map<DBusPendingCall*, std::tuple<int, DBusMessageReplyAsyncHandler*, DBusMessage>> timeoutMap_;
+ mutable std::shared_ptr<std::thread> enforcerThread_;
+ mutable std::mutex enforceTimeoutMutex_;
};
std::shared_ptr<DBusConnection> DBusConnection::getBus(const BusType& busType) {
diff --git a/src/test/DBusSelectiveBroadcastTest.cpp b/src/test/DBusSelectiveBroadcastTest.cpp
index 8f5df33..4b2cb25 100644
--- a/src/test/DBusSelectiveBroadcastTest.cpp
+++ b/src/test/DBusSelectiveBroadcastTest.cpp
@@ -58,7 +58,7 @@ public:
const std::shared_ptr<CommonAPI::ClientId> clientId,
const CommonAPI::SelectiveBroadcastSubscriptionEvent event) {
- if(event == CommonAPI::SelectiveBroadcastSubscriptionEvent::SUBSCRIBED)
+ if (event == CommonAPI::SelectiveBroadcastSubscriptionEvent::SUBSCRIBED)
lastSubscribedClient = clientId;
}
@@ -118,11 +118,9 @@ protected:
}
virtual void TearDown() {
+ servicePublisher_->unregisterService(serviceAddress_);
}
-
-
-
std::shared_ptr<CommonAPI::Runtime> runtime_;
std::shared_ptr<CommonAPI::Factory> proxyFactory_;
std::shared_ptr<CommonAPI::Factory> proxyFactory2_;
@@ -130,9 +128,6 @@ protected:
std::shared_ptr<CommonAPI::ServicePublisher> servicePublisher_;
static const std::string serviceAddress_;
- static const std::string serviceAddress2_;
- static const std::string serviceAddress3_;
- static const std::string serviceAddress4_;
static const std::string nonstandardAddress_;
int selectiveBroadcastArrivedAtProxyFromSameFactory1;
@@ -156,27 +151,23 @@ public:
};
const std::string DBusSelectiveBroadcastTest::serviceAddress_ = "local:CommonAPI.DBus.tests.DBusProxyTestInterface:CommonAPI.DBus.tests.DBusProxyTestService";
-const std::string DBusSelectiveBroadcastTest::serviceAddress2_ = "local:CommonAPI.DBus.tests.DBusProxyTestInterface:CommonAPI.DBus.tests.DBusProxyTestService2";
-const std::string DBusSelectiveBroadcastTest::serviceAddress3_ = "local:CommonAPI.DBus.tests.DBusProxyTestInterface:CommonAPI.DBus.tests.DBusProxyTestService3";
-const std::string DBusSelectiveBroadcastTest::serviceAddress4_ = "local:CommonAPI.DBus.tests.DBusProxyTestInterface:CommonAPI.DBus.tests.DBusProxyTestService4";
const std::string DBusSelectiveBroadcastTest::nonstandardAddress_ = "local:non.standard.ServiceName:non.standard.participand.ID";
TEST_F(DBusSelectiveBroadcastTest, ProxysCanSubscribe)
{
- auto proxyFromSameFactory1 = proxyFactory_->buildProxy<commonapi::tests::TestInterfaceProxy>(serviceAddress4_);
+ auto proxyFromSameFactory1 = proxyFactory_->buildProxy<commonapi::tests::TestInterfaceProxy>(serviceAddress_);
ASSERT_TRUE((bool)proxyFromSameFactory1);
- auto proxyFromSameFactory2 = proxyFactory_->buildProxy<commonapi::tests::TestInterfaceProxy>(serviceAddress4_);
+ auto proxyFromSameFactory2 = proxyFactory_->buildProxy<commonapi::tests::TestInterfaceProxy>(serviceAddress_);
ASSERT_TRUE((bool)proxyFromSameFactory2);
- auto proxyFromOtherFactory = proxyFactory2_->buildProxy<commonapi::tests::TestInterfaceProxy>(serviceAddress4_);
+ auto proxyFromOtherFactory = proxyFactory2_->buildProxy<commonapi::tests::TestInterfaceProxy>(serviceAddress_);
ASSERT_TRUE((bool)proxyFromOtherFactory);
auto stub = std::make_shared<SelectiveBroadcastSender>();
-
- bool serviceRegistered = servicePublisher_->registerService(stub, serviceAddress4_, stubFactory_);
+ bool serviceRegistered = servicePublisher_->registerService(stub, serviceAddress_, stubFactory_);
for (unsigned int i = 0; !serviceRegistered && i < 100; ++i) {
- serviceRegistered = servicePublisher_->registerService(stub, serviceAddress4_, stubFactory_);
+ serviceRegistered = servicePublisher_->registerService(stub, serviceAddress_, stubFactory_);
usleep(10000);
}
ASSERT_TRUE(serviceRegistered);
@@ -186,24 +177,26 @@ TEST_F(DBusSelectiveBroadcastTest, ProxysCanSubscribe)
}
ASSERT_TRUE(proxyFromSameFactory1->isAvailable());
- auto subscriptionResult1 = proxyFromSameFactory1->getTestSelectiveBroadcastSelectiveEvent().subscribe(std::bind(&DBusSelectiveBroadcastTest::selectiveBroadcastCallbackForProxyFromSameFactory1, this));
+ auto subscriptionResult1 = proxyFromSameFactory1->getTestSelectiveBroadcastSelectiveEvent().subscribe(
+ std::bind(&DBusSelectiveBroadcastTest::selectiveBroadcastCallbackForProxyFromSameFactory1, this));
+ usleep(20000);
ASSERT_EQ(stub->getNumberOfSubscribedClients(), 1);
stub->send();
- usleep(20000);
- ASSERT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory1, 1);
- ASSERT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory2, 0);
+ usleep(200000);
+ EXPECT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory1, 1);
+ EXPECT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory2, 0);
auto subscriptionResult2 = proxyFromSameFactory2->getTestSelectiveBroadcastSelectiveEvent().subscribe(std::bind(&DBusSelectiveBroadcastTest::selectiveBroadcastCallbackForProxyFromSameFactory2, this));
ASSERT_EQ(stub->getNumberOfSubscribedClients(), 1); // should still be one because these were created by the same factory thus using the same connection
stub->send();
- usleep(20000);
- ASSERT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory1, 2);
- ASSERT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory2, 1);
+ usleep(200000);
+ EXPECT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory1, 2);
+ EXPECT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory2, 1);
auto subscriptionResult3 = proxyFromOtherFactory->getTestSelectiveBroadcastSelectiveEvent().subscribe(std::bind(&DBusSelectiveBroadcastTest::selectiveBroadcastCallbackForProxyFromOtherFactory, this));
ASSERT_EQ(stub->getNumberOfSubscribedClients(), 2); // should still be two because proxyFromSameFactory1_ is still subscribed
@@ -212,36 +205,34 @@ TEST_F(DBusSelectiveBroadcastTest, ProxysCanSubscribe)
ASSERT_EQ(stub->getNumberOfSubscribedClients(), 2); // should still be two because proxyFromSameFactory1_ is still subscribed
stub->send();
- usleep(20000);
- ASSERT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory1, 3);
- ASSERT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory2, 1);
- ASSERT_EQ(selectiveBroadcastArrivedAtProxyFromOtherFactory, 1);
+ usleep(200000);
+ EXPECT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory1, 3);
+ EXPECT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory2, 1);
+ EXPECT_EQ(selectiveBroadcastArrivedAtProxyFromOtherFactory, 1);
// now only the last subscribed client (which is the one from the other factory) should receive the signal
stub->sendToLastSubscribedClient();
- usleep(20000);
- ASSERT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory1, 3);
- ASSERT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory2, 1);
- ASSERT_EQ(selectiveBroadcastArrivedAtProxyFromOtherFactory, 2);
+ usleep(200000);
+ EXPECT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory1, 3);
+ EXPECT_EQ(selectiveBroadcastArrivedAtProxyFromSameFactory2, 1);
+ EXPECT_EQ(selectiveBroadcastArrivedAtProxyFromOtherFactory, 2);
proxyFromSameFactory1->getTestSelectiveBroadcastSelectiveEvent().unsubscribe(subscriptionResult1);
- ASSERT_EQ(stub->getNumberOfSubscribedClients(), 1);
-
- servicePublisher_->unregisterService(serviceAddress4_);
+ EXPECT_EQ(stub->getNumberOfSubscribedClients(), 1);
}
TEST_F(DBusSelectiveBroadcastTest, ProxysCanBeRejected) {
- auto proxyFromSameFactory1 = proxyFactory_->buildProxy<commonapi::tests::TestInterfaceProxy>(serviceAddress4_);
+ auto proxyFromSameFactory1 = proxyFactory_->buildProxy<commonapi::tests::TestInterfaceProxy>(serviceAddress_);
ASSERT_TRUE((bool)proxyFromSameFactory1);
- auto proxyFromOtherFactory = proxyFactory2_->buildProxy<commonapi::tests::TestInterfaceProxy>(serviceAddress4_);
+ auto proxyFromOtherFactory = proxyFactory2_->buildProxy<commonapi::tests::TestInterfaceProxy>(serviceAddress_);
ASSERT_TRUE((bool)proxyFromOtherFactory);
auto stub = std::make_shared<SelectiveBroadcastSender>();
- bool serviceRegistered = servicePublisher_->registerService(stub, serviceAddress4_, stubFactory_);
+ bool serviceRegistered = servicePublisher_->registerService(stub, serviceAddress_, stubFactory_);
for (unsigned int i = 0; !serviceRegistered && i < 100; ++i) {
- serviceRegistered = servicePublisher_->registerService(stub, serviceAddress4_, stubFactory_);
+ serviceRegistered = servicePublisher_->registerService(stub, serviceAddress_, stubFactory_);
usleep(10000);
}
ASSERT_TRUE(serviceRegistered);