summaryrefslogtreecommitdiff
path: root/implementation/routing/src/routing_manager_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'implementation/routing/src/routing_manager_impl.cpp')
-rw-r--r--implementation/routing/src/routing_manager_impl.cpp130
1 files changed, 111 insertions, 19 deletions
diff --git a/implementation/routing/src/routing_manager_impl.cpp b/implementation/routing/src/routing_manager_impl.cpp
index ff1c67e..13ec332 100644
--- a/implementation/routing/src/routing_manager_impl.cpp
+++ b/implementation/routing/src/routing_manager_impl.cpp
@@ -175,7 +175,8 @@ void routing_manager_impl::release_service(client_t _client, service_t _service,
}
void routing_manager_impl::subscribe(client_t _client, service_t _service,
- instance_t _instance, eventgroup_t _eventgroup, major_version_t _major) {
+ instance_t _instance, eventgroup_t _eventgroup, major_version_t _major,
+ subscription_type_e _subscription_type) {
if (discovery_) {
if (!host_->on_subscription(_service, _instance, _eventgroup, _client, true)) {
VSOMEIP_INFO << "Subscription request for eventgroup " << _eventgroup
@@ -192,7 +193,7 @@ void routing_manager_impl::subscribe(client_t _client, service_t _service,
subscriber = _client;
}
discovery_->subscribe(_service, _instance, _eventgroup,
- _major, DEFAULT_TTL, subscriber);
+ _major, DEFAULT_TTL, subscriber, _subscription_type);
} else {
send_subscribe(_client, _service, _instance, _eventgroup, _major);
@@ -323,7 +324,6 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data,
} else {
std::shared_ptr<serviceinfo> its_info(find_service(its_service, _instance));
if (its_info) {
- its_target = its_info->get_endpoint(_reliable);
if (is_notification && !is_service_discovery) {
method_t its_method = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_METHOD_POS_MIN],
_data[VSOMEIP_METHOD_POS_MAX]);
@@ -347,19 +347,26 @@ bool routing_manager_impl::send(client_t _client, const byte_t *_data,
send_local(its_local_target, _client, _data, _size, _instance, _flush, _reliable);
}
}
-
- if (its_target) {
+ // we need both endpoints as clients can subscribe to events via TCP and UDP
+ std::shared_ptr<endpoint> its_unreliable_target = its_info->get_endpoint(false);
+ std::shared_ptr<endpoint> its_reliable_target = its_info->get_endpoint(true);
+ if (its_unreliable_target || its_reliable_target) {
// remote
auto its_eventgroup = find_eventgroup(its_service, _instance, its_group);
if (its_eventgroup) {
for (auto its_remote : its_eventgroup->get_targets()) {
- its_target->send_to(its_remote, _data, _size);
+ if(its_remote->is_reliable() && its_reliable_target) {
+ its_reliable_target->send_to(its_remote, _data, _size);
+ } else if(its_unreliable_target) {
+ its_unreliable_target->send_to(its_remote, _data, _size);
+ }
}
}
}
}
}
} else {
+ its_target = its_info->get_endpoint(_reliable);
if (its_target) {
is_sent = its_target->send(_data, _size, _flush);
} else {
@@ -437,7 +444,7 @@ bool routing_manager_impl::send_to(
void routing_manager_impl::register_event(client_t _client,
service_t _service, instance_t _instance,
- event_t _event, std::set<eventgroup_t> _eventgroups,
+ event_t _event, const std::set<eventgroup_t> &_eventgroups,
bool _is_field, bool _is_provided) {
(void)_client;
@@ -461,12 +468,18 @@ void routing_manager_impl::register_event(client_t _client,
its_event->set_event(_event);
its_event->set_field(_is_field);
its_event->set_provided(_is_provided);
+ std::shared_ptr<serviceinfo> its_service = find_service(_service, _instance);
+ if (its_service) {
+ its_event->set_version(its_service->get_major());
+ }
if (_eventgroups.size() == 0) { // No eventgroup specified
- _eventgroups.insert(_event);
+ std::set<eventgroup_t> its_eventgroups;
+ its_eventgroups.insert(_event);
+ its_event->set_eventgroups(its_eventgroups);
+ } else {
+ its_event->set_eventgroups(_eventgroups);
}
-
- its_event->set_eventgroups(_eventgroups);
}
its_event->add_ref();
@@ -578,8 +591,14 @@ void routing_manager_impl::on_message(const byte_t *_data, length_t _size,
its_service = VSOMEIP_BYTES_TO_WORD(_data[VSOMEIP_SERVICE_POS_MIN],
_data[VSOMEIP_SERVICE_POS_MAX]);
if (its_service == VSOMEIP_SD_SERVICE) {
- if (discovery_)
- discovery_->on_message(_data, _size);
+ if (discovery_) {
+ boost::asio::ip::address its_address;
+ if (_receiver->get_remote_address(its_address)) {
+ discovery_->on_message(_data, _size, its_address);
+ } else {
+ VSOMEIP_ERROR << "Ignored SD message from unknown address.";
+ }
+ }
} else {
instance_t its_instance = find_instance(its_service, _receiver);
return_code_e return_code = check_error(_data, _size, its_instance);
@@ -661,11 +680,13 @@ void routing_manager_impl::on_message(
its_session = VSOMEIP_BYTES_TO_WORD(
_data[VSOMEIP_SESSION_POS_MIN],
_data[VSOMEIP_SESSION_POS_MAX]);
+ major_version_t its_version = _data[VSOMEIP_INTERFACE_VERSION_POS];
its_response->set_service(_service);
its_response->set_method(its_method);
its_response->set_client(its_client);
its_response->set_session(its_session);
+ its_response->set_interface_version(its_version);
if (its_event->is_field()) {
its_response->set_message_type(
@@ -677,10 +698,13 @@ void routing_manager_impl::on_message(
std::lock_guard<std::mutex> its_lock(serialize_mutex_);
if (serializer_->serialize(its_response.get())) {
+ // always pass reliable = false, but this won't be used, as
+ // the event is sent out via TCP or UDP dependent on which
+ // L4Proto the subscriber passed in the endpoint option in
+ // its subscription to the eventgroup
send(its_client,
serializer_->get_data(), serializer_->get_size(),
- _instance,
- true, its_event->is_reliable());
+ _instance, true, false);
} else {
VSOMEIP_ERROR << "routing_manager_impl::on_message: serialization error.";
}
@@ -1167,7 +1191,9 @@ std::shared_ptr<endpoint> routing_manager_impl::create_server_endpoint(
configuration_->get_message_size_reliable(
its_unicast.to_string(), _port));
if (configuration_->has_enabled_magic_cookies(
- its_unicast.to_string(), _port)) {
+ its_unicast.to_string(), _port) ||
+ configuration_->has_enabled_magic_cookies(
+ "local", _port)) {
its_endpoint->enable_magic_cookies();
}
} else {
@@ -1637,6 +1663,65 @@ ttl_t routing_manager_impl::update_routing_info(ttl_t _elapsed) {
return its_smallest_ttl;
}
+void routing_manager_impl::expire_services(const boost::asio::ip::address &_address) {
+ std::map<service_t,
+ std::map<instance_t,
+ std::pair<bool, bool> > > its_expired_offers;
+
+ for (auto &s : services_) {
+ for (auto &i : s.second) {
+ bool is_gone(false);
+ boost::asio::ip::address its_address;
+ std::shared_ptr<endpoint> its_endpoint = i.second->get_endpoint(true);
+ if (its_endpoint) {
+ if (its_endpoint->get_remote_address(its_address)) {
+ is_gone = (its_address == _address);
+ }
+ } else {
+ its_endpoint = i.second->get_endpoint(false);
+ if (its_endpoint) {
+ if (its_endpoint->get_remote_address(its_address)) {
+ is_gone = (its_address == _address);
+ }
+ }
+ }
+
+ if (is_gone) {
+ if (discovery_)
+ discovery_->unsubscribe_all(s.first, i.first);
+ its_expired_offers[s.first][i.first] = {
+ i.second->get_endpoint(true) != nullptr,
+ i.second->get_endpoint(false) != nullptr
+ };
+ }
+ }
+ }
+
+ for (auto &s : its_expired_offers) {
+ for (auto &i : s.second) {
+ del_routing_info(s.first, i.first, i.second.first, i.second.second);
+ }
+ }
+}
+
+void routing_manager_impl::expire_subscriptions(const boost::asio::ip::address &_address) {
+ for (auto &its_service : eventgroups_) {
+ for (auto &its_instance : its_service.second) {
+ for (auto &its_eventgroup : its_instance.second) {
+ std::set<std::shared_ptr<endpoint_definition>> its_invalid_targets;
+ for (auto &its_target : its_eventgroup.second->get_targets()) {
+ if (its_target->get_address() == _address)
+ its_invalid_targets.insert(its_target);
+ }
+
+ for (auto &its_target : its_invalid_targets) {
+ its_eventgroup.second->remove_target(its_target);
+ }
+ }
+ }
+ }
+}
+
void routing_manager_impl::init_routing_info() {
VSOMEIP_INFO<< "Service Discovery disabled. Using static routing information.";
for (auto i : configuration_->get_remote_services()) {
@@ -2089,10 +2174,17 @@ return_code_e routing_manager_impl::check_error(const byte_t *_data, length_t _s
<< std::hex << its_service;
return return_code_e::E_UNKNOWN_SERVICE;
}
- // TODO: Check interface version handling?!
- if (_data[VSOMEIP_INTERFACE_VERSION_POS] != 0x0) {
- // Interface is currently set to zero always!
- return return_code_e::E_WRONG_INTERFACE_VERSION;
+ // Check interface version of service/instance
+ auto found_service = services_.find(its_service);
+ if (found_service != services_.end()) {
+ auto found_instance = found_service->second.find(_instance);
+ if (found_instance != found_service->second.end()) {
+ auto its_info = found_instance->second;
+ major_version_t its_version = _data[VSOMEIP_INTERFACE_VERSION_POS];
+ if (its_version != its_info->get_major()) {
+ return return_code_e::E_WRONG_INTERFACE_VERSION;
+ }
+ }
}
if (_data[VSOMEIP_RETURN_CODE_POS] != static_cast<byte_t> (return_code_e::E_OK)) {
// Request calls must to have return code E_OK set!