summaryrefslogtreecommitdiff
path: root/implementation/service_discovery/src/service_discovery_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/service_discovery/src/service_discovery_impl.cpp')
-rw-r--r--implementation/service_discovery/src/service_discovery_impl.cpp191
1 files changed, 150 insertions, 41 deletions
diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp
index 38975fb..b5493b8 100644
--- a/implementation/service_discovery/src/service_discovery_impl.cpp
+++ b/implementation/service_discovery/src/service_discovery_impl.cpp
@@ -124,7 +124,8 @@ void service_discovery_impl::release_service(service_t _service,
}
void service_discovery_impl::subscribe(service_t _service, instance_t _instance,
- eventgroup_t _eventgroup, major_version_t _major, ttl_t _ttl, client_t _client) {
+ eventgroup_t _eventgroup, major_version_t _major, ttl_t _ttl, client_t _client,
+ subscription_type_e _subscription_type) {
std::lock_guard<std::mutex> its_lock(subscribed_mutex_);
auto found_service = subscribed_.find(_service);
@@ -148,31 +149,18 @@ void service_discovery_impl::subscribe(service_t _service, instance_t _instance,
}
}
- // New subscription
- std::shared_ptr < endpoint > its_reliable = host_->find_or_create_remote_client(
- _service, _instance, true, _client);
- std::shared_ptr < endpoint > its_unreliable = host_->find_or_create_remote_client(
- _service, _instance, false, _client);
- std::shared_ptr < subscription > its_subscription = std::make_shared
- < subscription > (_major, _ttl, its_reliable, its_unreliable);
- subscribed_[_service][_instance][_eventgroup][_client] = its_subscription;
-
+ std::shared_ptr < endpoint > its_unreliable;
+ std::shared_ptr < endpoint > its_reliable;
bool has_address(false);
boost::asio::ip::address its_address;
- std::shared_ptr<endpoint> its_endpoint
- = host_->find_or_create_remote_client(_service, _instance, false, _client);
+ get_subscription_endpoints(_subscription_type, its_unreliable, its_reliable,
+ &its_address, &has_address, _service, _instance, _client);
- if (its_endpoint) {
- has_address = its_endpoint->get_remote_address(its_address);
- its_subscription->set_endpoint(its_endpoint, false);
- }
-
- its_endpoint = host_->find_or_create_remote_client(_service, _instance, true, _client);
- if (its_endpoint) {
- has_address = has_address || its_endpoint->get_remote_address(its_address);
- its_subscription->set_endpoint(its_endpoint, true);
- }
+ // New subscription
+ std::shared_ptr < subscription > its_subscription = std::make_shared
+ < subscription > (_major, _ttl, its_reliable, its_unreliable, _subscription_type);
+ subscribed_[_service][_instance][_eventgroup][_client] = its_subscription;
if (has_address) {
std::shared_ptr<runtime> its_runtime = runtime_.lock();
@@ -191,6 +179,69 @@ void service_discovery_impl::subscribe(service_t _service, instance_t _instance,
}
}
+void service_discovery_impl::get_subscription_endpoints(
+ subscription_type_e _subscription_type,
+ std::shared_ptr<endpoint>& _unreliable,
+ std::shared_ptr<endpoint>& _reliable, boost::asio::ip::address* _address,
+ bool* _has_address,
+ service_t _service, instance_t _instance, client_t _client) const {
+ switch (_subscription_type) {
+ case subscription_type_e::SU_RELIABLE_AND_UNRELIABLE:
+ _reliable = host_->find_or_create_remote_client(_service, _instance,
+ true, _client);
+ _unreliable = host_->find_or_create_remote_client(_service,
+ _instance, false, _client);
+ if (_unreliable) {
+ *_has_address = _unreliable->get_remote_address(*_address);
+ }
+ if (_reliable) {
+ *_has_address = *_has_address
+ || _reliable->get_remote_address(*_address);
+ }
+ break;
+ case subscription_type_e::SU_PREFER_UNRELIABLE:
+ _unreliable = host_->find_or_create_remote_client(_service,
+ _instance, false, _client);
+ if (_unreliable) {
+ *_has_address = _unreliable->get_remote_address(*_address);
+ } else {
+ _reliable = host_->find_or_create_remote_client(_service,
+ _instance, true, _client);
+ if (_reliable) {
+ *_has_address = _reliable->get_remote_address(*_address);
+ }
+ }
+ break;
+ case subscription_type_e::SU_PREFER_RELIABLE:
+ _reliable = host_->find_or_create_remote_client(_service,
+ _instance, true, _client);
+ if (_reliable) {
+ *_has_address = _reliable->get_remote_address(*_address);
+ } else {
+ _unreliable = host_->find_or_create_remote_client(_service,
+ _instance, false, _client);
+ if (_unreliable) {
+ *_has_address = _unreliable->get_remote_address(*_address);
+ }
+ }
+ break;
+ case subscription_type_e::SU_UNRELIABLE:
+ _unreliable = host_->find_or_create_remote_client(_service,
+ _instance,
+ false, _client);
+ if (_unreliable) {
+ *_has_address = _unreliable->get_remote_address(*_address);
+ }
+ break;
+ case subscription_type_e::SU_RELIABLE:
+ _reliable = host_->find_or_create_remote_client(_service, _instance,
+ true, _client);
+ if (_reliable) {
+ *_has_address = _reliable->get_remote_address(*_address);
+ }
+ }
+}
+
void service_discovery_impl::unsubscribe(service_t _service,
instance_t _instance, eventgroup_t _eventgroup, client_t _client) {
@@ -234,7 +285,10 @@ void service_discovery_impl::unsubscribe(service_t _service,
insert_subscription(its_message, _service, _instance, _eventgroup,
its_subscription);
- its_message->set_session(get_session(its_address));
+
+ std::pair<session_t, bool> its_session = get_session(its_address);
+ its_message->set_session(its_session.first);
+ its_message->set_reboot_flag(its_session.second);
serialize_and_send(its_message, its_address);
}
@@ -256,12 +310,12 @@ void service_discovery_impl::unsubscribe_all(service_t _service, instance_t _ins
}
}
-session_t service_discovery_impl::get_session(
+std::pair<session_t, bool> service_discovery_impl::get_session(
const boost::asio::ip::address &_address) {
- session_t its_session;
+ std::pair<session_t, bool> its_session;
auto found_session = sessions_.find(_address);
if (found_session == sessions_.end()) {
- its_session = sessions_[_address] = 1;
+ its_session = sessions_[_address] = { 1, true };
} else {
its_session = found_session->second;
}
@@ -272,14 +326,44 @@ void service_discovery_impl::increment_session(
const boost::asio::ip::address &_address) {
auto found_session = sessions_.find(_address);
if (found_session != sessions_.end()) {
- found_session->second++;
- if (0 == found_session->second) {
- found_session->second++;
- // TODO: what about the reboot flag?
+ found_session->second.first++;
+ if (found_session->second.first == 0) { // Wrap
+ found_session->second = { 1, false };
}
}
}
+bool service_discovery_impl::is_reboot(
+ const boost::asio::ip::address &_address,
+ bool _reboot_flag, session_t _session) {
+
+ bool result(false);
+#ifdef VSOMEIP_TODO
+ // Reboot detection: Either the flag has changed from false to true,
+ // or the session identifier overrun while the flag is true
+ if (reboots_.find(_address) == reboots_.end()) {
+ if (_reboot_flag) {
+ reboots_.insert(_address);
+ result = true;
+ }
+ } else {
+ auto its_last_session = sessions_receiving_.find(_address);
+ if(its_last_session != sessions_receiving_.end()) {
+ if (_reboot_flag && its_last_session->second >= _session) {
+ result = true;
+ }
+ }
+ }
+
+ sessions_receiving_[_address] = _session;
+#else
+ (void)_address;
+ (void)_reboot_flag;
+ (void)_session;
+#endif
+ return result;
+}
+
void service_discovery_impl::insert_option(
std::shared_ptr<message_impl> &_message,
std::shared_ptr<entry_impl> _entry,
@@ -319,6 +403,9 @@ void service_discovery_impl::insert_option(
if (its_option) {
its_option->set_address(its_address);
its_option->set_port(_port);
+ its_option->set_layer_four_protocol(
+ _is_reliable ? layer_four_protocol_e::TCP :
+ layer_four_protocol_e::UDP);
_entry->assign_option(its_option, 1);
}
} else {
@@ -328,6 +415,9 @@ void service_discovery_impl::insert_option(
if (its_option) {
its_option->set_address(its_address);
its_option->set_port(_port);
+ its_option->set_layer_four_protocol(
+ _is_reliable ? layer_four_protocol_e::TCP :
+ layer_four_protocol_e::UDP);
_entry->assign_option(its_option, 1);
}
}
@@ -456,7 +546,9 @@ void service_discovery_impl::send(bool _is_announcing) {
// Serialize and send
if (its_message->get_entries().size() > 0) {
- its_message->set_session(get_session(unicast_));
+ std::pair<session_t, bool> its_session = get_session(unicast_);
+ its_message->set_session(its_session.first);
+ its_message->set_reboot_flag(its_session.second);
if (host_->send(VSOMEIP_SD_CLIENT, its_message, true)) {
increment_session (unicast_);
}
@@ -464,7 +556,8 @@ void service_discovery_impl::send(bool _is_announcing) {
}
// Interface endpoint_host
-void service_discovery_impl::on_message(const byte_t *_data, length_t _length) {
+void service_discovery_impl::on_message(const byte_t *_data, length_t _length,
+ const boost::asio::ip::address &_sender) {
#if 0
std::stringstream msg;
msg << "sdi::on_message: ";
@@ -480,6 +573,13 @@ void service_discovery_impl::on_message(const byte_t *_data, length_t _length) {
if(!check_static_header_fields(its_message)) {
return;
}
+ // Expire all subscriptions / services in case of reboot
+ if (is_reboot(_sender,
+ its_message->get_reboot_flag(), its_message->get_session())) {
+ host_->expire_subscriptions(_sender);
+ host_->expire_services(_sender);
+ }
+
ttl_t expired = stop_ttl_timer();
smallest_ttl_ = host_->update_routing_info(expired);
@@ -637,12 +737,18 @@ void service_discovery_impl::process_offerservice_serviceentry(
for (auto its_client : its_eventgroup.second) {
std::shared_ptr<subscription> its_subscription(its_client.second);
if (its_subscription->is_acknowledged()) {
- its_subscription->set_endpoint(
- host_->find_or_create_remote_client(_service,
- _instance, true, its_client.first), true);
- its_subscription->set_endpoint(
- host_->find_or_create_remote_client(_service,
- _instance, false, its_client.first), false);
+ std::shared_ptr<endpoint> its_unreliable;
+ std::shared_ptr<endpoint> its_reliable;
+ bool has_address(false);
+ boost::asio::ip::address its_address;
+ get_subscription_endpoints(
+ its_client.second->get_subscription_type(),
+ its_unreliable, its_reliable, &its_address,
+ &has_address, _service, _instance,
+ its_client.first);
+ its_subscription->set_endpoint(its_reliable, true);
+ its_subscription->set_endpoint(its_unreliable,
+ false);
// TODO: consume major & ttl
insert_subscription(its_message,
@@ -657,7 +763,7 @@ void service_discovery_impl::process_offerservice_serviceentry(
if (0 < its_message->get_entries().size()) {
std::shared_ptr<endpoint_definition> its_target;
- session_t its_session(0);
+ std::pair<session_t, bool> its_session;
if (_reliable_port != ILLEGAL_PORT) {
its_target = endpoint_definition::get(
_reliable_address, port_, reliable_);
@@ -669,7 +775,8 @@ void service_discovery_impl::process_offerservice_serviceentry(
}
if (its_target) {
- its_message->set_session(its_session);
+ its_message->set_session(its_session.first);
+ its_message->set_reboot_flag(its_session.second);
serializer_->serialize(its_message.get());
if (host_->send_to(its_target,
serializer_->get_data(),
@@ -1050,7 +1157,9 @@ void service_discovery_impl::handle_eventgroup_subscription_ack(
void service_discovery_impl::serialize_and_send(
std::shared_ptr<message_impl> _message,
const boost::asio::ip::address &_address) {
- _message->set_session(get_session(_address));
+ std::pair<session_t, bool> its_session = get_session(_address);
+ _message->set_session(its_session.first);
+ _message->set_reboot_flag(its_session.second);
if(!serializer_->serialize(_message.get())) {
VSOMEIP_ERROR << "service_discovery_impl::serialize_and_send: serialization error.";
return;