diff options
author | Juergen Gehring <juergen.gehring@bmw.de> | 2018-01-25 00:40:05 -0800 |
---|---|---|
committer | Juergen Gehring <juergen.gehring@bmw.de> | 2018-01-25 00:40:05 -0800 |
commit | 79fd5f7a34ed33392f71fa914a60b2e68b28de68 (patch) | |
tree | 5ea93513d0173ffe6dea57545cc5b28db591f082 /implementation/service_discovery | |
parent | 5c43d511bd5b5e15eca521c4c71dfa69c6f1c90f (diff) | |
download | vSomeIP-79fd5f7a34ed33392f71fa914a60b2e68b28de68.tar.gz |
vsomeip 2.10.02.10.0
Diffstat (limited to 'implementation/service_discovery')
10 files changed, 706 insertions, 152 deletions
diff --git a/implementation/service_discovery/include/entry_impl.hpp b/implementation/service_discovery/include/entry_impl.hpp index fabca3b..db12a1e 100755 --- a/implementation/service_discovery/include/entry_impl.hpp +++ b/implementation/service_discovery/include/entry_impl.hpp @@ -68,6 +68,8 @@ protected: std::vector<uint8_t> options_[VSOMEIP_MAX_OPTION_RUN];
uint8_t num_options_[VSOMEIP_MAX_OPTION_RUN];
+ std::uint8_t index1_;
+ std::uint8_t index2_;
entry_impl();
entry_impl(const entry_impl &entry_);
diff --git a/implementation/service_discovery/include/eventgroupentry_impl.hpp b/implementation/service_discovery/include/eventgroupentry_impl.hpp index d9a6571..877cdbf 100755 --- a/implementation/service_discovery/include/eventgroupentry_impl.hpp +++ b/implementation/service_discovery/include/eventgroupentry_impl.hpp @@ -7,6 +7,7 @@ #define VSOMEIP_SD_EVENTGROUPENTRY_IMPL_HPP
#include "entry_impl.hpp"
+#include "../../endpoints/include/endpoint_definition.hpp"
namespace vsomeip {
namespace sd {
@@ -29,6 +30,24 @@ public: bool serialize(vsomeip::serializer *_to) const;
bool deserialize(vsomeip::deserializer *_from);
+ bool operator==(const eventgroupentry_impl& _other) const {
+ return !(ttl_ != _other.ttl_ ||
+ service_ != _other.service_ ||
+ instance_ != _other.instance_ ||
+ eventgroup_ != _other.eventgroup_ ||
+ index1_ != _other.index1_ ||
+ index2_ != _other.index2_ ||
+ num_options_[0] != _other.num_options_[0] ||
+ num_options_[1] != _other.num_options_[1] ||
+ major_version_ != _other.major_version_ ||
+ counter_ != _other.counter_);
+ }
+
+ bool is_matching_subscribe(const eventgroupentry_impl& _other) const;
+
+ void add_target(const std::shared_ptr<endpoint_definition> &_target);
+ std::shared_ptr<endpoint_definition> get_target(bool _reliable) const;
+
private:
eventgroup_t eventgroup_;
uint16_t reserved_;
@@ -36,6 +55,9 @@ private: // counter field to differentiate parallel subscriptions on same event group
// 4Bit only (max 16. parralel subscriptions)
uint8_t counter_;
+
+ std::shared_ptr<endpoint_definition> target_reliable_;
+ std::shared_ptr<endpoint_definition> target_unreliable_;
};
} // namespace sd
diff --git a/implementation/service_discovery/include/message_impl.hpp b/implementation/service_discovery/include/message_impl.hpp index 1d658a5..f679e9c 100755 --- a/implementation/service_discovery/include/message_impl.hpp +++ b/implementation/service_discovery/include/message_impl.hpp @@ -8,11 +8,14 @@ #include <memory>
#include <vector>
+#include <atomic>
+#include <mutex>
#include <vsomeip/message.hpp>
#include "../include/primitive_types.hpp"
#include "../../message/include/message_base_impl.hpp"
+#include "../../endpoints/include/endpoint_definition.hpp"
# if _MSC_VER >= 1300
/*
@@ -40,6 +43,14 @@ class protection_option_impl; class message_impl: public vsomeip::message, public vsomeip::message_base_impl {
public:
+ typedef std::vector<std::shared_ptr<entry_impl>> entries_t;
+ typedef std::vector<std::shared_ptr<option_impl>> options_t;
+ struct forced_initial_events_t {
+ std::shared_ptr<vsomeip::endpoint_definition> target_;
+ vsomeip::service_t service_;
+ vsomeip::instance_t instance_;
+ vsomeip::eventgroup_t eventgroup_;
+ };
message_impl();
virtual ~message_impl();
@@ -61,8 +72,8 @@ public: std::shared_ptr<load_balancing_option_impl> create_load_balancing_option();
std::shared_ptr<protection_option_impl> create_protection_option();
- const std::vector<std::shared_ptr<entry_impl> > & get_entries() const;
- const std::vector<std::shared_ptr<option_impl> > & get_options() const;
+ const entries_t & get_entries() const;
+ const options_t & get_options() const;
int16_t get_option_index(const std::shared_ptr<option_impl> &_option) const;
uint32_t get_options_length();
@@ -75,6 +86,18 @@ public: length_t get_someip_length() const;
+ std::uint8_t get_number_required_acks() const;
+ std::uint8_t get_number_contained_acks() const;
+ void set_number_required_acks(std::uint8_t _required_acks);
+ void increase_number_required_acks(std::uint8_t _amount = 1);
+ void decrease_number_required_acks(std::uint8_t _amount = 1);
+ void increase_number_contained_acks();
+ bool all_required_acks_contained() const;
+ std::unique_lock<std::mutex> get_message_lock();
+
+ void forced_initial_events_add(forced_initial_events_t _entry);
+ const std::vector<forced_initial_events_t> forced_initial_events_get();
+
private:
entry_impl * deserialize_entry(vsomeip::deserializer *_from);
option_impl * deserialize_option(vsomeip::deserializer *_from);
@@ -83,8 +106,14 @@ private: flags_t flags_;
uint32_t options_length_;
- std::vector<std::shared_ptr<entry_impl> > entries_;
- std::vector<std::shared_ptr<option_impl> > options_;
+ entries_t entries_;
+ options_t options_;
+ std::atomic<std::uint8_t> number_required_acks_;
+ std::atomic<std::uint8_t> number_contained_acks_;
+ std::mutex message_mutex_;
+
+ std::mutex forced_initial_events_mutex_;
+ std::vector<forced_initial_events_t> forced_initial_events_info_;
};
} // namespace sd
diff --git a/implementation/service_discovery/include/service_discovery.hpp b/implementation/service_discovery/include/service_discovery.hpp index 075129b..caec99e 100644 --- a/implementation/service_discovery/include/service_discovery.hpp +++ b/implementation/service_discovery/include/service_discovery.hpp @@ -65,6 +65,10 @@ public: std::shared_ptr<serviceinfo> _info) = 0; virtual void set_diagnosis_mode(const bool _activate) = 0; + virtual void remote_subscription_acknowledge( + service_t _service, instance_t _instance, eventgroup_t _eventgroup, + client_t _client, bool _accepted, + const std::shared_ptr<sd_message_identifier_t> &_sd_message_id) = 0; }; } // namespace sd diff --git a/implementation/service_discovery/include/service_discovery_host.hpp b/implementation/service_discovery/include/service_discovery_host.hpp index 492c920..f6ab65d 100644 --- a/implementation/service_discovery/include/service_discovery_host.hpp +++ b/implementation/service_discovery/include/service_discovery_host.hpp @@ -15,6 +15,8 @@ #include "../../routing/include/types.hpp" +#include <vsomeip/message.hpp> + namespace vsomeip { class configuration; @@ -57,12 +59,6 @@ public: virtual std::chrono::milliseconds update_routing_info( std::chrono::milliseconds _elapsed) = 0; - virtual void on_subscribe(service_t _service, instance_t _instance, - eventgroup_t _eventgroup, - std::shared_ptr<endpoint_definition> _subscriber, - std::shared_ptr<endpoint_definition> _target, - const std::chrono::steady_clock::time_point &_expiration) = 0; - virtual void on_unsubscribe(service_t _service, instance_t _instance, eventgroup_t _eventgroup, std::shared_ptr<endpoint_definition> _target) = 0; @@ -72,7 +68,7 @@ public: virtual void on_subscribe_ack(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, - event_t _event) = 0; + event_t _event, pending_subscription_id_t _subscription_id) = 0; virtual std::shared_ptr<endpoint> find_or_create_remote_client( service_t _service, instance_t _instance, @@ -81,13 +77,16 @@ public: virtual void expire_subscriptions(const boost::asio::ip::address &_address) = 0; virtual void expire_services(const boost::asio::ip::address &_address) = 0; - virtual bool on_subscribe_accepted(service_t _service, instance_t _instance, - eventgroup_t _eventgroup, std::shared_ptr<endpoint_definition> _target, - const std::chrono::steady_clock::time_point &_expiration) = 0; + virtual remote_subscription_state_e on_remote_subscription( + service_t _service, instance_t _instance, eventgroup_t _eventgroup, + const std::shared_ptr<endpoint_definition> &_subscriber, + const std::shared_ptr<endpoint_definition> &_target, + ttl_t _ttl, client_t *_client, + const std::shared_ptr<sd_message_identifier_t> &_sd_message_id) = 0; virtual void on_subscribe_nack(client_t _client, service_t _service, instance_t _instance, eventgroup_t _eventgroup, - event_t _event) = 0; + event_t _event, pending_subscription_id_t _subscription_id) = 0; virtual bool has_identified(client_t _client, service_t _service, instance_t _instance, bool _reliable) = 0; @@ -98,6 +97,10 @@ public: service_t _service, instance_t _instance) const = 0; virtual std::map<instance_t, std::shared_ptr<serviceinfo>> get_offered_service_instances( service_t _service) const = 0; + + virtual void send_initial_events(service_t _service, instance_t _instance, + eventgroup_t _eventgroup, + const std::shared_ptr<endpoint_definition>& _subscriber) = 0; }; } // namespace sd diff --git a/implementation/service_discovery/include/service_discovery_impl.hpp b/implementation/service_discovery/include/service_discovery_impl.hpp index 5f28135..68b159f 100644 --- a/implementation/service_discovery/include/service_discovery_impl.hpp +++ b/implementation/service_discovery/include/service_discovery_impl.hpp @@ -12,6 +12,7 @@ #include <set> #include <forward_list> #include <atomic> +#include <tuple> #include <boost/asio/steady_timer.hpp> @@ -35,19 +36,20 @@ class eventgroupentry_impl; class option_impl; class request; class serviceentry_impl; -class service_discovery_fsm; class service_discovery_host; class subscription; typedef std::map<service_t, std::map<instance_t, std::shared_ptr<request> > > requests_t; -struct accepted_subscriber_t { - std::shared_ptr < endpoint_definition > subscriber; - std::shared_ptr < endpoint_definition > target; - std::chrono::steady_clock::time_point its_expiration; - vsomeip::service_t service_id; - vsomeip::instance_t instance_id; - vsomeip::eventgroup_t eventgroup_; +struct subscriber_t { + std::shared_ptr<endpoint_definition> subscriber; + std::shared_ptr<endpoint_definition> target; + std::shared_ptr<sd_message_identifier_t> response_message_id_; + std::shared_ptr<eventgroupinfo> eventgroupinfo_; + vsomeip::ttl_t ttl_; + vsomeip::major_version_t major_; + std::uint16_t reserved_; + std::uint8_t counter_; }; class service_discovery_impl: public service_discovery, @@ -93,6 +95,10 @@ public: void set_diagnosis_mode(const bool _activate); + void remote_subscription_acknowledge( + service_t _service, instance_t _instance, eventgroup_t _eventgroup, + client_t _client, bool _acknowledged, + const std::shared_ptr<sd_message_identifier_t> &_sd_message_id); private: std::pair<session_t, bool> get_session(const boost::asio::ip::address &_address); void increment_session(const boost::asio::ip::address &_address); @@ -123,7 +129,9 @@ private: std::shared_ptr<subscription> &_subscription); void insert_subscription_ack(std::shared_ptr<message_impl> &_message, service_t _service, instance_t _instance, eventgroup_t _eventgroup, - std::shared_ptr<eventgroupinfo> &_info, ttl_t _ttl, uint8_t _counter, major_version_t _major, uint16_t _reserved); + const std::shared_ptr<eventgroupinfo> &_info, ttl_t _ttl, + uint8_t _counter, major_version_t _major, uint16_t _reserved, + const std::shared_ptr<endpoint_definition> &_target = nullptr); void insert_subscription_nack(std::shared_ptr<message_impl> &_message, service_t _service, instance_t _instance, eventgroup_t _eventgroup, uint8_t _counter, major_version_t _major, uint16_t _reserved); @@ -152,8 +160,10 @@ private: std::shared_ptr<eventgroupentry_impl> &_entry, const std::vector<std::shared_ptr<option_impl> > &_options, std::shared_ptr < message_impl > &its_message_response, - std::vector <accepted_subscriber_t> &accepted_subscribers, - const boost::asio::ip::address &_destination); + const boost::asio::ip::address &_destination, + const std::shared_ptr<sd_message_identifier_t> &_message_id, + bool _is_stop_subscribe_subscribe, + bool _force_initial_events); void handle_eventgroup_subscription(service_t _service, instance_t _instance, eventgroup_t _eventgroup, major_version_t _major, ttl_t _ttl, uint8_t _counter, uint16_t _reserved, @@ -162,7 +172,9 @@ private: const boost::asio::ip::address &_second_address, uint16_t _second_port, bool _is_second_reliable, std::shared_ptr < message_impl > &its_message, - std::vector <accepted_subscriber_t> &accepted_subscribers); + const std::shared_ptr<sd_message_identifier_t> &_message_id, + bool _is_stop_subscribe_subscribe, + bool _force_initial_events); void handle_eventgroup_subscription_ack(service_t _service, instance_t _instance, eventgroup_t _eventgroup, major_version_t _major, ttl_t _ttl, uint8_t _counter, @@ -182,7 +194,9 @@ private: void check_ttl(const boost::system::error_code &_error); void start_subscription_expiration_timer(); + void start_subscription_expiration_timer_unlocked(); void stop_subscription_expiration_timer(); + void stop_subscription_expiration_timer_unlocked(); void expire_subscriptions(const boost::system::error_code &_error); bool check_ipv4_address(boost::asio::ip::address its_address); @@ -279,6 +293,27 @@ private: bool check_source_address(const boost::asio::ip::address &its_source_address) const; + void update_subscription_expiration_timer(const std::shared_ptr<message_impl> &_message); + + void remote_subscription_acknowledge_subscriber( + service_t _service, instance_t _instance, eventgroup_t _eventgroup, + const std::shared_ptr<subscriber_t> &_subscriber, bool _acknowledged); + + void remote_subscription_not_acknowledge_subscriber( + service_t _service, instance_t _instance, eventgroup_t _eventgroup, + const std::shared_ptr<subscriber_t> &_subscriber, bool _acknowledged); + + void remote_subscription_not_acknowledge_all(service_t _service, instance_t _instance); + + void remote_subscription_not_acknowledge_all(); + + void remote_subscription_remove( + service_t _service, instance_t _instance, eventgroup_t _eventgroup, + const std::shared_ptr<endpoint_definition> &_subscriber); + + bool check_stop_subscribe_subscribe(message_impl::entries_t::const_iterator _iter, + message_impl::entries_t::const_iterator _end) const; + private: boost::asio::io_service &io_; service_discovery_host *host_; @@ -317,6 +352,7 @@ private: ttl_t ttl_; // TTL handling for subscriptions done by other hosts + std::mutex subscription_expiration_timer_mutex_; boost::asio::steady_timer subscription_expiration_timer_; std::chrono::steady_clock::time_point next_subscription_expiration_; @@ -358,6 +394,13 @@ private: boost::asio::ip::address current_remote_address_; std::atomic<bool> is_diagnosis_; + + std::mutex pending_remote_subscriptions_mutex_; + std::map<service_t, + std::map<instance_t, + std::map<eventgroup_t, + std::map<client_t, std::vector<std::shared_ptr<subscriber_t>>>>>> pending_remote_subscriptions_; + std::mutex response_mutex_; }; } // namespace sd diff --git a/implementation/service_discovery/src/entry_impl.cpp b/implementation/service_discovery/src/entry_impl.cpp index d647bf5..ecab203 100755 --- a/implementation/service_discovery/src/entry_impl.cpp +++ b/implementation/service_discovery/src/entry_impl.cpp @@ -23,6 +23,8 @@ entry_impl::entry_impl() { ttl_ = 0x0;
num_options_[0] = 0;
num_options_[1] = 0;
+ index1_ = 0;
+ index2_ = 0;
}
entry_impl::entry_impl(const entry_impl &_entry) {
@@ -33,6 +35,8 @@ entry_impl::entry_impl(const entry_impl &_entry) { ttl_ = _entry.ttl_;
num_options_[0] = _entry.num_options_[0];
num_options_[1] = _entry.num_options_[1];
+ index1_ = _entry.index1_;
+ index2_ = _entry.index2_;
}
entry_impl::~entry_impl() {
@@ -139,11 +143,9 @@ bool entry_impl::deserialize(vsomeip::deserializer *_from) { is_successful = is_successful && _from->deserialize(its_type);
type_ = static_cast<entry_type_e>(its_type);
- uint8_t its_index1(0);
- is_successful = is_successful && _from->deserialize(its_index1);
+ is_successful = is_successful && _from->deserialize(index1_);
- uint8_t its_index2(0);
- is_successful = is_successful && _from->deserialize(its_index2);
+ is_successful = is_successful && _from->deserialize(index2_);
uint8_t its_numbers(0);
is_successful = is_successful && _from->deserialize(its_numbers);
@@ -151,10 +153,10 @@ bool entry_impl::deserialize(vsomeip::deserializer *_from) { num_options_[0] = uint8_t(its_numbers >> 4);
num_options_[1] = uint8_t(its_numbers & 0xF);
- for (uint16_t i = its_index1; i < its_index1 + num_options_[0]; ++i)
+ for (uint16_t i = index1_; i < index1_ + num_options_[0]; ++i)
options_[0].push_back((uint8_t)(i));
- for (uint16_t i = its_index2; i < its_index2 + num_options_[1]; ++i)
+ for (uint16_t i = index2_; i < index2_ + num_options_[1]; ++i)
options_[1].push_back((uint8_t)(i));
uint16_t its_id(0);
diff --git a/implementation/service_discovery/src/eventgroupentry_impl.cpp b/implementation/service_discovery/src/eventgroupentry_impl.cpp index 29716d7..1f9dd4a 100755 --- a/implementation/service_discovery/src/eventgroupentry_impl.cpp +++ b/implementation/service_discovery/src/eventgroupentry_impl.cpp @@ -115,5 +115,34 @@ bool eventgroupentry_impl::deserialize(vsomeip::deserializer *_from) { return is_successful;
}
+bool eventgroupentry_impl::is_matching_subscribe(
+ const eventgroupentry_impl& _other) const {
+ return !(ttl_ != 0
+ || _other.ttl_ == 0
+ || service_ != _other.service_
+ || instance_ != _other.instance_
+ || eventgroup_ != _other.eventgroup_
+ || index1_ != _other.index1_
+ || index2_ != _other.index2_
+ || num_options_[0] != _other.num_options_[0]
+ || num_options_[1] != _other.num_options_[1]
+ || major_version_ != _other.major_version_
+ || counter_ != _other.counter_);
+}
+
+void eventgroupentry_impl::add_target(
+ const std::shared_ptr<endpoint_definition> &_target) {
+ if (_target->is_reliable()) {
+ target_reliable_ = _target;
+ } else {
+ target_unreliable_ = _target;
+ }
+}
+
+std::shared_ptr<endpoint_definition> eventgroupentry_impl::get_target(
+ bool _reliable) const {
+ return _reliable ? target_reliable_ : target_unreliable_;
+}
+
} // namespace sd
} // namespace vsomeip
diff --git a/implementation/service_discovery/src/message_impl.cpp b/implementation/service_discovery/src/message_impl.cpp index d4d3931..d23bf31 100755 --- a/implementation/service_discovery/src/message_impl.cpp +++ b/implementation/service_discovery/src/message_impl.cpp @@ -26,12 +26,14 @@ namespace vsomeip {
namespace sd {
-message_impl::message_impl() {
+message_impl::message_impl() :
+ flags_(0x0),
+ options_length_(0x0),
+ number_required_acks_(0x0),
+ number_contained_acks_(0x0) {
header_.service_ = 0xFFFF;
header_.method_ = 0x8100;
header_.protocol_version_ = 0x01;
- flags_ = 0x00;
- options_length_ = 0x0000;
}
message_impl::~message_impl() {
@@ -149,11 +151,11 @@ std::shared_ptr<protection_option_impl> message_impl::create_protection_option() return its_option;
}
-const std::vector<std::shared_ptr<entry_impl> > & message_impl::get_entries() const {
+const message_impl::entries_t & message_impl::get_entries() const {
return entries_;
}
-const std::vector<std::shared_ptr<option_impl> > & message_impl::get_options() const {
+const message_impl::options_t & message_impl::get_options() const {
return options_;
}
@@ -236,7 +238,31 @@ bool message_impl::deserialize(vsomeip::deserializer *_from) { while (is_successful && _from->get_remaining()) {
std::shared_ptr < entry_impl > its_entry(deserialize_entry(_from));
if (its_entry) {
- entries_.push_back(its_entry);
+ if (its_entry->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP) {
+ bool is_unique(true);
+ for (const auto& e : entries_) {
+ if (e->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP &&
+ *(static_cast<eventgroupentry_impl*>(e.get())) ==
+ *(static_cast<eventgroupentry_impl*>(its_entry.get()))) {
+ is_unique = false;
+ break;
+ }
+ }
+ if (is_unique) {
+ entries_.push_back(its_entry);
+ if (its_entry->get_ttl() > 0) {
+ const std::uint8_t num_options =
+ static_cast<std::uint8_t>(
+ its_entry->get_num_options(1) +
+ its_entry->get_num_options(2));
+ number_required_acks_ =
+ static_cast<std::uint8_t>(number_required_acks_
+ + num_options);
+ }
+ }
+ } else {
+ entries_.push_back(its_entry);
+ }
} else {
is_successful = false;
}
@@ -366,5 +392,48 @@ length_t message_impl::get_someip_length() const { return header_.length_;
}
+std::uint8_t message_impl::get_number_required_acks() const {
+ return number_required_acks_;
+}
+
+std::uint8_t message_impl::get_number_contained_acks() const {
+ return number_contained_acks_;
+}
+
+void message_impl::set_number_required_acks(std::uint8_t _required_acks) {
+ number_required_acks_ = _required_acks;
+}
+
+void message_impl::increase_number_required_acks(std::uint8_t _amount) {
+ number_required_acks_ += _amount;
+}
+
+void message_impl::decrease_number_required_acks(std::uint8_t _amount) {
+ number_required_acks_ -= _amount;
+}
+
+void message_impl::increase_number_contained_acks() {
+ number_contained_acks_++;
+}
+
+bool message_impl::all_required_acks_contained() const {
+ return number_contained_acks_ == number_required_acks_;
+}
+
+std::unique_lock<std::mutex> message_impl::get_message_lock() {
+ return std::unique_lock<std::mutex>(message_mutex_);
+}
+
+void message_impl::forced_initial_events_add(forced_initial_events_t _entry) {
+ std::lock_guard<std::mutex> its_lock(forced_initial_events_mutex_);
+ forced_initial_events_info_.push_back(_entry);
+}
+
+const std::vector<message_impl::forced_initial_events_t>
+message_impl::forced_initial_events_get() {
+ std::lock_guard<std::mutex> its_lock(forced_initial_events_mutex_);
+ return forced_initial_events_info_;
+}
+
} // namespace sd
} // namespace vsomeip
diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp index d4a8964..ca9aa5a 100644 --- a/implementation/service_discovery/src/service_discovery_impl.cpp +++ b/implementation/service_discovery/src/service_discovery_impl.cpp @@ -1000,8 +1000,11 @@ void service_discovery_impl::insert_nack_subscription_on_resubscribe(std::shared void service_discovery_impl::insert_subscription_ack( std::shared_ptr<message_impl> &_message, service_t _service, instance_t _instance, eventgroup_t _eventgroup, - std::shared_ptr<eventgroupinfo> &_info, ttl_t _ttl, uint8_t _counter, major_version_t _major, uint16_t _reserved) { - + const std::shared_ptr<eventgroupinfo> &_info, ttl_t _ttl, + uint8_t _counter, major_version_t _major, uint16_t _reserved, + const std::shared_ptr<endpoint_definition> &_target) { + std::unique_lock<std::mutex> its_lock(_message->get_message_lock()); + _message->increase_number_contained_acks(); for (auto its_entry : _message->get_entries()) { if (its_entry->is_eventgroup_entry()) { std::shared_ptr < eventgroupentry_impl > its_eventgroup_entry = @@ -1015,6 +1018,17 @@ void service_discovery_impl::insert_subscription_ack( && its_eventgroup_entry->get_reserved() == _reserved && its_eventgroup_entry->get_counter() == _counter && its_eventgroup_entry->get_ttl() == _ttl) { + if (_target) { + if (_target->is_reliable()) { + if (!its_eventgroup_entry->get_target(true)) { + its_eventgroup_entry->add_target(_target); + } + } else { + if (!its_eventgroup_entry->get_target(false)) { + its_eventgroup_entry->add_target(_target); + } + } + } return; } } @@ -1031,6 +1045,9 @@ void service_discovery_impl::insert_subscription_ack( its_entry->set_counter(_counter); // SWS_SD_00315 its_entry->set_ttl(_ttl); + if (_target) { + its_entry->add_target(_target); + } boost::asio::ip::address its_address; uint16_t its_port; @@ -1043,6 +1060,7 @@ void service_discovery_impl::insert_subscription_nack( std::shared_ptr<message_impl> &_message, service_t _service, instance_t _instance, eventgroup_t _eventgroup, uint8_t _counter, major_version_t _major, uint16_t _reserved) { + std::unique_lock<std::mutex> its_lock(_message->get_message_lock()); std::shared_ptr < eventgroupentry_impl > its_entry = _message->create_eventgroup_entry(); // SWS_SD_00316 and SWS_SD_00385 @@ -1055,6 +1073,7 @@ void service_discovery_impl::insert_subscription_nack( its_entry->set_counter(_counter); // SWS_SD_00432 its_entry->set_ttl(0x0); + _message->increase_number_contained_acks(); } bool service_discovery_impl::send(bool _is_announcing) { @@ -1064,6 +1083,7 @@ bool service_discovery_impl::send(bool _is_announcing) { std::shared_ptr < message_impl > its_message; if(_is_announcing) { + remote_subscription_not_acknowledge_all(); its_message = its_runtime->create_message(); its_messages.push_back(its_message); @@ -1127,32 +1147,62 @@ void service_discovery_impl::on_message(const byte_t *_data, length_t _length, std::shared_ptr < message_impl > its_message_response = its_runtime->create_message(); - std::vector <accepted_subscriber_t> accepted_subscribers; - for (auto its_entry : its_message->get_entries()) { - if (its_entry->is_service_entry()) { + const std::uint8_t its_required_acks = + its_message->get_number_required_acks(); + its_message_response->set_number_required_acks(its_required_acks); + std::shared_ptr<sd_message_identifier_t> its_message_id = + std::make_shared<sd_message_identifier_t>( + its_message->get_session(), _sender, _destination, + its_message_response); + + const message_impl::entries_t& its_entries = its_message->get_entries(); + const message_impl::entries_t::const_iterator its_end = its_entries.end(); + bool is_stop_subscribe_subscribe(false); + + for (auto iter = its_entries.begin(); iter != its_end; iter++) { + if ((*iter)->is_service_entry()) { std::shared_ptr < serviceentry_impl > its_service_entry = std::dynamic_pointer_cast < serviceentry_impl - > (its_entry); + > (*iter); bool its_unicast_flag = its_message->get_unicast_flag(); process_serviceentry(its_service_entry, its_options, its_unicast_flag ); } else { std::shared_ptr < eventgroupentry_impl > its_eventgroup_entry = std::dynamic_pointer_cast < eventgroupentry_impl - > (its_entry); - process_eventgroupentry( its_eventgroup_entry, its_options, its_message_response, accepted_subscribers, _destination); + > (*iter); + bool force_initial_events(false); + if (is_stop_subscribe_subscribe) { + is_stop_subscribe_subscribe = false; + force_initial_events = true; + } + is_stop_subscribe_subscribe = check_stop_subscribe_subscribe(iter, its_end); + process_eventgroupentry(its_eventgroup_entry, its_options, + its_message_response, _destination, + its_message_id, is_stop_subscribe_subscribe, force_initial_events); + } + } + + // send answer directly if SubscribeEventgroup entries were (n)acked + if (its_required_acks || its_message_response->get_number_required_acks() > 0) { + bool sent(false); + { + std::lock_guard<std::mutex> its_lock(response_mutex_); + if (its_message_response->all_required_acks_contained()) { + update_subscription_expiration_timer(its_message_response); + serialize_and_send(its_message_response, _sender); + // set required acks to zero to mark message as sent + its_message_response->set_number_required_acks(0); + sent = true; + } + } + if (sent) { + for (const auto &fie : its_message_response->forced_initial_events_get()) { + host_->send_initial_events(fie.service_, fie.instance_, + fie.eventgroup_, fie.target_); + } } } - - //send ACK / NACK if present - if( 0 < its_message_response->get_entries().size() && its_message_response ) { - serialize_and_send(its_message_response, _sender); - } - - for( const auto &a : accepted_subscribers) { - host_->on_subscribe(a.service_id, a.instance_id, a.eventgroup_, a.subscriber, a.target, a.its_expiration); - } - accepted_subscribers.clear(); start_ttl_timer(); } else { VSOMEIP_ERROR << "service_discovery_impl::on_message: deserialization error."; @@ -1658,8 +1708,10 @@ void service_discovery_impl::process_eventgroupentry( std::shared_ptr<eventgroupentry_impl> &_entry, const std::vector<std::shared_ptr<option_impl> > &_options, std::shared_ptr < message_impl > &its_message_response, - std::vector <accepted_subscriber_t> &accepted_subscribers, - const boost::asio::ip::address &_destination) { + const boost::asio::ip::address &_destination, + const std::shared_ptr<sd_message_identifier_t> &_message_id, + bool _is_stop_subscribe_subscribe, + bool _force_initial_events) { service_t its_service = _entry->get_service(); instance_t its_instance = _entry->get_instance(); eventgroup_t its_eventgroup = _entry->get_eventgroup(); @@ -1691,6 +1743,9 @@ void service_discovery_impl::process_eventgroupentry( && _entry->get_num_options(2) == 0) { VSOMEIP_ERROR << "Invalid number of options in SubscribeEventGroup entry"; if(its_ttl > 0) { + // increase number of required acks by one as number required acks + // is calculated based on the number of referenced options + its_message_response->increase_number_required_acks(); insert_subscription_nack(its_message_response, its_service, its_instance, its_eventgroup, its_counter, its_major, its_reserved); } @@ -1712,6 +1767,14 @@ void service_discovery_impl::process_eventgroupentry( VSOMEIP_ERROR << "Fewer options in SD message than " "referenced in EventGroup entry or malformed option received"; if(its_ttl > 0) { + const std::uint8_t num_overreferenced_options = + static_cast<std::uint8_t>(_entry->get_num_options(1) + + _entry->get_num_options(2) - _options.size()); + if (its_message_response->get_number_required_acks() - + num_overreferenced_options > 0) { + its_message_response->decrease_number_required_acks( + num_overreferenced_options); + } insert_subscription_nack(its_message_response, its_service, its_instance, its_eventgroup, its_counter, its_major, its_reserved); } @@ -1895,12 +1958,14 @@ void service_discovery_impl::process_eventgroupentry( } break; case option_type_e::CONFIGURATION: { + its_message_response->decrease_number_required_acks(); break; } case option_type_e::UNKNOWN: default: VSOMEIP_WARNING << "Unsupported eventgroup option"; if(its_ttl > 0) { + its_message_response->decrease_number_required_acks(); insert_subscription_nack(its_message_response, its_service, its_instance, its_eventgroup, its_counter, its_major, its_reserved); return; @@ -1914,7 +1979,8 @@ void service_discovery_impl::process_eventgroupentry( handle_eventgroup_subscription(its_service, its_instance, its_eventgroup, its_major, its_ttl, its_counter, its_reserved, its_first_address, its_first_port, is_first_reliable, - its_second_address, its_second_port, is_second_reliable, its_message_response, accepted_subscribers); + its_second_address, its_second_port, is_second_reliable, its_message_response, + _message_id, _is_stop_subscribe_subscribe, _force_initial_events); } else { if( entry_type_e::SUBSCRIBE_EVENTGROUP_ACK == its_type) { //this type is used for ACK and NACK messages if(its_ttl > 0) { @@ -1934,7 +2000,8 @@ void service_discovery_impl::handle_eventgroup_subscription(service_t _service, const boost::asio::ip::address &_first_address, uint16_t _first_port, bool _is_first_reliable, const boost::asio::ip::address &_second_address, uint16_t _second_port, bool _is_second_reliable, std::shared_ptr < message_impl > &its_message, - std::vector <accepted_subscriber_t> &accepted_subscribers) { + const std::shared_ptr<sd_message_identifier_t> &_message_id, + bool _is_stop_subscribe_subscribe, bool _force_initial_events) { if (its_message) { bool has_reliable_events(false); @@ -1966,30 +2033,38 @@ void service_discovery_impl::handle_eventgroup_subscription(service_t _service, if (reliablility_nack && _ttl > 0) { insert_subscription_nack(its_message, _service, _instance, _eventgroup, _counter, _major, _reserved); - VSOMEIP_WARNING << "Subscription for service/instance " - << std::hex << _service << "/" << _instance - << " not valid: Event configuration does not match the provided endpoint options"; + boost::system::error_code ec; + VSOMEIP_WARNING << "Subscription for [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]" + << " not valid: Event configuration does not match the provided " + << "endpoint options: " + << _first_address.to_string(ec) << ":" << std::dec << _first_port << " " + << _second_address.to_string(ec) << ":" << std::dec << _second_port; return; } std::shared_ptr < eventgroupinfo > its_info = host_->find_eventgroup( _service, _instance, _eventgroup); - bool is_nack(false); - std::shared_ptr < endpoint_definition > its_first_subscriber, - its_second_subscriber; - std::shared_ptr < endpoint_definition > its_first_target, - its_second_target; + struct subscriber_target_t { + std::shared_ptr<endpoint_definition> subscriber_; + std::shared_ptr<endpoint_definition> target_; + }; + std::array<subscriber_target_t, 2> its_targets = + { subscriber_target_t(), subscriber_target_t() }; // Could not find eventgroup or wrong version if (!its_info || _major != its_info->get_major()) { // Create a temporary info object with TTL=0 --> send NACK if( its_info && (_major != its_info->get_major())) { VSOMEIP_ERROR << "Requested major version:[" << (uint32_t) _major - << "] in subscription to service:[" << _service - << "] instance:[" << _instance - << "] eventgroup:[" << _eventgroup - << "], does not match with services major version:[" << (uint32_t) its_info->get_major() + << "] in subscription to service: [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]" + << " does not match with services major version:[" << (uint32_t) its_info->get_major() << "]"; } else { VSOMEIP_ERROR << "Requested eventgroup:[" << _eventgroup @@ -2006,117 +2081,125 @@ void service_discovery_impl::handle_eventgroup_subscription(service_t _service, boost::asio::ip::address its_first_address, its_second_address; uint16_t its_first_port, its_second_port; if (ILLEGAL_PORT != _first_port) { - its_first_subscriber = endpoint_definition::get( + its_targets[0].subscriber_ = endpoint_definition::get( _first_address, _first_port, _is_first_reliable, _service, _instance); if (!_is_first_reliable && its_info->get_multicast(its_first_address, its_first_port)) { // udp multicast - its_first_target = endpoint_definition::get( + its_targets[0].target_ = endpoint_definition::get( its_first_address, its_first_port, false, _service, _instance); } else if(_is_first_reliable) { // tcp unicast - its_first_target = its_first_subscriber; + its_targets[0].target_ = its_targets[0].subscriber_; // check if TCP connection is established by client - if( !is_tcp_connected(_service, _instance, its_first_target) && _ttl > 0) { + if(_ttl > 0 && !is_tcp_connected(_service, _instance, its_targets[0].target_)) { insert_subscription_nack(its_message, _service, _instance, _eventgroup, _counter, _major, _reserved); - VSOMEIP_ERROR << "TCP connection to target1: [" << its_first_target->get_address().to_string() - << ":" << its_first_target->get_port() - << "] not established for subscription to service:[" << _service - << "] instance:[" << _instance - << "] eventgroup:[" << _eventgroup << "]"; + VSOMEIP_ERROR << "TCP connection to target1: [" + << its_targets[0].target_->get_address().to_string() + << ":" << its_targets[0].target_->get_port() + << "] not established for subscription to: [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; + if (ILLEGAL_PORT != _second_port) { + its_message->decrease_number_required_acks(); + } return; } } else { // udp unicast - its_first_target = its_first_subscriber; + its_targets[0].target_ = its_targets[0].subscriber_; } } if (ILLEGAL_PORT != _second_port) { - its_second_subscriber = endpoint_definition::get( + its_targets[1].subscriber_ = endpoint_definition::get( _second_address, _second_port, _is_second_reliable, _service, _instance); if (!_is_second_reliable && its_info->get_multicast(its_second_address, its_second_port)) { // udp multicast - its_second_target = endpoint_definition::get( + its_targets[1].target_ = endpoint_definition::get( its_second_address, its_second_port, false, _service, _instance); } else if (_is_second_reliable) { // tcp unicast - its_second_target = its_second_subscriber; + its_targets[1].target_ = its_targets[1].subscriber_; // check if TCP connection is established by client - if(_ttl > 0 && !is_tcp_connected(_service, _instance, its_second_target)) { + if(_ttl > 0 && !is_tcp_connected(_service, _instance, its_targets[1].target_)) { insert_subscription_nack(its_message, _service, _instance, _eventgroup, _counter, _major, _reserved); - VSOMEIP_ERROR << "TCP connection to target2 : [" << its_second_target->get_address().to_string() - << ":" << its_second_target->get_port() - << "] not established for subscription to service:[" << _service - << "] instance:[" << _instance - << "] eventgroup:[" << _eventgroup << "]"; + VSOMEIP_ERROR << "TCP connection to target2 : [" + << its_targets[1].target_->get_address().to_string() + << ":" << its_targets[1].target_->get_port() + << "] not established for subscription to: [" + << std::hex << std::setw(4) << std::setfill('0') << _service << "." + << std::hex << std::setw(4) << std::setfill('0') << _instance << "." + << std::hex << std::setw(4) << std::setfill('0') << _eventgroup << "]"; + if (ILLEGAL_PORT != _first_port) { + its_message->decrease_number_required_acks(); + } return; } } else { // udp unicast - its_second_target = its_second_subscriber; + its_targets[1].target_ = its_targets[1].subscriber_; } } } if (_ttl == 0) { // --> unsubscribe - if (its_first_subscriber) { - host_->on_unsubscribe(_service, _instance, _eventgroup, its_first_subscriber); - } - if (its_second_subscriber) { - host_->on_unsubscribe(_service, _instance, _eventgroup, its_second_subscriber); + for (const auto &target : its_targets) { + if (target.subscriber_) { + remote_subscription_remove(_service, _instance, _eventgroup, target.subscriber_); + if (!_is_stop_subscribe_subscribe) { + host_->on_unsubscribe(_service, _instance, _eventgroup, target.subscriber_); + } + } } return; } - std::chrono::steady_clock::time_point its_expiration - = std::chrono::steady_clock::now() + std::chrono::seconds(_ttl); - - if (its_first_target) { - if(!host_->on_subscribe_accepted(_service, _instance, _eventgroup, - its_first_subscriber, its_expiration)) { - is_nack = true; - insert_subscription_nack(its_message, _service, _instance, _eventgroup, - _counter, _major, _reserved); - } - } - if (its_second_subscriber) { - if(!host_->on_subscribe_accepted(_service, _instance, _eventgroup, - its_second_subscriber, its_expiration)) { - is_nack = true; - insert_subscription_nack(its_message, _service, _instance, _eventgroup, - _counter, _major, _reserved); - } - } - - if (!is_nack) - { - insert_subscription_ack(its_message, _service, _instance, _eventgroup, - its_info, _ttl, _counter, _major, _reserved); - - if (its_expiration < next_subscription_expiration_) { - stop_subscription_expiration_timer(); - next_subscription_expiration_ = its_expiration; - start_subscription_expiration_timer(); - } - - if (its_first_target && its_first_subscriber) { - accepted_subscriber_t subscriber_; - subscriber_.service_id = _service; - subscriber_.instance_id = _instance; - subscriber_.eventgroup_ = _eventgroup; - subscriber_.subscriber = its_first_subscriber; - subscriber_.target = its_first_target; - subscriber_.its_expiration = its_expiration; - - accepted_subscribers.push_back(subscriber_); - } - if (its_second_target && its_second_subscriber) { - accepted_subscriber_t subscriber_; - subscriber_.service_id = _service; - subscriber_.instance_id = _instance; - subscriber_.eventgroup_ = _eventgroup; - subscriber_.subscriber = its_second_subscriber; - subscriber_.target = its_second_target; - subscriber_.its_expiration = its_expiration; - - accepted_subscribers.push_back(subscriber_); + std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_); + for (const auto &target : its_targets) { + if (target.target_) { + client_t its_subscribing_remote_client = VSOMEIP_ROUTING_CLIENT; + switch (host_->on_remote_subscription(_service, _instance, + _eventgroup, target.subscriber_, target.target_, + _ttl, &its_subscribing_remote_client, + _message_id)) { + case remote_subscription_state_e::SUBSCRIPTION_ACKED: + insert_subscription_ack(its_message, _service, + _instance, _eventgroup, its_info, _ttl, + _counter, _major, _reserved); + if (_force_initial_events) { + // processing subscription of StopSubscribe/Subscribe + // sequence + its_message->forced_initial_events_add( + message_impl::forced_initial_events_t( + { target.subscriber_, _service, _instance, + _eventgroup })); + } + break; + case remote_subscription_state_e::SUBSCRIPTION_NACKED: + case remote_subscription_state_e::SUBSCRIPTION_ERROR: + insert_subscription_nack(its_message, _service, + _instance, _eventgroup, _counter, _major, + _reserved); + break; + case remote_subscription_state_e::SUBSCRIPTION_PENDING: + if (target.target_ && target.subscriber_) { + std::shared_ptr<subscriber_t> subscriber_ = + std::make_shared<subscriber_t>(); + subscriber_->subscriber = target.subscriber_; + subscriber_->target = target.target_; + subscriber_->response_message_id_ = _message_id; + subscriber_->eventgroupinfo_ = its_info; + subscriber_->ttl_ = _ttl; + subscriber_->major_ = _major; + subscriber_->reserved_ = _reserved; + subscriber_->counter_ = _counter; + + pending_remote_subscriptions_[_service] + [_instance] + [_eventgroup] + [its_subscribing_remote_client].push_back(subscriber_); + } + default: + break; + } } } } @@ -2137,7 +2220,7 @@ void service_discovery_impl::handle_eventgroup_subscription_nack(service_t _serv // Deliver nack nackedClient = client.first; host_->on_subscribe_nack(client.first, _service, - _instance, _eventgroup, ANY_EVENT); + _instance, _eventgroup, ANY_EVENT, DEFAULT_SUBSCRIPTION); break; } } @@ -2179,7 +2262,7 @@ void service_discovery_impl::handle_eventgroup_subscription_ack( if (its_client.second->get_counter() == _counter) { its_client.second->set_acknowledged(true); host_->on_subscribe_ack(its_client.first, _service, - _instance, _eventgroup, ANY_EVENT); + _instance, _eventgroup, ANY_EVENT, DEFAULT_SUBSCRIPTION); } if (_address.is_multicast()) { host_->on_subscribe_ack(_service, _instance, _address, @@ -2389,6 +2472,11 @@ void service_discovery_impl::send_subscriptions(service_t _service, instance_t _ } void service_discovery_impl::start_subscription_expiration_timer() { + std::lock_guard<std::mutex> its_lock(subscription_expiration_timer_mutex_); + start_subscription_expiration_timer_unlocked(); +} + +void service_discovery_impl::start_subscription_expiration_timer_unlocked() { subscription_expiration_timer_.expires_at(next_subscription_expiration_); subscription_expiration_timer_.async_wait( std::bind(&service_discovery_impl::expire_subscriptions, @@ -2397,6 +2485,11 @@ void service_discovery_impl::start_subscription_expiration_timer() { } void service_discovery_impl::stop_subscription_expiration_timer() { + std::lock_guard<std::mutex> its_lock(subscription_expiration_timer_mutex_); + stop_subscription_expiration_timer_unlocked(); +} + +void service_discovery_impl::stop_subscription_expiration_timer_unlocked() { subscription_expiration_timer_.cancel(); } @@ -2913,6 +3006,8 @@ void service_discovery_impl::stop_offer_service( if(_info->is_in_mainphase() || stop_offer_required) { send_stop_offer(_service, _instance, _info); } + // sent out NACKs for all pending subscriptions + remote_subscription_not_acknowledge_all(_service, _instance); } bool service_discovery_impl::send_stop_offer( @@ -3014,5 +3109,261 @@ void service_discovery_impl::set_diagnosis_mode(const bool _activate) { is_diagnosis_ = _activate; } +void service_discovery_impl::remote_subscription_acknowledge( + service_t _service, instance_t _instance, eventgroup_t _eventgroup, + client_t _client, bool _acknowledged, + const std::shared_ptr<sd_message_identifier_t> &_sd_message_id) { + std::shared_ptr<subscriber_t> its_subscriber; + { + std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_); + const auto its_service = pending_remote_subscriptions_.find(_service); + if (its_service != pending_remote_subscriptions_.end()) { + const auto its_instance = its_service->second.find(_instance); + if (its_instance != its_service->second.end()) { + const auto its_eventgroup = its_instance->second.find(_eventgroup); + if (its_eventgroup != its_instance->second.end()) { + const auto its_client = its_eventgroup->second.find(_client); + if (its_client != its_eventgroup->second.end()) { + for (auto iter = its_client->second.begin(); + iter != its_client->second.end();) { + if ((*iter)->response_message_id_ == _sd_message_id) { + its_subscriber = *iter; + iter = its_client->second.erase(iter); + break; + } else { + iter++; + } + } + + // delete if necessary + if (!its_client->second.size()) { + its_eventgroup->second.erase(its_client); + if (!its_eventgroup->second.size()) { + its_instance->second.erase(its_eventgroup); + if (!its_instance->second.size()) { + its_service->second.erase(its_instance); + if (!its_service->second.size()) { + pending_remote_subscriptions_.erase( + its_service); + } + } + } + } + } + } + } + } + } + if (its_subscriber) { + remote_subscription_acknowledge_subscriber(_service, _instance, + _eventgroup, its_subscriber, _acknowledged); + } +} + +void service_discovery_impl::update_subscription_expiration_timer( + const std::shared_ptr<message_impl> &_message) { + std::lock_guard<std::mutex> its_lock(subscription_expiration_timer_mutex_); + const std::chrono::steady_clock::time_point now = + std::chrono::steady_clock::now(); + stop_subscription_expiration_timer_unlocked(); + for (const auto &entry : _message->get_entries()) { + if (entry->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP_ACK + && entry->get_ttl()) { + const std::chrono::steady_clock::time_point its_expiration = now + + std::chrono::seconds(entry->get_ttl()); + if (its_expiration < next_subscription_expiration_) { + next_subscription_expiration_ = its_expiration; + } + } + } + start_subscription_expiration_timer_unlocked(); +} + +void service_discovery_impl::remote_subscription_acknowledge_subscriber( + service_t _service, instance_t _instance, eventgroup_t _eventgroup, + const std::shared_ptr<subscriber_t> &_subscriber, bool _acknowledged) { + std::shared_ptr<message_impl> its_response = _subscriber->response_message_id_->response_; + std::vector<std::tuple<service_t, instance_t, eventgroup_t, + std::shared_ptr<endpoint_definition>>> its_acks; + bool sent(false); + { + std::lock_guard<std::mutex> its_lock(response_mutex_); + if (_acknowledged) { + insert_subscription_ack(its_response, _service, _instance, + _eventgroup, _subscriber->eventgroupinfo_, + _subscriber->ttl_, _subscriber->counter_, + _subscriber->major_, _subscriber->reserved_, _subscriber->subscriber); + } else { + insert_subscription_nack(its_response, _service, _instance, + _eventgroup, _subscriber->counter_, + _subscriber->major_, _subscriber->reserved_); + } + + if (its_response->all_required_acks_contained()) { + update_subscription_expiration_timer(its_response); + serialize_and_send(its_response, _subscriber->response_message_id_->sender_); + // set required acks to zero to mark message as sent + its_response->set_number_required_acks(0); + sent = true; + } + } + if (sent) { + for (const auto &e : its_response->get_entries()) { + if (e->get_type() == entry_type_e::SUBSCRIBE_EVENTGROUP_ACK + && e->get_ttl() > 0) { + const std::shared_ptr<eventgroupentry_impl> casted_e = + std::static_pointer_cast<eventgroupentry_impl>(e); + const std::shared_ptr<endpoint_definition> its_reliable = + casted_e->get_target(true); + if (its_reliable) { + its_acks.push_back( + std::make_tuple(e->get_service(), e->get_instance(), + casted_e->get_eventgroup(), its_reliable)); + } + const std::shared_ptr<endpoint_definition> its_unreliable = + casted_e->get_target(false); + if (its_unreliable) { + its_acks.push_back( + std::make_tuple(e->get_service(), e->get_instance(), + casted_e->get_eventgroup(), + its_unreliable)); + } + } + } + for (const auto& ack_tuple : its_acks) { + host_->send_initial_events(std::get<0>(ack_tuple), + std::get<1>(ack_tuple), std::get<2>(ack_tuple), + std::get<3>(ack_tuple)); + } + for (const auto &fie : its_response->forced_initial_events_get()) { + host_->send_initial_events(fie.service_, fie.instance_, + fie.eventgroup_, fie.target_); + } + } +} + +void service_discovery_impl::remote_subscription_not_acknowledge_all( + service_t _service, instance_t _instance) { + std::map<eventgroup_t, std::vector<std::shared_ptr<subscriber_t>>>its_pending_subscriptions; + { + std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_); + const auto its_service = pending_remote_subscriptions_.find(_service); + if (its_service != pending_remote_subscriptions_.end()) { + const auto its_instance = its_service->second.find(_instance); + if (its_instance != its_service->second.end()) { + for (const auto &its_eventgroup : its_instance->second) { + for (const auto &its_client : its_eventgroup.second) { + its_pending_subscriptions[its_eventgroup.first].insert( + its_pending_subscriptions[its_eventgroup.first].end(), + its_client.second.begin(), + its_client.second.end()); + } + } + // delete everything from this service instance + its_service->second.erase(its_instance); + if (!its_service->second.size()) { + pending_remote_subscriptions_.erase(its_service); + } + } + } + } + for (const auto &eg : its_pending_subscriptions) { + for (const auto &its_subscriber : eg.second) { + remote_subscription_acknowledge_subscriber(_service, _instance, + eg.first, its_subscriber, false); + } + } +} + +void service_discovery_impl::remote_subscription_not_acknowledge_all() { + std::map<service_t, + std::map<instance_t, + std::map<eventgroup_t, std::vector<std::shared_ptr<subscriber_t>>>>> to_be_nacked; + { + std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_); + for (const auto &its_service : pending_remote_subscriptions_) { + for (const auto &its_instance : its_service.second) { + for (const auto &its_eventgroup : its_instance.second) { + for (const auto &its_client : its_eventgroup.second) { + to_be_nacked[its_service.first] + [its_instance.first] + [its_eventgroup.first].insert( + to_be_nacked[its_service.first][its_instance.first][its_eventgroup.first].end(), + its_client.second.begin(), + its_client.second.end()); + } + } + } + } + pending_remote_subscriptions_.clear(); + } + for (const auto &s : to_be_nacked) { + for (const auto &i : s.second) { + for (const auto &eg : i.second) { + for (const auto &sub : eg.second) { + remote_subscription_acknowledge_subscriber(s.first, i.first, + eg.first, sub, false); + } + } + } + } +} + +void service_discovery_impl::remote_subscription_remove( + service_t _service, instance_t _instance, eventgroup_t _eventgroup, + const std::shared_ptr<endpoint_definition> &_subscriber) { + std::lock_guard<std::mutex> its_lock(pending_remote_subscriptions_mutex_); + const auto its_service = pending_remote_subscriptions_.find(_service); + if (its_service != pending_remote_subscriptions_.end()) { + const auto its_instance = its_service->second.find(_instance); + if (its_instance != its_service->second.end()) { + const auto its_eventgroup = its_instance->second.find(_eventgroup); + if (its_eventgroup != its_instance->second.end()) { + for (auto client_iter = its_eventgroup->second.begin(); + client_iter != its_eventgroup->second.end(); ) { + for (auto subscriber_iter = client_iter->second.begin(); + subscriber_iter != client_iter->second.end();) { + if ((*subscriber_iter)->subscriber == _subscriber) { + (*subscriber_iter)->response_message_id_->response_->decrease_number_required_acks(); + subscriber_iter = client_iter->second.erase( + subscriber_iter); + } else { + ++subscriber_iter; + } + } + if (!client_iter->second.size()) { + client_iter = its_eventgroup->second.erase(client_iter); + } else { + ++client_iter; + } + } + if (!its_eventgroup->second.size()) { + its_instance->second.erase(its_eventgroup); + if (!its_service->second.size()) { + its_service->second.erase(its_instance); + if (!its_service->second.size()) { + pending_remote_subscriptions_.erase(its_service); + } + } + } + } + } + } +} + +bool service_discovery_impl::check_stop_subscribe_subscribe( + message_impl::entries_t::const_iterator _iter, + message_impl::entries_t::const_iterator _end) const { + const message_impl::entries_t::const_iterator its_next = std::next(_iter); + if ((*_iter)->get_type() != entry_type_e::SUBSCRIBE_EVENTGROUP + || its_next == _end + || (*its_next)->get_type() != entry_type_e::STOP_SUBSCRIBE_EVENTGROUP) { + return false; + } + + return (*static_cast<eventgroupentry_impl*>(_iter->get())).is_matching_subscribe( + *(static_cast<eventgroupentry_impl*>(its_next->get()))); +} + } // namespace sd } // namespace vsomeip |