diff options
Diffstat (limited to 'implementation/service_discovery/src/service_discovery_impl.cpp')
-rw-r--r-- | implementation/service_discovery/src/service_discovery_impl.cpp | 191 |
1 files changed, 150 insertions, 41 deletions
diff --git a/implementation/service_discovery/src/service_discovery_impl.cpp b/implementation/service_discovery/src/service_discovery_impl.cpp index 38975fb..b5493b8 100644 --- a/implementation/service_discovery/src/service_discovery_impl.cpp +++ b/implementation/service_discovery/src/service_discovery_impl.cpp @@ -124,7 +124,8 @@ void service_discovery_impl::release_service(service_t _service, } void service_discovery_impl::subscribe(service_t _service, instance_t _instance, - eventgroup_t _eventgroup, major_version_t _major, ttl_t _ttl, client_t _client) { + eventgroup_t _eventgroup, major_version_t _major, ttl_t _ttl, client_t _client, + subscription_type_e _subscription_type) { std::lock_guard<std::mutex> its_lock(subscribed_mutex_); auto found_service = subscribed_.find(_service); @@ -148,31 +149,18 @@ void service_discovery_impl::subscribe(service_t _service, instance_t _instance, } } - // New subscription - std::shared_ptr < endpoint > its_reliable = host_->find_or_create_remote_client( - _service, _instance, true, _client); - std::shared_ptr < endpoint > its_unreliable = host_->find_or_create_remote_client( - _service, _instance, false, _client); - std::shared_ptr < subscription > its_subscription = std::make_shared - < subscription > (_major, _ttl, its_reliable, its_unreliable); - subscribed_[_service][_instance][_eventgroup][_client] = its_subscription; - + std::shared_ptr < endpoint > its_unreliable; + std::shared_ptr < endpoint > its_reliable; bool has_address(false); boost::asio::ip::address its_address; - std::shared_ptr<endpoint> its_endpoint - = host_->find_or_create_remote_client(_service, _instance, false, _client); + get_subscription_endpoints(_subscription_type, its_unreliable, its_reliable, + &its_address, &has_address, _service, _instance, _client); - if (its_endpoint) { - has_address = its_endpoint->get_remote_address(its_address); - its_subscription->set_endpoint(its_endpoint, false); - } - - its_endpoint = host_->find_or_create_remote_client(_service, _instance, true, _client); - if (its_endpoint) { - has_address = has_address || its_endpoint->get_remote_address(its_address); - its_subscription->set_endpoint(its_endpoint, true); - } + // New subscription + std::shared_ptr < subscription > its_subscription = std::make_shared + < subscription > (_major, _ttl, its_reliable, its_unreliable, _subscription_type); + subscribed_[_service][_instance][_eventgroup][_client] = its_subscription; if (has_address) { std::shared_ptr<runtime> its_runtime = runtime_.lock(); @@ -191,6 +179,69 @@ void service_discovery_impl::subscribe(service_t _service, instance_t _instance, } } +void service_discovery_impl::get_subscription_endpoints( + subscription_type_e _subscription_type, + std::shared_ptr<endpoint>& _unreliable, + std::shared_ptr<endpoint>& _reliable, boost::asio::ip::address* _address, + bool* _has_address, + service_t _service, instance_t _instance, client_t _client) const { + switch (_subscription_type) { + case subscription_type_e::SU_RELIABLE_AND_UNRELIABLE: + _reliable = host_->find_or_create_remote_client(_service, _instance, + true, _client); + _unreliable = host_->find_or_create_remote_client(_service, + _instance, false, _client); + if (_unreliable) { + *_has_address = _unreliable->get_remote_address(*_address); + } + if (_reliable) { + *_has_address = *_has_address + || _reliable->get_remote_address(*_address); + } + break; + case subscription_type_e::SU_PREFER_UNRELIABLE: + _unreliable = host_->find_or_create_remote_client(_service, + _instance, false, _client); + if (_unreliable) { + *_has_address = _unreliable->get_remote_address(*_address); + } else { + _reliable = host_->find_or_create_remote_client(_service, + _instance, true, _client); + if (_reliable) { + *_has_address = _reliable->get_remote_address(*_address); + } + } + break; + case subscription_type_e::SU_PREFER_RELIABLE: + _reliable = host_->find_or_create_remote_client(_service, + _instance, true, _client); + if (_reliable) { + *_has_address = _reliable->get_remote_address(*_address); + } else { + _unreliable = host_->find_or_create_remote_client(_service, + _instance, false, _client); + if (_unreliable) { + *_has_address = _unreliable->get_remote_address(*_address); + } + } + break; + case subscription_type_e::SU_UNRELIABLE: + _unreliable = host_->find_or_create_remote_client(_service, + _instance, + false, _client); + if (_unreliable) { + *_has_address = _unreliable->get_remote_address(*_address); + } + break; + case subscription_type_e::SU_RELIABLE: + _reliable = host_->find_or_create_remote_client(_service, _instance, + true, _client); + if (_reliable) { + *_has_address = _reliable->get_remote_address(*_address); + } + } +} + void service_discovery_impl::unsubscribe(service_t _service, instance_t _instance, eventgroup_t _eventgroup, client_t _client) { @@ -234,7 +285,10 @@ void service_discovery_impl::unsubscribe(service_t _service, insert_subscription(its_message, _service, _instance, _eventgroup, its_subscription); - its_message->set_session(get_session(its_address)); + + std::pair<session_t, bool> its_session = get_session(its_address); + its_message->set_session(its_session.first); + its_message->set_reboot_flag(its_session.second); serialize_and_send(its_message, its_address); } @@ -256,12 +310,12 @@ void service_discovery_impl::unsubscribe_all(service_t _service, instance_t _ins } } -session_t service_discovery_impl::get_session( +std::pair<session_t, bool> service_discovery_impl::get_session( const boost::asio::ip::address &_address) { - session_t its_session; + std::pair<session_t, bool> its_session; auto found_session = sessions_.find(_address); if (found_session == sessions_.end()) { - its_session = sessions_[_address] = 1; + its_session = sessions_[_address] = { 1, true }; } else { its_session = found_session->second; } @@ -272,14 +326,44 @@ void service_discovery_impl::increment_session( const boost::asio::ip::address &_address) { auto found_session = sessions_.find(_address); if (found_session != sessions_.end()) { - found_session->second++; - if (0 == found_session->second) { - found_session->second++; - // TODO: what about the reboot flag? + found_session->second.first++; + if (found_session->second.first == 0) { // Wrap + found_session->second = { 1, false }; } } } +bool service_discovery_impl::is_reboot( + const boost::asio::ip::address &_address, + bool _reboot_flag, session_t _session) { + + bool result(false); +#ifdef VSOMEIP_TODO + // Reboot detection: Either the flag has changed from false to true, + // or the session identifier overrun while the flag is true + if (reboots_.find(_address) == reboots_.end()) { + if (_reboot_flag) { + reboots_.insert(_address); + result = true; + } + } else { + auto its_last_session = sessions_receiving_.find(_address); + if(its_last_session != sessions_receiving_.end()) { + if (_reboot_flag && its_last_session->second >= _session) { + result = true; + } + } + } + + sessions_receiving_[_address] = _session; +#else + (void)_address; + (void)_reboot_flag; + (void)_session; +#endif + return result; +} + void service_discovery_impl::insert_option( std::shared_ptr<message_impl> &_message, std::shared_ptr<entry_impl> _entry, @@ -319,6 +403,9 @@ void service_discovery_impl::insert_option( if (its_option) { its_option->set_address(its_address); its_option->set_port(_port); + its_option->set_layer_four_protocol( + _is_reliable ? layer_four_protocol_e::TCP : + layer_four_protocol_e::UDP); _entry->assign_option(its_option, 1); } } else { @@ -328,6 +415,9 @@ void service_discovery_impl::insert_option( if (its_option) { its_option->set_address(its_address); its_option->set_port(_port); + its_option->set_layer_four_protocol( + _is_reliable ? layer_four_protocol_e::TCP : + layer_four_protocol_e::UDP); _entry->assign_option(its_option, 1); } } @@ -456,7 +546,9 @@ void service_discovery_impl::send(bool _is_announcing) { // Serialize and send if (its_message->get_entries().size() > 0) { - its_message->set_session(get_session(unicast_)); + std::pair<session_t, bool> its_session = get_session(unicast_); + its_message->set_session(its_session.first); + its_message->set_reboot_flag(its_session.second); if (host_->send(VSOMEIP_SD_CLIENT, its_message, true)) { increment_session (unicast_); } @@ -464,7 +556,8 @@ void service_discovery_impl::send(bool _is_announcing) { } // Interface endpoint_host -void service_discovery_impl::on_message(const byte_t *_data, length_t _length) { +void service_discovery_impl::on_message(const byte_t *_data, length_t _length, + const boost::asio::ip::address &_sender) { #if 0 std::stringstream msg; msg << "sdi::on_message: "; @@ -480,6 +573,13 @@ void service_discovery_impl::on_message(const byte_t *_data, length_t _length) { if(!check_static_header_fields(its_message)) { return; } + // Expire all subscriptions / services in case of reboot + if (is_reboot(_sender, + its_message->get_reboot_flag(), its_message->get_session())) { + host_->expire_subscriptions(_sender); + host_->expire_services(_sender); + } + ttl_t expired = stop_ttl_timer(); smallest_ttl_ = host_->update_routing_info(expired); @@ -637,12 +737,18 @@ void service_discovery_impl::process_offerservice_serviceentry( for (auto its_client : its_eventgroup.second) { std::shared_ptr<subscription> its_subscription(its_client.second); if (its_subscription->is_acknowledged()) { - its_subscription->set_endpoint( - host_->find_or_create_remote_client(_service, - _instance, true, its_client.first), true); - its_subscription->set_endpoint( - host_->find_or_create_remote_client(_service, - _instance, false, its_client.first), false); + std::shared_ptr<endpoint> its_unreliable; + std::shared_ptr<endpoint> its_reliable; + bool has_address(false); + boost::asio::ip::address its_address; + get_subscription_endpoints( + its_client.second->get_subscription_type(), + its_unreliable, its_reliable, &its_address, + &has_address, _service, _instance, + its_client.first); + its_subscription->set_endpoint(its_reliable, true); + its_subscription->set_endpoint(its_unreliable, + false); // TODO: consume major & ttl insert_subscription(its_message, @@ -657,7 +763,7 @@ void service_discovery_impl::process_offerservice_serviceentry( if (0 < its_message->get_entries().size()) { std::shared_ptr<endpoint_definition> its_target; - session_t its_session(0); + std::pair<session_t, bool> its_session; if (_reliable_port != ILLEGAL_PORT) { its_target = endpoint_definition::get( _reliable_address, port_, reliable_); @@ -669,7 +775,8 @@ void service_discovery_impl::process_offerservice_serviceentry( } if (its_target) { - its_message->set_session(its_session); + its_message->set_session(its_session.first); + its_message->set_reboot_flag(its_session.second); serializer_->serialize(its_message.get()); if (host_->send_to(its_target, serializer_->get_data(), @@ -1050,7 +1157,9 @@ void service_discovery_impl::handle_eventgroup_subscription_ack( void service_discovery_impl::serialize_and_send( std::shared_ptr<message_impl> _message, const boost::asio::ip::address &_address) { - _message->set_session(get_session(_address)); + std::pair<session_t, bool> its_session = get_session(_address); + _message->set_session(its_session.first); + _message->set_reboot_flag(its_session.second); if(!serializer_->serialize(_message.get())) { VSOMEIP_ERROR << "service_discovery_impl::serialize_and_send: serialization error."; return; |