diff options
Diffstat (limited to 'implementation/routing/src/routing_manager_impl.cpp')
-rw-r--r-- | implementation/routing/src/routing_manager_impl.cpp | 130 |
1 files changed, 111 insertions, 19 deletions
diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp index ff1c67e..13ec332 100644 --- a/implementation/routing/src/routing_manager_impl.cpp +++ b/implementation/routing/src/routing_manager_impl.cpp @@ -175,7 +175,8 @@ void routing_manager_impl::release_service(client_t _client, service_t _service, } void routing_manager_impl::subscribe(client_t _client, service_t _service, - instance_t _instance, eventgroup_t _eventgroup, major_version_t _major) { + instance_t _instance, eventgroup_t _eventgroup, major_version_t _major, + subscription_type_e _subscription_type) { if (discovery_) { if (!host_->on_subscription(_service, _instance, _eventgroup, _client, true)) { VSOMEIP_INFO << "Subscription request for eventgroup " << _eventgroup @@ -192,7 +193,7 @@ void routing_manager_impl::subscribe(client_t _client, service_t _service, subscriber = _client; } discovery_->subscribe(_service, _instance, _eventgroup, - _major, DEFAULT_TTL, subscriber); + _major, DEFAULT_TTL, subscriber, _subscription_type); } else { send_subscribe(_client, _service, _instance, _eventgroup, _major); @@ -323,7 +324,6 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, } else { std::shared_ptr<serviceinfo> its_info(find_service(its_service, _instance)); if (its_info) { - its_target = its_info->get_endpoint(_reliable); if (is_notification && !is_service_discovery) { method_t its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN], _data[VSOMEIP_METHOD_POS_MAX]); @@ -347,19 +347,26 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data, send_local(its_local_target, _client, _data, _size, _instance, _flush, _reliable); } } - - if (its_target) { + // we need both endpoints as clients can subscribe to events via TCP and UDP + std::shared_ptr<endpoint> its_unreliable_target = its_info->get_endpoint(false); + std::shared_ptr<endpoint> its_reliable_target = its_info->get_endpoint(true); + if (its_unreliable_target || its_reliable_target) { // remote auto its_eventgroup = find_eventgroup(its_service, _instance, its_group); if (its_eventgroup) { for (auto its_remote : its_eventgroup->get_targets()) { - its_target->send_to(its_remote, _data, _size); + if(its_remote->is_reliable() && its_reliable_target) { + its_reliable_target->send_to(its_remote, _data, _size); + } else if(its_unreliable_target) { + its_unreliable_target->send_to(its_remote, _data, _size); + } } } } } } } else { + its_target = its_info->get_endpoint(_reliable); if (its_target) { is_sent = its_target->send(_data, _size, _flush); } else { @@ -437,7 +444,7 @@ bool routing_manager_impl::send_to( void routing_manager_impl::register_event(client_t _client, service_t _service, instance_t _instance, - event_t _event, std::set<eventgroup_t> _eventgroups, + event_t _event, const std::set<eventgroup_t> &_eventgroups, bool _is_field, bool _is_provided) { (void)_client; @@ -461,12 +468,18 @@ void routing_manager_impl::register_event(client_t _client, its_event->set_event(_event); its_event->set_field(_is_field); its_event->set_provided(_is_provided); + std::shared_ptr<serviceinfo> its_service = find_service(_service, _instance); + if (its_service) { + its_event->set_version(its_service->get_major()); + } if (_eventgroups.size() == 0) { // No eventgroup specified - _eventgroups.insert(_event); + std::set<eventgroup_t> its_eventgroups; + its_eventgroups.insert(_event); + its_event->set_eventgroups(its_eventgroups); + } else { + its_event->set_eventgroups(_eventgroups); } - - its_event->set_eventgroups(_eventgroups); } its_event->add_ref(); @@ -578,8 +591,14 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size, its_service = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SERVICE_POS_MIN], _data[VSOMEIP_SERVICE_POS_MAX]); if (its_service == VSOMEIP_SD_SERVICE) { - if (discovery_) - discovery_->on_message(_data, _size); + if (discovery_) { + boost::asio::ip::address its_address; + if (_receiver->get_remote_address(its_address)) { + discovery_->on_message(_data, _size, its_address); + } else { + VSOMEIP_ERROR << "Ignored SD message from unknown address."; + } + } } else { instance_t its_instance = find_instance(its_service, _receiver); return_code_e return_code = check_error(_data, _size, its_instance); @@ -661,11 +680,13 @@ void routing_manager_impl::on_message( its_session = VSOMEIP_BYTES_TO_WORD( _data[VSOMEIP_SESSION_POS_MIN], _data[VSOMEIP_SESSION_POS_MAX]); + major_version_t its_version = _data[VSOMEIP_INTERFACE_VERSION_POS]; its_response->set_service(_service); its_response->set_method(its_method); its_response->set_client(its_client); its_response->set_session(its_session); + its_response->set_interface_version(its_version); if (its_event->is_field()) { its_response->set_message_type( @@ -677,10 +698,13 @@ void routing_manager_impl::on_message( std::lock_guard<std::mutex> its_lock(serialize_mutex_); if (serializer_->serialize(its_response.get())) { + // always pass reliable = false, but this won't be used, as + // the event is sent out via TCP or UDP dependent on which + // L4Proto the subscriber passed in the endpoint option in + // its subscription to the eventgroup send(its_client, serializer_->get_data(), serializer_->get_size(), - _instance, - true, its_event->is_reliable()); + _instance, true, false); } else { VSOMEIP_ERROR << "routing_manager_impl::on_message: serialization error."; } @@ -1167,7 +1191,9 @@ std::shared_ptr<endpoint> routing_manager_impl::create_server_endpoint( configuration_->get_message_size_reliable( its_unicast.to_string(), _port)); if (configuration_->has_enabled_magic_cookies( - its_unicast.to_string(), _port)) { + its_unicast.to_string(), _port) || + configuration_->has_enabled_magic_cookies( + "local", _port)) { its_endpoint->enable_magic_cookies(); } } else { @@ -1637,6 +1663,65 @@ ttl_t routing_manager_impl::update_routing_info(ttl_t _elapsed) { return its_smallest_ttl; } +void routing_manager_impl::expire_services(const boost::asio::ip::address &_address) { + std::map<service_t, + std::map<instance_t, + std::pair<bool, bool> > > its_expired_offers; + + for (auto &s : services_) { + for (auto &i : s.second) { + bool is_gone(false); + boost::asio::ip::address its_address; + std::shared_ptr<endpoint> its_endpoint = i.second->get_endpoint(true); + if (its_endpoint) { + if (its_endpoint->get_remote_address(its_address)) { + is_gone = (its_address == _address); + } + } else { + its_endpoint = i.second->get_endpoint(false); + if (its_endpoint) { + if (its_endpoint->get_remote_address(its_address)) { + is_gone = (its_address == _address); + } + } + } + + if (is_gone) { + if (discovery_) + discovery_->unsubscribe_all(s.first, i.first); + its_expired_offers[s.first][i.first] = { + i.second->get_endpoint(true) != nullptr, + i.second->get_endpoint(false) != nullptr + }; + } + } + } + + for (auto &s : its_expired_offers) { + for (auto &i : s.second) { + del_routing_info(s.first, i.first, i.second.first, i.second.second); + } + } +} + +void routing_manager_impl::expire_subscriptions(const boost::asio::ip::address &_address) { + for (auto &its_service : eventgroups_) { + for (auto &its_instance : its_service.second) { + for (auto &its_eventgroup : its_instance.second) { + std::set<std::shared_ptr<endpoint_definition>> its_invalid_targets; + for (auto &its_target : its_eventgroup.second->get_targets()) { + if (its_target->get_address() == _address) + its_invalid_targets.insert(its_target); + } + + for (auto &its_target : its_invalid_targets) { + its_eventgroup.second->remove_target(its_target); + } + } + } + } +} + void routing_manager_impl::init_routing_info() { VSOMEIP_INFO<< "Service Discovery disabled. Using static routing information."; for (auto i : configuration_->get_remote_services()) { @@ -2089,10 +2174,17 @@ return_code_e routing_manager_impl::check_error(const byte_t *_data, length_t _s << std::hex << its_service; return return_code_e::E_UNKNOWN_SERVICE; } - // TODO: Check interface version handling?! - if (_data[VSOMEIP_INTERFACE_VERSION_POS] != 0x0) { - // Interface is currently set to zero always! - return return_code_e::E_WRONG_INTERFACE_VERSION; + // Check interface version of service/instance + auto found_service = services_.find(its_service); + if (found_service != services_.end()) { + auto found_instance = found_service->second.find(_instance); + if (found_instance != found_service->second.end()) { + auto its_info = found_instance->second; + major_version_t its_version = _data[VSOMEIP_INTERFACE_VERSION_POS]; + if (its_version != its_info->get_major()) { + return return_code_e::E_WRONG_INTERFACE_VERSION; + } + } } if (_data[VSOMEIP_RETURN_CODE_POS] != static_cast<byte_t> (return_code_e::E_OK)) { // Request calls must to have return code E_OK set! |