summaryrefslogtreecommitdiff
path: root/include/CommonAPI/Event.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'include/CommonAPI/Event.hpp')
-rw-r--r--include/CommonAPI/Event.hpp93
1 files changed, 57 insertions, 36 deletions
diff --git a/include/CommonAPI/Event.hpp b/include/CommonAPI/Event.hpp
index d2be59d..1d638e6 100644
--- a/include/CommonAPI/Event.hpp
+++ b/include/CommonAPI/Event.hpp
@@ -1,10 +1,10 @@
-// Copyright (C) 2013-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+// Copyright (C) 2013-2020 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
#if !defined (COMMONAPI_INTERNAL_COMPILATION)
-#error "Only <CommonAPI/CommonAPI.h> can be included directly, this file may disappear or change contents."
+#error "Only <CommonAPI/CommonAPI.hpp> can be included directly, this file may disappear or change contents."
#endif
#ifndef COMMONAPI_EVENT_HPP_
@@ -97,8 +97,8 @@ private:
ListenersMap pendingSubscriptions_;
SubscriptionsSet pendingUnsubscriptions_;
- std::recursive_mutex mutex_;
- std::mutex abi_placeholder_;
+ std::mutex notificationMutex_;
+ std::mutex subscriptionMutex_;
};
template<typename ... Arguments_>
@@ -107,14 +107,12 @@ typename Event<Arguments_...>::Subscription Event<Arguments_...>::subscribe(List
bool isFirstListener;
Listeners listeners;
- {
- std::lock_guard<std::recursive_mutex> itsLock(mutex_);
- subscription = nextSubscription_++;
- isFirstListener = (0 == pendingSubscriptions_.size()) && (pendingUnsubscriptions_.size() == subscriptions_.size());
- listener = std::move(listener);
- listeners = std::make_tuple(listener, std::move(errorListener));
- pendingSubscriptions_[subscription] = std::move(listeners);
- }
+ subscriptionMutex_.lock();
+ subscription = nextSubscription_++;
+ isFirstListener = (0 == pendingSubscriptions_.size()) && (pendingUnsubscriptions_.size() == subscriptions_.size());
+ listeners = std::make_tuple(listener, std::move(errorListener));
+ pendingSubscriptions_[subscription] = std::move(listeners);
+ subscriptionMutex_.unlock();
if (isFirstListener) {
if (!pendingUnsubscriptions_.empty())
@@ -132,31 +130,30 @@ void Event<Arguments_...>::unsubscribe(const Subscription subscription) {
bool hasUnsubscribed(false);
Listener listener;
- {
- std::lock_guard<std::recursive_mutex> itsLock(mutex_);
- auto listenerIterator = subscriptions_.find(subscription);
- if (subscriptions_.end() != listenerIterator) {
- if (pendingUnsubscriptions_.end() == pendingUnsubscriptions_.find(subscription)) {
- if (0 == pendingSubscriptions_.erase(subscription)) {
- pendingUnsubscriptions_.insert(subscription);
- listener = std::get<0>(listenerIterator->second);
- hasUnsubscribed = true;
- }
- isLastListener = (pendingUnsubscriptions_.size() == subscriptions_.size());
+ subscriptionMutex_.lock();
+ auto listenerIterator = subscriptions_.find(subscription);
+ if (subscriptions_.end() != listenerIterator) {
+ if (pendingUnsubscriptions_.end() == pendingUnsubscriptions_.find(subscription)) {
+ if (0 == pendingSubscriptions_.erase(subscription)) {
+ pendingUnsubscriptions_.insert(subscription);
+ listener = std::get<0>(listenerIterator->second);
+ hasUnsubscribed = true;
}
+ isLastListener = (pendingUnsubscriptions_.size() == subscriptions_.size());
}
- else {
- listenerIterator = pendingSubscriptions_.find(subscription);
- if (pendingSubscriptions_.end() != listenerIterator) {
- listener = std::get<0>(listenerIterator->second);
- if (0 != pendingSubscriptions_.erase(subscription)) {
- isLastListener = (pendingUnsubscriptions_.size() == subscriptions_.size());
- hasUnsubscribed = true;
- }
+ }
+ else {
+ listenerIterator = pendingSubscriptions_.find(subscription);
+ if (pendingSubscriptions_.end() != listenerIterator) {
+ listener = std::get<0>(listenerIterator->second);
+ if (0 != pendingSubscriptions_.erase(subscription)) {
+ isLastListener = (pendingUnsubscriptions_.size() == subscriptions_.size());
+ hasUnsubscribed = true;
}
}
- isLastListener = isLastListener && (0 == pendingSubscriptions_.size());
}
+ isLastListener = isLastListener && (0 == pendingSubscriptions_.size());
+ subscriptionMutex_.unlock();
if (hasUnsubscribed) {
onListenerRemoved(listener, subscription);
@@ -168,7 +165,9 @@ void Event<Arguments_...>::unsubscribe(const Subscription subscription) {
template<typename ... Arguments_>
void Event<Arguments_...>::notifyListeners(const Arguments_&... eventArguments) {
- std::lock_guard<std::recursive_mutex> itsLock(mutex_);
+
+ notificationMutex_.lock();
+ subscriptionMutex_.lock();
for (auto iterator = pendingUnsubscriptions_.begin();
iterator != pendingUnsubscriptions_.end();
iterator++) {
@@ -183,14 +182,19 @@ void Event<Arguments_...>::notifyListeners(const Arguments_&... eventArguments)
}
pendingSubscriptions_.clear();
+ subscriptionMutex_.unlock();
for (auto iterator = subscriptions_.begin(); iterator != subscriptions_.end(); iterator++) {
(std::get<0>(iterator->second))(eventArguments...);
}
+
+ notificationMutex_.unlock();
}
template<typename ... Arguments_>
void Event<Arguments_...>::notifySpecificListener(const Subscription subscription, const Arguments_&... eventArguments) {
- std::lock_guard<std::recursive_mutex> itsLock(mutex_);
+
+ notificationMutex_.lock();
+ subscriptionMutex_.lock();
for (auto iterator = pendingUnsubscriptions_.begin();
iterator != pendingUnsubscriptions_.end();
iterator++) {
@@ -206,16 +210,22 @@ void Event<Arguments_...>::notifySpecificListener(const Subscription subscriptio
}
pendingSubscriptions_.clear();
+
+ subscriptionMutex_.unlock();
for (auto iterator = subscriptions_.begin(); iterator != subscriptions_.end(); iterator++) {
if (subscription == iterator->first) {
(std::get<0>(iterator->second))(eventArguments...);
}
}
+
+ notificationMutex_.unlock();
}
template<typename ... Arguments_>
void Event<Arguments_...>::notifySpecificError(const Subscription subscription, const CallStatus status) {
- std::lock_guard<std::recursive_mutex> itsLock(mutex_);
+
+ notificationMutex_.lock();
+ subscriptionMutex_.lock();
for (auto iterator = pendingUnsubscriptions_.begin();
iterator != pendingUnsubscriptions_.end();
iterator++) {
@@ -230,6 +240,7 @@ void Event<Arguments_...>::notifySpecificError(const Subscription subscription,
}
pendingSubscriptions_.clear();
+ subscriptionMutex_.unlock();
for (auto iterator = subscriptions_.begin(); iterator != subscriptions_.end(); iterator++) {
if (subscription == iterator->first) {
ErrorListener listener = std::get<1>(iterator->second);
@@ -239,7 +250,10 @@ void Event<Arguments_...>::notifySpecificError(const Subscription subscription,
}
}
+ notificationMutex_.unlock();
+
if (status != CommonAPI::CallStatus::SUCCESS) {
+ subscriptionMutex_.lock();
auto listenerIterator = subscriptions_.find(subscription);
if (subscriptions_.end() != listenerIterator) {
if (pendingUnsubscriptions_.end() == pendingUnsubscriptions_.find(subscription)) {
@@ -254,12 +268,15 @@ void Event<Arguments_...>::notifySpecificError(const Subscription subscription,
pendingSubscriptions_.erase(subscription);
}
}
+ subscriptionMutex_.unlock();
}
}
template<typename ... Arguments_>
void Event<Arguments_...>::notifyErrorListeners(const CallStatus status) {
- std::lock_guard<std::recursive_mutex> itsLock(mutex_);
+
+ notificationMutex_.lock();
+ subscriptionMutex_.lock();
for (auto iterator = pendingUnsubscriptions_.begin();
iterator != pendingUnsubscriptions_.end();
iterator++) {
@@ -274,12 +291,16 @@ void Event<Arguments_...>::notifyErrorListeners(const CallStatus status) {
}
pendingSubscriptions_.clear();
+ subscriptionMutex_.unlock();
+
for (auto iterator = subscriptions_.begin(); iterator != subscriptions_.end(); iterator++) {
ErrorListener listener = std::get<1>(iterator->second);
if (listener) {
listener(status);
}
}
+
+ notificationMutex_.unlock();
}