summaryrefslogtreecommitdiff
path: root/implementation/service_discovery
diff options
context:
space:
mode:
authorJuergen Gehring <juergen.gehring@bmw.de>2018-01-25 00:40:05 -0800
committerJuergen Gehring <juergen.gehring@bmw.de>2018-01-25 00:40:05 -0800
commit79fd5f7a34ed33392f71fa914a60b2e68b28de68 (patch)
tree5ea93513d0173ffe6dea57545cc5b28db591f082 /implementation/service_discovery
parent5c43d511bd5b5e15eca521c4c71dfa69c6f1c90f (diff)
downloadvSomeIP-79fd5f7a34ed33392f71fa914a60b2e68b28de68.tar.gz
vsomeip 2.10.02.10.0
Diffstat (limited to 'implementation/service_discovery')
-rwxr-xr-ximplementation/service_discovery/include/entry_impl.hpp2
-rwxr-xr-ximplementation/service_discovery/include/eventgroupentry_impl.hpp22
-rwxr-xr-ximplementation/service_discovery/include/message_impl.hpp37
-rw-r--r--implementation/service_discovery/include/service_discovery.hpp4
-rw-r--r--implementation/service_discovery/include/service_discovery_host.hpp25
-rw-r--r--implementation/service_discovery/include/service_discovery_impl.hpp67
-rwxr-xr-ximplementation/service_discovery/src/entry_impl.cpp14
-rwxr-xr-ximplementation/service_discovery/src/eventgroupentry_impl.cpp29
-rwxr-xr-ximplementation/service_discovery/src/message_impl.cpp81
-rw-r--r--implementation/service_discovery/src/service_discovery_impl.cpp577
10 files changed, 706 insertions, 152 deletions
diff --git a/implementation/service_discovery/include/entry_impl.hpp b/implementation/service_discovery/include/entry_impl.hpp
index fabca3b..db12a1e 100755
--- a/implementation/service_discovery/include/entry_impl.hpp
+++ b/implementation/service_discovery/include/entry_impl.hpp
@@ -68,6 +68,8 @@ protected:
std::vector<uint8_t> options_[VSOMEIP_MAX_OPTION_RUN];
uint8_t num_options_[VSOMEIP_MAX_OPTION_RUN];
+ std::uint8_t index1_;
+ std::uint8_t index2_;
entry_impl();
entry_impl(const entry_impl &entry_);
diff --git a/implementation/service_discovery/include/eventgroupentry_impl.hpp b/implementation/service_discovery/include/eventgroupentry_impl.hpp
index d9a6571..877cdbf 100755
--- a/implementation/service_discovery/include/eventgroupentry_impl.hpp
+++ b/implementation/service_discovery/include/eventgroupentry_impl.hpp
@@ -7,6 +7,7 @@
#define VSOMEIP_SD_EVENTGROUPENTRY_IMPL_HPP
#include "entry_impl.hpp"
+#include "../../endpoints/include/endpoint_definition.hpp"
namespace vsomeip {
namespace sd {
@@ -29,6 +30,24 @@ public:
bool serialize(vsomeip::serializer *_to) const;
bool deserialize(vsomeip::deserializer *_from);
+ bool operator==(const eventgroupentry_impl& _other) const {
+ return !(ttl_ != _other.ttl_ ||
+ service_ != _other.service_ ||
+ instance_ != _other.instance_ ||
+ eventgroup_ != _other.eventgroup_ ||
+ index1_ != _other.index1_ ||
+ index2_ != _other.index2_ ||
+ num_options_[0] != _other.num_options_[0] ||
+ num_options_[1] != _other.num_options_[1] ||
+ major_version_ != _other.major_version_ ||
+ counter_ != _other.counter_);
+ }
+
+ bool is_matching_subscribe(const eventgroupentry_impl& _other) const;
+
+ void add_target(const std::shared_ptr<endpoint_definition> &_target);
+ std::shared_ptr<endpoint_definition> get_target(bool _reliable) const;
+
private:
eventgroup_t eventgroup_;
uint16_t reserved_;
@@ -36,6 +55,9 @@ private:
// counter field to differentiate parallel subscriptions on same event group
// 4Bit only (max 16. parralel subscriptions)
uint8_t counter_;
+
+ std::shared_ptr<endpoint_definition> target_reliable_;
+ std::shared_ptr<endpoint_definition> target_unreliable_;
};
} // namespace sd
diff --git a/implementation/service_discovery/include/message_impl.hpp b/implementation/service_discovery/include/message_impl.hpp
index 1d658a5..f679e9c 100755
--- a/implementation/service_discovery/include/message_impl.hpp
+++ b/implementation/service_discovery/include/message_impl.hpp
@@ -8,11 +8,14 @@
#include <memory>
#include <vector>
+#include <atomic>
+#include <mutex>
#include <vsomeip/message.hpp>
#include "../include/primitive_types.hpp"
#include "../../message/include/message_base_impl.hpp"
+#include "../../endpoints/include/endpoint_definition.hpp"
# if _MSC_VER >= 1300
/*
@@ -40,6 +43,14 @@ class protection_option_impl;
class message_impl: public vsomeip::message, public vsomeip::message_base_impl {
public:
+ typedef std::vector<std::shared_ptr<entry_impl>> entries_t;
+ typedef std::vector<std::shared_ptr<option_impl>> options_t;
+ struct forced_initial_events_t {
+ std::shared_ptr<vsomeip::endpoint_definition> target_;
+ vsomeip::service_t service_;
+ vsomeip::instance_t instance_;
+ vsomeip::eventgroup_t eventgroup_;
+ };
message_impl();
virtual ~message_impl();
@@ -61,8 +72,8 @@ public:
std::shared_ptr<load_balancing_option_impl> create_load_balancing_option();
std::shared_ptr<protection_option_impl> create_protection_option();
- const std::vector<std::shared_ptr<entry_impl> > & get_entries() const;
- const std::vector<std::shared_ptr<option_impl> > & get_options() const;
+ const entries_t & get_entries() const;
+ const options_t & get_options() const;
int16_t get_option_index(const std::shared_ptr<option_impl> &_option) const;
uint32_t get_options_length();
@@ -75,6 +86,18 @@ public:
length_t get_someip_length() const;
+ std::uint8_t get_number_required_acks() const;
+ std::uint8_t get_number_contained_acks() const;
+ void set_number_required_acks(std::uint8_t _required_acks);
+ void increase_number_required_acks(std::uint8_t _amount = 1);
+ void decrease_number_required_acks(std::uint8_t _amount = 1);
+ void increase_number_contained_acks();
+ bool all_required_acks_contained() const;
+ std::unique_lock<std::mutex> get_message_lock();
+
+ void forced_initial_events_add(forced_initial_events_t _entry);
+ const std::vector<forced_initial_events_t> forced_initial_events_get();
+
private:
entry_impl * deserialize_entry(vsomeip::deserializer *_from);
option_impl * deserialize_option(vsomeip::deserializer *_from);
@@ -83,8 +106,14 @@ private:
flags_t flags_;
uint32_t options_length_;
- std::vector<std::shared_ptr<entry_impl> > entries_;
- std::vector<std::shared_ptr<option_impl> > options_;
+ entries_t entries_;
+ options_t options_;
+ std::atomic<std::uint8_t> number_required_acks_;
+ std::atomic<std::uint8_t> number_contained_acks_;
+ std::mutex message_mutex_;
+
+ std::mutex forced_initial_events_mutex_;
+ std::vector<forced_initial_events_t> forced_initial_events_info_;
};
} // namespace sd
diff --git a/implementation/service_discovery/include/service_discovery.hpp b/implementation/service_discovery/include/service_discovery.hpp
index 075129b..caec99e 100644
--- a/implementation/service_discovery/include/service_discovery.hpp
+++ b/implementation/service_discovery/include/service_discovery.hpp
@@ -65,6 +65,10 @@ public:
std::shared_ptr<serviceinfo> _info) = 0;
virtual void set_diagnosis_mode(const bool _activate) = 0;
+ virtual void remote_subscription_acknowledge(
+ service_t _service, instance_t _instance, eventgroup_t _eventgroup,
+ client_t _client, bool _accepted,
+ const std::shared_ptr<sd_message_identifier_t> &_sd_message_id) = 0;
};
} // namespace sd
diff --git a/implementation/service_discovery/include/service_discovery_host.hpp b/implementation/service_discovery/include/service_discovery_host.hpp
index 492c920..f6ab65d 100644
--- a/implementation/service_discovery/include/service_discovery_host.hpp
+++ b/implementation/service_discovery/include/service_discovery_host.hpp
@@ -15,6 +15,8 @@
#include "../../routing/include/types.hpp"
+#include <vsomeip/message.hpp>
+
namespace vsomeip {
class configuration;
@@ -57,12 +59,6 @@ public:
virtual std::chrono::milliseconds update_routing_info(
std::chrono::milliseconds _elapsed) = 0;
- virtual void on_subscribe(service_t _service, instance_t _instance,
- eventgroup_t _eventgroup,
- std::shared_ptr<endpoint_definition> _subscriber,
- std::shared_ptr<endpoint_definition> _target,
- const std::chrono::steady_clock::time_point &_expiration) = 0;
-
virtual void on_unsubscribe(service_t _service, instance_t _instance,
eventgroup_t _eventgroup,
std::shared_ptr<endpoint_definition> _target) = 0;
@@ -72,7 +68,7 @@ public:
virtual void on_subscribe_ack(client_t _client,
service_t _service, instance_t _instance, eventgroup_t _eventgroup,
- event_t _event) = 0;
+ event_t _event, pending_subscription_id_t _subscription_id) = 0;
virtual std::shared_ptr<endpoint> find_or_create_remote_client(
service_t _service, instance_t _instance,
@@ -81,13 +77,16 @@ public:
virtual void expire_subscriptions(const boost::asio::ip::address &_address) = 0;
virtual void expire_services(const boost::asio::ip::address &_address) = 0;
- virtual bool on_subscribe_accepted(service_t _service, instance_t _instance,
- eventgroup_t _eventgroup, std::shared_ptr<endpoint_definition> _target,
- const std::chrono::steady_clock::time_point &_expiration) = 0;
+ virtual remote_subscription_state_e on_remote_subscription(
+ service_t _service, instance_t _instance, eventgroup_t _eventgroup,
+ const std::shared_ptr<endpoint_definition> &_subscriber,
+ const std::shared_ptr<endpoint_definition> &_target,
+ ttl_t _ttl, client_t *_client,
+ const std::shared_ptr<sd_message_identifier_t> &_sd_message_id) = 0;
virtual void on_subscribe_nack(client_t _client,
service_t _service, instance_t _instance, eventgroup_t _eventgroup,
- event_t _event) = 0;
+ event_t _event, pending_subscription_id_t _subscription_id) = 0;
virtual bool has_identified(client_t _client, service_t _service,
instance_t _instance, bool _reliable) = 0;
@@ -98,6 +97,10 @@ public:
service_t _service, instance_t _instance) const = 0;
virtual std::map<instance_t, std::shared_ptr<serviceinfo>> get_offered_service_instances(
service_t _service) const = 0;
+
+ virtual void send_initial_events(service_t _service, instance_t _instance,
+ eventgroup_t _eventgroup,
+ const std::shared_ptr<endpoint_definition>& _subscriber) = 0;
};
} // namespace sd
diff --git a/implementation/service_discovery/include/service_discovery_impl.hpp b/implementation/service_discovery/include/service_discovery_impl.hpp
index 5f28135..68b159f 100644
--- a/implementation/service_discovery/include/service_discovery_impl.hpp
+++ b/implementation/service_discovery/include/service_discovery_impl.hpp
@@ -12,6 +12,7 @@
#include <set>
#include <forward_list>
#include <atomic>
+#include <tuple>
#include <boost/asio/steady_timer.hpp>
@@ -35,19 +36,20 @@ class eventgroupentry_impl;
class option_impl;
class request;
class serviceentry_impl;
-class service_discovery_fsm;
class service_discovery_host;
class subscription;
typedef std::map<service_t, std::map<instance_t, std::shared_ptr<request> > > requests_t;
-struct accepted_subscriber_t {
- std::shared_ptr < endpoint_definition > subscriber;
- std::shared_ptr < endpoint_definition > target;
- std::chrono::steady_clock::time_point its_expiration;
- vsomeip::service_t service_id;
- vsomeip::instance_t instance_id;
- vsomeip::eventgroup_t eventgroup_;
+struct subscriber_t {
+ std::shared_ptr<endpoint_definition> subscriber;
+ std::shared_ptr<endpoint_definition> target;
+ std::shared_ptr<sd_message_identifier_t> response_message_id_;
+ std::shared_ptr<eventgroupinfo> eventgroupinfo_;
+ vsomeip::ttl_t ttl_;
+ vsomeip::major_version_t major_;
+ std::uint16_t reserved_;
+ std::uint8_t counter_;
};
class service_discovery_impl: public service_discovery,
@@ -93,6 +95,10 @@ public:
void set_diagnosis_mode(const bool _activate);
+ void remote_subscription_acknowledge(
+ service_t _service, instance_t _instance, eventgroup_t _eventgroup,
+ client_t _client, bool _acknowledged,
+ const std::shared_ptr<sd_message_identifier_t> &_sd_message_id);
private:
std::pair<session_t, bool> get_session(const boost::asio::ip::address &_address);
void increment_session(const boost::asio::ip::address &_address);
@@ -123,7 +129,9 @@ private:
std::shared_ptr<subscription> &_subscription);
void insert_subscription_ack(std::shared_ptr<message_impl> &_message,
service_t _service, instance_t _instance, eventgroup_t _eventgroup,
- std::shared_ptr<eventgroupinfo> &_info, ttl_t _ttl, uint8_t _counter, major_version_t _major, uint16_t _reserved);
+ const std::shared_ptr<eventgroupinfo> &_info, ttl_t _ttl,
+ uint8_t _counter, major_version_t _major, uint16_t _reserved,
+ const std::shared_ptr<endpoint_definition> &_target = nullptr);
void insert_subscription_nack(std::shared_ptr<message_impl> &_message, service_t _service,
instance_t _instance, eventgroup_t _eventgroup,
uint8_t _counter, major_version_t _major, uint16_t _reserved);
@@ -152,8 +160,10 @@ private:
std::shared_ptr<eventgroupentry_impl> &_entry,
const std::vector<std::shared_ptr<option_impl> > &_options,
std::shared_ptr < message_impl > &its_message_response,
- std::vector <accepted_subscriber_t> &accepted_subscribers,
- const boost::asio::ip::address &_destination);
+ const boost::asio::ip::address &_destination,
+ const std::shared_ptr<sd_message_identifier_t> &_message_id,
+ bool _is_stop_subscribe_subscribe,
+ bool _force_initial_events);
void handle_eventgroup_subscription(service_t _service,
instance_t _instance, eventgroup_t _eventgroup,
major_version_t _major, ttl_t _ttl, uint8_t _counter, uint16_t _reserved,
@@ -162,7 +172,9 @@ private:
const boost::asio::ip::address &_second_address, uint16_t _second_port,
bool _is_second_reliable,
std::shared_ptr < message_impl > &its_message,
- std::vector <accepted_subscriber_t> &accepted_subscribers);
+ const std::shared_ptr<sd_message_identifier_t> &_message_id,
+ bool _is_stop_subscribe_subscribe,
+ bool _force_initial_events);
void handle_eventgroup_subscription_ack(service_t _service,
instance_t _instance, eventgroup_t _eventgroup,
major_version_t _major, ttl_t _ttl, uint8_t _counter,
@@ -182,7 +194,9 @@ private:
void check_ttl(const boost::system::error_code &_error);
void start_subscription_expiration_timer();
+ void start_subscription_expiration_timer_unlocked();
void stop_subscription_expiration_timer();
+ void stop_subscription_expiration_timer_unlocked();
void expire_subscriptions(const boost::system::error_code &_error);
bool check_ipv4_address(boost::asio::ip::address its_address);
@@ -279,6 +293,27 @@ private:
bool check_source_address(const boost::asio::ip::address &its_source_address) const;
+ void update_subscription_expiration_timer(const std::shared_ptr<message_impl> &_message);
+
+ void remote_subscription_acknowledge_subscriber(
+ service_t _service, instance_t _instance, eventgroup_t _eventgroup,
+ const std::shared_ptr<subscriber_t> &_subscriber, bool _acknowledged);
+
+ void remote_subscription_not_acknowledge_subscriber(
+ service_t _service, instance_t _instance, eventgroup_t _eventgroup,
+ const std::shared_ptr<subscriber_t> &_subscriber, bool _acknowledged);
+
+ void remote_subscription_not_acknowledge_all(service_t _service, instance_t _instance);
+
+ void remote_subscription_not_acknowledge_all();
+
+ void remote_subscription_remove(
+ service_t _service, instance_t _instance, eventgroup_t _eventgroup,
+ const std::shared_ptr<endpoint_definition> &_subscriber);
+
+ bool check_stop_subscribe_subscribe(message_impl::entries_t::const_iterator _iter,
+ message_impl::entries_t::const_iterator _end) const;
+
private:
boost::asio::io_service &io_;
service_discovery_host *host_;
@@ -317,6 +352,7 @@ private:
ttl_t ttl_;
// TTL handling for subscriptions done by other hosts
+ std::mutex subscription_expiration_timer_mutex_;
boost::asio::steady_timer subscription_expiration_timer_;
std::chrono::steady_clock::time_point next_subscription_expiration_;
@@ -358,6 +394,13 @@ private:
boost::asio::ip::address current_remote_address_;
std::atomic<bool> is_diagnosis_;
+
+ std::mutex pending_remote_subscriptions_mutex_;
+ std::map<service_t,
+ std::map<instance_t,
+ std::map<eventgroup_t,
+ std::map<client_t, std::vector<std::shared_ptr<subscriber_t>>>>>> pending_remote_subscriptions_;
+ std::mutex response_mutex_;
};
} // namespace sd
diff --git a/implementation/service_discovery/src/entry_impl.cpp b/implementation/service_discovery/src/entry_impl.cpp
index d647bf5..ecab203 100755
--- a/implementation/service_discovery/src/entry_impl.cpp
+++ b/implementation/service_discovery/src/entry_impl.cpp
@@ -23,6 +23,8 @@ entry_impl::entry_impl() {
ttl_ = 0x0;
num_options_[0] = 0;
num_options_[1] = 0;
+ index1_ = 0;
+ index2_ = 0;
}
entry_impl::entry_impl(const entry_impl &_entry) {
@@ -33,6 +35,8 @@ entry_impl::entry_impl(const entry_impl &_entry) {
ttl_ = _entry.ttl_;
num_options_[0] = _entry.num_options_[0];
num_options_[1] = _entry.num_options_[1];
+ index1_ = _entry.index1_;
+ index2_ = _entry.index2_;
}
entry_impl::~entry_impl() {
@@ -139,11 +143,9 @@ bool entry_impl::deserialize(vsomeip::deserializer *_from) {
is_successful = is_successful && _from->deserialize(its_type);
type_ = static_cast<entry_type_e>(its_type);
- uint8_t its_index1(0);
- is_successful = is_successful && _from->deserialize(its_index1);
+ is_successful = is_successful && _from->deserialize(index1_);
- uint8_t its_index2(0);
- is_successful = is_successful && _from->deserialize(its_index2);
+ is_successful = is_successful && _from->deserialize(index2_);
uint8_t its_numbers(0);
is_successful = is_successful && _from->deserialize(its_numbers);
@@ -151,10 +153,10 @@ bool entry_impl::deserialize(vsomeip::deserializer *_from) {
num_options_[0] = uint8_t(its_numbers >> 4);
num_options_[1] = uint8_t(its_numbers & 0xF);
- for (uint16_t i = its_index1; i < its_index1 + num_options_[0]; ++i)
+ for (uint16_t i = index1_; i < index1_ + num_options_[0]; ++i)
options_[0].push_back((uint8_t)(i));
- for (uint16_t i = its_index2; i < its_index2 + num_options_[1]; ++i)
+ for (uint16_t i = index2_; i < index2_ + num_options_[1]; ++i)
options_[1].push_back((uint8_t)(i));
uint16_t its_id(0);
diff --git a/implementation/service_discovery/src/eventgroupentry_impl.cpp b/implementation/service_discovery/src/eventgroupentry_impl.cpp
index 29716d7..1f9dd4a 100755
--- a/implementation/service_discovery/src/eventgroupentry_impl.cpp
+++ b/implementation/service_discovery/src/eventgroupentry_impl.cpp
@@ -115,5 +115,34 @@ bool eventgroupentry_impl::deserialize(vsomeip::deserializer *_from) {
return is_successful;
}
+bool eventgroupentry_impl::is_matching_subscribe(
+ const eventgroupentry_impl& _other) const {
+ return !(ttl_ != 0
+ || _other.ttl_ == 0
+ || service_ != _other.service_
+ || instance_ != _other.instance_
+ || eventgroup_ != _other.eventgroup_
+ || index1_ != _other.index1_
+ || index2_ != _other.index2_
+ || num_options_[0] != _other.num_options_[0]
+ || num_options_[1] != _other.num_options_[1]
+ || major_version_ != _other.major_version_
+ || counter_ != _other.counter_);
+}
+
+void eventgroupentry_impl::add_target(
+ const std::shared_ptr<endpoint_definition> &_target) {
+ if (_target->is_reliable()) {
+ target_reliable_ = _target;
+ } else {
+ target_unreliable_ = _target;
+ }
+}
+
+std::shared_ptr<endpoint_definition> eventgroupentry_impl::get_target(
+ bool _reliable) const {
+ return _reliable ? target_reliable_ : target_unreliable_;
+}
+
} // namespace sd
} // namespace vsomeip
diff --git a/implementation/service_discovery/src/message_impl.cpp b/implementation/service_discovery/src/message_impl.cpp
index d4d3931..d23bf31 100755
--- a/implementation/service_discovery/src/message_impl.cpp
+++ b/implementation/service_discovery/src/message_impl.cpp
@@ -26,12 +26,14 @@
namespace vsomeip {
namespace sd {
-message_impl::message_impl() {
+message_impl::message_impl() :
+ flags_(0x0),
+ options_length_(0x0),
+ number_required_acks_(0x0),
+ number_contained_acks_(0x0) {
header_.service_ = 0xFFFF;
header_.method_ = 0x8100;
header_.protocol_version_ = 0x01;
- flags_ = 0x00;
- options_length_ = 0x0000;
}
message_impl::~message_impl() {
@@ -149,11 +151,11 @@ std::shared_ptr<protection_option_impl> message_impl::create_protection_option()
return its_option;
}
-const std::vector<std::shared_ptr<entry_impl> > & message_impl::get_entries() const {
+const message_impl::entries_t & message_impl::get_entries() const {
return entries_;
}
-const std::vector<std::shared_ptr<option_impl> > & message_impl::get_options() const {
+const message_impl::options_t & message_impl::get_options() const {
return options_;
}
@@ -236,7 +238,31 @@ bool message_impl::deserialize(vsomeip::deserializer *_from) {
while (is_successful && _from->get_remaining()) {
std::shared_ptr < entry_impl > its_entry(deserialize_entry(_from));
if (its_entry) {
- entries_.push_back(its_entry);
+ if (its_entry->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP) {
+ bool is_unique(true);
+ for (const auto& e : entries_) {
+ if (e->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP &&
+ *(static_cast<eventgroupentry_impl*>(e.get())) ==
+ *(static_cast<eventgroupentry_impl*>(its_entry.get()))) {
+ is_unique = false;
+ break;
+ }
+ }
+ if (is_unique) {
+ entries_.push_back(its_entry);
+ if (its_entry->get_ttl() > 0) {
+ const std::uint8_t num_options =
+ static_cast<std::uint8_t>(
+ its_entry->get_num_options(1) +
+ its_entry->get_num_options(2));
+ number_required_acks_ =
+ static_cast<std::uint8_t>(number_required_acks_
+ + num_options);
+ }
+ }
+ } else {
+ entries_.push_back(its_entry);
+ }
} else {
is_successful = false;
}
@@ -366,5 +392,48 @@ length_t message_impl::get_someip_length() const {
return header_.length_;
}
+std::uint8_t message_impl::get_number_required_acks() const {
+ return number_required_acks_;
+}
+
+std::uint8_t message_impl::get_number_contained_acks() const {
+ return number_contained_acks_;
+}
+
+void message_impl::set_number_required_acks(std::uint8_t _required_acks) {
+ number_required_acks_ = _required_acks;
+}
+
+void message_impl::increase_number_required_acks(std::uint8_t _amount) {
+ number_required_acks_ += _amount;
+}
+
+void message_impl::decrease_number_required_acks(std::uint8_t _amount) {
+ number_required_acks_ -= _amount;
+}
+
+void message_impl::increase_number_contained_acks() {
+ number_contained_acks_++;
+}
+
+bool message_impl::all_required_acks_contained() const {
+ return number_contained_acks_ == number_required_acks_;
+}
+
+std::unique_lock<std::mutex> message_impl::get_message_lock() {
+ return std::unique_lock<std::mutex>(message_mutex_);
+}
+
+void message_impl::forced_initial_events_add(forced_initial_events_t _entry) {
+ std::lock_guard<std::mutex> its_lock(forced_initial_events_mutex_);
+ forced_initial_events_info_.push_back(_entry);
+}
+
+const std::vector<message_impl::forced_initial_events_t>
+message_impl::forced_initial_events_get() {
+ std::lock_guard<std::mutex> its_lock(forced_initial_events_mutex_);
+ return forced_initial_events_info_;
+}
+
} // namespace sd
} // namespace vsomeip
diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp
index d4a8964..ca9aa5a 100644
--- a/implementation/service_discovery/src/service_discovery_impl.cpp
+++ b/implementation/service_discovery/src/service_discovery_impl.cpp
@@ -1000,8 +1000,11 @@ void service_discovery_impl::insert_nack_subscription_on_resubscribe(std::shared
void service_discovery_impl::insert_subscription_ack(
std::shared_ptr<message_impl> &_message, service_t _service,
instance_t _instance, eventgroup_t _eventgroup,
- std::shared_ptr<eventgroupinfo> &_info, ttl_t _ttl, uint8_t _counter, major_version_t _major, uint16_t _reserved) {
-
+ const std::shared_ptr<eventgroupinfo> &_info, ttl_t _ttl,
+ uint8_t _counter, major_version_t _major, uint16_t _reserved,
+ const std::shared_ptr<endpoint_definition> &_target) {
+ std::unique_lock<std::mutex> its_lock(_message->get_message_lock());
+ _message->increase_number_contained_acks();
for (auto its_entry : _message->get_entries()) {
if (its_entry->is_eventgroup_entry()) {
std::shared_ptr < eventgroupentry_impl > its_eventgroup_entry =
@@ -1015,6 +1018,17 @@ void service_discovery_impl::insert_subscription_ack(
&& its_eventgroup_entry->get_reserved() == _reserved
&& its_eventgroup_entry->get_counter() == _counter
&& its_eventgroup_entry->get_ttl() == _ttl) {
+ if (_target) {
+ if (_target->is_reliable()) {
+ if (!its_eventgroup_entry->get_target(true)) {
+ its_eventgroup_entry->add_target(_target);
+ }
+ } else {
+ if (!its_eventgroup_entry->get_target(false)) {
+ its_eventgroup_entry->add_target(_target);
+ }
+ }
+ }
return;
}
}
@@ -1031,6 +1045,9 @@ void service_discovery_impl::insert_subscription_ack(
its_entry->set_counter(_counter);
// SWS_SD_00315
its_entry->set_ttl(_ttl);
+ if (_target) {
+ its_entry->add_target(_target);
+ }
boost::asio::ip::address its_address;
uint16_t its_port;
@@ -1043,6 +1060,7 @@ void service_discovery_impl::insert_subscription_nack(
std::shared_ptr<message_impl> &_message, service_t _service,
instance_t _instance, eventgroup_t _eventgroup,
uint8_t _counter, major_version_t _major, uint16_t _reserved) {
+ std::unique_lock<std::mutex> its_lock(_message->get_message_lock());
std::shared_ptr < eventgroupentry_impl > its_entry =
_message->create_eventgroup_entry();
// SWS_SD_00316 and SWS_SD_00385
@@ -1055,6 +1073,7 @@ void service_discovery_impl::insert_subscription_nack(
its_entry->set_counter(_counter);
// SWS_SD_00432
its_entry->set_ttl(0x0);
+ _message->increase_number_contained_acks();
}
bool service_discovery_impl::send(bool _is_announcing) {
@@ -1064,6 +1083,7 @@ bool service_discovery_impl::send(bool _is_announcing) {
std::shared_ptr < message_impl > its_message;
if(_is_announcing) {
+ remote_subscription_not_acknowledge_all();
its_message = its_runtime->create_message();
its_messages.push_back(its_message);
@@ -1127,32 +1147,62 @@ void service_discovery_impl::on_message(const byte_t *_data, length_t _length,
std::shared_ptr < message_impl > its_message_response
= its_runtime->create_message();
- std::vector <accepted_subscriber_t> accepted_subscribers;
- for (auto its_entry : its_message->get_entries()) {
- if (its_entry->is_service_entry()) {
+ const std::uint8_t its_required_acks =
+ its_message->get_number_required_acks();
+ its_message_response->set_number_required_acks(its_required_acks);
+ std::shared_ptr<sd_message_identifier_t> its_message_id =
+ std::make_shared<sd_message_identifier_t>(
+ its_message->get_session(), _sender, _destination,
+ its_message_response);
+
+ const message_impl::entries_t& its_entries = its_message->get_entries();
+ const message_impl::entries_t::const_iterator its_end = its_entries.end();
+ bool is_stop_subscribe_subscribe(false);
+
+ for (auto iter = its_entries.begin(); iter != its_end; iter++) {
+ if ((*iter)->is_service_entry()) {
std::shared_ptr < serviceentry_impl > its_service_entry =
std::dynamic_pointer_cast < serviceentry_impl
- > (its_entry);
+ > (*iter);
bool its_unicast_flag = its_message->get_unicast_flag();
process_serviceentry(its_service_entry, its_options, its_unicast_flag );
} else {
std::shared_ptr < eventgroupentry_impl > its_eventgroup_entry =
std::dynamic_pointer_cast < eventgroupentry_impl
- > (its_entry);
- process_eventgroupentry( its_eventgroup_entry, its_options, its_message_response, accepted_subscribers, _destination);
+ > (*iter);
+ bool force_initial_events(false);
+ if (is_stop_subscribe_subscribe) {
+ is_stop_subscribe_subscribe = false;
+ force_initial_events = true;
+ }
+ is_stop_subscribe_subscribe = check_stop_subscribe_subscribe(iter, its_end);
+ process_eventgroupentry(its_eventgroup_entry, its_options,
+ its_message_response, _destination,
+ its_message_id, is_stop_subscribe_subscribe, force_initial_events);
+ }
+ }
+
+ // send answer directly if SubscribeEventgroup entries were (n)acked
+ if (its_required_acks || its_message_response->get_number_required_acks() > 0) {
+ bool sent(false);
+ {
+ std::lock_guard<std::mutex> its_lock(response_mutex_);
+ if (its_message_response->all_required_acks_contained()) {
+ update_subscription_expiration_timer(its_message_response);
+ serialize_and_send(its_message_response, _sender);
+ // set required acks to zero to mark message as sent
+ its_message_response->set_number_required_acks(0);
+ sent = true;
+ }
+ }
+ if (sent) {
+ for (const auto &fie : its_message_response->forced_initial_events_get()) {
+ host_->send_initial_events(fie.service_, fie.instance_,
+ fie.eventgroup_, fie.target_);
+ }
}
}
-
- //send ACK / NACK if present
- if( 0 < its_message_response->get_entries().size() && its_message_response ) {
- serialize_and_send(its_message_response, _sender);
- }
-
- for( const auto &a : accepted_subscribers) {
- host_->on_subscribe(a.service_id, a.instance_id, a.eventgroup_, a.subscriber, a.target, a.its_expiration);
- }
- accepted_subscribers.clear();
start_ttl_timer();
} else {
VSOMEIP_ERROR << "service_discovery_impl::on_message: deserialization error.";
@@ -1658,8 +1708,10 @@ void service_discovery_impl::process_eventgroupentry(
std::shared_ptr<eventgroupentry_impl> &_entry,
const std::vector<std::shared_ptr<option_impl> > &_options,
std::shared_ptr < message_impl > &its_message_response,
- std::vector <accepted_subscriber_t> &accepted_subscribers,
- const boost::asio::ip::address &_destination) {
+ const boost::asio::ip::address &_destination,
+ const std::shared_ptr<sd_message_identifier_t> &_message_id,
+ bool _is_stop_subscribe_subscribe,
+ bool _force_initial_events) {
service_t its_service = _entry->get_service();
instance_t its_instance = _entry->get_instance();
eventgroup_t its_eventgroup = _entry->get_eventgroup();
@@ -1691,6 +1743,9 @@ void service_discovery_impl::process_eventgroupentry(
&& _entry->get_num_options(2) == 0) {
VSOMEIP_ERROR << "Invalid number of options in SubscribeEventGroup entry";
if(its_ttl > 0) {
+ // increase number of required acks by one as number required acks
+ // is calculated based on the number of referenced options
+ its_message_response->increase_number_required_acks();
insert_subscription_nack(its_message_response, its_service, its_instance,
its_eventgroup, its_counter, its_major, its_reserved);
}
@@ -1712,6 +1767,14 @@ void service_discovery_impl::process_eventgroupentry(
VSOMEIP_ERROR << "Fewer options in SD message than "
"referenced in EventGroup entry or malformed option received";
if(its_ttl > 0) {
+ const std::uint8_t num_overreferenced_options =
+ static_cast<std::uint8_t>(_entry->get_num_options(1) +
+ _entry->get_num_options(2) - _options.size());
+ if (its_message_response->get_number_required_acks() -
+ num_overreferenced_options > 0) {
+ its_message_response->decrease_number_required_acks(
+ num_overreferenced_options);
+ }
insert_subscription_nack(its_message_response, its_service, its_instance,
its_eventgroup, its_counter, its_major, its_reserved);
}
@@ -1895,12 +1958,14 @@ void service_discovery_impl::process_eventgroupentry(
}
break;
case option_type_e::CONFIGURATION: {
+ its_message_response->decrease_number_required_acks();
break;
}
case option_type_e::UNKNOWN:
default:
VSOMEIP_WARNING << "Unsupported eventgroup option";
if(its_ttl > 0) {
+ its_message_response->decrease_number_required_acks();
insert_subscription_nack(its_message_response, its_service, its_instance,
its_eventgroup, its_counter, its_major, its_reserved);
return;
@@ -1914,7 +1979,8 @@ void service_discovery_impl::process_eventgroupentry(
handle_eventgroup_subscription(its_service, its_instance,
its_eventgroup, its_major, its_ttl, its_counter, its_reserved,
its_first_address, its_first_port, is_first_reliable,
- its_second_address, its_second_port, is_second_reliable, its_message_response, accepted_subscribers);
+ its_second_address, its_second_port, is_second_reliable, its_message_response,
+ _message_id, _is_stop_subscribe_subscribe, _force_initial_events);
} else {
if( entry_type_e::SUBSCRIBE_EVENTGROUP_ACK == its_type) { //this type is used for ACK and NACK messages
if(its_ttl > 0) {
@@ -1934,7 +2000,8 @@ void service_discovery_impl::handle_eventgroup_subscription(service_t _service,
const boost::asio::ip::address &_first_address, uint16_t _first_port, bool _is_first_reliable,
const boost::asio::ip::address &_second_address, uint16_t _second_port, bool _is_second_reliable,
std::shared_ptr < message_impl > &its_message,
- std::vector <accepted_subscriber_t> &accepted_subscribers) {
+ const std::shared_ptr<sd_message_identifier_t> &_message_id,
+ bool _is_stop_subscribe_subscribe, bool _force_initial_events) {
if (its_message) {
bool has_reliable_events(false);
@@ -1966,30 +2033,38 @@ void service_discovery_impl::handle_eventgroup_subscription(service_t _service,
if (reliablility_nack && _ttl > 0) {
insert_subscription_nack(its_message, _service, _instance,
_eventgroup, _counter, _major, _reserved);
- VSOMEIP_WARNING << "Subscription for service/instance "
- << std::hex << _service << "/" << _instance
- << " not valid: Event configuration does not match the provided endpoint options";
+ boost::system::error_code ec;
+ VSOMEIP_WARNING << "Subscription for ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"
+ << " not valid: Event configuration does not match the provided "
+ << "endpoint options: "
+ << _first_address.to_string(ec) << ":" << std::dec << _first_port << " "
+ << _second_address.to_string(ec) << ":" << std::dec << _second_port;
return;
}
std::shared_ptr < eventgroupinfo > its_info = host_->find_eventgroup(
_service, _instance, _eventgroup);
- bool is_nack(false);
- std::shared_ptr < endpoint_definition > its_first_subscriber,
- its_second_subscriber;
- std::shared_ptr < endpoint_definition > its_first_target,
- its_second_target;
+ struct subscriber_target_t {
+ std::shared_ptr<endpoint_definition> subscriber_;
+ std::shared_ptr<endpoint_definition> target_;
+ };
+ std::array<subscriber_target_t, 2> its_targets =
+ { subscriber_target_t(), subscriber_target_t() };
// Could not find eventgroup or wrong version
if (!its_info || _major != its_info->get_major()) {
// Create a temporary info object with TTL=0 --> send NACK
if( its_info && (_major != its_info->get_major())) {
VSOMEIP_ERROR << "Requested major version:[" << (uint32_t) _major
- << "] in subscription to service:[" << _service
- << "] instance:[" << _instance
- << "] eventgroup:[" << _eventgroup
- << "], does not match with services major version:[" << (uint32_t) its_info->get_major()
+ << "] in subscription to service: ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"
+ << " does not match with services major version:[" << (uint32_t) its_info->get_major()
<< "]";
} else {
VSOMEIP_ERROR << "Requested eventgroup:[" << _eventgroup
@@ -2006,117 +2081,125 @@ void service_discovery_impl::handle_eventgroup_subscription(service_t _service,
boost::asio::ip::address its_first_address, its_second_address;
uint16_t its_first_port, its_second_port;
if (ILLEGAL_PORT != _first_port) {
- its_first_subscriber = endpoint_definition::get(
+ its_targets[0].subscriber_ = endpoint_definition::get(
_first_address, _first_port, _is_first_reliable, _service, _instance);
if (!_is_first_reliable &&
its_info->get_multicast(its_first_address, its_first_port)) { // udp multicast
- its_first_target = endpoint_definition::get(
+ its_targets[0].target_ = endpoint_definition::get(
its_first_address, its_first_port, false, _service, _instance);
} else if(_is_first_reliable) { // tcp unicast
- its_first_target = its_first_subscriber;
+ its_targets[0].target_ = its_targets[0].subscriber_;
// check if TCP connection is established by client
- if( !is_tcp_connected(_service, _instance, its_first_target) && _ttl > 0) {
+ if(_ttl > 0 && !is_tcp_connected(_service, _instance, its_targets[0].target_)) {
insert_subscription_nack(its_message, _service, _instance,
_eventgroup, _counter, _major, _reserved);
- VSOMEIP_ERROR << "TCP connection to target1: [" << its_first_target->get_address().to_string()
- << ":" << its_first_target->get_port()
- << "] not established for subscription to service:[" << _service
- << "] instance:[" << _instance
- << "] eventgroup:[" << _eventgroup << "]";
+ VSOMEIP_ERROR << "TCP connection to target1: ["
+ << its_targets[0].target_->get_address().to_string()
+ << ":" << its_targets[0].target_->get_port()
+ << "] not established for subscription to: ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
+ if (ILLEGAL_PORT != _second_port) {
+ its_message->decrease_number_required_acks();
+ }
return;
}
} else { // udp unicast
- its_first_target = its_first_subscriber;
+ its_targets[0].target_ = its_targets[0].subscriber_;
}
}
if (ILLEGAL_PORT != _second_port) {
- its_second_subscriber = endpoint_definition::get(
+ its_targets[1].subscriber_ = endpoint_definition::get(
_second_address, _second_port, _is_second_reliable, _service, _instance);
if (!_is_second_reliable &&
its_info->get_multicast(its_second_address, its_second_port)) { // udp multicast
- its_second_target = endpoint_definition::get(
+ its_targets[1].target_ = endpoint_definition::get(
its_second_address, its_second_port, false, _service, _instance);
} else if (_is_second_reliable) { // tcp unicast
- its_second_target = its_second_subscriber;
+ its_targets[1].target_ = its_targets[1].subscriber_;
// check if TCP connection is established by client
- if(_ttl > 0 && !is_tcp_connected(_service, _instance, its_second_target)) {
+ if(_ttl > 0 && !is_tcp_connected(_service, _instance, its_targets[1].target_)) {
insert_subscription_nack(its_message, _service, _instance,
_eventgroup, _counter, _major, _reserved);
- VSOMEIP_ERROR << "TCP connection to target2 : [" << its_second_target->get_address().to_string()
- << ":" << its_second_target->get_port()
- << "] not established for subscription to service:[" << _service
- << "] instance:[" << _instance
- << "] eventgroup:[" << _eventgroup << "]";
+ VSOMEIP_ERROR << "TCP connection to target2 : ["
+ << its_targets[1].target_->get_address().to_string()
+ << ":" << its_targets[1].target_->get_port()
+ << "] not established for subscription to: ["
+ << std::hex << std::setw(4) << std::setfill('0') << _service << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _instance << "."
+ << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]";
+ if (ILLEGAL_PORT != _first_port) {
+ its_message->decrease_number_required_acks();
+ }
return;
}
} else { // udp unicast
- its_second_target = its_second_subscriber;
+ its_targets[1].target_ = its_targets[1].subscriber_;
}
}
}
if (_ttl == 0) { // --> unsubscribe
- if (its_first_subscriber) {
- host_->on_unsubscribe(_service, _instance, _eventgroup, its_first_subscriber);
- }
- if (its_second_subscriber) {
- host_->on_unsubscribe(_service, _instance, _eventgroup, its_second_subscriber);
+ for (const auto &target : its_targets) {
+ if (target.subscriber_) {
+ remote_subscription_remove(_service, _instance, _eventgroup, target.subscriber_);
+ if (!_is_stop_subscribe_subscribe) {
+ host_->on_unsubscribe(_service, _instance, _eventgroup, target.subscriber_);
+ }
+ }
}
return;
}
- std::chrono::steady_clock::time_point its_expiration
- = std::chrono::steady_clock::now() + std::chrono::seconds(_ttl);
-
- if (its_first_target) {
- if(!host_->on_subscribe_accepted(_service, _instance, _eventgroup,
- its_first_subscriber, its_expiration)) {
- is_nack = true;
- insert_subscription_nack(its_message, _service, _instance, _eventgroup,
- _counter, _major, _reserved);
- }
- }
- if (its_second_subscriber) {
- if(!host_->on_subscribe_accepted(_service, _instance, _eventgroup,
- its_second_subscriber, its_expiration)) {
- is_nack = true;
- insert_subscription_nack(its_message, _service, _instance, _eventgroup,
- _counter, _major, _reserved);
- }
- }
-
- if (!is_nack)
- {
- insert_subscription_ack(its_message, _service, _instance, _eventgroup,
- its_info, _ttl, _counter, _major, _reserved);
-
- if (its_expiration < next_subscription_expiration_) {
- stop_subscription_expiration_timer();
- next_subscription_expiration_ = its_expiration;
- start_subscription_expiration_timer();
- }
-
- if (its_first_target && its_first_subscriber) {
- accepted_subscriber_t subscriber_;
- subscriber_.service_id = _service;
- subscriber_.instance_id = _instance;
- subscriber_.eventgroup_ = _eventgroup;
- subscriber_.subscriber = its_first_subscriber;
- subscriber_.target = its_first_target;
- subscriber_.its_expiration = its_expiration;
-
- accepted_subscribers.push_back(subscriber_);
- }
- if (its_second_target && its_second_subscriber) {
- accepted_subscriber_t subscriber_;
- subscriber_.service_id = _service;
- subscriber_.instance_id = _instance;
- subscriber_.eventgroup_ = _eventgroup;
- subscriber_.subscriber = its_second_subscriber;
- subscriber_.target = its_second_target;
- subscriber_.its_expiration = its_expiration;
-
- accepted_subscribers.push_back(subscriber_);
+ std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_);
+ for (const auto &target : its_targets) {
+ if (target.target_) {
+ client_t its_subscribing_remote_client = VSOMEIP_ROUTING_CLIENT;
+ switch (host_->on_remote_subscription(_service, _instance,
+ _eventgroup, target.subscriber_, target.target_,
+ _ttl, &its_subscribing_remote_client,
+ _message_id)) {
+ case remote_subscription_state_e::SUBSCRIPTION_ACKED:
+ insert_subscription_ack(its_message, _service,
+ _instance, _eventgroup, its_info, _ttl,
+ _counter, _major, _reserved);
+ if (_force_initial_events) {
+ // processing subscription of StopSubscribe/Subscribe
+ // sequence
+ its_message->forced_initial_events_add(
+ message_impl::forced_initial_events_t(
+ { target.subscriber_, _service, _instance,
+ _eventgroup }));
+ }
+ break;
+ case remote_subscription_state_e::SUBSCRIPTION_NACKED:
+ case remote_subscription_state_e::SUBSCRIPTION_ERROR:
+ insert_subscription_nack(its_message, _service,
+ _instance, _eventgroup, _counter, _major,
+ _reserved);
+ break;
+ case remote_subscription_state_e::SUBSCRIPTION_PENDING:
+ if (target.target_ && target.subscriber_) {
+ std::shared_ptr<subscriber_t> subscriber_ =
+ std::make_shared<subscriber_t>();
+ subscriber_->subscriber = target.subscriber_;
+ subscriber_->target = target.target_;
+ subscriber_->response_message_id_ = _message_id;
+ subscriber_->eventgroupinfo_ = its_info;
+ subscriber_->ttl_ = _ttl;
+ subscriber_->major_ = _major;
+ subscriber_->reserved_ = _reserved;
+ subscriber_->counter_ = _counter;
+
+ pending_remote_subscriptions_[_service]
+ [_instance]
+ [_eventgroup]
+ [its_subscribing_remote_client].push_back(subscriber_);
+ }
+ default:
+ break;
+ }
}
}
}
@@ -2137,7 +2220,7 @@ void service_discovery_impl::handle_eventgroup_subscription_nack(service_t _serv
// Deliver nack
nackedClient = client.first;
host_->on_subscribe_nack(client.first, _service,
- _instance, _eventgroup, ANY_EVENT);
+ _instance, _eventgroup, ANY_EVENT, DEFAULT_SUBSCRIPTION);
break;
}
}
@@ -2179,7 +2262,7 @@ void service_discovery_impl::handle_eventgroup_subscription_ack(
if (its_client.second->get_counter() == _counter) {
its_client.second->set_acknowledged(true);
host_->on_subscribe_ack(its_client.first, _service,
- _instance, _eventgroup, ANY_EVENT);
+ _instance, _eventgroup, ANY_EVENT, DEFAULT_SUBSCRIPTION);
}
if (_address.is_multicast()) {
host_->on_subscribe_ack(_service, _instance, _address,
@@ -2389,6 +2472,11 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _
}
void service_discovery_impl::start_subscription_expiration_timer() {
+ std::lock_guard<std::mutex> its_lock(subscription_expiration_timer_mutex_);
+ start_subscription_expiration_timer_unlocked();
+}
+
+void service_discovery_impl::start_subscription_expiration_timer_unlocked() {
subscription_expiration_timer_.expires_at(next_subscription_expiration_);
subscription_expiration_timer_.async_wait(
std::bind(&service_discovery_impl::expire_subscriptions,
@@ -2397,6 +2485,11 @@ void service_discovery_impl::start_subscription_expiration_timer() {
}
void service_discovery_impl::stop_subscription_expiration_timer() {
+ std::lock_guard<std::mutex> its_lock(subscription_expiration_timer_mutex_);
+ stop_subscription_expiration_timer_unlocked();
+}
+
+void service_discovery_impl::stop_subscription_expiration_timer_unlocked() {
subscription_expiration_timer_.cancel();
}
@@ -2913,6 +3006,8 @@ void service_discovery_impl::stop_offer_service(
if(_info->is_in_mainphase() || stop_offer_required) {
send_stop_offer(_service, _instance, _info);
}
+ // sent out NACKs for all pending subscriptions
+ remote_subscription_not_acknowledge_all(_service, _instance);
}
bool service_discovery_impl::send_stop_offer(
@@ -3014,5 +3109,261 @@ void service_discovery_impl::set_diagnosis_mode(const bool _activate) {
is_diagnosis_ = _activate;
}
+void service_discovery_impl::remote_subscription_acknowledge(
+ service_t _service, instance_t _instance, eventgroup_t _eventgroup,
+ client_t _client, bool _acknowledged,
+ const std::shared_ptr<sd_message_identifier_t> &_sd_message_id) {
+ std::shared_ptr<subscriber_t> its_subscriber;
+ {
+ std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_);
+ const auto its_service = pending_remote_subscriptions_.find(_service);
+ if (its_service != pending_remote_subscriptions_.end()) {
+ const auto its_instance = its_service->second.find(_instance);
+ if (its_instance != its_service->second.end()) {
+ const auto its_eventgroup = its_instance->second.find(_eventgroup);
+ if (its_eventgroup != its_instance->second.end()) {
+ const auto its_client = its_eventgroup->second.find(_client);
+ if (its_client != its_eventgroup->second.end()) {
+ for (auto iter = its_client->second.begin();
+ iter != its_client->second.end();) {
+ if ((*iter)->response_message_id_ == _sd_message_id) {
+ its_subscriber = *iter;
+ iter = its_client->second.erase(iter);
+ break;
+ } else {
+ iter++;
+ }
+ }
+
+ // delete if necessary
+ if (!its_client->second.size()) {
+ its_eventgroup->second.erase(its_client);
+ if (!its_eventgroup->second.size()) {
+ its_instance->second.erase(its_eventgroup);
+ if (!its_instance->second.size()) {
+ its_service->second.erase(its_instance);
+ if (!its_service->second.size()) {
+ pending_remote_subscriptions_.erase(
+ its_service);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ if (its_subscriber) {
+ remote_subscription_acknowledge_subscriber(_service, _instance,
+ _eventgroup, its_subscriber, _acknowledged);
+ }
+}
+
+void service_discovery_impl::update_subscription_expiration_timer(
+ const std::shared_ptr<message_impl> &_message) {
+ std::lock_guard<std::mutex> its_lock(subscription_expiration_timer_mutex_);
+ const std::chrono::steady_clock::time_point now =
+ std::chrono::steady_clock::now();
+ stop_subscription_expiration_timer_unlocked();
+ for (const auto &entry : _message->get_entries()) {
+ if (entry->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP_ACK
+ && entry->get_ttl()) {
+ const std::chrono::steady_clock::time_point its_expiration = now
+ + std::chrono::seconds(entry->get_ttl());
+ if (its_expiration < next_subscription_expiration_) {
+ next_subscription_expiration_ = its_expiration;
+ }
+ }
+ }
+ start_subscription_expiration_timer_unlocked();
+}
+
+void service_discovery_impl::remote_subscription_acknowledge_subscriber(
+ service_t _service, instance_t _instance, eventgroup_t _eventgroup,
+ const std::shared_ptr<subscriber_t> &_subscriber, bool _acknowledged) {
+ std::shared_ptr<message_impl> its_response = _subscriber->response_message_id_->response_;
+ std::vector<std::tuple<service_t, instance_t, eventgroup_t,
+ std::shared_ptr<endpoint_definition>>> its_acks;
+ bool sent(false);
+ {
+ std::lock_guard<std::mutex> its_lock(response_mutex_);
+ if (_acknowledged) {
+ insert_subscription_ack(its_response, _service, _instance,
+ _eventgroup, _subscriber->eventgroupinfo_,
+ _subscriber->ttl_, _subscriber->counter_,
+ _subscriber->major_, _subscriber->reserved_, _subscriber->subscriber);
+ } else {
+ insert_subscription_nack(its_response, _service, _instance,
+ _eventgroup, _subscriber->counter_,
+ _subscriber->major_, _subscriber->reserved_);
+ }
+
+ if (its_response->all_required_acks_contained()) {
+ update_subscription_expiration_timer(its_response);
+ serialize_and_send(its_response, _subscriber->response_message_id_->sender_);
+ // set required acks to zero to mark message as sent
+ its_response->set_number_required_acks(0);
+ sent = true;
+ }
+ }
+ if (sent) {
+ for (const auto &e : its_response->get_entries()) {
+ if (e->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP_ACK
+ && e->get_ttl() > 0) {
+ const std::shared_ptr<eventgroupentry_impl> casted_e =
+ std::static_pointer_cast<eventgroupentry_impl>(e);
+ const std::shared_ptr<endpoint_definition> its_reliable =
+ casted_e->get_target(true);
+ if (its_reliable) {
+ its_acks.push_back(
+ std::make_tuple(e->get_service(), e->get_instance(),
+ casted_e->get_eventgroup(), its_reliable));
+ }
+ const std::shared_ptr<endpoint_definition> its_unreliable =
+ casted_e->get_target(false);
+ if (its_unreliable) {
+ its_acks.push_back(
+ std::make_tuple(e->get_service(), e->get_instance(),
+ casted_e->get_eventgroup(),
+ its_unreliable));
+ }
+ }
+ }
+ for (const auto& ack_tuple : its_acks) {
+ host_->send_initial_events(std::get<0>(ack_tuple),
+ std::get<1>(ack_tuple), std::get<2>(ack_tuple),
+ std::get<3>(ack_tuple));
+ }
+ for (const auto &fie : its_response->forced_initial_events_get()) {
+ host_->send_initial_events(fie.service_, fie.instance_,
+ fie.eventgroup_, fie.target_);
+ }
+ }
+}
+
+void service_discovery_impl::remote_subscription_not_acknowledge_all(
+ service_t _service, instance_t _instance) {
+ std::map<eventgroup_t, std::vector<std::shared_ptr<subscriber_t>>>its_pending_subscriptions;
+ {
+ std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_);
+ const auto its_service = pending_remote_subscriptions_.find(_service);
+ if (its_service != pending_remote_subscriptions_.end()) {
+ const auto its_instance = its_service->second.find(_instance);
+ if (its_instance != its_service->second.end()) {
+ for (const auto &its_eventgroup : its_instance->second) {
+ for (const auto &its_client : its_eventgroup.second) {
+ its_pending_subscriptions[its_eventgroup.first].insert(
+ its_pending_subscriptions[its_eventgroup.first].end(),
+ its_client.second.begin(),
+ its_client.second.end());
+ }
+ }
+ // delete everything from this service instance
+ its_service->second.erase(its_instance);
+ if (!its_service->second.size()) {
+ pending_remote_subscriptions_.erase(its_service);
+ }
+ }
+ }
+ }
+ for (const auto &eg : its_pending_subscriptions) {
+ for (const auto &its_subscriber : eg.second) {
+ remote_subscription_acknowledge_subscriber(_service, _instance,
+ eg.first, its_subscriber, false);
+ }
+ }
+}
+
+void service_discovery_impl::remote_subscription_not_acknowledge_all() {
+ std::map<service_t,
+ std::map<instance_t,
+ std::map<eventgroup_t, std::vector<std::shared_ptr<subscriber_t>>>>> to_be_nacked;
+ {
+ std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_);
+ for (const auto &its_service : pending_remote_subscriptions_) {
+ for (const auto &its_instance : its_service.second) {
+ for (const auto &its_eventgroup : its_instance.second) {
+ for (const auto &its_client : its_eventgroup.second) {
+ to_be_nacked[its_service.first]
+ [its_instance.first]
+ [its_eventgroup.first].insert(
+ to_be_nacked[its_service.first][its_instance.first][its_eventgroup.first].end(),
+ its_client.second.begin(),
+ its_client.second.end());
+ }
+ }
+ }
+ }
+ pending_remote_subscriptions_.clear();
+ }
+ for (const auto &s : to_be_nacked) {
+ for (const auto &i : s.second) {
+ for (const auto &eg : i.second) {
+ for (const auto &sub : eg.second) {
+ remote_subscription_acknowledge_subscriber(s.first, i.first,
+ eg.first, sub, false);
+ }
+ }
+ }
+ }
+}
+
+void service_discovery_impl::remote_subscription_remove(
+ service_t _service, instance_t _instance, eventgroup_t _eventgroup,
+ const std::shared_ptr<endpoint_definition> &_subscriber) {
+ std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_);
+ const auto its_service = pending_remote_subscriptions_.find(_service);
+ if (its_service != pending_remote_subscriptions_.end()) {
+ const auto its_instance = its_service->second.find(_instance);
+ if (its_instance != its_service->second.end()) {
+ const auto its_eventgroup = its_instance->second.find(_eventgroup);
+ if (its_eventgroup != its_instance->second.end()) {
+ for (auto client_iter = its_eventgroup->second.begin();
+ client_iter != its_eventgroup->second.end(); ) {
+ for (auto subscriber_iter = client_iter->second.begin();
+ subscriber_iter != client_iter->second.end();) {
+ if ((*subscriber_iter)->subscriber == _subscriber) {
+ (*subscriber_iter)->response_message_id_->response_->decrease_number_required_acks();
+ subscriber_iter = client_iter->second.erase(
+ subscriber_iter);
+ } else {
+ ++subscriber_iter;
+ }
+ }
+ if (!client_iter->second.size()) {
+ client_iter = its_eventgroup->second.erase(client_iter);
+ } else {
+ ++client_iter;
+ }
+ }
+ if (!its_eventgroup->second.size()) {
+ its_instance->second.erase(its_eventgroup);
+ if (!its_service->second.size()) {
+ its_service->second.erase(its_instance);
+ if (!its_service->second.size()) {
+ pending_remote_subscriptions_.erase(its_service);
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+bool service_discovery_impl::check_stop_subscribe_subscribe(
+ message_impl::entries_t::const_iterator _iter,
+ message_impl::entries_t::const_iterator _end) const {
+ const message_impl::entries_t::const_iterator its_next = std::next(_iter);
+ if ((*_iter)->get_type() != entry_type_e::SUBSCRIBE_EVENTGROUP
+ || its_next == _end
+ || (*its_next)->get_type() != entry_type_e::STOP_SUBSCRIBE_EVENTGROUP) {
+ return false;
+ }
+
+ return (*static_cast<eventgroupentry_impl*>(_iter->get())).is_matching_subscribe(
+ *(static_cast<eventgroupentry_impl*>(its_next->get())));
+}
+
} // namespace sd
} // namespace vsomeip