diff options
25 files changed, 2421 insertions, 1822 deletions
diff --git a/qpid/extras/dispatch/CMakeLists.txt b/qpid/extras/dispatch/CMakeLists.txt index ae0e126be7..365e5d6bf7 100644 --- a/qpid/extras/dispatch/CMakeLists.txt +++ b/qpid/extras/dispatch/CMakeLists.txt @@ -61,13 +61,6 @@ if (PYTHONLIBS_FOUND) OUTPUT_STRIP_TRAILING_WHITESPACE) endif (PYTHONLIBS_FOUND) -include_directories( - ${CMAKE_CURRENT_SOURCE_DIR}/include - ${CMAKE_CURRENT_SOURCE_DIR}/src - ${proton_include} - ${PYTHON_INCLUDE_PATH} - ) - ## ## Find dependencies ## @@ -76,6 +69,13 @@ find_library(pthread_lib pthread) find_library(rt_lib rt) find_path(proton_include proton/driver.h) +include_directories( + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${CMAKE_CURRENT_SOURCE_DIR}/src + ${proton_include} + ${PYTHON_INCLUDE_PATH} + ) + set(CMAKE_C_FLAGS "-pthread -Wall -Werror -std=gnu99") set(CATCH_UNDEFINED "-Wl,--no-undefined") @@ -85,6 +85,8 @@ set(CATCH_UNDEFINED "-Wl,--no-undefined") set(server_SOURCES src/agent.c src/alloc.c + src/amqp.c + src/bitmask.c src/buffer.c src/compose.c src/config.c diff --git a/qpid/extras/dispatch/include/qpid/dispatch.h b/qpid/extras/dispatch/include/qpid/dispatch.h index 72b3456099..6b3d6a963f 100644 --- a/qpid/extras/dispatch/include/qpid/dispatch.h +++ b/qpid/extras/dispatch/include/qpid/dispatch.h @@ -20,6 +20,7 @@ */ #include <qpid/dispatch/alloc.h> +#include <qpid/dispatch/bitmask.h> #include <qpid/dispatch/buffer.h> #include <qpid/dispatch/ctools.h> #include <qpid/dispatch/hash.h> diff --git a/qpid/extras/dispatch/include/qpid/dispatch/amqp.h b/qpid/extras/dispatch/include/qpid/dispatch/amqp.h index 9babeb930d..c27eec6589 100644 --- a/qpid/extras/dispatch/include/qpid/dispatch/amqp.h +++ b/qpid/extras/dispatch/include/qpid/dispatch/amqp.h @@ -76,5 +76,23 @@ #define DX_AMQP_ARRAY8 0xe0 #define DX_AMQP_ARRAY32 0xf0 +/** + * Delivery Annotation Headers + */ +const char * const DX_DA_INGRESS; // Ingress Router +const char * const DX_DA_TRACE; // Trace +const char * const DX_DA_TO; // To-Override + +/** + * Link Terminus Capabilities + */ +const char * const DX_CAPABILITY_ROUTER; + +/** + * Miscellaneous Strings + */ +const char * const DX_INTERNODE_LINK_NAME_1; +const char * const DX_INTERNODE_LINK_NAME_2; + #endif diff --git a/qpid/extras/dispatch/include/qpid/dispatch/bitmask.h b/qpid/extras/dispatch/include/qpid/dispatch/bitmask.h new file mode 100644 index 0000000000..fe436bc19b --- /dev/null +++ b/qpid/extras/dispatch/include/qpid/dispatch/bitmask.h @@ -0,0 +1,37 @@ +#ifndef __dispatch_bitmask_h__ +#define __dispatch_bitmask_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +typedef struct dx_bitmask_t dx_bitmask_t; + +int dx_bitmask_width(); +dx_bitmask_t *dx_bitmask(int initial); +void dx_bitmask_free(dx_bitmask_t *b); +void dx_bitmask_set_all(dx_bitmask_t *b); +void dx_bitmask_clear_all(dx_bitmask_t *b); +void dx_bitmask_set_bit(dx_bitmask_t *b, int bitnum); +void dx_bitmask_clear_bit(dx_bitmask_t *b, int bitnum); +int dx_bitmask_value(dx_bitmask_t *b, int bitnum); +int dx_bitmask_first_set(dx_bitmask_t *b, int *bitnum); + + + +#endif + diff --git a/qpid/extras/dispatch/include/qpid/dispatch/router.h b/qpid/extras/dispatch/include/qpid/dispatch/router.h index c900d93615..1dadc8d119 100644 --- a/qpid/extras/dispatch/include/qpid/dispatch/router.h +++ b/qpid/extras/dispatch/include/qpid/dispatch/router.h @@ -27,7 +27,7 @@ typedef struct dx_address_t dx_address_t; -typedef void (*dx_router_message_cb)(void *context, dx_message_t *msg); +typedef void (*dx_router_message_cb)(void *context, dx_message_t *msg, int link_id); const char *dx_router_id(const dx_dispatch_t *dx); diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/adapter.py b/qpid/extras/dispatch/python/qpid/dispatch/router/adapter.py index 7f7f6c9e8e..d21f834751 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/adapter.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/adapter.py @@ -18,82 +18,82 @@ # try: - from dispatch import * + from dispatch import * except ImportError: - from ..stubs import * + from ..stubs import * ENTRY_OLD = 1 ENTRY_CURRENT = 2 ENTRY_NEW = 3 class AdapterEngine(object): - """ - This module is responsible for managing the Adapter's key bindings (list of address-subject:next-hop). - Key binding lists are kept in disjoint key-classes that can come from different parts of the router - (i.e. topological keys for inter-router communication and mobile keys for end users). - - For each key-class, a mirror copy of what the adapter has is kept internally. This allows changes to the - routing tables to be efficiently communicated to the adapter in the form of table deltas. - """ - def __init__(self, container): - self.container = container - self.id = self.container.id - self.area = self.container.area - self.key_classes = {} # map [key_class] => (addr-key, next-hop) - - - def tick(self, now): """ - There is no periodic processing needed for this module. + This module is responsible for managing the Adapter's key bindings (list of address-subject:next-hop). + Key binding lists are kept in disjoint key-classes that can come from different parts of the router + (i.e. topological keys for inter-router communication and mobile keys for end users). + + For each key-class, a mirror copy of what the adapter has is kept internally. This allows changes to the + routing tables to be efficiently communicated to the adapter in the form of table deltas. """ - pass - - - def remote_routes_changed(self, key_class, new_table): - old_table = [] - if key_class in self.key_classes: - old_table = self.key_classes[key_class] - - # flag all of the old entries - old_flags = {} - for a,b in old_table: - old_flags[(a,b)] = ENTRY_OLD - - # flag the new entries - new_flags = {} - for a,b in new_table: - new_flags[(a,b)] = ENTRY_NEW - - # calculate the differences from old to new - for a,b in new_table: - if old_table.count((a,b)) > 0: - old_flags[(a,b)] = ENTRY_CURRENT - new_flags[(a,b)] = ENTRY_CURRENT - - # make to_add and to_delete lists - to_add = [] - to_delete = [] - for (a,b),f in old_flags.items(): - if f == ENTRY_OLD: - to_delete.append((a,b)) - for (a,b),f in new_flags.items(): - if f == ENTRY_NEW: - to_add.append((a,b)) - - # set the routing table to the new contents - self.key_classes[key_class] = new_table - - # update the adapter's routing tables - # Note: Do deletions before adds to avoid overlapping routes that may cause - # messages to be duplicated. It's better to have gaps in the routing - # tables momentarily because unroutable messages are stored for retry. - for a,b in to_delete: - self.container.router_adapter.remote_unbind(a, b) - for a,b in to_add: - self.container.router_adapter.remote_bind(a, b) - - self.container.log(LOG_INFO, "New Routing Table (class=%s):" % key_class) - for a,b in new_table: - self.container.log(LOG_INFO, " %s => %s" % (a, b)) + def __init__(self, container): + self.container = container + self.id = self.container.id + self.area = self.container.area + self.key_classes = {} # map [key_class] => (addr-key, next-hop) + + + def tick(self, now): + """ + There is no periodic processing needed for this module. + """ + pass + + + def remote_routes_changed(self, key_class, new_table): + old_table = [] + if key_class in self.key_classes: + old_table = self.key_classes[key_class] + + # flag all of the old entries + old_flags = {} + for a,b in old_table: + old_flags[(a,b)] = ENTRY_OLD + + # flag the new entries + new_flags = {} + for a,b in new_table: + new_flags[(a,b)] = ENTRY_NEW + + # calculate the differences from old to new + for a,b in new_table: + if old_table.count((a,b)) > 0: + old_flags[(a,b)] = ENTRY_CURRENT + new_flags[(a,b)] = ENTRY_CURRENT + + # make to_add and to_delete lists + to_add = [] + to_delete = [] + for (a,b),f in old_flags.items(): + if f == ENTRY_OLD: + to_delete.append((a,b)) + for (a,b),f in new_flags.items(): + if f == ENTRY_NEW: + to_add.append((a,b)) + + # set the routing table to the new contents + self.key_classes[key_class] = new_table + + # update the adapter's routing tables + # Note: Do deletions before adds to avoid overlapping routes that may cause + # messages to be duplicated. It's better to have gaps in the routing + # tables momentarily because unroutable messages are stored for retry. + for a,b in to_delete: + self.container.router_adapter.remote_unbind(a, b) + for a,b in to_add: + self.container.router_adapter.remote_bind(a, b) + + self.container.log(LOG_INFO, "New Routing Table (class=%s):" % key_class) + for a,b in new_table: + self.container.log(LOG_INFO, " %s => %s" % (a, b)) diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/binding.py b/qpid/extras/dispatch/python/qpid/dispatch/router/binding.py index a37b585e3e..db62f6e8a5 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/binding.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/binding.py @@ -18,116 +18,117 @@ # try: - from dispatch import * + from dispatch import * except ImportError: - from ..stubs import * + from ..stubs import * class BindingEngine(object): - """ - This module is responsible for responding to two different events: + """ + This module is responsible for responding to two different events: 1) The learning of new remote mobile addresses 2) The change of topology (i.e. different next-hops for remote routers) - When these occur, this module converts the mobile routing table (address => router) - to a next-hop routing table (address => next-hop), compresses the keys in case there - are wild-card overlaps, and notifies outbound of changes in the "mobile-key" address class. - """ - def __init__(self, container): - self.container = container - self.id = self.container.id - self.area = self.container.area - self.current_keys = {} - - - def tick(self, now): - pass - - - def mobile_keys_changed(self, keys): - self.current_keys = keys - next_hop_keys = self._convert_ids_to_next_hops(keys) - routing_table = self._compress_keys(next_hop_keys) - self.container.remote_routes_changed('mobile-key', routing_table) - - - def next_hops_changed(self): - next_hop_keys = self._convert_ids_to_next_hops(self.current_keys) - routing_table = self._compress_keys(next_hop_keys) - self.container.remote_routes_changed('mobile-key', routing_table) - - - def _convert_ids_to_next_hops(self, keys): - next_hops = self.container.get_next_hops() - new_keys = {} - for _id, value in keys.items(): - if _id in next_hops: - next_hop = next_hops[_id] - if next_hop not in new_keys: - new_keys[next_hop] = [] - new_keys[next_hop].extend(value) - return new_keys - - - def _compress_keys(self, keys): - trees = {} - for _id, key_list in keys.items(): - trees[_id] = TopicElementList() - for key in key_list: - trees[_id].add_key(key) - routing_table = [] - for _id, tree in trees.items(): - tree_keys = tree.get_list() - for tk in tree_keys: - routing_table.append((tk, _id)) - return routing_table + When these occur, this module converts the mobile routing table (address => router) + to a next-hop routing table (address => next-hop), compresses the keys in case there + are wild-card overlaps, and notifies outbound of changes in the "mobile-key" address class. + """ + def __init__(self, container): + self.container = container + self.id = self.container.id + self.area = self.container.area + self.current_keys = {} + + + def tick(self, now): + pass + + + def mobile_keys_changed(self, keys): + self.current_keys = keys + next_hop_keys = self._convert_ids_to_next_hops(keys) + routing_table = self._compress_keys(next_hop_keys) + self.container.remote_routes_changed('mobile-key', routing_table) + + + def next_hops_changed(self): + next_hop_keys = self._convert_ids_to_next_hops(self.current_keys) + routing_table = self._compress_keys(next_hop_keys) + self.container.remote_routes_changed('mobile-key', routing_table) + + + def _convert_ids_to_next_hops(self, keys): + next_hops = self.container.get_next_hops() + new_keys = {} + for _id, value in keys.items(): + if _id in next_hops: + next_hop = next_hops[_id] + if next_hop not in new_keys: + new_keys[next_hop] = [] + new_keys[next_hop].extend(value) + return new_keys + + + def _compress_keys(self, keys): + trees = {} + for _id, key_list in keys.items(): + trees[_id] = TopicElementList() + for key in key_list: + trees[_id].add_key(key) + routing_table = [] + for _id, tree in trees.items(): + tree_keys = tree.get_list() + for tk in tree_keys: + routing_table.append((tk, _id)) + return routing_table class TopicElementList(object): - """ - """ - def __init__(self): - self.elements = {} # map text => (terminal, sub-list) - - def __repr__(self): - return "%r" % self.elements - - def add_key(self, key): - self.add_tokens(key.split('.')) - - def add_tokens(self, tokens): - first = tokens.pop(0) - terminal = len(tokens) == 0 - - if terminal and first == '#': - ## Optimization #1A (A.B.C.D followed by A.B.#) - self.elements = {'#':(True, TopicElementList())} - return - - if '#' in self.elements: - _t,_el = self.elements['#'] - if _t: - ## Optimization #1B (A.B.# followed by A.B.C.D) - return - - if first not in self.elements: - self.elements[first] = (terminal, TopicElementList()) - else: - _t,_el = self.elements[first] - if terminal and not _t: - self.elements[first] = (terminal, _el) - - if not terminal: - _t,_el = self.elements[first] - _el.add_tokens(tokens) - - def get_list(self): - keys = [] - for token, (_t,_el) in self.elements.items(): - if _t: keys.append(token) - _el.build_list(token, keys) - return keys - - def build_list(self, prefix, keys): - for token, (_t,_el) in self.elements.items(): - if _t: keys.append("%s.%s" % (prefix, token)) - _el.build_list("%s.%s" % (prefix, token), keys) + """ + """ + def __init__(self): + self.elements = {} # map text => (terminal, sub-list) + + def __repr__(self): + return "%r" % self.elements + + def add_key(self, key): + self.add_tokens(key.split('.')) + + def add_tokens(self, tokens): + first = tokens.pop(0) + terminal = len(tokens) == 0 + + if terminal and first == '#': + ## Optimization #1A (A.B.C.D followed by A.B.#) + self.elements = {'#':(True, TopicElementList())} + return + + if '#' in self.elements: + _t,_el = self.elements['#'] + if _t: + ## Optimization #1B (A.B.# followed by A.B.C.D) + return + + if first not in self.elements: + self.elements[first] = (terminal, TopicElementList()) + else: + _t,_el = self.elements[first] + if terminal and not _t: + self.elements[first] = (terminal, _el) + + if not terminal: + _t,_el = self.elements[first] + _el.add_tokens(tokens) + + def get_list(self): + keys = [] + for token, (_t,_el) in self.elements.items(): + if _t: keys.append(token) + _el.build_list(token, keys) + return keys + + def build_list(self, prefix, keys): + for token, (_t,_el) in self.elements.items(): + if _t: keys.append("%s.%s" % (prefix, token)) + _el.build_list("%s.%s" % (prefix, token), keys) + diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/configuration.py b/qpid/extras/dispatch/python/qpid/dispatch/router/configuration.py index f87d2ee7d2..e0fd060b82 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/configuration.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/configuration.py @@ -18,30 +18,30 @@ # class Configuration(object): - """ - This module manages and holds the configuration and tuning parameters for a router. - """ - def __init__(self, overrides={}): - ## - ## Load default values - ## - self.values = { 'hello_interval' : 1.0, - 'hello_max_age' : 3.0, - 'ra_interval' : 30.0, - 'remote_ls_max_age' : 60.0, - 'mobile_addr_max_age' : 60.0 } + """ + This module manages and holds the configuration and tuning parameters for a router. + """ + def __init__(self, overrides={}): + ## + ## Load default values + ## + self.values = { 'hello_interval' : 1.0, + 'hello_max_age' : 3.0, + 'ra_interval' : 30.0, + 'remote_ls_max_age' : 60.0, + 'mobile_addr_max_age' : 60.0 } - ## - ## Apply supplied overrides - ## - for k, v in overrides.items(): - self.values[k] = v + ## + ## Apply supplied overrides + ## + for k, v in overrides.items(): + self.values[k] = v - def __getattr__(self, key): - if key in self.values: - return self.values[key] - raise KeyError + def __getattr__(self, key): + if key in self.values: + return self.values[key] + raise KeyError - def __repr__(self): - return "%r" % self.values + def __repr__(self): + return "%r" % self.values diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/data.py b/qpid/extras/dispatch/python/qpid/dispatch/router/data.py index 89e347a29e..812369768f 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/data.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/data.py @@ -19,257 +19,257 @@ try: - from dispatch import * + from dispatch import * except ImportError: - from ..stubs import * + from ..stubs import * def getMandatory(data, key, cls=None): - """ - Get the value mapped to the requested key. If it's not present, raise an exception. - """ - if key in data: - value = data[key] - if cls and value.__class__ != cls: - raise Exception("Protocol field has wrong data type: '%s' type=%r expected=%r" % (key, value.__class__, cls)) - return value - raise Exception("Mandatory protocol field missing: '%s'" % key) + """ + Get the value mapped to the requested key. If it's not present, raise an exception. + """ + if key in data: + value = data[key] + if cls and value.__class__ != cls: + raise Exception("Protocol field has wrong data type: '%s' type=%r expected=%r" % (key, value.__class__, cls)) + return value + raise Exception("Mandatory protocol field missing: '%s'" % key) def getOptional(data, key, default=None, cls=None): - """ - Get the value mapped to the requested key. If it's not present, return the default value. - """ - if key in data: - value = data[key] - if cls and value.__class__ != cls: - raise Exception("Protocol field has wrong data type: '%s' type=%r expected=%r" % (key, value.__class__, cls)) - return value - return default + """ + Get the value mapped to the requested key. If it's not present, return the default value. + """ + if key in data: + value = data[key] + if cls and value.__class__ != cls: + raise Exception("Protocol field has wrong data type: '%s' type=%r expected=%r" % (key, value.__class__, cls)) + return value + return default class LinkState(object): - """ - The link-state of a single router. The link state consists of a list of neighbor routers reachable from - the reporting router. The link-state-sequence number is incremented each time the link state changes. - """ - def __init__(self, body, _id=None, _area=None, _ls_seq=None, _peers=None): - self.last_seen = 0 - if body: - self.id = getMandatory(body, 'id', str) - self.area = getMandatory(body, 'area', str) - self.ls_seq = getMandatory(body, 'ls_seq', long) - self.peers = getMandatory(body, 'peers', list) - else: - self.id = _id - self.area = _area - self.ls_seq = long(_ls_seq) - self.peers = _peers - - def __repr__(self): - return "LS(id=%s area=%s ls_seq=%d peers=%r)" % (self.id, self.area, self.ls_seq, self.peers) - - def to_dict(self): - return {'id' : self.id, - 'area' : self.area, - 'ls_seq' : self.ls_seq, - 'peers' : self.peers} - - def add_peer(self, _id): - if self.peers.count(_id) == 0: - self.peers.append(_id) - return True - return False - - def del_peer(self, _id): - if self.peers.count(_id) > 0: - self.peers.remove(_id) - return True - return False - - def bump_sequence(self): - self.ls_seq += 1 + """ + The link-state of a single router. The link state consists of a list of neighbor routers reachable from + the reporting router. The link-state-sequence number is incremented each time the link state changes. + """ + def __init__(self, body, _id=None, _area=None, _ls_seq=None, _peers=None): + self.last_seen = 0 + if body: + self.id = getMandatory(body, 'id', str) + self.area = getMandatory(body, 'area', str) + self.ls_seq = getMandatory(body, 'ls_seq', long) + self.peers = getMandatory(body, 'peers', list) + else: + self.id = _id + self.area = _area + self.ls_seq = long(_ls_seq) + self.peers = _peers + + def __repr__(self): + return "LS(id=%s area=%s ls_seq=%d peers=%r)" % (self.id, self.area, self.ls_seq, self.peers) + + def to_dict(self): + return {'id' : self.id, + 'area' : self.area, + 'ls_seq' : self.ls_seq, + 'peers' : self.peers} + + def add_peer(self, _id): + if self.peers.count(_id) == 0: + self.peers.append(_id) + return True + return False + + def del_peer(self, _id): + if self.peers.count(_id) > 0: + self.peers.remove(_id) + return True + return False + + def bump_sequence(self): + self.ls_seq += 1 class MessageHELLO(object): - """ - HELLO Message - scope: neighbors only - HELLO messages travel at most one hop - This message is used by directly connected routers to determine with whom they have - bidirectional connectivity. - """ - def __init__(self, body, _id=None, _area=None, _seen_peers=None): - if body: - self.id = getMandatory(body, 'id', str) - self.area = getMandatory(body, 'area', str) - self.seen_peers = getMandatory(body, 'seen', list) - else: - self.id = _id - self.area = _area - self.seen_peers = _seen_peers - - def __repr__(self): - return "HELLO(id=%s area=%s seen=%r)" % (self.id, self.area, self.seen_peers) - - def get_opcode(self): - return 'HELLO' - - def to_dict(self): - return {'id' : self.id, - 'area' : self.area, - 'seen' : self.seen_peers} - - def is_seen(self, _id): - return self.seen_peers.count(_id) > 0 + """ + HELLO Message + scope: neighbors only - HELLO messages travel at most one hop + This message is used by directly connected routers to determine with whom they have + bidirectional connectivity. + """ + def __init__(self, body, _id=None, _area=None, _seen_peers=None): + if body: + self.id = getMandatory(body, 'id', str) + self.area = getMandatory(body, 'area', str) + self.seen_peers = getMandatory(body, 'seen', list) + else: + self.id = _id + self.area = _area + self.seen_peers = _seen_peers + + def __repr__(self): + return "HELLO(id=%s area=%s seen=%r)" % (self.id, self.area, self.seen_peers) + + def get_opcode(self): + return 'HELLO' + + def to_dict(self): + return {'id' : self.id, + 'area' : self.area, + 'seen' : self.seen_peers} + + def is_seen(self, _id): + return self.seen_peers.count(_id) > 0 class MessageRA(object): - """ - Router Advertisement (RA) Message - scope: all routers in the area and all designated routers - This message is sent periodically to indicate the originating router's sequence numbers - for link-state and mobile-address-state. - """ - def __init__(self, body, _id=None, _area=None, _ls_seq=None, _mobile_seq=None): - if body: - self.id = getMandatory(body, 'id', str) - self.area = getMandatory(body, 'area', str) - self.ls_seq = getMandatory(body, 'ls_seq', long) - self.mobile_seq = getMandatory(body, 'mobile_seq', long) - else: - self.id = _id - self.area = _area - self.ls_seq = long(_ls_seq) - self.mobile_seq = long(_mobile_seq) - - def get_opcode(self): - return 'RA' - - def __repr__(self): - return "RA(id=%s area=%s ls_seq=%d mobile_seq=%d)" % \ - (self.id, self.area, self.ls_seq, self.mobile_seq) - - def to_dict(self): - return {'id' : self.id, - 'area' : self.area, - 'ls_seq' : self.ls_seq, - 'mobile_seq' : self.mobile_seq} + """ + Router Advertisement (RA) Message + scope: all routers in the area and all designated routers + This message is sent periodically to indicate the originating router's sequence numbers + for link-state and mobile-address-state. + """ + def __init__(self, body, _id=None, _area=None, _ls_seq=None, _mobile_seq=None): + if body: + self.id = getMandatory(body, 'id', str) + self.area = getMandatory(body, 'area', str) + self.ls_seq = getMandatory(body, 'ls_seq', long) + self.mobile_seq = getMandatory(body, 'mobile_seq', long) + else: + self.id = _id + self.area = _area + self.ls_seq = long(_ls_seq) + self.mobile_seq = long(_mobile_seq) + + def get_opcode(self): + return 'RA' + + def __repr__(self): + return "RA(id=%s area=%s ls_seq=%d mobile_seq=%d)" % \ + (self.id, self.area, self.ls_seq, self.mobile_seq) + + def to_dict(self): + return {'id' : self.id, + 'area' : self.area, + 'ls_seq' : self.ls_seq, + 'mobile_seq' : self.mobile_seq} class MessageLSU(object): - """ - """ - def __init__(self, body, _id=None, _area=None, _ls_seq=None, _ls=None): - if body: - self.id = getMandatory(body, 'id', str) - self.area = getMandatory(body, 'area', str) - self.ls_seq = getMandatory(body, 'ls_seq', long) - self.ls = LinkState(getMandatory(body, 'ls', dict)) - else: - self.id = _id - self.area = _area - self.ls_seq = long(_ls_seq) - self.ls = _ls - - def get_opcode(self): - return 'LSU' - - def __repr__(self): - return "LSU(id=%s area=%s ls_seq=%d ls=%r)" % \ - (self.id, self.area, self.ls_seq, self.ls) - - def to_dict(self): - return {'id' : self.id, - 'area' : self.area, - 'ls_seq' : self.ls_seq, - 'ls' : self.ls.to_dict()} + """ + """ + def __init__(self, body, _id=None, _area=None, _ls_seq=None, _ls=None): + if body: + self.id = getMandatory(body, 'id', str) + self.area = getMandatory(body, 'area', str) + self.ls_seq = getMandatory(body, 'ls_seq', long) + self.ls = LinkState(getMandatory(body, 'ls', dict)) + else: + self.id = _id + self.area = _area + self.ls_seq = long(_ls_seq) + self.ls = _ls + + def get_opcode(self): + return 'LSU' + + def __repr__(self): + return "LSU(id=%s area=%s ls_seq=%d ls=%r)" % \ + (self.id, self.area, self.ls_seq, self.ls) + + def to_dict(self): + return {'id' : self.id, + 'area' : self.area, + 'ls_seq' : self.ls_seq, + 'ls' : self.ls.to_dict()} class MessageLSR(object): - """ - """ - def __init__(self, body, _id=None, _area=None): - if body: - self.id = getMandatory(body, 'id', str) - self.area = getMandatory(body, 'area', str) - else: - self.id = _id - self.area = _area + """ + """ + def __init__(self, body, _id=None, _area=None): + if body: + self.id = getMandatory(body, 'id', str) + self.area = getMandatory(body, 'area', str) + else: + self.id = _id + self.area = _area - def get_opcode(self): - return 'LSR' + def get_opcode(self): + return 'LSR' - def __repr__(self): - return "LSR(id=%s area=%s)" % (self.id, self.area) + def __repr__(self): + return "LSR(id=%s area=%s)" % (self.id, self.area) - def to_dict(self): - return {'id' : self.id, - 'area' : self.area} + def to_dict(self): + return {'id' : self.id, + 'area' : self.area} class MessageMAU(object): - """ - """ - def __init__(self, body, _id=None, _area=None, _seq=None, _add_list=None, _del_list=None, _exist_list=None): - if body: - self.id = getMandatory(body, 'id', str) - self.area = getMandatory(body, 'area', str) - self.mobile_seq = getMandatory(body, 'mobile_seq', long) - self.add_list = getOptional(body, 'add', None, list) - self.del_list = getOptional(body, 'del', None, list) - self.exist_list = getOptional(body, 'exist', None, list) - else: - self.id = _id - self.area = _area - self.mobile_seq = long(_seq) - self.add_list = _add_list - self.del_list = _del_list - self.exist_list = _exist_list - - def get_opcode(self): - return 'MAU' - - def __repr__(self): - _add = '' - _del = '' - _exist = '' - if self.add_list: _add = ' add=%r' % self.add_list - if self.del_list: _del = ' del=%r' % self.del_list - if self.exist_list: _exist = ' exist=%r' % self.exist_list - return "MAU(id=%s area=%s mobile_seq=%d%s%s%s)" % \ - (self.id, self.area, self.mobile_seq, _add, _del, _exist) - - def to_dict(self): - body = { 'id' : self.id, - 'area' : self.area, - 'mobile_seq' : self.mobile_seq } - if self.add_list: body['add'] = self.add_list - if self.del_list: body['del'] = self.del_list - if self.exist_list: body['exist'] = self.exist_list - return body + """ + """ + def __init__(self, body, _id=None, _area=None, _seq=None, _add_list=None, _del_list=None, _exist_list=None): + if body: + self.id = getMandatory(body, 'id', str) + self.area = getMandatory(body, 'area', str) + self.mobile_seq = getMandatory(body, 'mobile_seq', long) + self.add_list = getOptional(body, 'add', None, list) + self.del_list = getOptional(body, 'del', None, list) + self.exist_list = getOptional(body, 'exist', None, list) + else: + self.id = _id + self.area = _area + self.mobile_seq = long(_seq) + self.add_list = _add_list + self.del_list = _del_list + self.exist_list = _exist_list + + def get_opcode(self): + return 'MAU' + + def __repr__(self): + _add = '' + _del = '' + _exist = '' + if self.add_list: _add = ' add=%r' % self.add_list + if self.del_list: _del = ' del=%r' % self.del_list + if self.exist_list: _exist = ' exist=%r' % self.exist_list + return "MAU(id=%s area=%s mobile_seq=%d%s%s%s)" % \ + (self.id, self.area, self.mobile_seq, _add, _del, _exist) + + def to_dict(self): + body = { 'id' : self.id, + 'area' : self.area, + 'mobile_seq' : self.mobile_seq } + if self.add_list: body['add'] = self.add_list + if self.del_list: body['del'] = self.del_list + if self.exist_list: body['exist'] = self.exist_list + return body class MessageMAR(object): - """ - """ - def __init__(self, body, _id=None, _area=None, _have_seq=None): - if body: - self.id = getMandatory(body, 'id', str) - self.area = getMandatory(body, 'area', str) - self.have_seq = getMandatory(body, 'have_seq', long) - else: - self.id = _id - self.area = _area - self.have_seq = long(_have_seq) - - def get_opcode(self): - return 'MAR' - - def __repr__(self): - return "MAR(id=%s area=%s have_seq=%d)" % (self.id, self.area, self.have_seq) - - def to_dict(self): - return {'id' : self.id, - 'area' : self.area, - 'have_seq' : self.have_seq} + """ + """ + def __init__(self, body, _id=None, _area=None, _have_seq=None): + if body: + self.id = getMandatory(body, 'id', str) + self.area = getMandatory(body, 'area', str) + self.have_seq = getMandatory(body, 'have_seq', long) + else: + self.id = _id + self.area = _area + self.have_seq = long(_have_seq) + + def get_opcode(self): + return 'MAR' + + def __repr__(self): + return "MAR(id=%s area=%s have_seq=%d)" % (self.id, self.area, self.have_seq) + + def to_dict(self): + return {'id' : self.id, + 'area' : self.area, + 'have_seq' : self.have_seq} diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/link.py b/qpid/extras/dispatch/python/qpid/dispatch/router/link.py index fb23177e2f..46e03379b4 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/link.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/link.py @@ -21,123 +21,123 @@ from data import MessageRA, MessageLSU, MessageLSR from time import time try: - from dispatch import * + from dispatch import * except ImportError: - from ..stubs import * + from ..stubs import * class LinkStateEngine(object): - """ - This module is responsible for running the Link State protocol and maintaining the set - of link states that are gathered from the domain. It notifies outbound when changes to - the link-state-collection are detected. - """ - def __init__(self, container): - self.container = container - self.id = self.container.id - self.area = self.container.area - self.ra_interval = self.container.config.ra_interval - self.remote_ls_max_age = self.container.config.remote_ls_max_age - self.last_ra_time = 0 - self.collection = {} - self.collection_changed = False - self.mobile_seq = 0 - self.needed_lsrs = {} - - - def tick(self, now): - self._expire_ls(now) - self._send_lsrs() - - if now - self.last_ra_time >= self.ra_interval: - self.last_ra_time = now - self._send_ra() - - if self.collection_changed: - self.collection_changed = False - self.container.log(LOG_INFO, "New Link-State Collection:") - for a,b in self.collection.items(): - self.container.log(LOG_INFO, " %s => %r" % (a, b.peers)) - self.container.ls_collection_changed(self.collection) - - - def handle_ra(self, msg, now): - if msg.id == self.id: - return - if msg.id in self.collection: - ls = self.collection[msg.id] - ls.last_seen = now - if ls.ls_seq < msg.ls_seq: - self.needed_lsrs[(msg.area, msg.id)] = None - else: - self.needed_lsrs[(msg.area, msg.id)] = None - - - def handle_lsu(self, msg, now): - if msg.id == self.id: - return - if msg.id in self.collection: - ls = self.collection[msg.id] - if ls.ls_seq < msg.ls_seq: - ls = msg.ls - self.collection[msg.id] = ls + """ + This module is responsible for running the Link State protocol and maintaining the set + of link states that are gathered from the domain. It notifies outbound when changes to + the link-state-collection are detected. + """ + def __init__(self, container): + self.container = container + self.id = self.container.id + self.area = self.container.area + self.ra_interval = self.container.config.ra_interval + self.remote_ls_max_age = self.container.config.remote_ls_max_age + self.last_ra_time = 0 + self.collection = {} + self.collection_changed = False + self.mobile_seq = 0 + self.needed_lsrs = {} + + + def tick(self, now): + self._expire_ls(now) + self._send_lsrs() + + if now - self.last_ra_time >= self.ra_interval: + self.last_ra_time = now + self._send_ra() + + if self.collection_changed: + self.collection_changed = False + self.container.log(LOG_INFO, "New Link-State Collection:") + for a,b in self.collection.items(): + self.container.log(LOG_INFO, " %s => %r" % (a, b.peers)) + self.container.ls_collection_changed(self.collection) + + + def handle_ra(self, msg, now): + if msg.id == self.id: + return + if msg.id in self.collection: + ls = self.collection[msg.id] + ls.last_seen = now + if ls.ls_seq < msg.ls_seq: + self.needed_lsrs[(msg.area, msg.id)] = None + else: + self.needed_lsrs[(msg.area, msg.id)] = None + + + def handle_lsu(self, msg, now): + if msg.id == self.id: + return + if msg.id in self.collection: + ls = self.collection[msg.id] + if ls.ls_seq < msg.ls_seq: + ls = msg.ls + self.collection[msg.id] = ls + self.collection_changed = True + ls.last_seen = now + else: + ls = msg.ls + self.collection[msg.id] = ls + self.collection_changed = True + ls.last_seen = now + self.container.new_node(msg.id) + self.container.log(LOG_INFO, "Learned link-state from new router: %s" % msg.id) + # Schedule LSRs for any routers referenced in this LS that we don't know about + for _id in msg.ls.peers: + if _id not in self.collection: + self.needed_lsrs[(msg.area, _id)] = None + + + def handle_lsr(self, msg, now): + if msg.id == self.id: + return + if self.id not in self.collection: + return + my_ls = self.collection[self.id] + self.container.send('_topo/%s/%s' % (msg.area, msg.id), MessageLSU(None, self.id, self.area, my_ls.ls_seq, my_ls)) + + + def new_local_link_state(self, link_state): + self.collection[self.id] = link_state self.collection_changed = True - ls.last_seen = now - else: - ls = msg.ls - self.collection[msg.id] = ls - self.collection_changed = True - ls.last_seen = now - self.container.new_node(msg.id) - self.container.log(LOG_INFO, "Learned link-state from new router: %s" % msg.id) - # Schedule LSRs for any routers referenced in this LS that we don't know about - for _id in msg.ls.peers: - if _id not in self.collection: - self.needed_lsrs[(msg.area, _id)] = None - - - def handle_lsr(self, msg, now): - if msg.id == self.id: - return - if self.id not in self.collection: - return - my_ls = self.collection[self.id] - self.container.send('_topo/%s/%s' % (msg.area, msg.id), MessageLSU(None, self.id, self.area, my_ls.ls_seq, my_ls)) - - - def new_local_link_state(self, link_state): - self.collection[self.id] = link_state - self.collection_changed = True - self._send_ra() - - - def set_mobile_sequence(self, seq): - self.mobile_seq = seq - - - def get_collection(self): - return self.collection - - - def _expire_ls(self, now): - to_delete = [] - for key, ls in self.collection.items(): - if key != self.id and now - ls.last_seen > self.remote_ls_max_age: - to_delete.append(key) - for key in to_delete: - ls = self.collection.pop(key) - self.collection_changed = True - self.container.lost_node(key) - self.container.log(LOG_INFO, "Expired link-state from router: %s" % key) - - - def _send_lsrs(self): - for (_area, _id) in self.needed_lsrs.keys(): - self.container.send('_topo/%s/%s' % (_area, _id), MessageLSR(None, self.id, self.area)) - self.needed_lsrs = {} - - - def _send_ra(self): - ls_seq = 0 - if self.id in self.collection: - ls_seq = self.collection[self.id].ls_seq - self.container.send('_topo/%s/all' % self.area, MessageRA(None, self.id, self.area, ls_seq, self.mobile_seq)) + self._send_ra() + + + def set_mobile_sequence(self, seq): + self.mobile_seq = seq + + + def get_collection(self): + return self.collection + + + def _expire_ls(self, now): + to_delete = [] + for key, ls in self.collection.items(): + if key != self.id and now - ls.last_seen > self.remote_ls_max_age: + to_delete.append(key) + for key in to_delete: + ls = self.collection.pop(key) + self.collection_changed = True + self.container.lost_node(key) + self.container.log(LOG_INFO, "Expired link-state from router: %s" % key) + + + def _send_lsrs(self): + for (_area, _id) in self.needed_lsrs.keys(): + self.container.send('_topo/%s/%s' % (_area, _id), MessageLSR(None, self.id, self.area)) + self.needed_lsrs = {} + + + def _send_ra(self): + ls_seq = 0 + if self.id in self.collection: + ls_seq = self.collection[self.id].ls_seq + self.container.send('_topo/%s/all' % self.area, MessageRA(None, self.id, self.area, ls_seq, self.mobile_seq)) diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py b/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py index ce80f38780..5413dd38a8 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py @@ -20,169 +20,169 @@ from data import MessageRA, MessageMAR, MessageMAU try: - from dispatch import * + from dispatch import * except ImportError: - from ..stubs import * + from ..stubs import * class MobileAddressEngine(object): - """ - This module is responsible for maintaining an up-to-date list of mobile addresses in the domain. - It runs the Mobile-Address protocol and generates an un-optimized routing table for mobile addresses. - Note that this routing table maps from the mobile address to the remote router where that address - is directly bound. - """ - def __init__(self, container): - self.container = container - self.id = self.container.id - self.area = self.container.area - self.mobile_addr_max_age = self.container.config.mobile_addr_max_age - self.mobile_seq = 0 - self.local_keys = [] - self.added_keys = [] - self.deleted_keys = [] - self.remote_lists = {} # map router_id => (sequence, list of keys) - self.remote_last_seen = {} # map router_id => time of last seen advertizement/update - self.remote_changed = False - self.needed_mars = {} - - - def tick(self, now): - self._expire_remotes(now) - self._send_mars() - - ## - ## If local keys have changed, collect the changes and send a MAU with the diffs - ## Note: it is important that the differential-MAU be sent before a RA is sent - ## - if len(self.added_keys) > 0 or len(self.deleted_keys) > 0: - self.mobile_seq += 1 - self.container.send('_topo.%s.all' % self.area, - MessageMAU(None, self.id, self.area, self.mobile_seq, self.added_keys, self.deleted_keys)) - self.local_keys.extend(self.added_keys) - for key in self.deleted_keys: - self.local_keys.remove(key) - self.added_keys = [] - self.deleted_keys = [] - self.container.mobile_sequence_changed(self.mobile_seq) - - ## - ## If remotes have changed, start the process of updating local bindings - ## - if self.remote_changed: - self.remote_changed = False - self._update_remote_keys() - - - def add_local_address(self, key): """ + This module is responsible for maintaining an up-to-date list of mobile addresses in the domain. + It runs the Mobile-Address protocol and generates an un-optimized routing table for mobile addresses. + Note that this routing table maps from the mobile address to the remote router where that address + is directly bound. """ - if self.local_keys.count(key) == 0: - if self.added_keys.count(key) == 0: - self.added_keys.append(key) - else: - if self.deleted_keys.count(key) > 0: - self.deleted_keys.remove(key) - - - def del_local_address(self, key): - """ - """ - if self.local_keys.count(key) > 0: - if self.deleted_keys.count(key) == 0: - self.deleted_keys.append(key) - else: - if self.added_keys.count(key) > 0: - self.added_keys.remove(key) - - - def handle_ra(self, msg, now): - if msg.id == self.id: - return - - if msg.mobile_seq == 0: - return - - if msg.id in self.remote_lists: - _seq, _list = self.remote_lists[msg.id] - self.remote_last_seen[msg.id] = now - if _seq < msg.mobile_seq: - self.needed_mars[(msg.id, msg.area, _seq)] = None - else: - self.needed_mars[(msg.id, msg.area, 0)] = None - - - def handle_mau(self, msg, now): - ## - ## If the MAU is differential, we can only use it if its sequence is exactly one greater - ## than our stored sequence. If not, we will ignore the content and schedule a MAR. - ## - ## If the MAU is absolute, we can use it in all cases. - ## - if msg.id == self.id: - return - - if msg.exist_list: - ## - ## Absolute MAU - ## - if msg.id in self.remote_lists: - _seq, _list = self.remote_lists[msg.id] - if _seq >= msg.mobile_seq: # ignore duplicates - return - self.remote_lists[msg.id] = (msg.mobile_seq, msg.exist_list) - self.remote_last_seen[msg.id] = now - self.remote_changed = True - else: - ## - ## Differential MAU - ## - if msg.id in self.remote_lists: - _seq, _list = self.remote_lists[msg.id] - if _seq == msg.mobile_seq: # ignore duplicates - return - self.remote_last_seen[msg.id] = now - if _seq + 1 == msg.mobile_seq: - ## - ## This is one greater than our stored value, incorporate the deltas - ## - if msg.add_list and msg.add_list.__class__ == list: - _list.extend(msg.add_list) - if msg.del_list and msg.del_list.__class__ == list: - for key in msg.del_list: - _list.remove(key) - self.remote_lists[msg.id] = (msg.mobile_seq, _list) - self.remote_changed = True + def __init__(self, container): + self.container = container + self.id = self.container.id + self.area = self.container.area + self.mobile_addr_max_age = self.container.config.mobile_addr_max_age + self.mobile_seq = 0 + self.local_keys = [] + self.added_keys = [] + self.deleted_keys = [] + self.remote_lists = {} # map router_id => (sequence, list of keys) + self.remote_last_seen = {} # map router_id => time of last seen advertizement/update + self.remote_changed = False + self.needed_mars = {} + + + def tick(self, now): + self._expire_remotes(now) + self._send_mars() + + ## + ## If local keys have changed, collect the changes and send a MAU with the diffs + ## Note: it is important that the differential-MAU be sent before a RA is sent + ## + if len(self.added_keys) > 0 or len(self.deleted_keys) > 0: + self.mobile_seq += 1 + self.container.send('_topo.%s.all' % self.area, + MessageMAU(None, self.id, self.area, self.mobile_seq, self.added_keys, self.deleted_keys)) + self.local_keys.extend(self.added_keys) + for key in self.deleted_keys: + self.local_keys.remove(key) + self.added_keys = [] + self.deleted_keys = [] + self.container.mobile_sequence_changed(self.mobile_seq) + + ## + ## If remotes have changed, start the process of updating local bindings + ## + if self.remote_changed: + self.remote_changed = False + self._update_remote_keys() + + + def add_local_address(self, key): + """ + """ + if self.local_keys.count(key) == 0: + if self.added_keys.count(key) == 0: + self.added_keys.append(key) else: - self.needed_mars[(msg.id, msg.area, _seq)] = None - else: - self.needed_mars[(msg.id, msg.area, 0)] = None - + if self.deleted_keys.count(key) > 0: + self.deleted_keys.remove(key) - def handle_mar(self, msg, now): - if msg.id == self.id: - return - if msg.have_seq < self.mobile_seq: - self.container.send('_topo.%s.%s' % (msg.area, msg.id), - MessageMAU(None, self.id, self.area, self.mobile_seq, None, None, self.local_keys)) - - def _update_remote_keys(self): - keys = {} - for _id,(seq,key_list) in self.remote_lists.items(): - keys[_id] = key_list - self.container.mobile_keys_changed(keys) + def del_local_address(self, key): + """ + """ + if self.local_keys.count(key) > 0: + if self.deleted_keys.count(key) == 0: + self.deleted_keys.append(key) + else: + if self.added_keys.count(key) > 0: + self.added_keys.remove(key) - def _expire_remotes(self, now): - for _id, t in self.remote_last_seen.items(): - if now - t > self.mobile_addr_max_age: - self.remote_lists.pop(_id) - self.remote_last_seen.pop(_id) - self.remote_changed = True + def handle_ra(self, msg, now): + if msg.id == self.id: + return + if msg.mobile_seq == 0: + return - def _send_mars(self): - for _id, _area, _seq in self.needed_mars.keys(): - self.container.send('_topo.%s.%s' % (_area, _id), MessageMAR(None, self.id, self.area, _seq)) - self.needed_mars = {} + if msg.id in self.remote_lists: + _seq, _list = self.remote_lists[msg.id] + self.remote_last_seen[msg.id] = now + if _seq < msg.mobile_seq: + self.needed_mars[(msg.id, msg.area, _seq)] = None + else: + self.needed_mars[(msg.id, msg.area, 0)] = None + + + def handle_mau(self, msg, now): + ## + ## If the MAU is differential, we can only use it if its sequence is exactly one greater + ## than our stored sequence. If not, we will ignore the content and schedule a MAR. + ## + ## If the MAU is absolute, we can use it in all cases. + ## + if msg.id == self.id: + return + + if msg.exist_list: + ## + ## Absolute MAU + ## + if msg.id in self.remote_lists: + _seq, _list = self.remote_lists[msg.id] + if _seq >= msg.mobile_seq: # ignore duplicates + return + self.remote_lists[msg.id] = (msg.mobile_seq, msg.exist_list) + self.remote_last_seen[msg.id] = now + self.remote_changed = True + else: + ## + ## Differential MAU + ## + if msg.id in self.remote_lists: + _seq, _list = self.remote_lists[msg.id] + if _seq == msg.mobile_seq: # ignore duplicates + return + self.remote_last_seen[msg.id] = now + if _seq + 1 == msg.mobile_seq: + ## + ## This is one greater than our stored value, incorporate the deltas + ## + if msg.add_list and msg.add_list.__class__ == list: + _list.extend(msg.add_list) + if msg.del_list and msg.del_list.__class__ == list: + for key in msg.del_list: + _list.remove(key) + self.remote_lists[msg.id] = (msg.mobile_seq, _list) + self.remote_changed = True + else: + self.needed_mars[(msg.id, msg.area, _seq)] = None + else: + self.needed_mars[(msg.id, msg.area, 0)] = None + + + def handle_mar(self, msg, now): + if msg.id == self.id: + return + if msg.have_seq < self.mobile_seq: + self.container.send('_topo.%s.%s' % (msg.area, msg.id), + MessageMAU(None, self.id, self.area, self.mobile_seq, None, None, self.local_keys)) + + + def _update_remote_keys(self): + keys = {} + for _id,(seq,key_list) in self.remote_lists.items(): + keys[_id] = key_list + self.container.mobile_keys_changed(keys) + + + def _expire_remotes(self, now): + for _id, t in self.remote_last_seen.items(): + if now - t > self.mobile_addr_max_age: + self.remote_lists.pop(_id) + self.remote_last_seen.pop(_id) + self.remote_changed = True + + + def _send_mars(self): + for _id, _area, _seq in self.needed_mars.keys(): + self.container.send('_topo.%s.%s' % (_area, _id), MessageMAR(None, self.id, self.area, _seq)) + self.needed_mars = {} diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py b/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py index ea8bacd660..6a16049065 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py @@ -21,63 +21,63 @@ from data import LinkState, MessageHELLO from time import time try: - from dispatch import * + from dispatch import * except ImportError: - from ..stubs import * + from ..stubs import * class NeighborEngine(object): - """ - This module is responsible for maintaining this router's link-state. It runs the HELLO protocol - with the router's neighbors and notifies outbound when the list of neighbors-in-good-standing (the - link-state) changes. - """ - def __init__(self, container): - self.container = container - self.id = self.container.id - self.area = self.container.area - self.last_hello_time = 0.0 - self.hello_interval = container.config.hello_interval - self.hello_max_age = container.config.hello_max_age - self.hellos = {} - self.link_state_changed = False - self.link_state = LinkState(None, self.id, self.area, 0, []) + """ + This module is responsible for maintaining this router's link-state. It runs the HELLO protocol + with the router's neighbors and notifies outbound when the list of neighbors-in-good-standing (the + link-state) changes. + """ + def __init__(self, container): + self.container = container + self.id = self.container.id + self.area = self.container.area + self.last_hello_time = 0.0 + self.hello_interval = container.config.hello_interval + self.hello_max_age = container.config.hello_max_age + self.hellos = {} + self.link_state_changed = False + self.link_state = LinkState(None, self.id, self.area, 0, []) - def tick(self, now): - self._expire_hellos(now) + def tick(self, now): + self._expire_hellos(now) - if now - self.last_hello_time >= self.hello_interval: - self.last_hello_time = now - self.container.send('_local/qdxrouter', MessageHELLO(None, self.id, self.area, self.hellos.keys())) + if now - self.last_hello_time >= self.hello_interval: + self.last_hello_time = now + self.container.send('_local/qdxrouter', MessageHELLO(None, self.id, self.area, self.hellos.keys())) - if self.link_state_changed: - self.link_state_changed = False - self.link_state.bump_sequence() - self.container.local_link_state_changed(self.link_state) + if self.link_state_changed: + self.link_state_changed = False + self.link_state.bump_sequence() + self.container.local_link_state_changed(self.link_state) - def handle_hello(self, msg, now): - if msg.id == self.id: - return - self.hellos[msg.id] = now - if msg.is_seen(self.id): - if self.link_state.add_peer(msg.id): - self.link_state_changed = True - self.container.new_neighbor(msg.id) - self.container.log(LOG_INFO, "New neighbor established: %s" % msg.id) - ## - ## TODO - Use this function to detect area boundaries - ## + def handle_hello(self, msg, now, link_id): + if msg.id == self.id: + return + self.hellos[msg.id] = now + if msg.is_seen(self.id): + if self.link_state.add_peer(msg.id): + self.link_state_changed = True + self.container.new_neighbor(msg.id, link_id) + self.container.log(LOG_INFO, "New neighbor established: %s on link: %d" % (msg.id, link_id)) + ## + ## TODO - Use this function to detect area boundaries + ## - def _expire_hellos(self, now): - to_delete = [] - for key, last_seen in self.hellos.items(): - if now - last_seen > self.hello_max_age: - to_delete.append(key) - for key in to_delete: - self.hellos.pop(key) - if self.link_state.del_peer(key): - self.link_state_changed = True - self.container.lost_neighbor(key) - self.container.log(LOG_INFO, "Neighbor lost: %s" % key) + def _expire_hellos(self, now): + to_delete = [] + for key, last_seen in self.hellos.items(): + if now - last_seen > self.hello_max_age: + to_delete.append(key) + for key in to_delete: + self.hellos.pop(key) + if self.link_state.del_peer(key): + self.link_state_changed = True + self.container.lost_neighbor(key) + self.container.log(LOG_INFO, "Neighbor lost: %s" % key) diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/node.py b/qpid/extras/dispatch/python/qpid/dispatch/router/node.py index c90f7f4232..433ce3ab1e 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/node.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/node.py @@ -18,86 +18,131 @@ # try: - from dispatch import * + from dispatch import * except ImportError: - from ..stubs import * + from ..stubs import * class NodeTracker(object): - """ - This module is responsible for tracking the set of router nodes that are known to this - router. It tracks whether they are neighbor or remote and whether they are reachable. - """ - def __init__(self, container): - self.container = container - self.id = self.container.id - self.area = self.container.area - self.nodes = {} # id => RemoteNode - - - def tick(self, now): - pass - - - def new_neighbor(self, node_id): - if node_id not in self.nodes: - self.nodes[node_id] = RemoteNode(node_id) - self.nodes[node_id].set_neighbor() - self._notify(self.nodes[node_id]) - - - def lost_neighbor(self, node_id): - node = self.nodes[node_id] - node.clear_neighbor() - self._notify(node) - if node.to_delete(): - self.nodes.pop(node_id) - - - def new_node(self, node_id): - if node_id not in self.nodes: - self.nodes[node_id] = RemoteNode(node_id) - self.nodes[node_id].set_remote() - self._notify(self.nodes[node_id]) - - - def lost_node(self, node_id): - node = self.nodes[node_id] - node.clear_remote() - self._notify(node) - if node.to_delete(): - self.nodes.pop(node_id) - - - def _notify(self, node): - if node.to_delete(): - self.container.adapter.node_updated("R%s" % node.id, 0, 0) - else: - is_neighbor = 0 - if node.neighbor: - is_neighbor = 1 - self.container.adapter.node_updated("R%s" % node.id, 1, is_neighbor) + """ + This module is responsible for tracking the set of router nodes that are known to this + router. It tracks whether they are neighbor or remote and whether they are reachable. + + This module is also responsible for assigning a unique mask bit value to each router. + The mask bit is used in the main router to represent sets of valid destinations for addresses. + """ + def __init__(self, container, max_routers): + self.container = container + self.max_routers = max_routers + self.nodes = {} # id => RemoteNode + self.maskbits = [] + self.next_maskbit = 0 + for i in range(max_routers): + self.maskbits.append(None) + + + def tick(self, now): + pass + + + def new_neighbor(self, node_id, link_maskbit): + """ + A node, designated by node_id, has been discovered as a neighbor over a link with + a maskbit of link_maskbit. + """ + if node_id not in self.nodes: + self.nodes[node_id] = RemoteNode(node_id, self._allocate_maskbit()) + self.nodes[node_id].set_neighbor(link_maskbit) + self._notify(self.nodes[node_id]) + + + def lost_neighbor(self, node_id): + """ + We have lost contact with a neighboring node node_id. + """ + node = self.nodes[node_id] + node.clear_neighbor() + self._notify(node) + if node.to_delete(): + self._free_maskbit(node.maskbit) + self.nodes.pop(node_id) + + + def new_node(self, node_id): + """ + A node, designated by node_id, has been discovered through the an advertisement from a + remote peer. + """ + if node_id not in self.nodes: + self.nodes[node_id] = RemoteNode(node_id, self._allocate_maskbit()) + self.nodes[node_id].set_remote() + self._notify(self.nodes[node_id]) + + + def lost_node(self, node_id): + """ + A remote node, node_id, has not been heard from for too long and is being deemed lost. + """ + node = self.nodes[node_id] + node.clear_remote() + self._notify(node) + if node.to_delete(): + self._free_maskbit(node.maskbit) + self.nodes.pop(node_id) + + + def _allocate_maskbit(self): + if self.next_maskbit == None: + raise Exception("Exceeded Maximum Router Count") + result = self.next_maskbit + self.next_maskbit = None + self.maskbits[result] = True + for n in range(result + 1, self.max_routers): + if self.maskbits[n] == None: + self.next_maskbit = n + break + return result + + + def _free_maskbit(self, i): + self.maskbits[i] = None + if self.next_maskbit == None or i < self.next_maskbit: + self.next_maskbit = i + + + def _notify(self, node): + if node.to_delete(): + self.container.node_updated("R%s" % node.id, 0, 0, 0, 0) + else: + is_neighbor = 0 + if node.neighbor: + is_neighbor = 1 + self.container.node_updated("R%s" % node.id, 1, is_neighbor, node.link_maskbit, node.maskbit) class RemoteNode(object): - def __init__(self, node_id): - self.id = node_id - self.neighbor = None - self.remote = None + def __init__(self, node_id, maskbit): + self.id = node_id + self.neighbor = None + self.link_maskbit = None + self.maskbit = maskbit + self.remote = None - def set_neighbor(self): - self.neighbor = True + def set_neighbor(self, link_maskbit): + self.neighbor = True + self.link_maskbit = link_maskbit - def set_remote(self): - self.remote = True + def set_remote(self): + self.remote = True - def clear_neighbor(self): - self.neighbor = None + def clear_neighbor(self): + self.neighbor = None + self.link_maskbit = None - def clear_remote(self): - self.remote = None + def clear_remote(self): + self.remote = None - def to_delete(self): - return self.neighbor or self.remote + def to_delete(self): + return self.neighbor == None and self.remote == None diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/path.py b/qpid/extras/dispatch/python/qpid/dispatch/router/path.py index c051dbe7fc..9357ed1847 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/path.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/path.py @@ -18,185 +18,185 @@ # try: - from dispatch import * + from dispatch import * except ImportError: - from ..stubs import * + from ..stubs import * class PathEngine(object): - """ - This module is responsible for computing the next-hop for every router/area in the domain - based on the collection of link states that have been gathered. - """ - def __init__(self, container): - self.container = container - self.id = self.container.id - self.area = self.container.area - self.recalculate = False - self.collection = None - - - def tick(self, now_unused): - if self.recalculate: - self.recalculate = False - self._calculate_routes() - - - def ls_collection_changed(self, collection): - self.recalculate = True - self.collection = collection - - - def _calculate_tree_from_root(self, root): - ## - ## Make a copy of the current collection of link-states that contains - ## an empty link-state for nodes that are known-peers but are not in the - ## collection currently. This is needed to establish routes to those nodes - ## so we can trade link-state information with them. - ## - link_states = {} - for _id, ls in self.collection.items(): - link_states[_id] = ls.peers - for p in ls.peers: - if p not in link_states: - link_states[p] = [] - - ## - ## Setup Dijkstra's Algorithm - ## - cost = {} - prev = {} - for _id in link_states: - cost[_id] = None # infinite - prev[_id] = None # undefined - cost[root] = 0 # no cost to the root node - unresolved = NodeSet(cost) - - ## - ## Process unresolved nodes until lowest cost paths to all reachable nodes have been found. - ## - while not unresolved.empty(): - u = unresolved.lowest_cost() - if cost[u] == None: - # There are no more reachable nodes in unresolved - break - for v in link_states[u]: - if unresolved.contains(v): - alt = cost[u] + 1 # TODO - Use link cost instead of 1 - if cost[v] == None or alt < cost[v]: - cost[v] = alt - prev[v] = u - unresolved.set_cost(v, alt) - - ## - ## Remove unreachable nodes from the map. Note that this will also remove the - ## root node (has no previous node) from the map. - ## - for u, val in prev.items(): - if not val: - prev.pop(u) - - ## - ## Return previous-node map. This is a map of all reachable, remote nodes to - ## their predecessor node. - ## - return prev - - - def _calculate_routes(self): - ## - ## Generate the shortest-path tree with the local node as root - ## - prev = self._calculate_tree_from_root(self.id) - nodes = prev.keys() - - ## - ## Distill the path tree into a map of next hops for each node - ## - next_hops = {} - while len(nodes) > 0: - u = nodes[0] # pick any destination - path = [u] - nodes.remove(u) - v = prev[u] - while v != self.id: # build a list of nodes in the path back to the root - if v in nodes: - path.append(v) - nodes.remove(v) - u = v - v = prev[u] - for w in path: # mark each node in the path as reachable via the next hop - next_hops[w] = u - - ## - ## TODO - Calculate the tree from each origin, determine the set of origins-per-dest - ## for which the path from origin to dest passes through us. This is the set - ## of valid origins for forwarding to the destination. - ## - - self.container.next_hops_changed(next_hops) + """ + This module is responsible for computing the next-hop for every router/area in the domain + based on the collection of link states that have been gathered. + """ + def __init__(self, container): + self.container = container + self.id = self.container.id + self.area = self.container.area + self.recalculate = False + self.collection = None -class NodeSet(object): - """ - This data structure is an ordered list of node IDs, sorted in increasing order by their cost. - Equal cost nodes are secondarily sorted by their ID in order to provide deterministic and - repeatable ordering. - """ - def __init__(self, cost_map): - self.nodes = [] - for _id, cost in cost_map.items(): - ## - ## Assume that nodes are either unreachable (cost = None) or local (cost = 0) - ## during this initialization. - ## - if cost == 0: - self.nodes.insert(0, (_id, cost)) - else: + def tick(self, now_unused): + if self.recalculate: + self.recalculate = False + self._calculate_routes() + + + def ls_collection_changed(self, collection): + self.recalculate = True + self.collection = collection + + + def _calculate_tree_from_root(self, root): ## - ## There is no need to sort unreachable nodes by ID + ## Make a copy of the current collection of link-states that contains + ## an empty link-state for nodes that are known-peers but are not in the + ## collection currently. This is needed to establish routes to those nodes + ## so we can trade link-state information with them. ## - self.nodes.append((_id, cost)) + link_states = {} + for _id, ls in self.collection.items(): + link_states[_id] = ls.peers + for p in ls.peers: + if p not in link_states: + link_states[p] = [] + ## + ## Setup Dijkstra's Algorithm + ## + cost = {} + prev = {} + for _id in link_states: + cost[_id] = None # infinite + prev[_id] = None # undefined + cost[root] = 0 # no cost to the root node + unresolved = NodeSet(cost) - def __repr__(self): - return self.nodes.__repr__() + ## + ## Process unresolved nodes until lowest cost paths to all reachable nodes have been found. + ## + while not unresolved.empty(): + u = unresolved.lowest_cost() + if cost[u] == None: + # There are no more reachable nodes in unresolved + break + for v in link_states[u]: + if unresolved.contains(v): + alt = cost[u] + 1 # TODO - Use link cost instead of 1 + if cost[v] == None or alt < cost[v]: + cost[v] = alt + prev[v] = u + unresolved.set_cost(v, alt) + ## + ## Remove unreachable nodes from the map. Note that this will also remove the + ## root node (has no previous node) from the map. + ## + for u, val in prev.items(): + if not val: + prev.pop(u) - def empty(self): - return len(self.nodes) == 0 + ## + ## Return previous-node map. This is a map of all reachable, remote nodes to + ## their predecessor node. + ## + return prev - def contains(self, _id): - for a, b in self.nodes: - if a == _id: - return True - return False + def _calculate_routes(self): + ## + ## Generate the shortest-path tree with the local node as root + ## + prev = self._calculate_tree_from_root(self.id) + nodes = prev.keys() + ## + ## Distill the path tree into a map of next hops for each node + ## + next_hops = {} + while len(nodes) > 0: + u = nodes[0] # pick any destination + path = [u] + nodes.remove(u) + v = prev[u] + while v != self.id: # build a list of nodes in the path back to the root + if v in nodes: + path.append(v) + nodes.remove(v) + u = v + v = prev[u] + for w in path: # mark each node in the path as reachable via the next hop + next_hops[w] = u - def lowest_cost(self): - """ - Remove and return the lowest cost node ID. - """ - _id, cost = self.nodes.pop(0) - return _id + ## + ## TODO - Calculate the tree from each origin, determine the set of origins-per-dest + ## for which the path from origin to dest passes through us. This is the set + ## of valid origins for forwarding to the destination. + ## + self.container.next_hops_changed(next_hops) - def set_cost(self, _id, new_cost): +class NodeSet(object): """ - Set the cost for an ID in the NodeSet and re-insert the ID so that the list - remains sorted in increasing cost order. + This data structure is an ordered list of node IDs, sorted in increasing order by their cost. + Equal cost nodes are secondarily sorted by their ID in order to provide deterministic and + repeatable ordering. """ - index = 0 - for i, c in self.nodes: - if i == _id: - break - index += 1 - self.nodes.pop(index) - - index = 0 - for i, c in self.nodes: - if c == None or new_cost < c or (new_cost == c and _id < i): - break - index += 1 - - self.nodes.insert(index, (_id, new_cost)) + def __init__(self, cost_map): + self.nodes = [] + for _id, cost in cost_map.items(): + ## + ## Assume that nodes are either unreachable (cost = None) or local (cost = 0) + ## during this initialization. + ## + if cost == 0: + self.nodes.insert(0, (_id, cost)) + else: + ## + ## There is no need to sort unreachable nodes by ID + ## + self.nodes.append((_id, cost)) + + + def __repr__(self): + return self.nodes.__repr__() + + + def empty(self): + return len(self.nodes) == 0 + + + def contains(self, _id): + for a, b in self.nodes: + if a == _id: + return True + return False + + + def lowest_cost(self): + """ + Remove and return the lowest cost node ID. + """ + _id, cost = self.nodes.pop(0) + return _id + + + def set_cost(self, _id, new_cost): + """ + Set the cost for an ID in the NodeSet and re-insert the ID so that the list + remains sorted in increasing cost order. + """ + index = 0 + for i, c in self.nodes: + if i == _id: + break + index += 1 + self.nodes.pop(index) + + index = 0 + for i, c in self.nodes: + if c == None or new_cost < c or (new_cost == c and _id < i): + break + index += 1 + + self.nodes.insert(index, (_id, new_cost)) + diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py b/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py index 18b48379c5..82f51184a9 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py @@ -36,245 +36,249 @@ from node import NodeTracker ## (i.e. we are in a test bench, etc.), load the stub versions. ## try: - from dispatch import * + from dispatch import * except ImportError: - from ..stubs import * + from ..stubs import * class RouterEngine: - """ - """ - - def __init__(self, router_adapter, router_id=None, area='area', config_override={}): - """ - Initialize an instance of a router for a domain. - """ - ## - ## Record important information about this router instance - ## - self.domain = "domain" - self.router_adapter = router_adapter - self.log_adapter = LogAdapter("dispatch.router") - self.io_adapter = IoAdapter(self, "qdxrouter") - - if router_id: - self.id = router_id - else: - self.id = str(uuid4()) - self.area = area - self.log(LOG_INFO, "Router Engine Instantiated: area=%s id=%s" % (self.area, self.id)) - - ## - ## Setup configuration - ## - self.config = Configuration(config_override) - self.log(LOG_INFO, "Config: %r" % self.config) - - ## - ## Launch the sub-module engines - ## - self.neighbor_engine = NeighborEngine(self) - self.link_state_engine = LinkStateEngine(self) - self.path_engine = PathEngine(self) - self.mobile_address_engine = MobileAddressEngine(self) - self.routing_table_engine = RoutingTableEngine(self) - self.binding_engine = BindingEngine(self) - self.adapter_engine = AdapterEngine(self) - self.node_tracker = NodeTracker(self) - - - - ##======================================================================================== - ## Adapter Entry Points - invoked from the adapter - ##======================================================================================== - def getId(self): - """ - Return the router's ID - """ - return self.id - - - def addLocalAddress(self, key): - """ - """ - try: - if key.find('_topo') == 0 or key.find('_local') == 0: - return - self.mobile_address_engine.add_local_address(key) - except Exception, e: - self.log(LOG_ERROR, "Exception in new-address processing: exception=%r" % e) - - def delLocalAddress(self, key): - """ - """ - try: - if key.find('_topo') == 0 or key.find('_local') == 0: - return - self.mobile_address_engine.del_local_address(key) - except Exception, e: - self.log(LOG_ERROR, "Exception in del-address processing: exception=%r" % e) - - - def handleTimerTick(self): - """ - """ - try: - now = time() - self.neighbor_engine.tick(now) - self.link_state_engine.tick(now) - self.path_engine.tick(now) - self.mobile_address_engine.tick(now) - self.routing_table_engine.tick(now) - self.binding_engine.tick(now) - self.adapter_engine.tick(now) - self.node_tracker.tick(now) - except Exception, e: - self.log(LOG_ERROR, "Exception in timer processing: exception=%r" % e) - - - def handleControlMessage(self, opcode, body): - """ - """ - try: - now = time() - if opcode == 'HELLO': - msg = MessageHELLO(body) - self.log(LOG_TRACE, "RCVD: %r" % msg) - self.neighbor_engine.handle_hello(msg, now) - - elif opcode == 'RA': - msg = MessageRA(body) - self.log(LOG_TRACE, "RCVD: %r" % msg) - self.link_state_engine.handle_ra(msg, now) - self.mobile_address_engine.handle_ra(msg, now) - - elif opcode == 'LSU': - msg = MessageLSU(body) - self.log(LOG_TRACE, "RCVD: %r" % msg) - self.link_state_engine.handle_lsu(msg, now) - - elif opcode == 'LSR': - msg = MessageLSR(body) - self.log(LOG_TRACE, "RCVD: %r" % msg) - self.link_state_engine.handle_lsr(msg, now) - - elif opcode == 'MAU': - msg = MessageMAU(body) - self.log(LOG_TRACE, "RCVD: %r" % msg) - self.mobile_address_engine.handle_mau(msg, now) - - elif opcode == 'MAR': - msg = MessageMAR(body) - self.log(LOG_TRACE, "RCVD: %r" % msg) - self.mobile_address_engine.handle_mar(msg, now) - - except Exception, e: - self.log(LOG_ERROR, "Exception in message processing: opcode=%s body=%r exception=%r" % (opcode, body, e)) - - - def receive(self, message_properties, body): - """ - This is the IoAdapter message-receive handler - """ - try: - self.handleControlMessage(message_properties['opcode'], body) - except Exception, e: - self.log(LOG_ERROR, "Exception in raw message processing: properties=%r body=%r exception=%r" % - (message_properties, body, e)) - - def getRouterData(self, kind): - """ """ - if kind == 'help': - return { 'help' : "Get list of supported values for kind", - 'link-state' : "This router's link state", - 'link-state-set' : "The set of link states from known routers", - 'next-hops' : "Next hops to each known router", - 'topo-table' : "Topological routing table", - 'mobile-table' : "Mobile key routing table" - } - if kind == 'link-state' : return self.neighbor_engine.link_state.to_dict() - if kind == 'next-hops' : return self.routing_table_engine.next_hops - if kind == 'topo-table' : return {'table': self.adapter_engine.key_classes['topological']} - if kind == 'mobile-table' : return {'table': self.adapter_engine.key_classes['mobile-key']} - if kind == 'link-state-set' : - copy = {} - for _id,_ls in self.link_state_engine.collection.items(): - copy[_id] = _ls.to_dict() - return copy - - return {'notice':'Use kind="help" to get a list of possibilities'} - - - ##======================================================================================== - ## Adapter Calls - outbound calls to Dispatch - ##======================================================================================== - def log(self, level, text): """ - Emit a log message to the host's event log - """ - self.log_adapter.log(level, text) - - - def send(self, dest, msg): - """ - Send a control message to another router. - """ - app_props = {'opcode' : msg.get_opcode() } - self.io_adapter.send(dest, app_props, msg.to_dict()) - self.log(LOG_TRACE, "SENT: %r dest=%s" % (msg, dest)) - - - def node_updated(self, addr, reachable, neighbor): - """ - """ - self.router_adapter(addr, reachable, neighbor) - - - ##======================================================================================== - ## Interconnect between the Sub-Modules - ##======================================================================================== - def local_link_state_changed(self, link_state): - self.log(LOG_DEBUG, "Event: local_link_state_changed: %r" % link_state) - self.link_state_engine.new_local_link_state(link_state) - - def ls_collection_changed(self, collection): - self.log(LOG_DEBUG, "Event: ls_collection_changed: %r" % collection) - self.path_engine.ls_collection_changed(collection) - - def next_hops_changed(self, next_hop_table): - self.log(LOG_DEBUG, "Event: next_hops_changed: %r" % next_hop_table) - self.routing_table_engine.next_hops_changed(next_hop_table) - self.binding_engine.next_hops_changed() - - def mobile_sequence_changed(self, mobile_seq): - self.log(LOG_DEBUG, "Event: mobile_sequence_changed: %d" % mobile_seq) - self.link_state_engine.set_mobile_sequence(mobile_seq) - - def mobile_keys_changed(self, keys): - self.log(LOG_DEBUG, "Event: mobile_keys_changed: %r" % keys) - self.binding_engine.mobile_keys_changed(keys) - - def get_next_hops(self): - return self.routing_table_engine.get_next_hops() - - def remote_routes_changed(self, key_class, routes): - self.log(LOG_DEBUG, "Event: remote_routes_changed: class=%s routes=%r" % (key_class, routes)) - self.adapter_engine.remote_routes_changed(key_class, routes) - - def new_neighbor(self, rid): - self.log(LOG_DEBUG, "Event: new_neighbor: id=%s" % rid) - self.node_tracker.new_neighbor(rid) - - def lost_neighbor(self, rid): - self.log(LOG_DEBUG, "Event: lost_neighbor: id=%s" % rid) - self.node_tracker.lost_neighbor(rid) - - def new_node(self, rid): - self.log(LOG_DEBUG, "Event: new_node: id=%s" % rid) - self.node_tracker.new_node(rid) - - def lost_node(self, rid): - self.log(LOG_DEBUG, "Event: lost_node: id=%s" % rid) - self.node_tracker.lost_node(rid) + def __init__(self, router_adapter, router_id, area, max_routers, config_override={}): + """ + Initialize an instance of a router for a domain. + """ + ## + ## Record important information about this router instance + ## + self.domain = "domain" + self.router_adapter = router_adapter + self.log_adapter = LogAdapter("dispatch.router") + self.io_adapter = IoAdapter(self, "qdxrouter") + self.max_routers = max_routers + self.id = router_id + self.area = area + self.log(LOG_INFO, "Router Engine Instantiated: area=%s id=%s max_routers=%d" % + (self.area, self.id, self.max_routers)) + + ## + ## Setup configuration + ## + self.config = Configuration(config_override) + self.log(LOG_INFO, "Config: %r" % self.config) + + ## + ## Launch the sub-module engines + ## + self.neighbor_engine = NeighborEngine(self) + self.link_state_engine = LinkStateEngine(self) + self.path_engine = PathEngine(self) + self.mobile_address_engine = MobileAddressEngine(self) + self.routing_table_engine = RoutingTableEngine(self) + self.binding_engine = BindingEngine(self) + self.adapter_engine = AdapterEngine(self) + self.node_tracker = NodeTracker(self, self.max_routers) + + + + ##======================================================================================== + ## Adapter Entry Points - invoked from the adapter + ##======================================================================================== + def getId(self): + """ + Return the router's ID + """ + return self.id + + + def addLocalAddress(self, key): + """ + """ + try: + if key.find('_topo') == 0 or key.find('_local') == 0: + return + self.mobile_address_engine.add_local_address(key) + except Exception, e: + self.log(LOG_ERROR, "Exception in new-address processing: exception=%r" % e) + + + def delLocalAddress(self, key): + """ + """ + try: + if key.find('_topo') == 0 or key.find('_local') == 0: + return + self.mobile_address_engine.del_local_address(key) + except Exception, e: + self.log(LOG_ERROR, "Exception in del-address processing: exception=%r" % e) + + + def handleTimerTick(self): + """ + """ + try: + now = time() + self.neighbor_engine.tick(now) + self.link_state_engine.tick(now) + self.path_engine.tick(now) + self.mobile_address_engine.tick(now) + self.routing_table_engine.tick(now) + self.binding_engine.tick(now) + self.adapter_engine.tick(now) + self.node_tracker.tick(now) + except Exception, e: + self.log(LOG_ERROR, "Exception in timer processing: exception=%r" % e) + + + def handleControlMessage(self, opcode, body, link_id): + """ + """ + try: + now = time() + if opcode == 'HELLO': + msg = MessageHELLO(body) + self.log(LOG_TRACE, "RCVD: %r" % msg) + self.neighbor_engine.handle_hello(msg, now, link_id) + + elif opcode == 'RA': + msg = MessageRA(body) + self.log(LOG_TRACE, "RCVD: %r" % msg) + self.link_state_engine.handle_ra(msg, now) + self.mobile_address_engine.handle_ra(msg, now) + + elif opcode == 'LSU': + msg = MessageLSU(body) + self.log(LOG_TRACE, "RCVD: %r" % msg) + self.link_state_engine.handle_lsu(msg, now) + + elif opcode == 'LSR': + msg = MessageLSR(body) + self.log(LOG_TRACE, "RCVD: %r" % msg) + self.link_state_engine.handle_lsr(msg, now) + + elif opcode == 'MAU': + msg = MessageMAU(body) + self.log(LOG_TRACE, "RCVD: %r" % msg) + self.mobile_address_engine.handle_mau(msg, now) + + elif opcode == 'MAR': + msg = MessageMAR(body) + self.log(LOG_TRACE, "RCVD: %r" % msg) + self.mobile_address_engine.handle_mar(msg, now) + + except Exception, e: + self.log(LOG_ERROR, "Exception in message processing: opcode=%s body=%r exception=%r" % (opcode, body, e)) + + + def receive(self, message_properties, body, link_id): + """ + This is the IoAdapter message-receive handler + """ + try: + self.handleControlMessage(message_properties['opcode'], body, link_id) + except Exception, e: + self.log(LOG_ERROR, "Exception in raw message processing: properties=%r body=%r exception=%r" % + (message_properties, body, e)) + + + def getRouterData(self, kind): + """ + """ + if kind == 'help': + return { 'help' : "Get list of supported values for kind", + 'link-state' : "This router's link state", + 'link-state-set' : "The set of link states from known routers", + 'next-hops' : "Next hops to each known router", + 'topo-table' : "Topological routing table", + 'mobile-table' : "Mobile key routing table" + } + if kind == 'link-state' : return self.neighbor_engine.link_state.to_dict() + if kind == 'next-hops' : return self.routing_table_engine.next_hops + if kind == 'topo-table' : return {'table': self.adapter_engine.key_classes['topological']} + if kind == 'mobile-table' : return {'table': self.adapter_engine.key_classes['mobile-key']} + if kind == 'link-state-set' : + copy = {} + for _id,_ls in self.link_state_engine.collection.items(): + copy[_id] = _ls.to_dict() + return copy + + return {'notice':'Use kind="help" to get a list of possibilities'} + + + ##======================================================================================== + ## Adapter Calls - outbound calls to Dispatch + ##======================================================================================== + def log(self, level, text): + """ + Emit a log message to the host's event log + """ + self.log_adapter.log(level, text) + + + def send(self, dest, msg): + """ + Send a control message to another router. + """ + app_props = {'opcode' : msg.get_opcode() } + self.io_adapter.send(dest, app_props, msg.to_dict()) + self.log(LOG_TRACE, "SENT: %r dest=%s" % (msg, dest)) + + + def node_updated(self, addr, reachable, neighbor): + """ + """ + self.router_adapter(addr, reachable, neighbor) + + + ##======================================================================================== + ## Interconnect between the Sub-Modules + ##======================================================================================== + def local_link_state_changed(self, link_state): + self.log(LOG_DEBUG, "Event: local_link_state_changed: %r" % link_state) + self.link_state_engine.new_local_link_state(link_state) + + def ls_collection_changed(self, collection): + self.log(LOG_DEBUG, "Event: ls_collection_changed: %r" % collection) + self.path_engine.ls_collection_changed(collection) + + def next_hops_changed(self, next_hop_table): + self.log(LOG_DEBUG, "Event: next_hops_changed: %r" % next_hop_table) + self.routing_table_engine.next_hops_changed(next_hop_table) + self.binding_engine.next_hops_changed() + + def mobile_sequence_changed(self, mobile_seq): + self.log(LOG_DEBUG, "Event: mobile_sequence_changed: %d" % mobile_seq) + self.link_state_engine.set_mobile_sequence(mobile_seq) + + def mobile_keys_changed(self, keys): + self.log(LOG_DEBUG, "Event: mobile_keys_changed: %r" % keys) + self.binding_engine.mobile_keys_changed(keys) + + def get_next_hops(self): + return self.routing_table_engine.get_next_hops() + + def remote_routes_changed(self, key_class, routes): + self.log(LOG_DEBUG, "Event: remote_routes_changed: class=%s routes=%r" % (key_class, routes)) + self.adapter_engine.remote_routes_changed(key_class, routes) + + def new_neighbor(self, rid, link_id): + self.log(LOG_DEBUG, "Event: new_neighbor: id=%s link_id=%d" % (rid, link_id)) + self.node_tracker.new_neighbor(rid, link_id) + + def lost_neighbor(self, rid): + self.log(LOG_DEBUG, "Event: lost_neighbor: id=%s" % rid) + self.node_tracker.lost_neighbor(rid) + + def new_node(self, rid): + self.log(LOG_DEBUG, "Event: new_node: id=%s" % rid) + self.node_tracker.new_node(rid) + + def lost_node(self, rid): + self.log(LOG_DEBUG, "Event: lost_node: id=%s" % rid) + self.node_tracker.lost_node(rid) + + def node_updated(self, address, reachable, neighbor, link_bit, router_bit): + self.log(LOG_DEBUG, "Event: node_updated: address=%s, reachable=%r, neighbor=%r, link_bit=%d, router_bit=%d" % \ + (address, reachable, neighbor, link_bit, router_bit)) + self.router_adapter.node_updataed(address, reachable, neighbor, link_bit, router_bit) diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py b/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py index 8030582177..39b34bbc9b 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py @@ -18,39 +18,39 @@ # try: - from dispatch import * + from dispatch import * except ImportError: - from ..stubs import * + from ..stubs import * class RoutingTableEngine(object): - """ - This module is responsible for converting the set of next hops to remote routers to a routing - table in the "topological" address class. - """ - def __init__(self, container): - self.container = container - self.id = self.container.id - self.area = self.container.area - self.next_hops = {} + """ + This module is responsible for converting the set of next hops to remote routers to a routing + table in the "topological" address class. + """ + def __init__(self, container): + self.container = container + self.id = self.container.id + self.area = self.container.area + self.next_hops = {} - def tick(self, now): - pass + def tick(self, now): + pass - def next_hops_changed(self, next_hops): - # Convert next_hops into routing table - self.next_hops = next_hops - new_table = [] - for _id, next_hop in next_hops.items(): - new_table.append(('_topo.%s.%s.#' % (self.area, _id), next_hop)) - pair = ('_topo.%s.all' % (self.area), next_hop) - if new_table.count(pair) == 0: - new_table.append(pair) + def next_hops_changed(self, next_hops): + # Convert next_hops into routing table + self.next_hops = next_hops + new_table = [] + for _id, next_hop in next_hops.items(): + new_table.append(('_topo.%s.%s.#' % (self.area, _id), next_hop)) + pair = ('_topo.%s.all' % (self.area), next_hop) + if new_table.count(pair) == 0: + new_table.append(pair) - self.container.remote_routes_changed('topological', new_table) + self.container.remote_routes_changed('topological', new_table) - def get_next_hops(self): - return self.next_hops + def get_next_hops(self): + return self.next_hops diff --git a/qpid/extras/dispatch/router/src/main.c b/qpid/extras/dispatch/router/src/main.c index 62e3a2beb6..e0d6849d1b 100644 --- a/qpid/extras/dispatch/router/src/main.c +++ b/qpid/extras/dispatch/router/src/main.c @@ -117,7 +117,7 @@ int main(int argc, char **argv) } } - dx_log_set_mask(0xFFFFFFFF); + dx_log_set_mask(0xFFFFFFFE); dispatch = dx_dispatch(config_path); diff --git a/qpid/extras/dispatch/src/agent.c b/qpid/extras/dispatch/src/agent.c index 557410910c..2063b8fcc4 100644 --- a/qpid/extras/dispatch/src/agent.c +++ b/qpid/extras/dispatch/src/agent.c @@ -240,7 +240,7 @@ static void dx_agent_deferred_handler(void *context) } -static void dx_agent_rx_handler(void *context, dx_message_t *msg) +static void dx_agent_rx_handler(void *context, dx_message_t *msg, int unused_link_id) { dx_agent_t *agent = (dx_agent_t*) context; dx_message_t *copy = dx_message_copy(msg); diff --git a/qpid/extras/dispatch/src/amqp.c b/qpid/extras/dispatch/src/amqp.c new file mode 100644 index 0000000000..6a8545b757 --- /dev/null +++ b/qpid/extras/dispatch/src/amqp.c @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/amqp.h> + +const char * const DX_DA_INGRESS = "qdx.ingress"; +const char * const DX_DA_TRACE = "qdx.trace"; +const char * const DX_DA_TO = "qdx.to"; + +const char * const DX_CAPABILITY_ROUTER = "qdx.router"; + +const char * const DX_INTERNODE_LINK_NAME_1 = "qdx.internode.1"; +const char * const DX_INTERNODE_LINK_NAME_2 = "qdx.internode.2"; + diff --git a/qpid/extras/dispatch/src/bitmask.c b/qpid/extras/dispatch/src/bitmask.c new file mode 100644 index 0000000000..5341f8d83a --- /dev/null +++ b/qpid/extras/dispatch/src/bitmask.c @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/bitmask.h> +#include <qpid/dispatch/alloc.h> +#include <assert.h> + +#define DX_BITMASK_LONGS 16 +#define DX_BITMASK_BITS (DX_BITMASK_LONGS * 64) + +struct dx_bitmask_t { + uint64_t array[DX_BITMASK_LONGS]; + int first_set; +}; + +ALLOC_DECLARE(dx_bitmask_t); +ALLOC_DEFINE(dx_bitmask_t); + +#define MASK_INDEX(num) (num / 64) +#define MASK_ONEHOT(num) (1 << (num % 64)) +#define FIRST_NONE -1 +#define FIRST_UNKNOWN -2 + + +int dx_bitmask_width() +{ + return DX_BITMASK_BITS; +} + + +dx_bitmask_t *dx_bitmask(int initial) +{ + dx_bitmask_t *b = new_dx_bitmask_t(); + if (initial) + dx_bitmask_set_all(b); + else + dx_bitmask_clear_all(b); + return b; +} + + +void dx_bitmask_free(dx_bitmask_t *b) +{ + free_dx_bitmask_t(b); +} + + +void dx_bitmask_set_all(dx_bitmask_t *b) +{ + for (int i = 0; i < DX_BITMASK_LONGS; i++) + b->array[i] = 0xFFFFFFFFFFFFFFFF; + b->first_set = 0; +} + + +void dx_bitmask_clear_all(dx_bitmask_t *b) +{ + for (int i = 0; i < DX_BITMASK_LONGS; i++) + b->array[i] = 0; + b->first_set = FIRST_NONE; +} + + +void dx_bitmask_set_bit(dx_bitmask_t *b, int bitnum) +{ + assert(bitnum < DX_BITMASK_BITS); + b->array[MASK_INDEX(bitnum)] |= MASK_ONEHOT(bitnum); + if (b->first_set > bitnum) + b->first_set = bitnum; +} + + +void dx_bitmask_clear_bit(dx_bitmask_t *b, int bitnum) +{ + assert(bitnum < DX_BITMASK_BITS); + b->array[MASK_INDEX(bitnum)] &= ~(MASK_ONEHOT(bitnum)); + if (b->first_set == bitnum) + b->first_set = FIRST_UNKNOWN; +} + + +int dx_bitmask_value(dx_bitmask_t *b, int bitnum) +{ + return (b->array[MASK_INDEX(bitnum)] & MASK_ONEHOT(bitnum)) ? 1 : 0; +} + + +int dx_bitmask_first_set(dx_bitmask_t *b, int *bitnum) +{ + if (b->first_set == FIRST_UNKNOWN) { + b->first_set = FIRST_NONE; + for (int i = 0; i < DX_BITMASK_LONGS; i++) + if (b->array[i]) { + for (int j = 0; j < 64; j++) + if ((1 << j) & b->array[i]) { + b->first_set = i * 64 + j; + break; + } + break; + } + } + + if (b->first_set == FIRST_NONE) + return 0; + *bitnum = b->first_set; + return 1; +} + diff --git a/qpid/extras/dispatch/src/message_private.h b/qpid/extras/dispatch/src/message_private.h index 27b81bbb4c..c57cea5f0d 100644 --- a/qpid/extras/dispatch/src/message_private.h +++ b/qpid/extras/dispatch/src/message_private.h @@ -67,7 +67,7 @@ typedef struct { sys_mutex_t *lock; uint32_t ref_count; // The number of messages referencing this dx_buffer_list_t buffers; // The buffer chain containing the message - dx_buffer_list_t new_delivery_annotations; // The buffer chain containing the new delivery annotations + dx_buffer_list_t new_delivery_annotations; // The buffer chain containing the new delivery annotations (MOVE TO MSG_PVT) dx_field_location_t section_message_header; // The message header list dx_field_location_t section_delivery_annotation; // The delivery annotation map dx_field_location_t section_message_annotation; // The message annotation map diff --git a/qpid/extras/dispatch/src/python_embedded.c b/qpid/extras/dispatch/src/python_embedded.c index 0b0cc11025..2db8f583de 100644 --- a/qpid/extras/dispatch/src/python_embedded.c +++ b/qpid/extras/dispatch/src/python_embedded.c @@ -402,7 +402,7 @@ typedef struct { } IoAdapter; -static void dx_io_rx_handler(void *context, dx_message_t *msg) +static void dx_io_rx_handler(void *context, dx_message_t *msg, int link_id) { IoAdapter *self = (IoAdapter*) context; @@ -454,9 +454,10 @@ static void dx_io_rx_handler(void *context, dx_message_t *msg) PyObject *pAP = dx_field_to_py(ap_map); PyObject *pBody = dx_field_to_py(body_map); - PyObject *pArgs = PyTuple_New(2); + PyObject *pArgs = PyTuple_New(3); PyTuple_SetItem(pArgs, 0, pAP); PyTuple_SetItem(pArgs, 1, pBody); + PyTuple_SetItem(pArgs, 2, PyInt_FromLong((long) link_id)); PyObject *pValue = PyObject_CallObject(self->handler_rx_call, pArgs); Py_DECREF(pArgs); @@ -507,10 +508,10 @@ static PyObject* dx_python_send(PyObject *self, PyObject *args) field = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, field); dx_compose_start_map(field); - dx_compose_insert_string(field, "qdx.ingress"); + dx_compose_insert_string(field, DX_DA_INGRESS); dx_compose_insert_string(field, dx_router_id(ioa->dx)); - dx_compose_insert_string(field, "qdx.trace"); + dx_compose_insert_string(field, DX_DA_TRACE); dx_compose_start_list(field); dx_compose_insert_string(field, dx_router_id(ioa->dx)); dx_compose_end_list(field); diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c index d2704c4bd5..dc63a45451 100644 --- a/qpid/extras/dispatch/src/router_node.c +++ b/qpid/extras/dispatch/src/router_node.c @@ -21,17 +21,18 @@ #include <stdio.h> #include <string.h> #include <stdbool.h> +#include <stdlib.h> #include <qpid/dispatch.h> #include "dispatch_private.h" +#include "router_private.h" static char *module = "ROUTER"; static void dx_router_python_setup(dx_router_t *router); static void dx_pyrouter_tick(dx_router_t *router); -static char *router_address = "_local/qdxrouter"; -static char *local_prefix = "_local/"; -//static char *topo_prefix = "_topo/"; +static char *local_prefix = "_local/"; +static char *topo_prefix = "_topo/"; /** * Address Types and Processing: @@ -48,86 +49,82 @@ static char *local_prefix = "_local/"; * <mobile> M<mobile> forward handler */ +ALLOC_DEFINE(dx_routed_event_t); +ALLOC_DEFINE(dx_router_link_t); +ALLOC_DEFINE(dx_router_node_t); +ALLOC_DEFINE(dx_router_ref_t); +ALLOC_DEFINE(dx_router_link_ref_t); +ALLOC_DEFINE(dx_address_t); -typedef struct dx_router_link_t dx_router_link_t; -typedef struct dx_router_node_t dx_router_node_t; +static void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link) +{ + dx_router_link_ref_t *ref = new_dx_router_link_ref_t(); + DEQ_ITEM_INIT(ref); + ref->link = link; + link->ref = ref; + DEQ_INSERT_TAIL(*ref_list, ref); +} -typedef enum { - DX_LINK_ENDPOINT, // A link to a connected endpoint - DX_LINK_ROUTER, // A link to a peer router in the same area - DX_LINK_AREA // A link to a peer router in a different area (area boundary) -} dx_link_type_t; +static void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link) +{ + if (link->ref) { + DEQ_REMOVE(*ref_list, link->ref); + free_dx_router_link_ref_t(link->ref); + link->ref = 0; + } +} -typedef struct dx_routed_event_t { - DEQ_LINKS(struct dx_routed_event_t); - dx_delivery_t *delivery; - dx_message_t *message; - bool settle; - uint64_t disposition; -} dx_routed_event_t; -ALLOC_DECLARE(dx_routed_event_t); -ALLOC_DEFINE(dx_routed_event_t); -DEQ_DECLARE(dx_routed_event_t, dx_routed_event_list_t); - - -struct dx_router_link_t { - DEQ_LINKS(dx_router_link_t); - dx_direction_t link_direction; - dx_link_type_t link_type; - dx_address_t *owning_addr; // [ref] Address record that owns this link - dx_link_t *link; // [own] Link pointer - dx_router_link_t *connected_link; // [ref] If this is a link-route, reference the connected link - dx_router_link_t *peer_link; // [ref] If this is a bidirectional link-route, reference the peer link - dx_routed_event_list_t event_fifo; // FIFO of outgoing delivery/link events (no messages) - dx_routed_event_list_t msg_fifo; // FIFO of outgoing message deliveries -}; +/** + * Check an address to see if it no longer has any associated destinations. + * Depending on its policy, the address may be eligible for being closed out + * (i.e. Logging its terminal statistics and freeing its resources). + */ +static void dx_router_check_addr_LH(dx_address_t *addr) +{ + // TODO +} -ALLOC_DECLARE(dx_router_link_t); -ALLOC_DEFINE(dx_router_link_t); -DEQ_DECLARE(dx_router_link_t, dx_router_link_list_t); - -struct dx_router_node_t { - DEQ_LINKS(dx_router_node_t); - const char *id; - dx_router_node_t *next_hop; // Next hop node _if_ this is not a neighbor node - dx_router_link_t *peer_link; // Outgoing link _if_ this is a neighbor node - // list of valid origins (pointers to router_node) - (bit masks?) -}; -ALLOC_DECLARE(dx_router_node_t); -ALLOC_DEFINE(dx_router_node_t); -DEQ_DECLARE(dx_router_node_t, dx_router_node_list_t); +/** + * Determine whether a terminus has router capability + */ +static int dx_router_terminus_is_router(pn_terminus_t *term) +{ + pn_data_t *cap = pn_terminus_capabilities(term); + if (cap && pn_data_type(cap) == PN_SYMBOL) { + pn_bytes_t sym = pn_data_get_symbol(cap); + if (sym.size == strlen(DX_CAPABILITY_ROUTER) && + strcmp(sym.start, DX_CAPABILITY_ROUTER) == 0) + return 1; + } -struct dx_address_t { - dx_router_message_cb handler; // In-Process Consumer - void *handler_context; - dx_router_link_list_t rlinks; // Locally-Connected Consumers - dx_router_node_list_t rnodes; // Remotely-Connected Consumers -}; + return 0; +} -ALLOC_DECLARE(dx_address_t); -ALLOC_DEFINE(dx_address_t); +static void dx_router_generate_temp_addr(dx_router_t *router, char *buffer, size_t length) +{ + static const char *table = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+_"; + char discriminator[11]; + long int rnd = random(); + int idx; + + for (idx = 0; idx < 10; idx++) + discriminator[idx] = table[(rnd >> (idx * 6)) & 63]; + discriminator[idx] = '\0'; -struct dx_router_t { - dx_dispatch_t *dx; - const char *router_area; - const char *router_id; - dx_node_t *node; - dx_router_link_list_t in_links; - dx_router_node_list_t routers; - dx_message_list_t in_fifo; - sys_mutex_t *lock; - dx_timer_t *timer; - hash_t *out_hash; - uint64_t dtag; - PyObject *pyRouter; - PyObject *pyTick; -}; + snprintf(buffer, length, "amqp:/%s%s/%s/temp.%s", topo_prefix, router->router_area, router->router_id, discriminator); +} + + +static int dx_router_find_mask_bit(dx_link_t *link) +{ + return 0; // TODO +} /** @@ -191,7 +188,7 @@ static int router_writable_link_handler(void* context, dx_link_t *link) DEQ_REMOVE_HEAD(to_send); // - // Get a delivery for the send. This will be the current deliver on the link. + // Get a delivery for the send. This will be the current delivery on the link. // tag++; delivery = dx_delivery(link, pn_dtag((char*) &tag, 8)); @@ -259,8 +256,8 @@ static void router_annotate_message(dx_router_t *router, dx_message_t *msg) dx_parsed_field_t *ingress = 0; if (in_da) { - trace = dx_parse_value_by_key(in_da, "qdx.trace"); - ingress = dx_parse_value_by_key(in_da, "qdx.ingress"); + trace = dx_parse_value_by_key(in_da, DX_DA_TRACE); + ingress = dx_parse_value_by_key(in_da, DX_DA_INGRESS); } dx_compose_start_map(out_da); @@ -269,7 +266,7 @@ static void router_annotate_message(dx_router_t *router, dx_message_t *msg) // If there is a trace field, append this router's ID to the trace. // if (trace && dx_parse_is_list(trace)) { - dx_compose_insert_string(out_da, "qdx.trace"); + dx_compose_insert_string(out_da, DX_DA_TRACE); dx_compose_start_list(out_da); uint32_t idx = 0; @@ -289,7 +286,7 @@ static void router_annotate_message(dx_router_t *router, dx_message_t *msg) // If there is no ingress field, annotate the ingress as this router else // keep the original field. // - dx_compose_insert_string(out_da, "qdx.ingress"); + dx_compose_insert_string(out_da, DX_DA_INGRESS); if (ingress && dx_parse_is_scalar(ingress)) { dx_field_iterator_t *iter = dx_parse_raw(ingress); dx_compose_insert_string_iterator(out_da, iter); @@ -380,7 +377,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del if (iter) { dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); - hash_retrieve(router->out_hash, iter, (void*) &addr); + hash_retrieve(router->addr_hash, iter, (void*) &addr); dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST); int is_local = dx_field_iterator_prefix(iter, local_prefix); dx_field_iterator_free(iter); @@ -415,33 +412,34 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del // // Forward to all of the local links receiving this address. // - dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks); - while (dest_link) { + dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks); + while (dest_link_ref) { dx_routed_event_t *re = new_dx_routed_event_t(); DEQ_ITEM_INIT(re); re->delivery = 0; re->message = dx_message_copy(msg); re->settle = 0; re->disposition = 0; - DEQ_INSERT_TAIL(dest_link->msg_fifo, re); + DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re); fanout++; if (fanout == 1 && !dx_delivery_settled(delivery)) re->delivery = delivery; - dx_link_activate(dest_link->link); - dest_link = DEQ_NEXT(dest_link); + dx_link_activate(dest_link_ref->link->link); + dest_link_ref = DEQ_NEXT(dest_link_ref); } // // Forward to the next-hops for remote destinations. // - dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes); - while (dest_node) { - if (dest_node->next_hop) - dest_link = dest_node->next_hop->peer_link; + dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes); + dx_router_link_t *dest_link; + while (dest_node_ref) { + if (dest_node_ref->router->next_hop) + dest_link = dest_node_ref->router->next_hop->peer_link; else - dest_link = dest_node->peer_link; + dest_link = dest_node_ref->router->peer_link; if (dest_link) { dx_routed_event_t *re = new_dx_routed_event_t(); DEQ_ITEM_INIT(re); @@ -457,7 +455,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del dx_link_activate(dest_link->link); } - dest_node = DEQ_NEXT(dest_node); + dest_node_ref = DEQ_NEXT(dest_node_ref); } } } @@ -487,7 +485,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del // Invoke the in-process handler now that the lock is released. // if (handler) - handler(handler_context, in_process_copy); + handler(handler_context, in_process_copy, rlink->mask_bit); } @@ -541,25 +539,28 @@ static int router_incoming_link_handler(void* context, dx_link_t *link) dx_router_t *router = (dx_router_t*) context; dx_router_link_t *rlink = new_dx_router_link_t(); pn_link_t *pn_link = dx_link_pn(link); + int is_router = dx_router_terminus_is_router(dx_link_remote_source(link)); DEQ_ITEM_INIT(rlink); + rlink->mask_bit = is_router ? dx_router_find_mask_bit(link) : 0; + rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT; rlink->link_direction = DX_INCOMING; - rlink->link_type = DX_LINK_ENDPOINT; rlink->owning_addr = 0; rlink->link = link; rlink->connected_link = 0; rlink->peer_link = 0; + rlink->ref = 0; DEQ_INIT(rlink->event_fifo); DEQ_INIT(rlink->msg_fifo); dx_link_set_context(link, rlink); sys_mutex_lock(router->lock); - DEQ_INSERT_TAIL(router->in_links, rlink); + DEQ_INSERT_TAIL(router->links, rlink); sys_mutex_unlock(router->lock); - pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link)); - pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link)); + pn_terminus_copy(dx_link_source(link), dx_link_remote_source(link)); + pn_terminus_copy(dx_link_target(link), dx_link_remote_target(link)); pn_link_flow(pn_link, 1000); pn_link_open(pn_link); @@ -579,52 +580,90 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link) { dx_router_t *router = (dx_router_t*) context; pn_link_t *pn_link = dx_link_pn(link); - const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link)); + const char *r_src = pn_terminus_get_address(dx_link_remote_source(link)); + int is_dynamic = pn_terminus_is_dynamic(dx_link_remote_source(link)); + int is_router = dx_router_terminus_is_router(dx_link_remote_target(link)); - if (!r_tgt) { + // + // If this link is not a router link and it has no source address, we can't + // accept it. + // + if (r_src == 0 && !is_router && !is_dynamic) { pn_link_close(pn_link); return 0; } - dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST); - dx_router_link_t *rlink = new_dx_router_link_t(); - - int is_router = dx_field_iterator_equal(iter, (unsigned char*) router_address); - + // + // Create a router_link record for this link. Some of the fields will be + // modified in the different cases below. + // + dx_router_link_t *rlink = new_dx_router_link_t(); DEQ_ITEM_INIT(rlink); - rlink->link_direction = DX_OUTGOING; + rlink->mask_bit = is_router ? dx_router_find_mask_bit(link) : 0; rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT; + rlink->link_direction = DX_OUTGOING; + rlink->owning_addr = 0; rlink->link = link; rlink->connected_link = 0; rlink->peer_link = 0; + rlink->ref = 0; DEQ_INIT(rlink->event_fifo); DEQ_INIT(rlink->msg_fifo); dx_link_set_context(link, rlink); - - dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); - dx_address_t *addr; + pn_terminus_copy(dx_link_source(link), dx_link_remote_source(link)); + pn_terminus_copy(dx_link_target(link), dx_link_remote_target(link)); sys_mutex_lock(router->lock); - hash_retrieve(router->out_hash, iter, (void**) &addr); - if (!addr) { - addr = new_dx_address_t(); - addr->handler = 0; - addr->handler_context = 0; - DEQ_INIT(addr->rlinks); - DEQ_INIT(addr->rnodes); - hash_insert(router->out_hash, iter, addr); + + if (is_router) { + // + // If this is a router link, put it in the router_address link-list. + // + dx_router_add_link_ref_LH(&router->router_addr->rlinks, rlink); + rlink->owning_addr = router->router_addr; + + } else { + // + // If this is an endpoint link, check the source. If it is dynamic, we will + // assign it an ephemeral and routable address. If it has a non-dymanic + // address, that address needs to be set up in the address list. + // + dx_field_iterator_t *iter; + char temp_addr[1000]; + dx_address_t *addr; + + if (is_dynamic) { + dx_router_generate_temp_addr(router, temp_addr, 1000); + iter = dx_field_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH); + pn_terminus_set_address(dx_link_source(link), temp_addr); + dx_log(module, LOG_INFO, "Assigned temporary routable address: %s", temp_addr); + } else { + iter = dx_field_iterator_string(r_src, ITER_VIEW_ADDRESS_HASH); + dx_log(module, LOG_INFO, "Registered local address: %s", r_src); + } + + hash_retrieve(router->addr_hash, iter, (void**) &addr); + if (!addr) { + addr = new_dx_address_t(); + DEQ_ITEM_INIT(addr); + addr->handler = 0; + addr->handler_context = 0; + DEQ_INIT(addr->rlinks); + DEQ_INIT(addr->rnodes); + hash_insert(router->addr_hash, iter, addr); + DEQ_INSERT_TAIL(router->addrs, addr); + } + dx_field_iterator_free(iter); + + rlink->owning_addr = addr; + dx_router_add_link_ref_LH(&addr->rlinks, rlink); } - dx_field_iterator_free(iter); - rlink->owning_addr = addr; - DEQ_INSERT_TAIL(addr->rlinks, rlink); + DEQ_INSERT_TAIL(router->links, rlink); + sys_mutex_unlock(router->lock); - pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link)); - pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link)); pn_link_open(pn_link); - sys_mutex_unlock(router->lock); - dx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt); return 0; } @@ -634,40 +673,37 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link) */ static int router_link_detach_handler(void* context, dx_link_t *link, int closed) { - dx_router_t *router = (dx_router_t*) context; - pn_link_t *pn_link = dx_link_pn(link); - dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); - const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link)); + dx_router_t *router = (dx_router_t*) context; + dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); if (!rlink) return 0; sys_mutex_lock(router->lock); - if (pn_link_is_sender(pn_link)) { - DEQ_REMOVE(rlink->owning_addr->rlinks, rlink); - - if ((rlink->owning_addr->handler == 0) && - (DEQ_SIZE(rlink->owning_addr->rlinks) == 0) && - (DEQ_SIZE(rlink->owning_addr->rnodes) == 0)) { - dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH); - dx_address_t *addr; - if (iter) { - hash_retrieve(router->out_hash, iter, (void**) &addr); - if (addr == rlink->owning_addr) { - hash_remove(router->out_hash, iter); - free_dx_router_link_t(rlink); - free_dx_address_t(addr); - dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt); - } - dx_field_iterator_free(iter); - } - } - } else { - DEQ_REMOVE(router->in_links, rlink); - free_dx_router_link_t(rlink); + + // + // If the link is outgoing, we must disassociate it from its address. + // + if (rlink->link_direction == DX_OUTGOING && rlink->owning_addr) { + dx_router_del_link_ref_LH(&rlink->owning_addr->rlinks, rlink); + dx_router_check_addr_LH(rlink->owning_addr); } + // + // If this is an incoming inter-router link, we must free the mask_bit. + // + if (rlink->link_type == DX_LINK_ROUTER && rlink->link_direction == DX_INCOMING) + dx_bitmask_set_bit(router->neighbor_free_mask, rlink->mask_bit); + + // + // Remove the link from the master list-of-links. + // + DEQ_REMOVE(router->links, rlink); sys_mutex_unlock(router->lock); + + // TODO - wrap the free to handle the recursive items + free_dx_router_link_t(rlink); + return 0; } @@ -683,24 +719,37 @@ static void router_outbound_open_handler(void *type_context, dx_connection_t *co // Ignore otherwise dx_router_t *router = (dx_router_t*) type_context; - dx_field_iterator_t *aiter = dx_field_iterator_string(router_address, ITER_VIEW_ADDRESS_HASH); dx_link_t *sender; dx_link_t *receiver; dx_router_link_t *rlink; + int mask_bit = 0; + size_t clen = strlen(DX_CAPABILITY_ROUTER); + + // + // Allocate a mask bit to designate the pair of links connected to the neighbor router + // + sys_mutex_lock(router->lock); + if (dx_bitmask_first_set(router->neighbor_free_mask, &mask_bit)) { + dx_bitmask_clear_bit(router->neighbor_free_mask, mask_bit); + } else { + sys_mutex_unlock(router->lock); + dx_log(module, LOG_CRITICAL, "Exceeded maximum inter-router link count"); + return; + } // - // Create an incoming link and put it in the in-links collection. The address - // of the remote source of this link is '_local/qdxrouter'. + // Create an incoming link with router source capability // - receiver = dx_link(router->node, conn, DX_INCOMING, "inter-router-rx"); - pn_terminus_set_address(dx_link_remote_source(receiver), router_address); - pn_terminus_set_address(dx_link_target(receiver), router_address); + receiver = dx_link(router->node, conn, DX_INCOMING, DX_INTERNODE_LINK_NAME_1); + // TODO - We don't want to have to cast away the constness of the literal string here! + // See PROTON-429 + pn_data_put_symbol(pn_terminus_capabilities(dx_link_target(receiver)), pn_bytes(clen, (char*) DX_CAPABILITY_ROUTER)); rlink = new_dx_router_link_t(); - DEQ_ITEM_INIT(rlink); - rlink->link_direction = DX_INCOMING; + rlink->mask_bit = mask_bit; rlink->link_type = DX_LINK_ROUTER; + rlink->link_direction = DX_INCOMING; rlink->owning_addr = 0; rlink->link = receiver; rlink->connected_link = 0; @@ -709,53 +758,40 @@ static void router_outbound_open_handler(void *type_context, dx_connection_t *co DEQ_INIT(rlink->msg_fifo); dx_link_set_context(receiver, rlink); - - sys_mutex_lock(router->lock); - DEQ_INSERT_TAIL(router->in_links, rlink); - sys_mutex_unlock(router->lock); + DEQ_INSERT_TAIL(router->links, rlink); // - // Create an outgoing link with a local source of '_local/qdxrouter' and place - // it in the routing table. + // Create an outgoing link with router target capability // - sender = dx_link(router->node, conn, DX_OUTGOING, "inter-router-tx"); - pn_terminus_set_address(dx_link_remote_target(sender), router_address); - pn_terminus_set_address(dx_link_source(sender), router_address); + sender = dx_link(router->node, conn, DX_OUTGOING, DX_INTERNODE_LINK_NAME_2); + // TODO - We don't want to have to cast away the constness of the literal string here! + // See PROTON-429 + pn_data_put_symbol(pn_terminus_capabilities(dx_link_target(receiver)), pn_bytes(clen, (char *) DX_CAPABILITY_ROUTER)); rlink = new_dx_router_link_t(); - DEQ_ITEM_INIT(rlink); - rlink->link_direction = DX_OUTGOING; + rlink->mask_bit = mask_bit; rlink->link_type = DX_LINK_ROUTER; + rlink->link_direction = DX_OUTGOING; + rlink->owning_addr = router->router_addr; rlink->link = sender; rlink->connected_link = 0; rlink->peer_link = 0; DEQ_INIT(rlink->event_fifo); DEQ_INIT(rlink->msg_fifo); - dx_link_set_context(sender, rlink); - - dx_address_t *addr; - - sys_mutex_lock(router->lock); - hash_retrieve(router->out_hash, aiter, (void**) &addr); - if (!addr) { - addr = new_dx_address_t(); - addr->handler = 0; - addr->handler_context = 0; - DEQ_INIT(addr->rlinks); - DEQ_INIT(addr->rnodes); - hash_insert(router->out_hash, aiter, addr); - } + // + // Add the new outgoing link to the router_address's list of links. + // + dx_router_add_link_ref_LH(&router->router_addr->rlinks, rlink); - rlink->owning_addr = addr; - DEQ_INSERT_TAIL(addr->rlinks, rlink); + dx_link_set_context(sender, rlink); + DEQ_INSERT_TAIL(router->links, rlink); sys_mutex_unlock(router->lock); pn_link_open(dx_link_pn(receiver)); pn_link_open(dx_link_pn(sender)); pn_link_flow(dx_link_pn(receiver), 1000); - dx_field_iterator_free(aiter); } @@ -767,7 +803,6 @@ static void dx_router_timer_handler(void *context) // Periodic processing. // dx_pyrouter_tick(router); - dx_timer_schedule(router->timer, 1000); } @@ -797,20 +832,29 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id) router_node.type_context = router; + dx->router = router; router->dx = dx; router->router_area = area; router->router_id = id; router->node = dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH); - DEQ_INIT(router->in_links); + DEQ_INIT(router->addrs); + router->addr_hash = hash(10, 32, 0); + + DEQ_INIT(router->links); DEQ_INIT(router->routers); - DEQ_INIT(router->in_fifo); - router->lock = sys_mutex(); - router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router); - router->out_hash = hash(10, 32, 0); - router->dtag = 1; - router->pyRouter = 0; - router->pyTick = 0; + router->neighbor_free_mask = dx_bitmask(1); + router->lock = sys_mutex(); + router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router); + router->dtag = 1; + router->pyRouter = 0; + router->pyTick = 0; + + // + // Create an address for all of the routers in the topology. It will be registered + // locally later in the initialization sequence. + // + router->router_addr = dx_router_register_address(dx, "qdxrouter", 0, 0); // // Inform the field iterator module of this router's id and area. The field iterator @@ -824,7 +868,6 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id) dx_python_start(); dx_log(module, LOG_INFO, "Router started, area=%s id=%s", area, id); - return router; } @@ -869,14 +912,17 @@ dx_address_t *dx_router_register_address(dx_dispatch_t *dx, iter = dx_field_iterator_string(addr_string, ITER_VIEW_NO_HOST); sys_mutex_lock(router->lock); - hash_retrieve(router->out_hash, iter, (void**) &addr); + hash_retrieve(router->addr_hash, iter, (void**) &addr); if (!addr) { addr = new_dx_address_t(); + DEQ_ITEM_INIT(addr); addr->handler = 0; addr->handler_context = 0; DEQ_INIT(addr->rlinks); DEQ_INIT(addr->rnodes); - hash_insert(router->out_hash, iter, addr); + hash_insert(router->addr_hash, iter, addr); + DEQ_ITEM_INIT(addr); + DEQ_INSERT_TAIL(router->addrs, addr); } dx_field_iterator_free(iter); @@ -885,7 +931,8 @@ dx_address_t *dx_router_register_address(dx_dispatch_t *dx, sys_mutex_unlock(router->lock); - dx_log(module, LOG_TRACE, "In-Process Address Registered: %s", address); + if (handler) + dx_log(module, LOG_INFO, "In-Process Address Registered: %s", address); return addr; } @@ -905,34 +952,35 @@ void dx_router_send(dx_dispatch_t *dx, dx_field_iterator_reset_view(address, ITER_VIEW_ADDRESS_HASH); sys_mutex_lock(router->lock); - hash_retrieve(router->out_hash, address, (void*) &addr); + hash_retrieve(router->addr_hash, address, (void*) &addr); if (addr) { // // Forward to all of the local links receiving this address. // - dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks); - while (dest_link) { + dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks); + while (dest_link_ref) { dx_routed_event_t *re = new_dx_routed_event_t(); DEQ_ITEM_INIT(re); re->delivery = 0; re->message = dx_message_copy(msg); re->settle = 0; re->disposition = 0; - DEQ_INSERT_TAIL(dest_link->msg_fifo, re); + DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re); - dx_link_activate(dest_link->link); - dest_link = DEQ_NEXT(dest_link); + dx_link_activate(dest_link_ref->link->link); + dest_link_ref = DEQ_NEXT(dest_link_ref); } // // Forward to the next-hops for remote destinations. // - dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes); - while (dest_node) { - if (dest_node->next_hop) - dest_link = dest_node->next_hop->peer_link; + dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes); + dx_router_link_t *dest_link; + while (dest_node_ref) { + if (dest_node_ref->router->next_hop) + dest_link = dest_node_ref->router->next_hop->peer_link; else - dest_link = dest_node->peer_link; + dest_link = dest_node_ref->router->peer_link; if (dest_link) { dx_routed_event_t *re = new_dx_routed_event_t(); DEQ_ITEM_INIT(re); @@ -943,7 +991,7 @@ void dx_router_send(dx_dispatch_t *dx, DEQ_INSERT_TAIL(dest_link->msg_fifo, re); dx_link_activate(dest_link->link); } - dest_node = DEQ_NEXT(dest_node); + dest_node_ref = DEQ_NEXT(dest_node_ref); } } sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher? @@ -977,8 +1025,11 @@ static PyObject* dx_router_node_updated(PyObject *self, PyObject *args) const char *address; int is_reachable; int is_neighbor; + int link_maskbit; + int router_maskbit; - if (!PyArg_ParseTuple(args, "sii", &address, &is_reachable, &is_neighbor)) + if (!PyArg_ParseTuple(args, "siiii", &address, &is_reachable, &is_neighbor, + &link_maskbit, &router_maskbit)) return 0; // TODO @@ -1099,6 +1150,7 @@ static void dx_router_python_setup(dx_router_t *router) PyObject* pName; PyObject* pId; PyObject* pArea; + PyObject* pMaxRouters; PyObject* pModule; PyObject* pClass; PyObject* pArgs; @@ -1126,7 +1178,7 @@ static void dx_router_python_setup(dx_router_t *router) // // Constructor Arguments for RouterEngine // - pArgs = PyTuple_New(3); + pArgs = PyTuple_New(4); // arg 0: adapter instance PyTuple_SetItem(pArgs, 0, adapterInstance); @@ -1135,10 +1187,14 @@ static void dx_router_python_setup(dx_router_t *router) pId = PyString_FromString(router->router_id); PyTuple_SetItem(pArgs, 1, pId); - // arg 2: area id + // arg 2: area_id pArea = PyString_FromString(router->router_area); PyTuple_SetItem(pArgs, 2, pArea); + // arg 3: max_routers + pMaxRouters = PyInt_FromLong((long) dx_bitmask_width()); + PyTuple_SetItem(pArgs, 3, pMaxRouters); + // // Instantiate the router // diff --git a/qpid/extras/dispatch/src/router_private.h b/qpid/extras/dispatch/src/router_private.h new file mode 100644 index 0000000000..8e481375ab --- /dev/null +++ b/qpid/extras/dispatch/src/router_private.h @@ -0,0 +1,128 @@ +#ifndef __router_private_h__ +#define __router_private_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +typedef struct dx_router_link_t dx_router_link_t; +typedef struct dx_router_node_t dx_router_node_t; +typedef struct dx_router_ref_t dx_router_ref_t; +typedef struct dx_router_link_ref_t dx_router_link_ref_t; + + +typedef enum { + DX_LINK_ENDPOINT, // A link to a connected endpoint + DX_LINK_ROUTER, // A link to a peer router in the same area + DX_LINK_AREA // A link to a peer router in a different area (area boundary) +} dx_link_type_t; + + +typedef struct dx_routed_event_t { + DEQ_LINKS(struct dx_routed_event_t); + dx_delivery_t *delivery; + dx_message_t *message; + bool settle; + uint64_t disposition; +} dx_routed_event_t; + +ALLOC_DECLARE(dx_routed_event_t); +DEQ_DECLARE(dx_routed_event_t, dx_routed_event_list_t); + + +struct dx_router_link_t { + DEQ_LINKS(dx_router_link_t); + int mask_bit; // Unique mask bit if this is an inter-router link + dx_link_type_t link_type; + dx_direction_t link_direction; + dx_address_t *owning_addr; // [ref] Address record that owns this link + dx_link_t *link; // [own] Link pointer + dx_router_link_t *connected_link; // [ref] If this is a link-route, reference the connected link + dx_router_link_t *peer_link; // [ref] If this is a bidirectional link-route, reference the peer link + dx_router_link_ref_t *ref; // Pointer to a containing reference object + dx_routed_event_list_t event_fifo; // FIFO of outgoing delivery/link events (no messages) + dx_routed_event_list_t msg_fifo; // FIFO of outgoing message deliveries +}; + +ALLOC_DECLARE(dx_router_link_t); +DEQ_DECLARE(dx_router_link_t, dx_router_link_list_t); + +struct dx_router_node_t { + DEQ_LINKS(dx_router_node_t); + const char *id; + int mask_bit; + dx_router_node_t *next_hop; // Next hop node _if_ this is not a neighbor node + dx_router_link_t *peer_link; // Outgoing link _if_ this is a neighbor node + dx_bitmask_t *valid_origins; +}; + +ALLOC_DECLARE(dx_router_node_t); +DEQ_DECLARE(dx_router_node_t, dx_router_node_list_t); + +struct dx_router_ref_t { + DEQ_LINKS(dx_router_ref_t); + dx_router_node_t *router; +}; + +ALLOC_DECLARE(dx_router_ref_t); +DEQ_DECLARE(dx_router_ref_t, dx_router_ref_list_t); + + +struct dx_router_link_ref_t { + DEQ_LINKS(dx_router_link_ref_t); + dx_router_link_t *link; +}; + +ALLOC_DECLARE(dx_router_link_ref_t); +DEQ_DECLARE(dx_router_link_ref_t, dx_router_link_ref_list_t); + + +struct dx_address_t { + DEQ_LINKS(dx_address_t); + dx_router_message_cb handler; // In-Process Consumer + void *handler_context; // In-Process Consumer context + dx_router_link_ref_list_t rlinks; // Locally-Connected Consumers + dx_router_ref_list_t rnodes; // Remotely-Connected Consumers +}; + +ALLOC_DECLARE(dx_address_t); +DEQ_DECLARE(dx_address_t, dx_address_list_t); + + +struct dx_router_t { + dx_dispatch_t *dx; + const char *router_area; + const char *router_id; + dx_node_t *node; + + dx_address_list_t addrs; + hash_t *addr_hash; + dx_address_t *router_addr; + + dx_router_link_list_t links; + dx_router_node_list_t routers; + + dx_bitmask_t *neighbor_free_mask; + sys_mutex_t *lock; + dx_timer_t *timer; + uint64_t dtag; + + PyObject *pyRouter; + PyObject *pyTick; +}; + +#endif diff --git a/qpid/extras/dispatch/tests/router_engine_test.py b/qpid/extras/dispatch/tests/router_engine_test.py index 605e7568d3..7e3940fda4 100644 --- a/qpid/extras/dispatch/tests/router_engine_test.py +++ b/qpid/extras/dispatch/tests/router_engine_test.py @@ -18,393 +18,545 @@ # import unittest -from qpid.dispatch.router.router_engine import NeighborEngine, PathEngine, Configuration +from qpid.dispatch.router.router_engine import NeighborEngine, PathEngine, Configuration, NodeTracker from qpid.dispatch.router.data import LinkState, MessageHELLO class Adapter(object): - def __init__(self, domain): - self._domain = domain + def __init__(self, domain): + self._domain = domain - def log(self, level, text): - print "Adapter.log(%d): domain=%s, text=%s" % (level, self._domain, text) + def log(self, level, text): + print "Adapter.log(%d): domain=%s, text=%s" % (level, self._domain, text) - def send(self, dest, opcode, body): - print "Adapter.send: domain=%s, dest=%s, opcode=%s, body=%s" % (self._domain, dest, opcode, body) - def remote_bind(self, subject, peer): - print "Adapter.remote_bind: subject=%s, peer=%s" % (subject, peer) + def send(self, dest, opcode, body): + print "Adapter.send: domain=%s, dest=%s, opcode=%s, body=%s" % (self._domain, dest, opcode, body) - def remote_unbind(self, subject, peer): - print "Adapter.remote_unbind: subject=%s, peer=%s" % (subject, peer) + def remote_bind(self, subject, peer): + print "Adapter.remote_bind: subject=%s, peer=%s" % (subject, peer) + def remote_unbind(self, subject, peer): + print "Adapter.remote_unbind: subject=%s, peer=%s" % (subject, peer) + + def node_updated(self, address, reachable, neighbor, link_bit, router_bit): + print "Adapter.node_updated: address=%s, reachable=%r, neighbor=%r, link_bit=%d, router_bit=%d" % \ + (address, reachable, neighbor, link_bit, router_bit) -class DataTest(unittest.TestCase): - def test_link_state(self): - ls = LinkState(None, 'R1', 'area', 1, ['R2', 'R3']) - self.assertEqual(ls.id, 'R1') - self.assertEqual(ls.area, 'area') - self.assertEqual(ls.ls_seq, 1) - self.assertEqual(ls.peers, ['R2', 'R3']) - ls.bump_sequence() - self.assertEqual(ls.id, 'R1') - self.assertEqual(ls.area, 'area') - self.assertEqual(ls.ls_seq, 2) - self.assertEqual(ls.peers, ['R2', 'R3']) - - result = ls.add_peer('R4') - self.assertTrue(result) - self.assertEqual(ls.peers, ['R2', 'R3', 'R4']) - result = ls.add_peer('R2') - self.assertFalse(result) - self.assertEqual(ls.peers, ['R2', 'R3', 'R4']) - - result = ls.del_peer('R3') - self.assertTrue(result) - self.assertEqual(ls.peers, ['R2', 'R4']) - result = ls.del_peer('R5') - self.assertFalse(result) - self.assertEqual(ls.peers, ['R2', 'R4']) - - encoded = ls.to_dict() - new_ls = LinkState(encoded) - self.assertEqual(new_ls.id, 'R1') - self.assertEqual(new_ls.area, 'area') - self.assertEqual(new_ls.ls_seq, 2) - self.assertEqual(new_ls.peers, ['R2', 'R4']) - - - def test_hello_message(self): - msg1 = MessageHELLO(None, 'R1', 'area', ['R2', 'R3', 'R4']) - self.assertEqual(msg1.get_opcode(), "HELLO") - self.assertEqual(msg1.id, 'R1') - self.assertEqual(msg1.area, 'area') - self.assertEqual(msg1.seen_peers, ['R2', 'R3', 'R4']) - encoded = msg1.to_dict() - msg2 = MessageHELLO(encoded) - self.assertEqual(msg2.get_opcode(), "HELLO") - self.assertEqual(msg2.id, 'R1') - self.assertEqual(msg2.area, 'area') - self.assertEqual(msg2.seen_peers, ['R2', 'R3', 'R4']) - self.assertTrue(msg2.is_seen('R3')) - self.assertFalse(msg2.is_seen('R9')) +class DataTest(unittest.TestCase): + def test_link_state(self): + ls = LinkState(None, 'R1', 'area', 1, ['R2', 'R3']) + self.assertEqual(ls.id, 'R1') + self.assertEqual(ls.area, 'area') + self.assertEqual(ls.ls_seq, 1) + self.assertEqual(ls.peers, ['R2', 'R3']) + ls.bump_sequence() + self.assertEqual(ls.id, 'R1') + self.assertEqual(ls.area, 'area') + self.assertEqual(ls.ls_seq, 2) + self.assertEqual(ls.peers, ['R2', 'R3']) + + result = ls.add_peer('R4') + self.assertTrue(result) + self.assertEqual(ls.peers, ['R2', 'R3', 'R4']) + result = ls.add_peer('R2') + self.assertFalse(result) + self.assertEqual(ls.peers, ['R2', 'R3', 'R4']) + + result = ls.del_peer('R3') + self.assertTrue(result) + self.assertEqual(ls.peers, ['R2', 'R4']) + result = ls.del_peer('R5') + self.assertFalse(result) + self.assertEqual(ls.peers, ['R2', 'R4']) + + encoded = ls.to_dict() + new_ls = LinkState(encoded) + self.assertEqual(new_ls.id, 'R1') + self.assertEqual(new_ls.area, 'area') + self.assertEqual(new_ls.ls_seq, 2) + self.assertEqual(new_ls.peers, ['R2', 'R4']) + + + def test_hello_message(self): + msg1 = MessageHELLO(None, 'R1', 'area', ['R2', 'R3', 'R4']) + self.assertEqual(msg1.get_opcode(), "HELLO") + self.assertEqual(msg1.id, 'R1') + self.assertEqual(msg1.area, 'area') + self.assertEqual(msg1.seen_peers, ['R2', 'R3', 'R4']) + encoded = msg1.to_dict() + msg2 = MessageHELLO(encoded) + self.assertEqual(msg2.get_opcode(), "HELLO") + self.assertEqual(msg2.id, 'R1') + self.assertEqual(msg2.area, 'area') + self.assertEqual(msg2.seen_peers, ['R2', 'R3', 'R4']) + self.assertTrue(msg2.is_seen('R3')) + self.assertFalse(msg2.is_seen('R9')) + + +class NodeTrackerTest(unittest.TestCase): + def log(self, level, text): + pass + + def node_updated(self, address, reachable, neighbor, link_bit, router_bit): + self.address = address + self.reachable = reachable + self.neighbor = neighbor + self.link_bit = link_bit + self.router_bit = router_bit + + def reset(self): + self.address = None + self.reachable = None + self.neighbor = None + self.link_bit = None + self.router_bit = None + + def test_node_tracker_limits(self): + tracker = NodeTracker(self, 5) + + self.reset() + tracker.new_neighbor('A', 1) + self.assertEqual(self.address, 'RA') + self.assertTrue(self.reachable) + self.assertTrue(self.neighbor) + self.assertEqual(self.link_bit, 1) + self.assertEqual(self.router_bit, 0) + + self.reset() + tracker.new_neighbor('B', 5) + self.assertEqual(self.address, 'RB') + self.assertTrue(self.reachable) + self.assertTrue(self.neighbor) + self.assertEqual(self.link_bit, 5) + self.assertEqual(self.router_bit, 1) + + self.reset() + tracker.new_neighbor('C', 6) + self.assertEqual(self.address, 'RC') + self.assertTrue(self.reachable) + self.assertTrue(self.neighbor) + self.assertEqual(self.link_bit, 6) + self.assertEqual(self.router_bit, 2) + + self.reset() + tracker.new_neighbor('D', 7) + self.assertEqual(self.address, 'RD') + self.assertTrue(self.reachable) + self.assertTrue(self.neighbor) + self.assertEqual(self.link_bit, 7) + self.assertEqual(self.router_bit, 3) + + self.reset() + tracker.new_neighbor('E', 8) + self.assertEqual(self.address, 'RE') + self.assertTrue(self.reachable) + self.assertTrue(self.neighbor) + self.assertEqual(self.link_bit, 8) + self.assertEqual(self.router_bit, 4) + + self.reset() + try: + tracker.new_neighbor('F', 9) + AssertFalse("We shouldn't be here") + except: + pass + + self.reset() + tracker.lost_neighbor('C') + self.assertEqual(self.address, 'RC') + self.assertFalse(self.reachable) + + self.reset() + tracker.new_neighbor('F', 9) + self.assertEqual(self.address, 'RF') + self.assertTrue(self.reachable) + self.assertTrue(self.neighbor) + self.assertEqual(self.link_bit, 9) + self.assertEqual(self.router_bit, 2) + + + def test_node_tracker_remote_neighbor(self): + tracker = NodeTracker(self, 5) + + self.reset() + tracker.new_node('A') + self.assertEqual(self.address, 'RA') + self.assertTrue(self.reachable) + self.assertFalse(self.neighbor) + self.assertFalse(self.link_bit) + self.assertEqual(self.router_bit, 0) + + self.reset() + tracker.new_neighbor('A', 3) + self.assertEqual(self.address, 'RA') + self.assertTrue(self.reachable) + self.assertTrue(self.neighbor) + self.assertEqual(self.link_bit, 3) + self.assertEqual(self.router_bit, 0) + + self.reset() + tracker.lost_node('A') + self.assertEqual(self.address, 'RA') + self.assertTrue(self.reachable) + self.assertTrue(self.neighbor) + self.assertEqual(self.link_bit, 3) + self.assertEqual(self.router_bit, 0) + + self.reset() + tracker.lost_neighbor('A') + self.assertEqual(self.address, 'RA') + self.assertFalse(self.reachable) + + + def test_node_tracker_neighbor_remote(self): + tracker = NodeTracker(self, 5) + + self.reset() + tracker.new_neighbor('A', 3) + self.assertEqual(self.address, 'RA') + self.assertTrue(self.reachable) + self.assertTrue(self.neighbor) + self.assertEqual(self.link_bit, 3) + self.assertEqual(self.router_bit, 0) + + self.reset() + tracker.new_node('A') + self.assertEqual(self.address, 'RA') + self.assertTrue(self.reachable) + self.assertTrue(self.neighbor) + self.assertEqual(self.link_bit, 3) + self.assertEqual(self.router_bit, 0) + + self.reset() + tracker.lost_neighbor('A') + self.assertEqual(self.address, 'RA') + self.assertTrue(self.reachable) + self.assertFalse(self.neighbor) + self.assertFalse(self.link_bit) + self.assertEqual(self.router_bit, 0) + + self.reset() + tracker.lost_node('A') + self.assertEqual(self.address, 'RA') + self.assertFalse(self.reachable) class NeighborTest(unittest.TestCase): - def log(self, level, text): - pass - - def send(self, dest, msg): - self.sent.append((dest, msg)) - - def local_link_state_changed(self, link_state): - self.local_link_state = link_state - - def new_neighbor(self, rid): - self.neighbors[rid] = None - - def lost_neighbor(self, rid): - self.neighbors.pop(rid) - - def setUp(self): - self.sent = [] - self.local_link_state = None - self.id = "R1" - self.area = "area" - self.config = Configuration() - self.neighbors = {} - - def test_hello_sent(self): - self.sent = [] - self.local_link_state = None - self.engine = NeighborEngine(self) - self.engine.tick(0.5) - self.assertEqual(self.sent, []) - self.engine.tick(1.5) - self.assertEqual(len(self.sent), 1) - dest, msg = self.sent.pop(0) - self.assertEqual(dest, "_local/qdxrouter") - self.assertEqual(msg.get_opcode(), "HELLO") - self.assertEqual(msg.id, self.id) - self.assertEqual(msg.area, self.area) - self.assertEqual(msg.seen_peers, []) - self.assertEqual(self.local_link_state, None) - - def test_sees_peer(self): - self.sent = [] - self.local_link_state = None - self.engine = NeighborEngine(self) - self.engine.handle_hello(MessageHELLO(None, 'R2', 'area', []), 2.0) - self.engine.tick(5.0) - self.assertEqual(len(self.sent), 1) - dest, msg = self.sent.pop(0) - self.assertEqual(msg.seen_peers, ['R2']) - - def test_establish_peer(self): - self.sent = [] - self.local_link_state = None - self.engine = NeighborEngine(self) - self.engine.handle_hello(MessageHELLO(None, 'R2', 'area', ['R1']), 0.5) - self.engine.tick(1.0) - self.engine.tick(2.0) - self.engine.tick(3.0) - self.assertEqual(self.local_link_state.id, 'R1') - self.assertEqual(self.local_link_state.area, 'area') - self.assertEqual(self.local_link_state.ls_seq, 1) - self.assertEqual(self.local_link_state.peers, ['R2']) - - def test_establish_multiple_peers(self): - self.sent = [] - self.local_link_state = None - self.engine = NeighborEngine(self) - self.engine.handle_hello(MessageHELLO(None, 'R2', 'area', ['R1']), 0.5) - self.engine.tick(1.0) - self.engine.handle_hello(MessageHELLO(None, 'R3', 'area', ['R1', 'R2']), 1.5) - self.engine.tick(2.0) - self.engine.handle_hello(MessageHELLO(None, 'R4', 'area', ['R1']), 2.5) - self.engine.handle_hello(MessageHELLO(None, 'R5', 'area', ['R2']), 2.5) - self.engine.handle_hello(MessageHELLO(None, 'R6', 'area', ['R1']), 2.5) - self.engine.tick(3.0) - self.assertEqual(self.local_link_state.id, 'R1') - self.assertEqual(self.local_link_state.area, 'area') - self.assertEqual(self.local_link_state.ls_seq, 3) - self.local_link_state.peers.sort() - self.assertEqual(self.local_link_state.peers, ['R2', 'R3', 'R4', 'R6']) - - def test_timeout_peer(self): - self.sent = [] - self.local_link_state = None - self.engine = NeighborEngine(self) - self.engine.handle_hello(MessageHELLO(None, 'R2', 'area', ['R3', 'R1']), 2.0) - self.engine.tick(5.0) - self.engine.tick(17.1) - self.assertEqual(self.local_link_state.id, 'R1') - self.assertEqual(self.local_link_state.area, 'area') - self.assertEqual(self.local_link_state.ls_seq, 2) - self.assertEqual(self.local_link_state.peers, []) + def log(self, level, text): + pass + + def send(self, dest, msg): + self.sent.append((dest, msg)) + + def local_link_state_changed(self, link_state): + self.local_link_state = link_state + + def new_neighbor(self, rid, lbit): + self.neighbors[rid] = None + + def lost_neighbor(self, rid): + self.neighbors.pop(rid) + + def setUp(self): + self.sent = [] + self.local_link_state = None + self.id = "R1" + self.area = "area" + self.config = Configuration() + self.neighbors = {} + + def test_hello_sent(self): + self.sent = [] + self.local_link_state = None + self.engine = NeighborEngine(self) + self.engine.tick(0.5) + self.assertEqual(self.sent, []) + self.engine.tick(1.5) + self.assertEqual(len(self.sent), 1) + dest, msg = self.sent.pop(0) + self.assertEqual(dest, "_local/qdxrouter") + self.assertEqual(msg.get_opcode(), "HELLO") + self.assertEqual(msg.id, self.id) + self.assertEqual(msg.area, self.area) + self.assertEqual(msg.seen_peers, []) + self.assertEqual(self.local_link_state, None) + + def test_sees_peer(self): + self.sent = [] + self.local_link_state = None + self.engine = NeighborEngine(self) + self.engine.handle_hello(MessageHELLO(None, 'R2', 'area', []), 2.0, 0) + self.engine.tick(5.0) + self.assertEqual(len(self.sent), 1) + dest, msg = self.sent.pop(0) + self.assertEqual(msg.seen_peers, ['R2']) + + def test_establish_peer(self): + self.sent = [] + self.local_link_state = None + self.engine = NeighborEngine(self) + self.engine.handle_hello(MessageHELLO(None, 'R2', 'area', ['R1']), 0.5, 0) + self.engine.tick(1.0) + self.engine.tick(2.0) + self.engine.tick(3.0) + self.assertEqual(self.local_link_state.id, 'R1') + self.assertEqual(self.local_link_state.area, 'area') + self.assertEqual(self.local_link_state.ls_seq, 1) + self.assertEqual(self.local_link_state.peers, ['R2']) + + def test_establish_multiple_peers(self): + self.sent = [] + self.local_link_state = None + self.engine = NeighborEngine(self) + self.engine.handle_hello(MessageHELLO(None, 'R2', 'area', ['R1']), 0.5, 0) + self.engine.tick(1.0) + self.engine.handle_hello(MessageHELLO(None, 'R3', 'area', ['R1', 'R2']), 1.5, 0) + self.engine.tick(2.0) + self.engine.handle_hello(MessageHELLO(None, 'R4', 'area', ['R1']), 2.5, 0) + self.engine.handle_hello(MessageHELLO(None, 'R5', 'area', ['R2']), 2.5, 0) + self.engine.handle_hello(MessageHELLO(None, 'R6', 'area', ['R1']), 2.5, 0) + self.engine.tick(3.0) + self.assertEqual(self.local_link_state.id, 'R1') + self.assertEqual(self.local_link_state.area, 'area') + self.assertEqual(self.local_link_state.ls_seq, 3) + self.local_link_state.peers.sort() + self.assertEqual(self.local_link_state.peers, ['R2', 'R3', 'R4', 'R6']) + + def test_timeout_peer(self): + self.sent = [] + self.local_link_state = None + self.engine = NeighborEngine(self) + self.engine.handle_hello(MessageHELLO(None, 'R2', 'area', ['R3', 'R1']), 2.0, 0) + self.engine.tick(5.0) + self.engine.tick(17.1) + self.assertEqual(self.local_link_state.id, 'R1') + self.assertEqual(self.local_link_state.area, 'area') + self.assertEqual(self.local_link_state.ls_seq, 2) + self.assertEqual(self.local_link_state.peers, []) class PathTest(unittest.TestCase): - def setUp(self): - self.id = 'R1' - self.area = 'area' - self.next_hops = None - self.engine = PathEngine(self) - - def log(self, level, text): - pass - - def next_hops_changed(self, nh): - self.next_hops = nh - - def test_topology1(self): - """ - - +====+ +----+ +----+ - | R1 |------| R2 |------| R3 | - +====+ +----+ +----+ - - """ - collection = { 'R1': LinkState(None, 'R1', 'area', 1, ['R2']), - 'R2': LinkState(None, 'R2', 'area', 1, ['R1', 'R3']), - 'R3': LinkState(None, 'R3', 'area', 1, ['R2']) } - self.engine.ls_collection_changed(collection) - self.engine.tick(1.0) - self.assertEqual(len(self.next_hops), 2) - self.assertEqual(self.next_hops['R2'], 'R2') - self.assertEqual(self.next_hops['R3'], 'R2') - - def test_topology2(self): - """ - - +====+ +----+ +----+ - | R1 |------| R2 |------| R4 | - +====+ +----+ +----+ - | | - +----+ +----+ +----+ - | R3 |------| R5 |------| R6 | - +----+ +----+ +----+ - - """ - collection = { 'R1': LinkState(None, 'R1', 'area', 1, ['R2']), - 'R2': LinkState(None, 'R2', 'area', 1, ['R1', 'R3', 'R4']), - 'R3': LinkState(None, 'R3', 'area', 1, ['R2', 'R5']), - 'R4': LinkState(None, 'R4', 'area', 1, ['R2', 'R5']), - 'R5': LinkState(None, 'R5', 'area', 1, ['R3', 'R4', 'R6']), - 'R6': LinkState(None, 'R6', 'area', 1, ['R5']) } - self.engine.ls_collection_changed(collection) - self.engine.tick(1.0) - self.assertEqual(len(self.next_hops), 5) - self.assertEqual(self.next_hops['R2'], 'R2') - self.assertEqual(self.next_hops['R3'], 'R2') - self.assertEqual(self.next_hops['R4'], 'R2') - self.assertEqual(self.next_hops['R5'], 'R2') - self.assertEqual(self.next_hops['R6'], 'R2') - - def test_topology3(self): - """ - - +----+ +----+ +----+ - | R2 |------| R3 |------| R4 | - +----+ +----+ +----+ - | | - +====+ +----+ +----+ - | R1 |------| R5 |------| R6 | - +====+ +----+ +----+ - - """ - collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3']), - 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']), - 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']), - 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5']), - 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']), - 'R6': LinkState(None, 'R6', 'area', 1, ['R5']) } - self.engine.ls_collection_changed(collection) - self.engine.tick(1.0) - self.assertEqual(len(self.next_hops), 5) - self.assertEqual(self.next_hops['R2'], 'R3') - self.assertEqual(self.next_hops['R3'], 'R3') - self.assertEqual(self.next_hops['R4'], 'R3') - self.assertEqual(self.next_hops['R5'], 'R5') - self.assertEqual(self.next_hops['R6'], 'R5') - - def test_topology4(self): - """ - - +----+ +----+ +----+ - | R2 |------| R3 |------| R4 | - +----+ +----+ +----+ - | | - +====+ +----+ +----+ - | R1 |------| R5 |------| R6 |------ R7 (no ls from R7) - +====+ +----+ +----+ - - """ - collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3']), - 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']), - 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']), - 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5']), - 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']), - 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) } - self.engine.ls_collection_changed(collection) - self.engine.tick(1.0) - self.assertEqual(len(self.next_hops), 6) - self.assertEqual(self.next_hops['R2'], 'R3') - self.assertEqual(self.next_hops['R3'], 'R3') - self.assertEqual(self.next_hops['R4'], 'R3') - self.assertEqual(self.next_hops['R5'], 'R5') - self.assertEqual(self.next_hops['R6'], 'R5') - self.assertEqual(self.next_hops['R7'], 'R5') - - def test_topology5(self): - """ - - +----+ +----+ +----+ - | R2 |------| R3 |------| R4 | - +----+ +----+ +----+ - | | | - | +====+ +----+ +----+ - +--------| R1 |------| R5 |------| R6 |------ R7 (no ls from R7) - +====+ +----+ +----+ - - """ - collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3', 'R1']), - 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']), - 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']), - 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5', 'R2']), - 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']), - 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) } - self.engine.ls_collection_changed(collection) - self.engine.tick(1.0) - self.assertEqual(len(self.next_hops), 6) - self.assertEqual(self.next_hops['R2'], 'R2') - self.assertEqual(self.next_hops['R3'], 'R3') - self.assertEqual(self.next_hops['R4'], 'R3') - self.assertEqual(self.next_hops['R5'], 'R5') - self.assertEqual(self.next_hops['R6'], 'R5') - self.assertEqual(self.next_hops['R7'], 'R5') - - def test_topology5_with_asymmetry1(self): - """ - - +----+ +----+ +----+ - | R2 |------| R3 |------| R4 | - +----+ +----+ +----+ - ^ | | - ^ +====+ +----+ +----+ - +-<-<-<--| R1 |------| R5 |------| R6 |------ R7 (no ls from R7) - +====+ +----+ +----+ - - """ - collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3']), - 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']), - 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']), - 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5', 'R2']), - 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']), - 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) } - self.engine.ls_collection_changed(collection) - self.engine.tick(1.0) - self.assertEqual(len(self.next_hops), 6) - self.assertEqual(self.next_hops['R2'], 'R2') - self.assertEqual(self.next_hops['R3'], 'R3') - self.assertEqual(self.next_hops['R4'], 'R3') - self.assertEqual(self.next_hops['R5'], 'R5') - self.assertEqual(self.next_hops['R6'], 'R5') - self.assertEqual(self.next_hops['R7'], 'R5') - - def test_topology5_with_asymmetry2(self): - """ - - +----+ +----+ +----+ - | R2 |------| R3 |------| R4 | - +----+ +----+ +----+ - v | | - v +====+ +----+ +----+ - +->->->->| R1 |------| R5 |------| R6 |------ R7 (no ls from R7) - +====+ +----+ +----+ - - """ - collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3', 'R1']), - 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']), - 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']), - 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5']), - 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']), - 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) } - self.engine.ls_collection_changed(collection) - self.engine.tick(1.0) - self.assertEqual(len(self.next_hops), 6) - self.assertEqual(self.next_hops['R2'], 'R3') - self.assertEqual(self.next_hops['R3'], 'R3') - self.assertEqual(self.next_hops['R4'], 'R3') - self.assertEqual(self.next_hops['R5'], 'R5') - self.assertEqual(self.next_hops['R6'], 'R5') - self.assertEqual(self.next_hops['R7'], 'R5') - - def test_topology5_with_asymmetry3(self): - """ - - +----+ +----+ +----+ - | R2 |------| R3 |------| R4 | - +----+ +----+ +----+ - v | | - v +====+ +----+ +----+ - +->->->->| R1 |------| R5 |<-<-<-| R6 |------ R7 (no ls from R7) - +====+ +----+ +----+ - - """ - collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3', 'R1']), - 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']), - 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']), - 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5']), - 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4']), - 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) } - self.engine.ls_collection_changed(collection) - self.engine.tick(1.0) - self.assertEqual(len(self.next_hops), 4) - self.assertEqual(self.next_hops['R2'], 'R3') - self.assertEqual(self.next_hops['R3'], 'R3') - self.assertEqual(self.next_hops['R4'], 'R3') - self.assertEqual(self.next_hops['R5'], 'R5') + def setUp(self): + self.id = 'R1' + self.area = 'area' + self.next_hops = None + self.engine = PathEngine(self) + + def log(self, level, text): + pass + + def next_hops_changed(self, nh): + self.next_hops = nh + + def test_topology1(self): + """ + + +====+ +----+ +----+ + | R1 |------| R2 |------| R3 | + +====+ +----+ +----+ + + """ + collection = { 'R1': LinkState(None, 'R1', 'area', 1, ['R2']), + 'R2': LinkState(None, 'R2', 'area', 1, ['R1', 'R3']), + 'R3': LinkState(None, 'R3', 'area', 1, ['R2']) } + self.engine.ls_collection_changed(collection) + self.engine.tick(1.0) + self.assertEqual(len(self.next_hops), 2) + self.assertEqual(self.next_hops['R2'], 'R2') + self.assertEqual(self.next_hops['R3'], 'R2') + + def test_topology2(self): + """ + + +====+ +----+ +----+ + | R1 |------| R2 |------| R4 | + +====+ +----+ +----+ + | | + +----+ +----+ +----+ + | R3 |------| R5 |------| R6 | + +----+ +----+ +----+ + + """ + collection = { 'R1': LinkState(None, 'R1', 'area', 1, ['R2']), + 'R2': LinkState(None, 'R2', 'area', 1, ['R1', 'R3', 'R4']), + 'R3': LinkState(None, 'R3', 'area', 1, ['R2', 'R5']), + 'R4': LinkState(None, 'R4', 'area', 1, ['R2', 'R5']), + 'R5': LinkState(None, 'R5', 'area', 1, ['R3', 'R4', 'R6']), + 'R6': LinkState(None, 'R6', 'area', 1, ['R5']) } + self.engine.ls_collection_changed(collection) + self.engine.tick(1.0) + self.assertEqual(len(self.next_hops), 5) + self.assertEqual(self.next_hops['R2'], 'R2') + self.assertEqual(self.next_hops['R3'], 'R2') + self.assertEqual(self.next_hops['R4'], 'R2') + self.assertEqual(self.next_hops['R5'], 'R2') + self.assertEqual(self.next_hops['R6'], 'R2') + + def test_topology3(self): + """ + + +----+ +----+ +----+ + | R2 |------| R3 |------| R4 | + +----+ +----+ +----+ + | | + +====+ +----+ +----+ + | R1 |------| R5 |------| R6 | + +====+ +----+ +----+ + + """ + collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3']), + 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']), + 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']), + 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5']), + 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']), + 'R6': LinkState(None, 'R6', 'area', 1, ['R5']) } + self.engine.ls_collection_changed(collection) + self.engine.tick(1.0) + self.assertEqual(len(self.next_hops), 5) + self.assertEqual(self.next_hops['R2'], 'R3') + self.assertEqual(self.next_hops['R3'], 'R3') + self.assertEqual(self.next_hops['R4'], 'R3') + self.assertEqual(self.next_hops['R5'], 'R5') + self.assertEqual(self.next_hops['R6'], 'R5') + + def test_topology4(self): + """ + + +----+ +----+ +----+ + | R2 |------| R3 |------| R4 | + +----+ +----+ +----+ + | | + +====+ +----+ +----+ + | R1 |------| R5 |------| R6 |------ R7 (no ls from R7) + +====+ +----+ +----+ + + """ + collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3']), + 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']), + 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']), + 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5']), + 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']), + 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) } + self.engine.ls_collection_changed(collection) + self.engine.tick(1.0) + self.assertEqual(len(self.next_hops), 6) + self.assertEqual(self.next_hops['R2'], 'R3') + self.assertEqual(self.next_hops['R3'], 'R3') + self.assertEqual(self.next_hops['R4'], 'R3') + self.assertEqual(self.next_hops['R5'], 'R5') + self.assertEqual(self.next_hops['R6'], 'R5') + self.assertEqual(self.next_hops['R7'], 'R5') + + def test_topology5(self): + """ + + +----+ +----+ +----+ + | R2 |------| R3 |------| R4 | + +----+ +----+ +----+ + | | | + | +====+ +----+ +----+ + +--------| R1 |------| R5 |------| R6 |------ R7 (no ls from R7) + +====+ +----+ +----+ + + """ + collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3', 'R1']), + 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']), + 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']), + 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5', 'R2']), + 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']), + 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) } + self.engine.ls_collection_changed(collection) + self.engine.tick(1.0) + self.assertEqual(len(self.next_hops), 6) + self.assertEqual(self.next_hops['R2'], 'R2') + self.assertEqual(self.next_hops['R3'], 'R3') + self.assertEqual(self.next_hops['R4'], 'R3') + self.assertEqual(self.next_hops['R5'], 'R5') + self.assertEqual(self.next_hops['R6'], 'R5') + self.assertEqual(self.next_hops['R7'], 'R5') + + def test_topology5_with_asymmetry1(self): + """ + + +----+ +----+ +----+ + | R2 |------| R3 |------| R4 | + +----+ +----+ +----+ + ^ | | + ^ +====+ +----+ +----+ + +-<-<-<--| R1 |------| R5 |------| R6 |------ R7 (no ls from R7) + +====+ +----+ +----+ + + """ + collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3']), + 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']), + 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']), + 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5', 'R2']), + 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']), + 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) } + self.engine.ls_collection_changed(collection) + self.engine.tick(1.0) + self.assertEqual(len(self.next_hops), 6) + self.assertEqual(self.next_hops['R2'], 'R2') + self.assertEqual(self.next_hops['R3'], 'R3') + self.assertEqual(self.next_hops['R4'], 'R3') + self.assertEqual(self.next_hops['R5'], 'R5') + self.assertEqual(self.next_hops['R6'], 'R5') + self.assertEqual(self.next_hops['R7'], 'R5') + + def test_topology5_with_asymmetry2(self): + """ + + +----+ +----+ +----+ + | R2 |------| R3 |------| R4 | + +----+ +----+ +----+ + v | | + v +====+ +----+ +----+ + +->->->->| R1 |------| R5 |------| R6 |------ R7 (no ls from R7) + +====+ +----+ +----+ + + """ + collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3', 'R1']), + 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']), + 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']), + 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5']), + 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']), + 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) } + self.engine.ls_collection_changed(collection) + self.engine.tick(1.0) + self.assertEqual(len(self.next_hops), 6) + self.assertEqual(self.next_hops['R2'], 'R3') + self.assertEqual(self.next_hops['R3'], 'R3') + self.assertEqual(self.next_hops['R4'], 'R3') + self.assertEqual(self.next_hops['R5'], 'R5') + self.assertEqual(self.next_hops['R6'], 'R5') + self.assertEqual(self.next_hops['R7'], 'R5') + + def test_topology5_with_asymmetry3(self): + """ + + +----+ +----+ +----+ + | R2 |------| R3 |------| R4 | + +----+ +----+ +----+ + v | | + v +====+ +----+ +----+ + +->->->->| R1 |------| R5 |<-<-<-| R6 |------ R7 (no ls from R7) + +====+ +----+ +----+ + + """ + collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3', 'R1']), + 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']), + 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']), + 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5']), + 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4']), + 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) } + self.engine.ls_collection_changed(collection) + self.engine.tick(1.0) + self.assertEqual(len(self.next_hops), 4) + self.assertEqual(self.next_hops['R2'], 'R3') + self.assertEqual(self.next_hops['R3'], 'R3') + self.assertEqual(self.next_hops['R4'], 'R3') + self.assertEqual(self.next_hops['R5'], 'R5') if __name__ == '__main__': - unittest.main() + unittest.main() |
