summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/extras/dispatch/CMakeLists.txt16
-rw-r--r--qpid/extras/dispatch/include/qpid/dispatch.h1
-rw-r--r--qpid/extras/dispatch/include/qpid/dispatch/amqp.h18
-rw-r--r--qpid/extras/dispatch/include/qpid/dispatch/bitmask.h37
-rw-r--r--qpid/extras/dispatch/include/qpid/dispatch/router.h2
-rw-r--r--qpid/extras/dispatch/python/qpid/dispatch/router/adapter.py136
-rw-r--r--qpid/extras/dispatch/python/qpid/dispatch/router/binding.py209
-rw-r--r--qpid/extras/dispatch/python/qpid/dispatch/router/configuration.py46
-rw-r--r--qpid/extras/dispatch/python/qpid/dispatch/router/data.py444
-rw-r--r--qpid/extras/dispatch/python/qpid/dispatch/router/link.py232
-rw-r--r--qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py306
-rw-r--r--qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py98
-rw-r--r--qpid/extras/dispatch/python/qpid/dispatch/router/node.py183
-rw-r--r--qpid/extras/dispatch/python/qpid/dispatch/router/path.py326
-rw-r--r--qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py474
-rw-r--r--qpid/extras/dispatch/python/qpid/dispatch/router/routing.py50
-rw-r--r--qpid/extras/dispatch/router/src/main.c2
-rw-r--r--qpid/extras/dispatch/src/agent.c2
-rw-r--r--qpid/extras/dispatch/src/amqp.c30
-rw-r--r--qpid/extras/dispatch/src/bitmask.c124
-rw-r--r--qpid/extras/dispatch/src/message_private.h2
-rw-r--r--qpid/extras/dispatch/src/python_embedded.c9
-rw-r--r--qpid/extras/dispatch/src/router_node.c472
-rw-r--r--qpid/extras/dispatch/src/router_private.h128
-rw-r--r--qpid/extras/dispatch/tests/router_engine_test.py896
25 files changed, 2421 insertions, 1822 deletions
diff --git a/qpid/extras/dispatch/CMakeLists.txt b/qpid/extras/dispatch/CMakeLists.txt
index ae0e126be7..365e5d6bf7 100644
--- a/qpid/extras/dispatch/CMakeLists.txt
+++ b/qpid/extras/dispatch/CMakeLists.txt
@@ -61,13 +61,6 @@ if (PYTHONLIBS_FOUND)
OUTPUT_STRIP_TRAILING_WHITESPACE)
endif (PYTHONLIBS_FOUND)
-include_directories(
- ${CMAKE_CURRENT_SOURCE_DIR}/include
- ${CMAKE_CURRENT_SOURCE_DIR}/src
- ${proton_include}
- ${PYTHON_INCLUDE_PATH}
- )
-
##
## Find dependencies
##
@@ -76,6 +69,13 @@ find_library(pthread_lib pthread)
find_library(rt_lib rt)
find_path(proton_include proton/driver.h)
+include_directories(
+ ${CMAKE_CURRENT_SOURCE_DIR}/include
+ ${CMAKE_CURRENT_SOURCE_DIR}/src
+ ${proton_include}
+ ${PYTHON_INCLUDE_PATH}
+ )
+
set(CMAKE_C_FLAGS "-pthread -Wall -Werror -std=gnu99")
set(CATCH_UNDEFINED "-Wl,--no-undefined")
@@ -85,6 +85,8 @@ set(CATCH_UNDEFINED "-Wl,--no-undefined")
set(server_SOURCES
src/agent.c
src/alloc.c
+ src/amqp.c
+ src/bitmask.c
src/buffer.c
src/compose.c
src/config.c
diff --git a/qpid/extras/dispatch/include/qpid/dispatch.h b/qpid/extras/dispatch/include/qpid/dispatch.h
index 72b3456099..6b3d6a963f 100644
--- a/qpid/extras/dispatch/include/qpid/dispatch.h
+++ b/qpid/extras/dispatch/include/qpid/dispatch.h
@@ -20,6 +20,7 @@
*/
#include <qpid/dispatch/alloc.h>
+#include <qpid/dispatch/bitmask.h>
#include <qpid/dispatch/buffer.h>
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/hash.h>
diff --git a/qpid/extras/dispatch/include/qpid/dispatch/amqp.h b/qpid/extras/dispatch/include/qpid/dispatch/amqp.h
index 9babeb930d..c27eec6589 100644
--- a/qpid/extras/dispatch/include/qpid/dispatch/amqp.h
+++ b/qpid/extras/dispatch/include/qpid/dispatch/amqp.h
@@ -76,5 +76,23 @@
#define DX_AMQP_ARRAY8 0xe0
#define DX_AMQP_ARRAY32 0xf0
+/**
+ * Delivery Annotation Headers
+ */
+const char * const DX_DA_INGRESS; // Ingress Router
+const char * const DX_DA_TRACE; // Trace
+const char * const DX_DA_TO; // To-Override
+
+/**
+ * Link Terminus Capabilities
+ */
+const char * const DX_CAPABILITY_ROUTER;
+
+/**
+ * Miscellaneous Strings
+ */
+const char * const DX_INTERNODE_LINK_NAME_1;
+const char * const DX_INTERNODE_LINK_NAME_2;
+
#endif
diff --git a/qpid/extras/dispatch/include/qpid/dispatch/bitmask.h b/qpid/extras/dispatch/include/qpid/dispatch/bitmask.h
new file mode 100644
index 0000000000..fe436bc19b
--- /dev/null
+++ b/qpid/extras/dispatch/include/qpid/dispatch/bitmask.h
@@ -0,0 +1,37 @@
+#ifndef __dispatch_bitmask_h__
+#define __dispatch_bitmask_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+typedef struct dx_bitmask_t dx_bitmask_t;
+
+int dx_bitmask_width();
+dx_bitmask_t *dx_bitmask(int initial);
+void dx_bitmask_free(dx_bitmask_t *b);
+void dx_bitmask_set_all(dx_bitmask_t *b);
+void dx_bitmask_clear_all(dx_bitmask_t *b);
+void dx_bitmask_set_bit(dx_bitmask_t *b, int bitnum);
+void dx_bitmask_clear_bit(dx_bitmask_t *b, int bitnum);
+int dx_bitmask_value(dx_bitmask_t *b, int bitnum);
+int dx_bitmask_first_set(dx_bitmask_t *b, int *bitnum);
+
+
+
+#endif
+
diff --git a/qpid/extras/dispatch/include/qpid/dispatch/router.h b/qpid/extras/dispatch/include/qpid/dispatch/router.h
index c900d93615..1dadc8d119 100644
--- a/qpid/extras/dispatch/include/qpid/dispatch/router.h
+++ b/qpid/extras/dispatch/include/qpid/dispatch/router.h
@@ -27,7 +27,7 @@
typedef struct dx_address_t dx_address_t;
-typedef void (*dx_router_message_cb)(void *context, dx_message_t *msg);
+typedef void (*dx_router_message_cb)(void *context, dx_message_t *msg, int link_id);
const char *dx_router_id(const dx_dispatch_t *dx);
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/adapter.py b/qpid/extras/dispatch/python/qpid/dispatch/router/adapter.py
index 7f7f6c9e8e..d21f834751 100644
--- a/qpid/extras/dispatch/python/qpid/dispatch/router/adapter.py
+++ b/qpid/extras/dispatch/python/qpid/dispatch/router/adapter.py
@@ -18,82 +18,82 @@
#
try:
- from dispatch import *
+ from dispatch import *
except ImportError:
- from ..stubs import *
+ from ..stubs import *
ENTRY_OLD = 1
ENTRY_CURRENT = 2
ENTRY_NEW = 3
class AdapterEngine(object):
- """
- This module is responsible for managing the Adapter's key bindings (list of address-subject:next-hop).
- Key binding lists are kept in disjoint key-classes that can come from different parts of the router
- (i.e. topological keys for inter-router communication and mobile keys for end users).
-
- For each key-class, a mirror copy of what the adapter has is kept internally. This allows changes to the
- routing tables to be efficiently communicated to the adapter in the form of table deltas.
- """
- def __init__(self, container):
- self.container = container
- self.id = self.container.id
- self.area = self.container.area
- self.key_classes = {} # map [key_class] => (addr-key, next-hop)
-
-
- def tick(self, now):
"""
- There is no periodic processing needed for this module.
+ This module is responsible for managing the Adapter's key bindings (list of address-subject:next-hop).
+ Key binding lists are kept in disjoint key-classes that can come from different parts of the router
+ (i.e. topological keys for inter-router communication and mobile keys for end users).
+
+ For each key-class, a mirror copy of what the adapter has is kept internally. This allows changes to the
+ routing tables to be efficiently communicated to the adapter in the form of table deltas.
"""
- pass
-
-
- def remote_routes_changed(self, key_class, new_table):
- old_table = []
- if key_class in self.key_classes:
- old_table = self.key_classes[key_class]
-
- # flag all of the old entries
- old_flags = {}
- for a,b in old_table:
- old_flags[(a,b)] = ENTRY_OLD
-
- # flag the new entries
- new_flags = {}
- for a,b in new_table:
- new_flags[(a,b)] = ENTRY_NEW
-
- # calculate the differences from old to new
- for a,b in new_table:
- if old_table.count((a,b)) > 0:
- old_flags[(a,b)] = ENTRY_CURRENT
- new_flags[(a,b)] = ENTRY_CURRENT
-
- # make to_add and to_delete lists
- to_add = []
- to_delete = []
- for (a,b),f in old_flags.items():
- if f == ENTRY_OLD:
- to_delete.append((a,b))
- for (a,b),f in new_flags.items():
- if f == ENTRY_NEW:
- to_add.append((a,b))
-
- # set the routing table to the new contents
- self.key_classes[key_class] = new_table
-
- # update the adapter's routing tables
- # Note: Do deletions before adds to avoid overlapping routes that may cause
- # messages to be duplicated. It's better to have gaps in the routing
- # tables momentarily because unroutable messages are stored for retry.
- for a,b in to_delete:
- self.container.router_adapter.remote_unbind(a, b)
- for a,b in to_add:
- self.container.router_adapter.remote_bind(a, b)
-
- self.container.log(LOG_INFO, "New Routing Table (class=%s):" % key_class)
- for a,b in new_table:
- self.container.log(LOG_INFO, " %s => %s" % (a, b))
+ def __init__(self, container):
+ self.container = container
+ self.id = self.container.id
+ self.area = self.container.area
+ self.key_classes = {} # map [key_class] => (addr-key, next-hop)
+
+
+ def tick(self, now):
+ """
+ There is no periodic processing needed for this module.
+ """
+ pass
+
+
+ def remote_routes_changed(self, key_class, new_table):
+ old_table = []
+ if key_class in self.key_classes:
+ old_table = self.key_classes[key_class]
+
+ # flag all of the old entries
+ old_flags = {}
+ for a,b in old_table:
+ old_flags[(a,b)] = ENTRY_OLD
+
+ # flag the new entries
+ new_flags = {}
+ for a,b in new_table:
+ new_flags[(a,b)] = ENTRY_NEW
+
+ # calculate the differences from old to new
+ for a,b in new_table:
+ if old_table.count((a,b)) > 0:
+ old_flags[(a,b)] = ENTRY_CURRENT
+ new_flags[(a,b)] = ENTRY_CURRENT
+
+ # make to_add and to_delete lists
+ to_add = []
+ to_delete = []
+ for (a,b),f in old_flags.items():
+ if f == ENTRY_OLD:
+ to_delete.append((a,b))
+ for (a,b),f in new_flags.items():
+ if f == ENTRY_NEW:
+ to_add.append((a,b))
+
+ # set the routing table to the new contents
+ self.key_classes[key_class] = new_table
+
+ # update the adapter's routing tables
+ # Note: Do deletions before adds to avoid overlapping routes that may cause
+ # messages to be duplicated. It's better to have gaps in the routing
+ # tables momentarily because unroutable messages are stored for retry.
+ for a,b in to_delete:
+ self.container.router_adapter.remote_unbind(a, b)
+ for a,b in to_add:
+ self.container.router_adapter.remote_bind(a, b)
+
+ self.container.log(LOG_INFO, "New Routing Table (class=%s):" % key_class)
+ for a,b in new_table:
+ self.container.log(LOG_INFO, " %s => %s" % (a, b))
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/binding.py b/qpid/extras/dispatch/python/qpid/dispatch/router/binding.py
index a37b585e3e..db62f6e8a5 100644
--- a/qpid/extras/dispatch/python/qpid/dispatch/router/binding.py
+++ b/qpid/extras/dispatch/python/qpid/dispatch/router/binding.py
@@ -18,116 +18,117 @@
#
try:
- from dispatch import *
+ from dispatch import *
except ImportError:
- from ..stubs import *
+ from ..stubs import *
class BindingEngine(object):
- """
- This module is responsible for responding to two different events:
+ """
+ This module is responsible for responding to two different events:
1) The learning of new remote mobile addresses
2) The change of topology (i.e. different next-hops for remote routers)
- When these occur, this module converts the mobile routing table (address => router)
- to a next-hop routing table (address => next-hop), compresses the keys in case there
- are wild-card overlaps, and notifies outbound of changes in the "mobile-key" address class.
- """
- def __init__(self, container):
- self.container = container
- self.id = self.container.id
- self.area = self.container.area
- self.current_keys = {}
-
-
- def tick(self, now):
- pass
-
-
- def mobile_keys_changed(self, keys):
- self.current_keys = keys
- next_hop_keys = self._convert_ids_to_next_hops(keys)
- routing_table = self._compress_keys(next_hop_keys)
- self.container.remote_routes_changed('mobile-key', routing_table)
-
-
- def next_hops_changed(self):
- next_hop_keys = self._convert_ids_to_next_hops(self.current_keys)
- routing_table = self._compress_keys(next_hop_keys)
- self.container.remote_routes_changed('mobile-key', routing_table)
-
-
- def _convert_ids_to_next_hops(self, keys):
- next_hops = self.container.get_next_hops()
- new_keys = {}
- for _id, value in keys.items():
- if _id in next_hops:
- next_hop = next_hops[_id]
- if next_hop not in new_keys:
- new_keys[next_hop] = []
- new_keys[next_hop].extend(value)
- return new_keys
-
-
- def _compress_keys(self, keys):
- trees = {}
- for _id, key_list in keys.items():
- trees[_id] = TopicElementList()
- for key in key_list:
- trees[_id].add_key(key)
- routing_table = []
- for _id, tree in trees.items():
- tree_keys = tree.get_list()
- for tk in tree_keys:
- routing_table.append((tk, _id))
- return routing_table
+ When these occur, this module converts the mobile routing table (address => router)
+ to a next-hop routing table (address => next-hop), compresses the keys in case there
+ are wild-card overlaps, and notifies outbound of changes in the "mobile-key" address class.
+ """
+ def __init__(self, container):
+ self.container = container
+ self.id = self.container.id
+ self.area = self.container.area
+ self.current_keys = {}
+
+
+ def tick(self, now):
+ pass
+
+
+ def mobile_keys_changed(self, keys):
+ self.current_keys = keys
+ next_hop_keys = self._convert_ids_to_next_hops(keys)
+ routing_table = self._compress_keys(next_hop_keys)
+ self.container.remote_routes_changed('mobile-key', routing_table)
+
+
+ def next_hops_changed(self):
+ next_hop_keys = self._convert_ids_to_next_hops(self.current_keys)
+ routing_table = self._compress_keys(next_hop_keys)
+ self.container.remote_routes_changed('mobile-key', routing_table)
+
+
+ def _convert_ids_to_next_hops(self, keys):
+ next_hops = self.container.get_next_hops()
+ new_keys = {}
+ for _id, value in keys.items():
+ if _id in next_hops:
+ next_hop = next_hops[_id]
+ if next_hop not in new_keys:
+ new_keys[next_hop] = []
+ new_keys[next_hop].extend(value)
+ return new_keys
+
+
+ def _compress_keys(self, keys):
+ trees = {}
+ for _id, key_list in keys.items():
+ trees[_id] = TopicElementList()
+ for key in key_list:
+ trees[_id].add_key(key)
+ routing_table = []
+ for _id, tree in trees.items():
+ tree_keys = tree.get_list()
+ for tk in tree_keys:
+ routing_table.append((tk, _id))
+ return routing_table
class TopicElementList(object):
- """
- """
- def __init__(self):
- self.elements = {} # map text => (terminal, sub-list)
-
- def __repr__(self):
- return "%r" % self.elements
-
- def add_key(self, key):
- self.add_tokens(key.split('.'))
-
- def add_tokens(self, tokens):
- first = tokens.pop(0)
- terminal = len(tokens) == 0
-
- if terminal and first == '#':
- ## Optimization #1A (A.B.C.D followed by A.B.#)
- self.elements = {'#':(True, TopicElementList())}
- return
-
- if '#' in self.elements:
- _t,_el = self.elements['#']
- if _t:
- ## Optimization #1B (A.B.# followed by A.B.C.D)
- return
-
- if first not in self.elements:
- self.elements[first] = (terminal, TopicElementList())
- else:
- _t,_el = self.elements[first]
- if terminal and not _t:
- self.elements[first] = (terminal, _el)
-
- if not terminal:
- _t,_el = self.elements[first]
- _el.add_tokens(tokens)
-
- def get_list(self):
- keys = []
- for token, (_t,_el) in self.elements.items():
- if _t: keys.append(token)
- _el.build_list(token, keys)
- return keys
-
- def build_list(self, prefix, keys):
- for token, (_t,_el) in self.elements.items():
- if _t: keys.append("%s.%s" % (prefix, token))
- _el.build_list("%s.%s" % (prefix, token), keys)
+ """
+ """
+ def __init__(self):
+ self.elements = {} # map text => (terminal, sub-list)
+
+ def __repr__(self):
+ return "%r" % self.elements
+
+ def add_key(self, key):
+ self.add_tokens(key.split('.'))
+
+ def add_tokens(self, tokens):
+ first = tokens.pop(0)
+ terminal = len(tokens) == 0
+
+ if terminal and first == '#':
+ ## Optimization #1A (A.B.C.D followed by A.B.#)
+ self.elements = {'#':(True, TopicElementList())}
+ return
+
+ if '#' in self.elements:
+ _t,_el = self.elements['#']
+ if _t:
+ ## Optimization #1B (A.B.# followed by A.B.C.D)
+ return
+
+ if first not in self.elements:
+ self.elements[first] = (terminal, TopicElementList())
+ else:
+ _t,_el = self.elements[first]
+ if terminal and not _t:
+ self.elements[first] = (terminal, _el)
+
+ if not terminal:
+ _t,_el = self.elements[first]
+ _el.add_tokens(tokens)
+
+ def get_list(self):
+ keys = []
+ for token, (_t,_el) in self.elements.items():
+ if _t: keys.append(token)
+ _el.build_list(token, keys)
+ return keys
+
+ def build_list(self, prefix, keys):
+ for token, (_t,_el) in self.elements.items():
+ if _t: keys.append("%s.%s" % (prefix, token))
+ _el.build_list("%s.%s" % (prefix, token), keys)
+
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/configuration.py b/qpid/extras/dispatch/python/qpid/dispatch/router/configuration.py
index f87d2ee7d2..e0fd060b82 100644
--- a/qpid/extras/dispatch/python/qpid/dispatch/router/configuration.py
+++ b/qpid/extras/dispatch/python/qpid/dispatch/router/configuration.py
@@ -18,30 +18,30 @@
#
class Configuration(object):
- """
- This module manages and holds the configuration and tuning parameters for a router.
- """
- def __init__(self, overrides={}):
- ##
- ## Load default values
- ##
- self.values = { 'hello_interval' : 1.0,
- 'hello_max_age' : 3.0,
- 'ra_interval' : 30.0,
- 'remote_ls_max_age' : 60.0,
- 'mobile_addr_max_age' : 60.0 }
+ """
+ This module manages and holds the configuration and tuning parameters for a router.
+ """
+ def __init__(self, overrides={}):
+ ##
+ ## Load default values
+ ##
+ self.values = { 'hello_interval' : 1.0,
+ 'hello_max_age' : 3.0,
+ 'ra_interval' : 30.0,
+ 'remote_ls_max_age' : 60.0,
+ 'mobile_addr_max_age' : 60.0 }
- ##
- ## Apply supplied overrides
- ##
- for k, v in overrides.items():
- self.values[k] = v
+ ##
+ ## Apply supplied overrides
+ ##
+ for k, v in overrides.items():
+ self.values[k] = v
- def __getattr__(self, key):
- if key in self.values:
- return self.values[key]
- raise KeyError
+ def __getattr__(self, key):
+ if key in self.values:
+ return self.values[key]
+ raise KeyError
- def __repr__(self):
- return "%r" % self.values
+ def __repr__(self):
+ return "%r" % self.values
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/data.py b/qpid/extras/dispatch/python/qpid/dispatch/router/data.py
index 89e347a29e..812369768f 100644
--- a/qpid/extras/dispatch/python/qpid/dispatch/router/data.py
+++ b/qpid/extras/dispatch/python/qpid/dispatch/router/data.py
@@ -19,257 +19,257 @@
try:
- from dispatch import *
+ from dispatch import *
except ImportError:
- from ..stubs import *
+ from ..stubs import *
def getMandatory(data, key, cls=None):
- """
- Get the value mapped to the requested key. If it's not present, raise an exception.
- """
- if key in data:
- value = data[key]
- if cls and value.__class__ != cls:
- raise Exception("Protocol field has wrong data type: '%s' type=%r expected=%r" % (key, value.__class__, cls))
- return value
- raise Exception("Mandatory protocol field missing: '%s'" % key)
+ """
+ Get the value mapped to the requested key. If it's not present, raise an exception.
+ """
+ if key in data:
+ value = data[key]
+ if cls and value.__class__ != cls:
+ raise Exception("Protocol field has wrong data type: '%s' type=%r expected=%r" % (key, value.__class__, cls))
+ return value
+ raise Exception("Mandatory protocol field missing: '%s'" % key)
def getOptional(data, key, default=None, cls=None):
- """
- Get the value mapped to the requested key. If it's not present, return the default value.
- """
- if key in data:
- value = data[key]
- if cls and value.__class__ != cls:
- raise Exception("Protocol field has wrong data type: '%s' type=%r expected=%r" % (key, value.__class__, cls))
- return value
- return default
+ """
+ Get the value mapped to the requested key. If it's not present, return the default value.
+ """
+ if key in data:
+ value = data[key]
+ if cls and value.__class__ != cls:
+ raise Exception("Protocol field has wrong data type: '%s' type=%r expected=%r" % (key, value.__class__, cls))
+ return value
+ return default
class LinkState(object):
- """
- The link-state of a single router. The link state consists of a list of neighbor routers reachable from
- the reporting router. The link-state-sequence number is incremented each time the link state changes.
- """
- def __init__(self, body, _id=None, _area=None, _ls_seq=None, _peers=None):
- self.last_seen = 0
- if body:
- self.id = getMandatory(body, 'id', str)
- self.area = getMandatory(body, 'area', str)
- self.ls_seq = getMandatory(body, 'ls_seq', long)
- self.peers = getMandatory(body, 'peers', list)
- else:
- self.id = _id
- self.area = _area
- self.ls_seq = long(_ls_seq)
- self.peers = _peers
-
- def __repr__(self):
- return "LS(id=%s area=%s ls_seq=%d peers=%r)" % (self.id, self.area, self.ls_seq, self.peers)
-
- def to_dict(self):
- return {'id' : self.id,
- 'area' : self.area,
- 'ls_seq' : self.ls_seq,
- 'peers' : self.peers}
-
- def add_peer(self, _id):
- if self.peers.count(_id) == 0:
- self.peers.append(_id)
- return True
- return False
-
- def del_peer(self, _id):
- if self.peers.count(_id) > 0:
- self.peers.remove(_id)
- return True
- return False
-
- def bump_sequence(self):
- self.ls_seq += 1
+ """
+ The link-state of a single router. The link state consists of a list of neighbor routers reachable from
+ the reporting router. The link-state-sequence number is incremented each time the link state changes.
+ """
+ def __init__(self, body, _id=None, _area=None, _ls_seq=None, _peers=None):
+ self.last_seen = 0
+ if body:
+ self.id = getMandatory(body, 'id', str)
+ self.area = getMandatory(body, 'area', str)
+ self.ls_seq = getMandatory(body, 'ls_seq', long)
+ self.peers = getMandatory(body, 'peers', list)
+ else:
+ self.id = _id
+ self.area = _area
+ self.ls_seq = long(_ls_seq)
+ self.peers = _peers
+
+ def __repr__(self):
+ return "LS(id=%s area=%s ls_seq=%d peers=%r)" % (self.id, self.area, self.ls_seq, self.peers)
+
+ def to_dict(self):
+ return {'id' : self.id,
+ 'area' : self.area,
+ 'ls_seq' : self.ls_seq,
+ 'peers' : self.peers}
+
+ def add_peer(self, _id):
+ if self.peers.count(_id) == 0:
+ self.peers.append(_id)
+ return True
+ return False
+
+ def del_peer(self, _id):
+ if self.peers.count(_id) > 0:
+ self.peers.remove(_id)
+ return True
+ return False
+
+ def bump_sequence(self):
+ self.ls_seq += 1
class MessageHELLO(object):
- """
- HELLO Message
- scope: neighbors only - HELLO messages travel at most one hop
- This message is used by directly connected routers to determine with whom they have
- bidirectional connectivity.
- """
- def __init__(self, body, _id=None, _area=None, _seen_peers=None):
- if body:
- self.id = getMandatory(body, 'id', str)
- self.area = getMandatory(body, 'area', str)
- self.seen_peers = getMandatory(body, 'seen', list)
- else:
- self.id = _id
- self.area = _area
- self.seen_peers = _seen_peers
-
- def __repr__(self):
- return "HELLO(id=%s area=%s seen=%r)" % (self.id, self.area, self.seen_peers)
-
- def get_opcode(self):
- return 'HELLO'
-
- def to_dict(self):
- return {'id' : self.id,
- 'area' : self.area,
- 'seen' : self.seen_peers}
-
- def is_seen(self, _id):
- return self.seen_peers.count(_id) > 0
+ """
+ HELLO Message
+ scope: neighbors only - HELLO messages travel at most one hop
+ This message is used by directly connected routers to determine with whom they have
+ bidirectional connectivity.
+ """
+ def __init__(self, body, _id=None, _area=None, _seen_peers=None):
+ if body:
+ self.id = getMandatory(body, 'id', str)
+ self.area = getMandatory(body, 'area', str)
+ self.seen_peers = getMandatory(body, 'seen', list)
+ else:
+ self.id = _id
+ self.area = _area
+ self.seen_peers = _seen_peers
+
+ def __repr__(self):
+ return "HELLO(id=%s area=%s seen=%r)" % (self.id, self.area, self.seen_peers)
+
+ def get_opcode(self):
+ return 'HELLO'
+
+ def to_dict(self):
+ return {'id' : self.id,
+ 'area' : self.area,
+ 'seen' : self.seen_peers}
+
+ def is_seen(self, _id):
+ return self.seen_peers.count(_id) > 0
class MessageRA(object):
- """
- Router Advertisement (RA) Message
- scope: all routers in the area and all designated routers
- This message is sent periodically to indicate the originating router's sequence numbers
- for link-state and mobile-address-state.
- """
- def __init__(self, body, _id=None, _area=None, _ls_seq=None, _mobile_seq=None):
- if body:
- self.id = getMandatory(body, 'id', str)
- self.area = getMandatory(body, 'area', str)
- self.ls_seq = getMandatory(body, 'ls_seq', long)
- self.mobile_seq = getMandatory(body, 'mobile_seq', long)
- else:
- self.id = _id
- self.area = _area
- self.ls_seq = long(_ls_seq)
- self.mobile_seq = long(_mobile_seq)
-
- def get_opcode(self):
- return 'RA'
-
- def __repr__(self):
- return "RA(id=%s area=%s ls_seq=%d mobile_seq=%d)" % \
- (self.id, self.area, self.ls_seq, self.mobile_seq)
-
- def to_dict(self):
- return {'id' : self.id,
- 'area' : self.area,
- 'ls_seq' : self.ls_seq,
- 'mobile_seq' : self.mobile_seq}
+ """
+ Router Advertisement (RA) Message
+ scope: all routers in the area and all designated routers
+ This message is sent periodically to indicate the originating router's sequence numbers
+ for link-state and mobile-address-state.
+ """
+ def __init__(self, body, _id=None, _area=None, _ls_seq=None, _mobile_seq=None):
+ if body:
+ self.id = getMandatory(body, 'id', str)
+ self.area = getMandatory(body, 'area', str)
+ self.ls_seq = getMandatory(body, 'ls_seq', long)
+ self.mobile_seq = getMandatory(body, 'mobile_seq', long)
+ else:
+ self.id = _id
+ self.area = _area
+ self.ls_seq = long(_ls_seq)
+ self.mobile_seq = long(_mobile_seq)
+
+ def get_opcode(self):
+ return 'RA'
+
+ def __repr__(self):
+ return "RA(id=%s area=%s ls_seq=%d mobile_seq=%d)" % \
+ (self.id, self.area, self.ls_seq, self.mobile_seq)
+
+ def to_dict(self):
+ return {'id' : self.id,
+ 'area' : self.area,
+ 'ls_seq' : self.ls_seq,
+ 'mobile_seq' : self.mobile_seq}
class MessageLSU(object):
- """
- """
- def __init__(self, body, _id=None, _area=None, _ls_seq=None, _ls=None):
- if body:
- self.id = getMandatory(body, 'id', str)
- self.area = getMandatory(body, 'area', str)
- self.ls_seq = getMandatory(body, 'ls_seq', long)
- self.ls = LinkState(getMandatory(body, 'ls', dict))
- else:
- self.id = _id
- self.area = _area
- self.ls_seq = long(_ls_seq)
- self.ls = _ls
-
- def get_opcode(self):
- return 'LSU'
-
- def __repr__(self):
- return "LSU(id=%s area=%s ls_seq=%d ls=%r)" % \
- (self.id, self.area, self.ls_seq, self.ls)
-
- def to_dict(self):
- return {'id' : self.id,
- 'area' : self.area,
- 'ls_seq' : self.ls_seq,
- 'ls' : self.ls.to_dict()}
+ """
+ """
+ def __init__(self, body, _id=None, _area=None, _ls_seq=None, _ls=None):
+ if body:
+ self.id = getMandatory(body, 'id', str)
+ self.area = getMandatory(body, 'area', str)
+ self.ls_seq = getMandatory(body, 'ls_seq', long)
+ self.ls = LinkState(getMandatory(body, 'ls', dict))
+ else:
+ self.id = _id
+ self.area = _area
+ self.ls_seq = long(_ls_seq)
+ self.ls = _ls
+
+ def get_opcode(self):
+ return 'LSU'
+
+ def __repr__(self):
+ return "LSU(id=%s area=%s ls_seq=%d ls=%r)" % \
+ (self.id, self.area, self.ls_seq, self.ls)
+
+ def to_dict(self):
+ return {'id' : self.id,
+ 'area' : self.area,
+ 'ls_seq' : self.ls_seq,
+ 'ls' : self.ls.to_dict()}
class MessageLSR(object):
- """
- """
- def __init__(self, body, _id=None, _area=None):
- if body:
- self.id = getMandatory(body, 'id', str)
- self.area = getMandatory(body, 'area', str)
- else:
- self.id = _id
- self.area = _area
+ """
+ """
+ def __init__(self, body, _id=None, _area=None):
+ if body:
+ self.id = getMandatory(body, 'id', str)
+ self.area = getMandatory(body, 'area', str)
+ else:
+ self.id = _id
+ self.area = _area
- def get_opcode(self):
- return 'LSR'
+ def get_opcode(self):
+ return 'LSR'
- def __repr__(self):
- return "LSR(id=%s area=%s)" % (self.id, self.area)
+ def __repr__(self):
+ return "LSR(id=%s area=%s)" % (self.id, self.area)
- def to_dict(self):
- return {'id' : self.id,
- 'area' : self.area}
+ def to_dict(self):
+ return {'id' : self.id,
+ 'area' : self.area}
class MessageMAU(object):
- """
- """
- def __init__(self, body, _id=None, _area=None, _seq=None, _add_list=None, _del_list=None, _exist_list=None):
- if body:
- self.id = getMandatory(body, 'id', str)
- self.area = getMandatory(body, 'area', str)
- self.mobile_seq = getMandatory(body, 'mobile_seq', long)
- self.add_list = getOptional(body, 'add', None, list)
- self.del_list = getOptional(body, 'del', None, list)
- self.exist_list = getOptional(body, 'exist', None, list)
- else:
- self.id = _id
- self.area = _area
- self.mobile_seq = long(_seq)
- self.add_list = _add_list
- self.del_list = _del_list
- self.exist_list = _exist_list
-
- def get_opcode(self):
- return 'MAU'
-
- def __repr__(self):
- _add = ''
- _del = ''
- _exist = ''
- if self.add_list: _add = ' add=%r' % self.add_list
- if self.del_list: _del = ' del=%r' % self.del_list
- if self.exist_list: _exist = ' exist=%r' % self.exist_list
- return "MAU(id=%s area=%s mobile_seq=%d%s%s%s)" % \
- (self.id, self.area, self.mobile_seq, _add, _del, _exist)
-
- def to_dict(self):
- body = { 'id' : self.id,
- 'area' : self.area,
- 'mobile_seq' : self.mobile_seq }
- if self.add_list: body['add'] = self.add_list
- if self.del_list: body['del'] = self.del_list
- if self.exist_list: body['exist'] = self.exist_list
- return body
+ """
+ """
+ def __init__(self, body, _id=None, _area=None, _seq=None, _add_list=None, _del_list=None, _exist_list=None):
+ if body:
+ self.id = getMandatory(body, 'id', str)
+ self.area = getMandatory(body, 'area', str)
+ self.mobile_seq = getMandatory(body, 'mobile_seq', long)
+ self.add_list = getOptional(body, 'add', None, list)
+ self.del_list = getOptional(body, 'del', None, list)
+ self.exist_list = getOptional(body, 'exist', None, list)
+ else:
+ self.id = _id
+ self.area = _area
+ self.mobile_seq = long(_seq)
+ self.add_list = _add_list
+ self.del_list = _del_list
+ self.exist_list = _exist_list
+
+ def get_opcode(self):
+ return 'MAU'
+
+ def __repr__(self):
+ _add = ''
+ _del = ''
+ _exist = ''
+ if self.add_list: _add = ' add=%r' % self.add_list
+ if self.del_list: _del = ' del=%r' % self.del_list
+ if self.exist_list: _exist = ' exist=%r' % self.exist_list
+ return "MAU(id=%s area=%s mobile_seq=%d%s%s%s)" % \
+ (self.id, self.area, self.mobile_seq, _add, _del, _exist)
+
+ def to_dict(self):
+ body = { 'id' : self.id,
+ 'area' : self.area,
+ 'mobile_seq' : self.mobile_seq }
+ if self.add_list: body['add'] = self.add_list
+ if self.del_list: body['del'] = self.del_list
+ if self.exist_list: body['exist'] = self.exist_list
+ return body
class MessageMAR(object):
- """
- """
- def __init__(self, body, _id=None, _area=None, _have_seq=None):
- if body:
- self.id = getMandatory(body, 'id', str)
- self.area = getMandatory(body, 'area', str)
- self.have_seq = getMandatory(body, 'have_seq', long)
- else:
- self.id = _id
- self.area = _area
- self.have_seq = long(_have_seq)
-
- def get_opcode(self):
- return 'MAR'
-
- def __repr__(self):
- return "MAR(id=%s area=%s have_seq=%d)" % (self.id, self.area, self.have_seq)
-
- def to_dict(self):
- return {'id' : self.id,
- 'area' : self.area,
- 'have_seq' : self.have_seq}
+ """
+ """
+ def __init__(self, body, _id=None, _area=None, _have_seq=None):
+ if body:
+ self.id = getMandatory(body, 'id', str)
+ self.area = getMandatory(body, 'area', str)
+ self.have_seq = getMandatory(body, 'have_seq', long)
+ else:
+ self.id = _id
+ self.area = _area
+ self.have_seq = long(_have_seq)
+
+ def get_opcode(self):
+ return 'MAR'
+
+ def __repr__(self):
+ return "MAR(id=%s area=%s have_seq=%d)" % (self.id, self.area, self.have_seq)
+
+ def to_dict(self):
+ return {'id' : self.id,
+ 'area' : self.area,
+ 'have_seq' : self.have_seq}
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/link.py b/qpid/extras/dispatch/python/qpid/dispatch/router/link.py
index fb23177e2f..46e03379b4 100644
--- a/qpid/extras/dispatch/python/qpid/dispatch/router/link.py
+++ b/qpid/extras/dispatch/python/qpid/dispatch/router/link.py
@@ -21,123 +21,123 @@ from data import MessageRA, MessageLSU, MessageLSR
from time import time
try:
- from dispatch import *
+ from dispatch import *
except ImportError:
- from ..stubs import *
+ from ..stubs import *
class LinkStateEngine(object):
- """
- This module is responsible for running the Link State protocol and maintaining the set
- of link states that are gathered from the domain. It notifies outbound when changes to
- the link-state-collection are detected.
- """
- def __init__(self, container):
- self.container = container
- self.id = self.container.id
- self.area = self.container.area
- self.ra_interval = self.container.config.ra_interval
- self.remote_ls_max_age = self.container.config.remote_ls_max_age
- self.last_ra_time = 0
- self.collection = {}
- self.collection_changed = False
- self.mobile_seq = 0
- self.needed_lsrs = {}
-
-
- def tick(self, now):
- self._expire_ls(now)
- self._send_lsrs()
-
- if now - self.last_ra_time >= self.ra_interval:
- self.last_ra_time = now
- self._send_ra()
-
- if self.collection_changed:
- self.collection_changed = False
- self.container.log(LOG_INFO, "New Link-State Collection:")
- for a,b in self.collection.items():
- self.container.log(LOG_INFO, " %s => %r" % (a, b.peers))
- self.container.ls_collection_changed(self.collection)
-
-
- def handle_ra(self, msg, now):
- if msg.id == self.id:
- return
- if msg.id in self.collection:
- ls = self.collection[msg.id]
- ls.last_seen = now
- if ls.ls_seq < msg.ls_seq:
- self.needed_lsrs[(msg.area, msg.id)] = None
- else:
- self.needed_lsrs[(msg.area, msg.id)] = None
-
-
- def handle_lsu(self, msg, now):
- if msg.id == self.id:
- return
- if msg.id in self.collection:
- ls = self.collection[msg.id]
- if ls.ls_seq < msg.ls_seq:
- ls = msg.ls
- self.collection[msg.id] = ls
+ """
+ This module is responsible for running the Link State protocol and maintaining the set
+ of link states that are gathered from the domain. It notifies outbound when changes to
+ the link-state-collection are detected.
+ """
+ def __init__(self, container):
+ self.container = container
+ self.id = self.container.id
+ self.area = self.container.area
+ self.ra_interval = self.container.config.ra_interval
+ self.remote_ls_max_age = self.container.config.remote_ls_max_age
+ self.last_ra_time = 0
+ self.collection = {}
+ self.collection_changed = False
+ self.mobile_seq = 0
+ self.needed_lsrs = {}
+
+
+ def tick(self, now):
+ self._expire_ls(now)
+ self._send_lsrs()
+
+ if now - self.last_ra_time >= self.ra_interval:
+ self.last_ra_time = now
+ self._send_ra()
+
+ if self.collection_changed:
+ self.collection_changed = False
+ self.container.log(LOG_INFO, "New Link-State Collection:")
+ for a,b in self.collection.items():
+ self.container.log(LOG_INFO, " %s => %r" % (a, b.peers))
+ self.container.ls_collection_changed(self.collection)
+
+
+ def handle_ra(self, msg, now):
+ if msg.id == self.id:
+ return
+ if msg.id in self.collection:
+ ls = self.collection[msg.id]
+ ls.last_seen = now
+ if ls.ls_seq < msg.ls_seq:
+ self.needed_lsrs[(msg.area, msg.id)] = None
+ else:
+ self.needed_lsrs[(msg.area, msg.id)] = None
+
+
+ def handle_lsu(self, msg, now):
+ if msg.id == self.id:
+ return
+ if msg.id in self.collection:
+ ls = self.collection[msg.id]
+ if ls.ls_seq < msg.ls_seq:
+ ls = msg.ls
+ self.collection[msg.id] = ls
+ self.collection_changed = True
+ ls.last_seen = now
+ else:
+ ls = msg.ls
+ self.collection[msg.id] = ls
+ self.collection_changed = True
+ ls.last_seen = now
+ self.container.new_node(msg.id)
+ self.container.log(LOG_INFO, "Learned link-state from new router: %s" % msg.id)
+ # Schedule LSRs for any routers referenced in this LS that we don't know about
+ for _id in msg.ls.peers:
+ if _id not in self.collection:
+ self.needed_lsrs[(msg.area, _id)] = None
+
+
+ def handle_lsr(self, msg, now):
+ if msg.id == self.id:
+ return
+ if self.id not in self.collection:
+ return
+ my_ls = self.collection[self.id]
+ self.container.send('_topo/%s/%s' % (msg.area, msg.id), MessageLSU(None, self.id, self.area, my_ls.ls_seq, my_ls))
+
+
+ def new_local_link_state(self, link_state):
+ self.collection[self.id] = link_state
self.collection_changed = True
- ls.last_seen = now
- else:
- ls = msg.ls
- self.collection[msg.id] = ls
- self.collection_changed = True
- ls.last_seen = now
- self.container.new_node(msg.id)
- self.container.log(LOG_INFO, "Learned link-state from new router: %s" % msg.id)
- # Schedule LSRs for any routers referenced in this LS that we don't know about
- for _id in msg.ls.peers:
- if _id not in self.collection:
- self.needed_lsrs[(msg.area, _id)] = None
-
-
- def handle_lsr(self, msg, now):
- if msg.id == self.id:
- return
- if self.id not in self.collection:
- return
- my_ls = self.collection[self.id]
- self.container.send('_topo/%s/%s' % (msg.area, msg.id), MessageLSU(None, self.id, self.area, my_ls.ls_seq, my_ls))
-
-
- def new_local_link_state(self, link_state):
- self.collection[self.id] = link_state
- self.collection_changed = True
- self._send_ra()
-
-
- def set_mobile_sequence(self, seq):
- self.mobile_seq = seq
-
-
- def get_collection(self):
- return self.collection
-
-
- def _expire_ls(self, now):
- to_delete = []
- for key, ls in self.collection.items():
- if key != self.id and now - ls.last_seen > self.remote_ls_max_age:
- to_delete.append(key)
- for key in to_delete:
- ls = self.collection.pop(key)
- self.collection_changed = True
- self.container.lost_node(key)
- self.container.log(LOG_INFO, "Expired link-state from router: %s" % key)
-
-
- def _send_lsrs(self):
- for (_area, _id) in self.needed_lsrs.keys():
- self.container.send('_topo/%s/%s' % (_area, _id), MessageLSR(None, self.id, self.area))
- self.needed_lsrs = {}
-
-
- def _send_ra(self):
- ls_seq = 0
- if self.id in self.collection:
- ls_seq = self.collection[self.id].ls_seq
- self.container.send('_topo/%s/all' % self.area, MessageRA(None, self.id, self.area, ls_seq, self.mobile_seq))
+ self._send_ra()
+
+
+ def set_mobile_sequence(self, seq):
+ self.mobile_seq = seq
+
+
+ def get_collection(self):
+ return self.collection
+
+
+ def _expire_ls(self, now):
+ to_delete = []
+ for key, ls in self.collection.items():
+ if key != self.id and now - ls.last_seen > self.remote_ls_max_age:
+ to_delete.append(key)
+ for key in to_delete:
+ ls = self.collection.pop(key)
+ self.collection_changed = True
+ self.container.lost_node(key)
+ self.container.log(LOG_INFO, "Expired link-state from router: %s" % key)
+
+
+ def _send_lsrs(self):
+ for (_area, _id) in self.needed_lsrs.keys():
+ self.container.send('_topo/%s/%s' % (_area, _id), MessageLSR(None, self.id, self.area))
+ self.needed_lsrs = {}
+
+
+ def _send_ra(self):
+ ls_seq = 0
+ if self.id in self.collection:
+ ls_seq = self.collection[self.id].ls_seq
+ self.container.send('_topo/%s/all' % self.area, MessageRA(None, self.id, self.area, ls_seq, self.mobile_seq))
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py b/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py
index ce80f38780..5413dd38a8 100644
--- a/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py
+++ b/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py
@@ -20,169 +20,169 @@
from data import MessageRA, MessageMAR, MessageMAU
try:
- from dispatch import *
+ from dispatch import *
except ImportError:
- from ..stubs import *
+ from ..stubs import *
class MobileAddressEngine(object):
- """
- This module is responsible for maintaining an up-to-date list of mobile addresses in the domain.
- It runs the Mobile-Address protocol and generates an un-optimized routing table for mobile addresses.
- Note that this routing table maps from the mobile address to the remote router where that address
- is directly bound.
- """
- def __init__(self, container):
- self.container = container
- self.id = self.container.id
- self.area = self.container.area
- self.mobile_addr_max_age = self.container.config.mobile_addr_max_age
- self.mobile_seq = 0
- self.local_keys = []
- self.added_keys = []
- self.deleted_keys = []
- self.remote_lists = {} # map router_id => (sequence, list of keys)
- self.remote_last_seen = {} # map router_id => time of last seen advertizement/update
- self.remote_changed = False
- self.needed_mars = {}
-
-
- def tick(self, now):
- self._expire_remotes(now)
- self._send_mars()
-
- ##
- ## If local keys have changed, collect the changes and send a MAU with the diffs
- ## Note: it is important that the differential-MAU be sent before a RA is sent
- ##
- if len(self.added_keys) > 0 or len(self.deleted_keys) > 0:
- self.mobile_seq += 1
- self.container.send('_topo.%s.all' % self.area,
- MessageMAU(None, self.id, self.area, self.mobile_seq, self.added_keys, self.deleted_keys))
- self.local_keys.extend(self.added_keys)
- for key in self.deleted_keys:
- self.local_keys.remove(key)
- self.added_keys = []
- self.deleted_keys = []
- self.container.mobile_sequence_changed(self.mobile_seq)
-
- ##
- ## If remotes have changed, start the process of updating local bindings
- ##
- if self.remote_changed:
- self.remote_changed = False
- self._update_remote_keys()
-
-
- def add_local_address(self, key):
"""
+ This module is responsible for maintaining an up-to-date list of mobile addresses in the domain.
+ It runs the Mobile-Address protocol and generates an un-optimized routing table for mobile addresses.
+ Note that this routing table maps from the mobile address to the remote router where that address
+ is directly bound.
"""
- if self.local_keys.count(key) == 0:
- if self.added_keys.count(key) == 0:
- self.added_keys.append(key)
- else:
- if self.deleted_keys.count(key) > 0:
- self.deleted_keys.remove(key)
-
-
- def del_local_address(self, key):
- """
- """
- if self.local_keys.count(key) > 0:
- if self.deleted_keys.count(key) == 0:
- self.deleted_keys.append(key)
- else:
- if self.added_keys.count(key) > 0:
- self.added_keys.remove(key)
-
-
- def handle_ra(self, msg, now):
- if msg.id == self.id:
- return
-
- if msg.mobile_seq == 0:
- return
-
- if msg.id in self.remote_lists:
- _seq, _list = self.remote_lists[msg.id]
- self.remote_last_seen[msg.id] = now
- if _seq < msg.mobile_seq:
- self.needed_mars[(msg.id, msg.area, _seq)] = None
- else:
- self.needed_mars[(msg.id, msg.area, 0)] = None
-
-
- def handle_mau(self, msg, now):
- ##
- ## If the MAU is differential, we can only use it if its sequence is exactly one greater
- ## than our stored sequence. If not, we will ignore the content and schedule a MAR.
- ##
- ## If the MAU is absolute, we can use it in all cases.
- ##
- if msg.id == self.id:
- return
-
- if msg.exist_list:
- ##
- ## Absolute MAU
- ##
- if msg.id in self.remote_lists:
- _seq, _list = self.remote_lists[msg.id]
- if _seq >= msg.mobile_seq: # ignore duplicates
- return
- self.remote_lists[msg.id] = (msg.mobile_seq, msg.exist_list)
- self.remote_last_seen[msg.id] = now
- self.remote_changed = True
- else:
- ##
- ## Differential MAU
- ##
- if msg.id in self.remote_lists:
- _seq, _list = self.remote_lists[msg.id]
- if _seq == msg.mobile_seq: # ignore duplicates
- return
- self.remote_last_seen[msg.id] = now
- if _seq + 1 == msg.mobile_seq:
- ##
- ## This is one greater than our stored value, incorporate the deltas
- ##
- if msg.add_list and msg.add_list.__class__ == list:
- _list.extend(msg.add_list)
- if msg.del_list and msg.del_list.__class__ == list:
- for key in msg.del_list:
- _list.remove(key)
- self.remote_lists[msg.id] = (msg.mobile_seq, _list)
- self.remote_changed = True
+ def __init__(self, container):
+ self.container = container
+ self.id = self.container.id
+ self.area = self.container.area
+ self.mobile_addr_max_age = self.container.config.mobile_addr_max_age
+ self.mobile_seq = 0
+ self.local_keys = []
+ self.added_keys = []
+ self.deleted_keys = []
+ self.remote_lists = {} # map router_id => (sequence, list of keys)
+ self.remote_last_seen = {} # map router_id => time of last seen advertizement/update
+ self.remote_changed = False
+ self.needed_mars = {}
+
+
+ def tick(self, now):
+ self._expire_remotes(now)
+ self._send_mars()
+
+ ##
+ ## If local keys have changed, collect the changes and send a MAU with the diffs
+ ## Note: it is important that the differential-MAU be sent before a RA is sent
+ ##
+ if len(self.added_keys) > 0 or len(self.deleted_keys) > 0:
+ self.mobile_seq += 1
+ self.container.send('_topo.%s.all' % self.area,
+ MessageMAU(None, self.id, self.area, self.mobile_seq, self.added_keys, self.deleted_keys))
+ self.local_keys.extend(self.added_keys)
+ for key in self.deleted_keys:
+ self.local_keys.remove(key)
+ self.added_keys = []
+ self.deleted_keys = []
+ self.container.mobile_sequence_changed(self.mobile_seq)
+
+ ##
+ ## If remotes have changed, start the process of updating local bindings
+ ##
+ if self.remote_changed:
+ self.remote_changed = False
+ self._update_remote_keys()
+
+
+ def add_local_address(self, key):
+ """
+ """
+ if self.local_keys.count(key) == 0:
+ if self.added_keys.count(key) == 0:
+ self.added_keys.append(key)
else:
- self.needed_mars[(msg.id, msg.area, _seq)] = None
- else:
- self.needed_mars[(msg.id, msg.area, 0)] = None
-
+ if self.deleted_keys.count(key) > 0:
+ self.deleted_keys.remove(key)
- def handle_mar(self, msg, now):
- if msg.id == self.id:
- return
- if msg.have_seq < self.mobile_seq:
- self.container.send('_topo.%s.%s' % (msg.area, msg.id),
- MessageMAU(None, self.id, self.area, self.mobile_seq, None, None, self.local_keys))
-
- def _update_remote_keys(self):
- keys = {}
- for _id,(seq,key_list) in self.remote_lists.items():
- keys[_id] = key_list
- self.container.mobile_keys_changed(keys)
+ def del_local_address(self, key):
+ """
+ """
+ if self.local_keys.count(key) > 0:
+ if self.deleted_keys.count(key) == 0:
+ self.deleted_keys.append(key)
+ else:
+ if self.added_keys.count(key) > 0:
+ self.added_keys.remove(key)
- def _expire_remotes(self, now):
- for _id, t in self.remote_last_seen.items():
- if now - t > self.mobile_addr_max_age:
- self.remote_lists.pop(_id)
- self.remote_last_seen.pop(_id)
- self.remote_changed = True
+ def handle_ra(self, msg, now):
+ if msg.id == self.id:
+ return
+ if msg.mobile_seq == 0:
+ return
- def _send_mars(self):
- for _id, _area, _seq in self.needed_mars.keys():
- self.container.send('_topo.%s.%s' % (_area, _id), MessageMAR(None, self.id, self.area, _seq))
- self.needed_mars = {}
+ if msg.id in self.remote_lists:
+ _seq, _list = self.remote_lists[msg.id]
+ self.remote_last_seen[msg.id] = now
+ if _seq < msg.mobile_seq:
+ self.needed_mars[(msg.id, msg.area, _seq)] = None
+ else:
+ self.needed_mars[(msg.id, msg.area, 0)] = None
+
+
+ def handle_mau(self, msg, now):
+ ##
+ ## If the MAU is differential, we can only use it if its sequence is exactly one greater
+ ## than our stored sequence. If not, we will ignore the content and schedule a MAR.
+ ##
+ ## If the MAU is absolute, we can use it in all cases.
+ ##
+ if msg.id == self.id:
+ return
+
+ if msg.exist_list:
+ ##
+ ## Absolute MAU
+ ##
+ if msg.id in self.remote_lists:
+ _seq, _list = self.remote_lists[msg.id]
+ if _seq >= msg.mobile_seq: # ignore duplicates
+ return
+ self.remote_lists[msg.id] = (msg.mobile_seq, msg.exist_list)
+ self.remote_last_seen[msg.id] = now
+ self.remote_changed = True
+ else:
+ ##
+ ## Differential MAU
+ ##
+ if msg.id in self.remote_lists:
+ _seq, _list = self.remote_lists[msg.id]
+ if _seq == msg.mobile_seq: # ignore duplicates
+ return
+ self.remote_last_seen[msg.id] = now
+ if _seq + 1 == msg.mobile_seq:
+ ##
+ ## This is one greater than our stored value, incorporate the deltas
+ ##
+ if msg.add_list and msg.add_list.__class__ == list:
+ _list.extend(msg.add_list)
+ if msg.del_list and msg.del_list.__class__ == list:
+ for key in msg.del_list:
+ _list.remove(key)
+ self.remote_lists[msg.id] = (msg.mobile_seq, _list)
+ self.remote_changed = True
+ else:
+ self.needed_mars[(msg.id, msg.area, _seq)] = None
+ else:
+ self.needed_mars[(msg.id, msg.area, 0)] = None
+
+
+ def handle_mar(self, msg, now):
+ if msg.id == self.id:
+ return
+ if msg.have_seq < self.mobile_seq:
+ self.container.send('_topo.%s.%s' % (msg.area, msg.id),
+ MessageMAU(None, self.id, self.area, self.mobile_seq, None, None, self.local_keys))
+
+
+ def _update_remote_keys(self):
+ keys = {}
+ for _id,(seq,key_list) in self.remote_lists.items():
+ keys[_id] = key_list
+ self.container.mobile_keys_changed(keys)
+
+
+ def _expire_remotes(self, now):
+ for _id, t in self.remote_last_seen.items():
+ if now - t > self.mobile_addr_max_age:
+ self.remote_lists.pop(_id)
+ self.remote_last_seen.pop(_id)
+ self.remote_changed = True
+
+
+ def _send_mars(self):
+ for _id, _area, _seq in self.needed_mars.keys():
+ self.container.send('_topo.%s.%s' % (_area, _id), MessageMAR(None, self.id, self.area, _seq))
+ self.needed_mars = {}
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py b/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py
index ea8bacd660..6a16049065 100644
--- a/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py
+++ b/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py
@@ -21,63 +21,63 @@ from data import LinkState, MessageHELLO
from time import time
try:
- from dispatch import *
+ from dispatch import *
except ImportError:
- from ..stubs import *
+ from ..stubs import *
class NeighborEngine(object):
- """
- This module is responsible for maintaining this router's link-state. It runs the HELLO protocol
- with the router's neighbors and notifies outbound when the list of neighbors-in-good-standing (the
- link-state) changes.
- """
- def __init__(self, container):
- self.container = container
- self.id = self.container.id
- self.area = self.container.area
- self.last_hello_time = 0.0
- self.hello_interval = container.config.hello_interval
- self.hello_max_age = container.config.hello_max_age
- self.hellos = {}
- self.link_state_changed = False
- self.link_state = LinkState(None, self.id, self.area, 0, [])
+ """
+ This module is responsible for maintaining this router's link-state. It runs the HELLO protocol
+ with the router's neighbors and notifies outbound when the list of neighbors-in-good-standing (the
+ link-state) changes.
+ """
+ def __init__(self, container):
+ self.container = container
+ self.id = self.container.id
+ self.area = self.container.area
+ self.last_hello_time = 0.0
+ self.hello_interval = container.config.hello_interval
+ self.hello_max_age = container.config.hello_max_age
+ self.hellos = {}
+ self.link_state_changed = False
+ self.link_state = LinkState(None, self.id, self.area, 0, [])
- def tick(self, now):
- self._expire_hellos(now)
+ def tick(self, now):
+ self._expire_hellos(now)
- if now - self.last_hello_time >= self.hello_interval:
- self.last_hello_time = now
- self.container.send('_local/qdxrouter', MessageHELLO(None, self.id, self.area, self.hellos.keys()))
+ if now - self.last_hello_time >= self.hello_interval:
+ self.last_hello_time = now
+ self.container.send('_local/qdxrouter', MessageHELLO(None, self.id, self.area, self.hellos.keys()))
- if self.link_state_changed:
- self.link_state_changed = False
- self.link_state.bump_sequence()
- self.container.local_link_state_changed(self.link_state)
+ if self.link_state_changed:
+ self.link_state_changed = False
+ self.link_state.bump_sequence()
+ self.container.local_link_state_changed(self.link_state)
- def handle_hello(self, msg, now):
- if msg.id == self.id:
- return
- self.hellos[msg.id] = now
- if msg.is_seen(self.id):
- if self.link_state.add_peer(msg.id):
- self.link_state_changed = True
- self.container.new_neighbor(msg.id)
- self.container.log(LOG_INFO, "New neighbor established: %s" % msg.id)
- ##
- ## TODO - Use this function to detect area boundaries
- ##
+ def handle_hello(self, msg, now, link_id):
+ if msg.id == self.id:
+ return
+ self.hellos[msg.id] = now
+ if msg.is_seen(self.id):
+ if self.link_state.add_peer(msg.id):
+ self.link_state_changed = True
+ self.container.new_neighbor(msg.id, link_id)
+ self.container.log(LOG_INFO, "New neighbor established: %s on link: %d" % (msg.id, link_id))
+ ##
+ ## TODO - Use this function to detect area boundaries
+ ##
- def _expire_hellos(self, now):
- to_delete = []
- for key, last_seen in self.hellos.items():
- if now - last_seen > self.hello_max_age:
- to_delete.append(key)
- for key in to_delete:
- self.hellos.pop(key)
- if self.link_state.del_peer(key):
- self.link_state_changed = True
- self.container.lost_neighbor(key)
- self.container.log(LOG_INFO, "Neighbor lost: %s" % key)
+ def _expire_hellos(self, now):
+ to_delete = []
+ for key, last_seen in self.hellos.items():
+ if now - last_seen > self.hello_max_age:
+ to_delete.append(key)
+ for key in to_delete:
+ self.hellos.pop(key)
+ if self.link_state.del_peer(key):
+ self.link_state_changed = True
+ self.container.lost_neighbor(key)
+ self.container.log(LOG_INFO, "Neighbor lost: %s" % key)
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/node.py b/qpid/extras/dispatch/python/qpid/dispatch/router/node.py
index c90f7f4232..433ce3ab1e 100644
--- a/qpid/extras/dispatch/python/qpid/dispatch/router/node.py
+++ b/qpid/extras/dispatch/python/qpid/dispatch/router/node.py
@@ -18,86 +18,131 @@
#
try:
- from dispatch import *
+ from dispatch import *
except ImportError:
- from ..stubs import *
+ from ..stubs import *
class NodeTracker(object):
- """
- This module is responsible for tracking the set of router nodes that are known to this
- router. It tracks whether they are neighbor or remote and whether they are reachable.
- """
- def __init__(self, container):
- self.container = container
- self.id = self.container.id
- self.area = self.container.area
- self.nodes = {} # id => RemoteNode
-
-
- def tick(self, now):
- pass
-
-
- def new_neighbor(self, node_id):
- if node_id not in self.nodes:
- self.nodes[node_id] = RemoteNode(node_id)
- self.nodes[node_id].set_neighbor()
- self._notify(self.nodes[node_id])
-
-
- def lost_neighbor(self, node_id):
- node = self.nodes[node_id]
- node.clear_neighbor()
- self._notify(node)
- if node.to_delete():
- self.nodes.pop(node_id)
-
-
- def new_node(self, node_id):
- if node_id not in self.nodes:
- self.nodes[node_id] = RemoteNode(node_id)
- self.nodes[node_id].set_remote()
- self._notify(self.nodes[node_id])
-
-
- def lost_node(self, node_id):
- node = self.nodes[node_id]
- node.clear_remote()
- self._notify(node)
- if node.to_delete():
- self.nodes.pop(node_id)
-
-
- def _notify(self, node):
- if node.to_delete():
- self.container.adapter.node_updated("R%s" % node.id, 0, 0)
- else:
- is_neighbor = 0
- if node.neighbor:
- is_neighbor = 1
- self.container.adapter.node_updated("R%s" % node.id, 1, is_neighbor)
+ """
+ This module is responsible for tracking the set of router nodes that are known to this
+ router. It tracks whether they are neighbor or remote and whether they are reachable.
+
+ This module is also responsible for assigning a unique mask bit value to each router.
+ The mask bit is used in the main router to represent sets of valid destinations for addresses.
+ """
+ def __init__(self, container, max_routers):
+ self.container = container
+ self.max_routers = max_routers
+ self.nodes = {} # id => RemoteNode
+ self.maskbits = []
+ self.next_maskbit = 0
+ for i in range(max_routers):
+ self.maskbits.append(None)
+
+
+ def tick(self, now):
+ pass
+
+
+ def new_neighbor(self, node_id, link_maskbit):
+ """
+ A node, designated by node_id, has been discovered as a neighbor over a link with
+ a maskbit of link_maskbit.
+ """
+ if node_id not in self.nodes:
+ self.nodes[node_id] = RemoteNode(node_id, self._allocate_maskbit())
+ self.nodes[node_id].set_neighbor(link_maskbit)
+ self._notify(self.nodes[node_id])
+
+
+ def lost_neighbor(self, node_id):
+ """
+ We have lost contact with a neighboring node node_id.
+ """
+ node = self.nodes[node_id]
+ node.clear_neighbor()
+ self._notify(node)
+ if node.to_delete():
+ self._free_maskbit(node.maskbit)
+ self.nodes.pop(node_id)
+
+
+ def new_node(self, node_id):
+ """
+ A node, designated by node_id, has been discovered through the an advertisement from a
+ remote peer.
+ """
+ if node_id not in self.nodes:
+ self.nodes[node_id] = RemoteNode(node_id, self._allocate_maskbit())
+ self.nodes[node_id].set_remote()
+ self._notify(self.nodes[node_id])
+
+
+ def lost_node(self, node_id):
+ """
+ A remote node, node_id, has not been heard from for too long and is being deemed lost.
+ """
+ node = self.nodes[node_id]
+ node.clear_remote()
+ self._notify(node)
+ if node.to_delete():
+ self._free_maskbit(node.maskbit)
+ self.nodes.pop(node_id)
+
+
+ def _allocate_maskbit(self):
+ if self.next_maskbit == None:
+ raise Exception("Exceeded Maximum Router Count")
+ result = self.next_maskbit
+ self.next_maskbit = None
+ self.maskbits[result] = True
+ for n in range(result + 1, self.max_routers):
+ if self.maskbits[n] == None:
+ self.next_maskbit = n
+ break
+ return result
+
+
+ def _free_maskbit(self, i):
+ self.maskbits[i] = None
+ if self.next_maskbit == None or i < self.next_maskbit:
+ self.next_maskbit = i
+
+
+ def _notify(self, node):
+ if node.to_delete():
+ self.container.node_updated("R%s" % node.id, 0, 0, 0, 0)
+ else:
+ is_neighbor = 0
+ if node.neighbor:
+ is_neighbor = 1
+ self.container.node_updated("R%s" % node.id, 1, is_neighbor, node.link_maskbit, node.maskbit)
class RemoteNode(object):
- def __init__(self, node_id):
- self.id = node_id
- self.neighbor = None
- self.remote = None
+ def __init__(self, node_id, maskbit):
+ self.id = node_id
+ self.neighbor = None
+ self.link_maskbit = None
+ self.maskbit = maskbit
+ self.remote = None
- def set_neighbor(self):
- self.neighbor = True
+ def set_neighbor(self, link_maskbit):
+ self.neighbor = True
+ self.link_maskbit = link_maskbit
- def set_remote(self):
- self.remote = True
+ def set_remote(self):
+ self.remote = True
- def clear_neighbor(self):
- self.neighbor = None
+ def clear_neighbor(self):
+ self.neighbor = None
+ self.link_maskbit = None
- def clear_remote(self):
- self.remote = None
+ def clear_remote(self):
+ self.remote = None
- def to_delete(self):
- return self.neighbor or self.remote
+ def to_delete(self):
+ return self.neighbor == None and self.remote == None
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/path.py b/qpid/extras/dispatch/python/qpid/dispatch/router/path.py
index c051dbe7fc..9357ed1847 100644
--- a/qpid/extras/dispatch/python/qpid/dispatch/router/path.py
+++ b/qpid/extras/dispatch/python/qpid/dispatch/router/path.py
@@ -18,185 +18,185 @@
#
try:
- from dispatch import *
+ from dispatch import *
except ImportError:
- from ..stubs import *
+ from ..stubs import *
class PathEngine(object):
- """
- This module is responsible for computing the next-hop for every router/area in the domain
- based on the collection of link states that have been gathered.
- """
- def __init__(self, container):
- self.container = container
- self.id = self.container.id
- self.area = self.container.area
- self.recalculate = False
- self.collection = None
-
-
- def tick(self, now_unused):
- if self.recalculate:
- self.recalculate = False
- self._calculate_routes()
-
-
- def ls_collection_changed(self, collection):
- self.recalculate = True
- self.collection = collection
-
-
- def _calculate_tree_from_root(self, root):
- ##
- ## Make a copy of the current collection of link-states that contains
- ## an empty link-state for nodes that are known-peers but are not in the
- ## collection currently. This is needed to establish routes to those nodes
- ## so we can trade link-state information with them.
- ##
- link_states = {}
- for _id, ls in self.collection.items():
- link_states[_id] = ls.peers
- for p in ls.peers:
- if p not in link_states:
- link_states[p] = []
-
- ##
- ## Setup Dijkstra's Algorithm
- ##
- cost = {}
- prev = {}
- for _id in link_states:
- cost[_id] = None # infinite
- prev[_id] = None # undefined
- cost[root] = 0 # no cost to the root node
- unresolved = NodeSet(cost)
-
- ##
- ## Process unresolved nodes until lowest cost paths to all reachable nodes have been found.
- ##
- while not unresolved.empty():
- u = unresolved.lowest_cost()
- if cost[u] == None:
- # There are no more reachable nodes in unresolved
- break
- for v in link_states[u]:
- if unresolved.contains(v):
- alt = cost[u] + 1 # TODO - Use link cost instead of 1
- if cost[v] == None or alt < cost[v]:
- cost[v] = alt
- prev[v] = u
- unresolved.set_cost(v, alt)
-
- ##
- ## Remove unreachable nodes from the map. Note that this will also remove the
- ## root node (has no previous node) from the map.
- ##
- for u, val in prev.items():
- if not val:
- prev.pop(u)
-
- ##
- ## Return previous-node map. This is a map of all reachable, remote nodes to
- ## their predecessor node.
- ##
- return prev
-
-
- def _calculate_routes(self):
- ##
- ## Generate the shortest-path tree with the local node as root
- ##
- prev = self._calculate_tree_from_root(self.id)
- nodes = prev.keys()
-
- ##
- ## Distill the path tree into a map of next hops for each node
- ##
- next_hops = {}
- while len(nodes) > 0:
- u = nodes[0] # pick any destination
- path = [u]
- nodes.remove(u)
- v = prev[u]
- while v != self.id: # build a list of nodes in the path back to the root
- if v in nodes:
- path.append(v)
- nodes.remove(v)
- u = v
- v = prev[u]
- for w in path: # mark each node in the path as reachable via the next hop
- next_hops[w] = u
-
- ##
- ## TODO - Calculate the tree from each origin, determine the set of origins-per-dest
- ## for which the path from origin to dest passes through us. This is the set
- ## of valid origins for forwarding to the destination.
- ##
-
- self.container.next_hops_changed(next_hops)
+ """
+ This module is responsible for computing the next-hop for every router/area in the domain
+ based on the collection of link states that have been gathered.
+ """
+ def __init__(self, container):
+ self.container = container
+ self.id = self.container.id
+ self.area = self.container.area
+ self.recalculate = False
+ self.collection = None
-class NodeSet(object):
- """
- This data structure is an ordered list of node IDs, sorted in increasing order by their cost.
- Equal cost nodes are secondarily sorted by their ID in order to provide deterministic and
- repeatable ordering.
- """
- def __init__(self, cost_map):
- self.nodes = []
- for _id, cost in cost_map.items():
- ##
- ## Assume that nodes are either unreachable (cost = None) or local (cost = 0)
- ## during this initialization.
- ##
- if cost == 0:
- self.nodes.insert(0, (_id, cost))
- else:
+ def tick(self, now_unused):
+ if self.recalculate:
+ self.recalculate = False
+ self._calculate_routes()
+
+
+ def ls_collection_changed(self, collection):
+ self.recalculate = True
+ self.collection = collection
+
+
+ def _calculate_tree_from_root(self, root):
##
- ## There is no need to sort unreachable nodes by ID
+ ## Make a copy of the current collection of link-states that contains
+ ## an empty link-state for nodes that are known-peers but are not in the
+ ## collection currently. This is needed to establish routes to those nodes
+ ## so we can trade link-state information with them.
##
- self.nodes.append((_id, cost))
+ link_states = {}
+ for _id, ls in self.collection.items():
+ link_states[_id] = ls.peers
+ for p in ls.peers:
+ if p not in link_states:
+ link_states[p] = []
+ ##
+ ## Setup Dijkstra's Algorithm
+ ##
+ cost = {}
+ prev = {}
+ for _id in link_states:
+ cost[_id] = None # infinite
+ prev[_id] = None # undefined
+ cost[root] = 0 # no cost to the root node
+ unresolved = NodeSet(cost)
- def __repr__(self):
- return self.nodes.__repr__()
+ ##
+ ## Process unresolved nodes until lowest cost paths to all reachable nodes have been found.
+ ##
+ while not unresolved.empty():
+ u = unresolved.lowest_cost()
+ if cost[u] == None:
+ # There are no more reachable nodes in unresolved
+ break
+ for v in link_states[u]:
+ if unresolved.contains(v):
+ alt = cost[u] + 1 # TODO - Use link cost instead of 1
+ if cost[v] == None or alt < cost[v]:
+ cost[v] = alt
+ prev[v] = u
+ unresolved.set_cost(v, alt)
+ ##
+ ## Remove unreachable nodes from the map. Note that this will also remove the
+ ## root node (has no previous node) from the map.
+ ##
+ for u, val in prev.items():
+ if not val:
+ prev.pop(u)
- def empty(self):
- return len(self.nodes) == 0
+ ##
+ ## Return previous-node map. This is a map of all reachable, remote nodes to
+ ## their predecessor node.
+ ##
+ return prev
- def contains(self, _id):
- for a, b in self.nodes:
- if a == _id:
- return True
- return False
+ def _calculate_routes(self):
+ ##
+ ## Generate the shortest-path tree with the local node as root
+ ##
+ prev = self._calculate_tree_from_root(self.id)
+ nodes = prev.keys()
+ ##
+ ## Distill the path tree into a map of next hops for each node
+ ##
+ next_hops = {}
+ while len(nodes) > 0:
+ u = nodes[0] # pick any destination
+ path = [u]
+ nodes.remove(u)
+ v = prev[u]
+ while v != self.id: # build a list of nodes in the path back to the root
+ if v in nodes:
+ path.append(v)
+ nodes.remove(v)
+ u = v
+ v = prev[u]
+ for w in path: # mark each node in the path as reachable via the next hop
+ next_hops[w] = u
- def lowest_cost(self):
- """
- Remove and return the lowest cost node ID.
- """
- _id, cost = self.nodes.pop(0)
- return _id
+ ##
+ ## TODO - Calculate the tree from each origin, determine the set of origins-per-dest
+ ## for which the path from origin to dest passes through us. This is the set
+ ## of valid origins for forwarding to the destination.
+ ##
+ self.container.next_hops_changed(next_hops)
- def set_cost(self, _id, new_cost):
+class NodeSet(object):
"""
- Set the cost for an ID in the NodeSet and re-insert the ID so that the list
- remains sorted in increasing cost order.
+ This data structure is an ordered list of node IDs, sorted in increasing order by their cost.
+ Equal cost nodes are secondarily sorted by their ID in order to provide deterministic and
+ repeatable ordering.
"""
- index = 0
- for i, c in self.nodes:
- if i == _id:
- break
- index += 1
- self.nodes.pop(index)
-
- index = 0
- for i, c in self.nodes:
- if c == None or new_cost < c or (new_cost == c and _id < i):
- break
- index += 1
-
- self.nodes.insert(index, (_id, new_cost))
+ def __init__(self, cost_map):
+ self.nodes = []
+ for _id, cost in cost_map.items():
+ ##
+ ## Assume that nodes are either unreachable (cost = None) or local (cost = 0)
+ ## during this initialization.
+ ##
+ if cost == 0:
+ self.nodes.insert(0, (_id, cost))
+ else:
+ ##
+ ## There is no need to sort unreachable nodes by ID
+ ##
+ self.nodes.append((_id, cost))
+
+
+ def __repr__(self):
+ return self.nodes.__repr__()
+
+
+ def empty(self):
+ return len(self.nodes) == 0
+
+
+ def contains(self, _id):
+ for a, b in self.nodes:
+ if a == _id:
+ return True
+ return False
+
+
+ def lowest_cost(self):
+ """
+ Remove and return the lowest cost node ID.
+ """
+ _id, cost = self.nodes.pop(0)
+ return _id
+
+
+ def set_cost(self, _id, new_cost):
+ """
+ Set the cost for an ID in the NodeSet and re-insert the ID so that the list
+ remains sorted in increasing cost order.
+ """
+ index = 0
+ for i, c in self.nodes:
+ if i == _id:
+ break
+ index += 1
+ self.nodes.pop(index)
+
+ index = 0
+ for i, c in self.nodes:
+ if c == None or new_cost < c or (new_cost == c and _id < i):
+ break
+ index += 1
+
+ self.nodes.insert(index, (_id, new_cost))
+
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py b/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py
index 18b48379c5..82f51184a9 100644
--- a/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py
+++ b/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py
@@ -36,245 +36,249 @@ from node import NodeTracker
## (i.e. we are in a test bench, etc.), load the stub versions.
##
try:
- from dispatch import *
+ from dispatch import *
except ImportError:
- from ..stubs import *
+ from ..stubs import *
class RouterEngine:
- """
- """
-
- def __init__(self, router_adapter, router_id=None, area='area', config_override={}):
- """
- Initialize an instance of a router for a domain.
- """
- ##
- ## Record important information about this router instance
- ##
- self.domain = "domain"
- self.router_adapter = router_adapter
- self.log_adapter = LogAdapter("dispatch.router")
- self.io_adapter = IoAdapter(self, "qdxrouter")
-
- if router_id:
- self.id = router_id
- else:
- self.id = str(uuid4())
- self.area = area
- self.log(LOG_INFO, "Router Engine Instantiated: area=%s id=%s" % (self.area, self.id))
-
- ##
- ## Setup configuration
- ##
- self.config = Configuration(config_override)
- self.log(LOG_INFO, "Config: %r" % self.config)
-
- ##
- ## Launch the sub-module engines
- ##
- self.neighbor_engine = NeighborEngine(self)
- self.link_state_engine = LinkStateEngine(self)
- self.path_engine = PathEngine(self)
- self.mobile_address_engine = MobileAddressEngine(self)
- self.routing_table_engine = RoutingTableEngine(self)
- self.binding_engine = BindingEngine(self)
- self.adapter_engine = AdapterEngine(self)
- self.node_tracker = NodeTracker(self)
-
-
-
- ##========================================================================================
- ## Adapter Entry Points - invoked from the adapter
- ##========================================================================================
- def getId(self):
- """
- Return the router's ID
- """
- return self.id
-
-
- def addLocalAddress(self, key):
- """
- """
- try:
- if key.find('_topo') == 0 or key.find('_local') == 0:
- return
- self.mobile_address_engine.add_local_address(key)
- except Exception, e:
- self.log(LOG_ERROR, "Exception in new-address processing: exception=%r" % e)
-
- def delLocalAddress(self, key):
- """
- """
- try:
- if key.find('_topo') == 0 or key.find('_local') == 0:
- return
- self.mobile_address_engine.del_local_address(key)
- except Exception, e:
- self.log(LOG_ERROR, "Exception in del-address processing: exception=%r" % e)
-
-
- def handleTimerTick(self):
- """
- """
- try:
- now = time()
- self.neighbor_engine.tick(now)
- self.link_state_engine.tick(now)
- self.path_engine.tick(now)
- self.mobile_address_engine.tick(now)
- self.routing_table_engine.tick(now)
- self.binding_engine.tick(now)
- self.adapter_engine.tick(now)
- self.node_tracker.tick(now)
- except Exception, e:
- self.log(LOG_ERROR, "Exception in timer processing: exception=%r" % e)
-
-
- def handleControlMessage(self, opcode, body):
- """
- """
- try:
- now = time()
- if opcode == 'HELLO':
- msg = MessageHELLO(body)
- self.log(LOG_TRACE, "RCVD: %r" % msg)
- self.neighbor_engine.handle_hello(msg, now)
-
- elif opcode == 'RA':
- msg = MessageRA(body)
- self.log(LOG_TRACE, "RCVD: %r" % msg)
- self.link_state_engine.handle_ra(msg, now)
- self.mobile_address_engine.handle_ra(msg, now)
-
- elif opcode == 'LSU':
- msg = MessageLSU(body)
- self.log(LOG_TRACE, "RCVD: %r" % msg)
- self.link_state_engine.handle_lsu(msg, now)
-
- elif opcode == 'LSR':
- msg = MessageLSR(body)
- self.log(LOG_TRACE, "RCVD: %r" % msg)
- self.link_state_engine.handle_lsr(msg, now)
-
- elif opcode == 'MAU':
- msg = MessageMAU(body)
- self.log(LOG_TRACE, "RCVD: %r" % msg)
- self.mobile_address_engine.handle_mau(msg, now)
-
- elif opcode == 'MAR':
- msg = MessageMAR(body)
- self.log(LOG_TRACE, "RCVD: %r" % msg)
- self.mobile_address_engine.handle_mar(msg, now)
-
- except Exception, e:
- self.log(LOG_ERROR, "Exception in message processing: opcode=%s body=%r exception=%r" % (opcode, body, e))
-
-
- def receive(self, message_properties, body):
- """
- This is the IoAdapter message-receive handler
- """
- try:
- self.handleControlMessage(message_properties['opcode'], body)
- except Exception, e:
- self.log(LOG_ERROR, "Exception in raw message processing: properties=%r body=%r exception=%r" %
- (message_properties, body, e))
-
- def getRouterData(self, kind):
- """
"""
- if kind == 'help':
- return { 'help' : "Get list of supported values for kind",
- 'link-state' : "This router's link state",
- 'link-state-set' : "The set of link states from known routers",
- 'next-hops' : "Next hops to each known router",
- 'topo-table' : "Topological routing table",
- 'mobile-table' : "Mobile key routing table"
- }
- if kind == 'link-state' : return self.neighbor_engine.link_state.to_dict()
- if kind == 'next-hops' : return self.routing_table_engine.next_hops
- if kind == 'topo-table' : return {'table': self.adapter_engine.key_classes['topological']}
- if kind == 'mobile-table' : return {'table': self.adapter_engine.key_classes['mobile-key']}
- if kind == 'link-state-set' :
- copy = {}
- for _id,_ls in self.link_state_engine.collection.items():
- copy[_id] = _ls.to_dict()
- return copy
-
- return {'notice':'Use kind="help" to get a list of possibilities'}
-
-
- ##========================================================================================
- ## Adapter Calls - outbound calls to Dispatch
- ##========================================================================================
- def log(self, level, text):
"""
- Emit a log message to the host's event log
- """
- self.log_adapter.log(level, text)
-
-
- def send(self, dest, msg):
- """
- Send a control message to another router.
- """
- app_props = {'opcode' : msg.get_opcode() }
- self.io_adapter.send(dest, app_props, msg.to_dict())
- self.log(LOG_TRACE, "SENT: %r dest=%s" % (msg, dest))
-
-
- def node_updated(self, addr, reachable, neighbor):
- """
- """
- self.router_adapter(addr, reachable, neighbor)
-
-
- ##========================================================================================
- ## Interconnect between the Sub-Modules
- ##========================================================================================
- def local_link_state_changed(self, link_state):
- self.log(LOG_DEBUG, "Event: local_link_state_changed: %r" % link_state)
- self.link_state_engine.new_local_link_state(link_state)
-
- def ls_collection_changed(self, collection):
- self.log(LOG_DEBUG, "Event: ls_collection_changed: %r" % collection)
- self.path_engine.ls_collection_changed(collection)
-
- def next_hops_changed(self, next_hop_table):
- self.log(LOG_DEBUG, "Event: next_hops_changed: %r" % next_hop_table)
- self.routing_table_engine.next_hops_changed(next_hop_table)
- self.binding_engine.next_hops_changed()
-
- def mobile_sequence_changed(self, mobile_seq):
- self.log(LOG_DEBUG, "Event: mobile_sequence_changed: %d" % mobile_seq)
- self.link_state_engine.set_mobile_sequence(mobile_seq)
-
- def mobile_keys_changed(self, keys):
- self.log(LOG_DEBUG, "Event: mobile_keys_changed: %r" % keys)
- self.binding_engine.mobile_keys_changed(keys)
-
- def get_next_hops(self):
- return self.routing_table_engine.get_next_hops()
-
- def remote_routes_changed(self, key_class, routes):
- self.log(LOG_DEBUG, "Event: remote_routes_changed: class=%s routes=%r" % (key_class, routes))
- self.adapter_engine.remote_routes_changed(key_class, routes)
-
- def new_neighbor(self, rid):
- self.log(LOG_DEBUG, "Event: new_neighbor: id=%s" % rid)
- self.node_tracker.new_neighbor(rid)
-
- def lost_neighbor(self, rid):
- self.log(LOG_DEBUG, "Event: lost_neighbor: id=%s" % rid)
- self.node_tracker.lost_neighbor(rid)
-
- def new_node(self, rid):
- self.log(LOG_DEBUG, "Event: new_node: id=%s" % rid)
- self.node_tracker.new_node(rid)
-
- def lost_node(self, rid):
- self.log(LOG_DEBUG, "Event: lost_node: id=%s" % rid)
- self.node_tracker.lost_node(rid)
+ def __init__(self, router_adapter, router_id, area, max_routers, config_override={}):
+ """
+ Initialize an instance of a router for a domain.
+ """
+ ##
+ ## Record important information about this router instance
+ ##
+ self.domain = "domain"
+ self.router_adapter = router_adapter
+ self.log_adapter = LogAdapter("dispatch.router")
+ self.io_adapter = IoAdapter(self, "qdxrouter")
+ self.max_routers = max_routers
+ self.id = router_id
+ self.area = area
+ self.log(LOG_INFO, "Router Engine Instantiated: area=%s id=%s max_routers=%d" %
+ (self.area, self.id, self.max_routers))
+
+ ##
+ ## Setup configuration
+ ##
+ self.config = Configuration(config_override)
+ self.log(LOG_INFO, "Config: %r" % self.config)
+
+ ##
+ ## Launch the sub-module engines
+ ##
+ self.neighbor_engine = NeighborEngine(self)
+ self.link_state_engine = LinkStateEngine(self)
+ self.path_engine = PathEngine(self)
+ self.mobile_address_engine = MobileAddressEngine(self)
+ self.routing_table_engine = RoutingTableEngine(self)
+ self.binding_engine = BindingEngine(self)
+ self.adapter_engine = AdapterEngine(self)
+ self.node_tracker = NodeTracker(self, self.max_routers)
+
+
+
+ ##========================================================================================
+ ## Adapter Entry Points - invoked from the adapter
+ ##========================================================================================
+ def getId(self):
+ """
+ Return the router's ID
+ """
+ return self.id
+
+
+ def addLocalAddress(self, key):
+ """
+ """
+ try:
+ if key.find('_topo') == 0 or key.find('_local') == 0:
+ return
+ self.mobile_address_engine.add_local_address(key)
+ except Exception, e:
+ self.log(LOG_ERROR, "Exception in new-address processing: exception=%r" % e)
+
+
+ def delLocalAddress(self, key):
+ """
+ """
+ try:
+ if key.find('_topo') == 0 or key.find('_local') == 0:
+ return
+ self.mobile_address_engine.del_local_address(key)
+ except Exception, e:
+ self.log(LOG_ERROR, "Exception in del-address processing: exception=%r" % e)
+
+
+ def handleTimerTick(self):
+ """
+ """
+ try:
+ now = time()
+ self.neighbor_engine.tick(now)
+ self.link_state_engine.tick(now)
+ self.path_engine.tick(now)
+ self.mobile_address_engine.tick(now)
+ self.routing_table_engine.tick(now)
+ self.binding_engine.tick(now)
+ self.adapter_engine.tick(now)
+ self.node_tracker.tick(now)
+ except Exception, e:
+ self.log(LOG_ERROR, "Exception in timer processing: exception=%r" % e)
+
+
+ def handleControlMessage(self, opcode, body, link_id):
+ """
+ """
+ try:
+ now = time()
+ if opcode == 'HELLO':
+ msg = MessageHELLO(body)
+ self.log(LOG_TRACE, "RCVD: %r" % msg)
+ self.neighbor_engine.handle_hello(msg, now, link_id)
+
+ elif opcode == 'RA':
+ msg = MessageRA(body)
+ self.log(LOG_TRACE, "RCVD: %r" % msg)
+ self.link_state_engine.handle_ra(msg, now)
+ self.mobile_address_engine.handle_ra(msg, now)
+
+ elif opcode == 'LSU':
+ msg = MessageLSU(body)
+ self.log(LOG_TRACE, "RCVD: %r" % msg)
+ self.link_state_engine.handle_lsu(msg, now)
+
+ elif opcode == 'LSR':
+ msg = MessageLSR(body)
+ self.log(LOG_TRACE, "RCVD: %r" % msg)
+ self.link_state_engine.handle_lsr(msg, now)
+
+ elif opcode == 'MAU':
+ msg = MessageMAU(body)
+ self.log(LOG_TRACE, "RCVD: %r" % msg)
+ self.mobile_address_engine.handle_mau(msg, now)
+
+ elif opcode == 'MAR':
+ msg = MessageMAR(body)
+ self.log(LOG_TRACE, "RCVD: %r" % msg)
+ self.mobile_address_engine.handle_mar(msg, now)
+
+ except Exception, e:
+ self.log(LOG_ERROR, "Exception in message processing: opcode=%s body=%r exception=%r" % (opcode, body, e))
+
+
+ def receive(self, message_properties, body, link_id):
+ """
+ This is the IoAdapter message-receive handler
+ """
+ try:
+ self.handleControlMessage(message_properties['opcode'], body, link_id)
+ except Exception, e:
+ self.log(LOG_ERROR, "Exception in raw message processing: properties=%r body=%r exception=%r" %
+ (message_properties, body, e))
+
+
+ def getRouterData(self, kind):
+ """
+ """
+ if kind == 'help':
+ return { 'help' : "Get list of supported values for kind",
+ 'link-state' : "This router's link state",
+ 'link-state-set' : "The set of link states from known routers",
+ 'next-hops' : "Next hops to each known router",
+ 'topo-table' : "Topological routing table",
+ 'mobile-table' : "Mobile key routing table"
+ }
+ if kind == 'link-state' : return self.neighbor_engine.link_state.to_dict()
+ if kind == 'next-hops' : return self.routing_table_engine.next_hops
+ if kind == 'topo-table' : return {'table': self.adapter_engine.key_classes['topological']}
+ if kind == 'mobile-table' : return {'table': self.adapter_engine.key_classes['mobile-key']}
+ if kind == 'link-state-set' :
+ copy = {}
+ for _id,_ls in self.link_state_engine.collection.items():
+ copy[_id] = _ls.to_dict()
+ return copy
+
+ return {'notice':'Use kind="help" to get a list of possibilities'}
+
+
+ ##========================================================================================
+ ## Adapter Calls - outbound calls to Dispatch
+ ##========================================================================================
+ def log(self, level, text):
+ """
+ Emit a log message to the host's event log
+ """
+ self.log_adapter.log(level, text)
+
+
+ def send(self, dest, msg):
+ """
+ Send a control message to another router.
+ """
+ app_props = {'opcode' : msg.get_opcode() }
+ self.io_adapter.send(dest, app_props, msg.to_dict())
+ self.log(LOG_TRACE, "SENT: %r dest=%s" % (msg, dest))
+
+
+ def node_updated(self, addr, reachable, neighbor):
+ """
+ """
+ self.router_adapter(addr, reachable, neighbor)
+
+
+ ##========================================================================================
+ ## Interconnect between the Sub-Modules
+ ##========================================================================================
+ def local_link_state_changed(self, link_state):
+ self.log(LOG_DEBUG, "Event: local_link_state_changed: %r" % link_state)
+ self.link_state_engine.new_local_link_state(link_state)
+
+ def ls_collection_changed(self, collection):
+ self.log(LOG_DEBUG, "Event: ls_collection_changed: %r" % collection)
+ self.path_engine.ls_collection_changed(collection)
+
+ def next_hops_changed(self, next_hop_table):
+ self.log(LOG_DEBUG, "Event: next_hops_changed: %r" % next_hop_table)
+ self.routing_table_engine.next_hops_changed(next_hop_table)
+ self.binding_engine.next_hops_changed()
+
+ def mobile_sequence_changed(self, mobile_seq):
+ self.log(LOG_DEBUG, "Event: mobile_sequence_changed: %d" % mobile_seq)
+ self.link_state_engine.set_mobile_sequence(mobile_seq)
+
+ def mobile_keys_changed(self, keys):
+ self.log(LOG_DEBUG, "Event: mobile_keys_changed: %r" % keys)
+ self.binding_engine.mobile_keys_changed(keys)
+
+ def get_next_hops(self):
+ return self.routing_table_engine.get_next_hops()
+
+ def remote_routes_changed(self, key_class, routes):
+ self.log(LOG_DEBUG, "Event: remote_routes_changed: class=%s routes=%r" % (key_class, routes))
+ self.adapter_engine.remote_routes_changed(key_class, routes)
+
+ def new_neighbor(self, rid, link_id):
+ self.log(LOG_DEBUG, "Event: new_neighbor: id=%s link_id=%d" % (rid, link_id))
+ self.node_tracker.new_neighbor(rid, link_id)
+
+ def lost_neighbor(self, rid):
+ self.log(LOG_DEBUG, "Event: lost_neighbor: id=%s" % rid)
+ self.node_tracker.lost_neighbor(rid)
+
+ def new_node(self, rid):
+ self.log(LOG_DEBUG, "Event: new_node: id=%s" % rid)
+ self.node_tracker.new_node(rid)
+
+ def lost_node(self, rid):
+ self.log(LOG_DEBUG, "Event: lost_node: id=%s" % rid)
+ self.node_tracker.lost_node(rid)
+
+ def node_updated(self, address, reachable, neighbor, link_bit, router_bit):
+ self.log(LOG_DEBUG, "Event: node_updated: address=%s, reachable=%r, neighbor=%r, link_bit=%d, router_bit=%d" % \
+ (address, reachable, neighbor, link_bit, router_bit))
+ self.router_adapter.node_updataed(address, reachable, neighbor, link_bit, router_bit)
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py b/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py
index 8030582177..39b34bbc9b 100644
--- a/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py
+++ b/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py
@@ -18,39 +18,39 @@
#
try:
- from dispatch import *
+ from dispatch import *
except ImportError:
- from ..stubs import *
+ from ..stubs import *
class RoutingTableEngine(object):
- """
- This module is responsible for converting the set of next hops to remote routers to a routing
- table in the "topological" address class.
- """
- def __init__(self, container):
- self.container = container
- self.id = self.container.id
- self.area = self.container.area
- self.next_hops = {}
+ """
+ This module is responsible for converting the set of next hops to remote routers to a routing
+ table in the "topological" address class.
+ """
+ def __init__(self, container):
+ self.container = container
+ self.id = self.container.id
+ self.area = self.container.area
+ self.next_hops = {}
- def tick(self, now):
- pass
+ def tick(self, now):
+ pass
- def next_hops_changed(self, next_hops):
- # Convert next_hops into routing table
- self.next_hops = next_hops
- new_table = []
- for _id, next_hop in next_hops.items():
- new_table.append(('_topo.%s.%s.#' % (self.area, _id), next_hop))
- pair = ('_topo.%s.all' % (self.area), next_hop)
- if new_table.count(pair) == 0:
- new_table.append(pair)
+ def next_hops_changed(self, next_hops):
+ # Convert next_hops into routing table
+ self.next_hops = next_hops
+ new_table = []
+ for _id, next_hop in next_hops.items():
+ new_table.append(('_topo.%s.%s.#' % (self.area, _id), next_hop))
+ pair = ('_topo.%s.all' % (self.area), next_hop)
+ if new_table.count(pair) == 0:
+ new_table.append(pair)
- self.container.remote_routes_changed('topological', new_table)
+ self.container.remote_routes_changed('topological', new_table)
- def get_next_hops(self):
- return self.next_hops
+ def get_next_hops(self):
+ return self.next_hops
diff --git a/qpid/extras/dispatch/router/src/main.c b/qpid/extras/dispatch/router/src/main.c
index 62e3a2beb6..e0d6849d1b 100644
--- a/qpid/extras/dispatch/router/src/main.c
+++ b/qpid/extras/dispatch/router/src/main.c
@@ -117,7 +117,7 @@ int main(int argc, char **argv)
}
}
- dx_log_set_mask(0xFFFFFFFF);
+ dx_log_set_mask(0xFFFFFFFE);
dispatch = dx_dispatch(config_path);
diff --git a/qpid/extras/dispatch/src/agent.c b/qpid/extras/dispatch/src/agent.c
index 557410910c..2063b8fcc4 100644
--- a/qpid/extras/dispatch/src/agent.c
+++ b/qpid/extras/dispatch/src/agent.c
@@ -240,7 +240,7 @@ static void dx_agent_deferred_handler(void *context)
}
-static void dx_agent_rx_handler(void *context, dx_message_t *msg)
+static void dx_agent_rx_handler(void *context, dx_message_t *msg, int unused_link_id)
{
dx_agent_t *agent = (dx_agent_t*) context;
dx_message_t *copy = dx_message_copy(msg);
diff --git a/qpid/extras/dispatch/src/amqp.c b/qpid/extras/dispatch/src/amqp.c
new file mode 100644
index 0000000000..6a8545b757
--- /dev/null
+++ b/qpid/extras/dispatch/src/amqp.c
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/amqp.h>
+
+const char * const DX_DA_INGRESS = "qdx.ingress";
+const char * const DX_DA_TRACE = "qdx.trace";
+const char * const DX_DA_TO = "qdx.to";
+
+const char * const DX_CAPABILITY_ROUTER = "qdx.router";
+
+const char * const DX_INTERNODE_LINK_NAME_1 = "qdx.internode.1";
+const char * const DX_INTERNODE_LINK_NAME_2 = "qdx.internode.2";
+
diff --git a/qpid/extras/dispatch/src/bitmask.c b/qpid/extras/dispatch/src/bitmask.c
new file mode 100644
index 0000000000..5341f8d83a
--- /dev/null
+++ b/qpid/extras/dispatch/src/bitmask.c
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/bitmask.h>
+#include <qpid/dispatch/alloc.h>
+#include <assert.h>
+
+#define DX_BITMASK_LONGS 16
+#define DX_BITMASK_BITS (DX_BITMASK_LONGS * 64)
+
+struct dx_bitmask_t {
+ uint64_t array[DX_BITMASK_LONGS];
+ int first_set;
+};
+
+ALLOC_DECLARE(dx_bitmask_t);
+ALLOC_DEFINE(dx_bitmask_t);
+
+#define MASK_INDEX(num) (num / 64)
+#define MASK_ONEHOT(num) (1 << (num % 64))
+#define FIRST_NONE -1
+#define FIRST_UNKNOWN -2
+
+
+int dx_bitmask_width()
+{
+ return DX_BITMASK_BITS;
+}
+
+
+dx_bitmask_t *dx_bitmask(int initial)
+{
+ dx_bitmask_t *b = new_dx_bitmask_t();
+ if (initial)
+ dx_bitmask_set_all(b);
+ else
+ dx_bitmask_clear_all(b);
+ return b;
+}
+
+
+void dx_bitmask_free(dx_bitmask_t *b)
+{
+ free_dx_bitmask_t(b);
+}
+
+
+void dx_bitmask_set_all(dx_bitmask_t *b)
+{
+ for (int i = 0; i < DX_BITMASK_LONGS; i++)
+ b->array[i] = 0xFFFFFFFFFFFFFFFF;
+ b->first_set = 0;
+}
+
+
+void dx_bitmask_clear_all(dx_bitmask_t *b)
+{
+ for (int i = 0; i < DX_BITMASK_LONGS; i++)
+ b->array[i] = 0;
+ b->first_set = FIRST_NONE;
+}
+
+
+void dx_bitmask_set_bit(dx_bitmask_t *b, int bitnum)
+{
+ assert(bitnum < DX_BITMASK_BITS);
+ b->array[MASK_INDEX(bitnum)] |= MASK_ONEHOT(bitnum);
+ if (b->first_set > bitnum)
+ b->first_set = bitnum;
+}
+
+
+void dx_bitmask_clear_bit(dx_bitmask_t *b, int bitnum)
+{
+ assert(bitnum < DX_BITMASK_BITS);
+ b->array[MASK_INDEX(bitnum)] &= ~(MASK_ONEHOT(bitnum));
+ if (b->first_set == bitnum)
+ b->first_set = FIRST_UNKNOWN;
+}
+
+
+int dx_bitmask_value(dx_bitmask_t *b, int bitnum)
+{
+ return (b->array[MASK_INDEX(bitnum)] & MASK_ONEHOT(bitnum)) ? 1 : 0;
+}
+
+
+int dx_bitmask_first_set(dx_bitmask_t *b, int *bitnum)
+{
+ if (b->first_set == FIRST_UNKNOWN) {
+ b->first_set = FIRST_NONE;
+ for (int i = 0; i < DX_BITMASK_LONGS; i++)
+ if (b->array[i]) {
+ for (int j = 0; j < 64; j++)
+ if ((1 << j) & b->array[i]) {
+ b->first_set = i * 64 + j;
+ break;
+ }
+ break;
+ }
+ }
+
+ if (b->first_set == FIRST_NONE)
+ return 0;
+ *bitnum = b->first_set;
+ return 1;
+}
+
diff --git a/qpid/extras/dispatch/src/message_private.h b/qpid/extras/dispatch/src/message_private.h
index 27b81bbb4c..c57cea5f0d 100644
--- a/qpid/extras/dispatch/src/message_private.h
+++ b/qpid/extras/dispatch/src/message_private.h
@@ -67,7 +67,7 @@ typedef struct {
sys_mutex_t *lock;
uint32_t ref_count; // The number of messages referencing this
dx_buffer_list_t buffers; // The buffer chain containing the message
- dx_buffer_list_t new_delivery_annotations; // The buffer chain containing the new delivery annotations
+ dx_buffer_list_t new_delivery_annotations; // The buffer chain containing the new delivery annotations (MOVE TO MSG_PVT)
dx_field_location_t section_message_header; // The message header list
dx_field_location_t section_delivery_annotation; // The delivery annotation map
dx_field_location_t section_message_annotation; // The message annotation map
diff --git a/qpid/extras/dispatch/src/python_embedded.c b/qpid/extras/dispatch/src/python_embedded.c
index 0b0cc11025..2db8f583de 100644
--- a/qpid/extras/dispatch/src/python_embedded.c
+++ b/qpid/extras/dispatch/src/python_embedded.c
@@ -402,7 +402,7 @@ typedef struct {
} IoAdapter;
-static void dx_io_rx_handler(void *context, dx_message_t *msg)
+static void dx_io_rx_handler(void *context, dx_message_t *msg, int link_id)
{
IoAdapter *self = (IoAdapter*) context;
@@ -454,9 +454,10 @@ static void dx_io_rx_handler(void *context, dx_message_t *msg)
PyObject *pAP = dx_field_to_py(ap_map);
PyObject *pBody = dx_field_to_py(body_map);
- PyObject *pArgs = PyTuple_New(2);
+ PyObject *pArgs = PyTuple_New(3);
PyTuple_SetItem(pArgs, 0, pAP);
PyTuple_SetItem(pArgs, 1, pBody);
+ PyTuple_SetItem(pArgs, 2, PyInt_FromLong((long) link_id));
PyObject *pValue = PyObject_CallObject(self->handler_rx_call, pArgs);
Py_DECREF(pArgs);
@@ -507,10 +508,10 @@ static PyObject* dx_python_send(PyObject *self, PyObject *args)
field = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, field);
dx_compose_start_map(field);
- dx_compose_insert_string(field, "qdx.ingress");
+ dx_compose_insert_string(field, DX_DA_INGRESS);
dx_compose_insert_string(field, dx_router_id(ioa->dx));
- dx_compose_insert_string(field, "qdx.trace");
+ dx_compose_insert_string(field, DX_DA_TRACE);
dx_compose_start_list(field);
dx_compose_insert_string(field, dx_router_id(ioa->dx));
dx_compose_end_list(field);
diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c
index d2704c4bd5..dc63a45451 100644
--- a/qpid/extras/dispatch/src/router_node.c
+++ b/qpid/extras/dispatch/src/router_node.c
@@ -21,17 +21,18 @@
#include <stdio.h>
#include <string.h>
#include <stdbool.h>
+#include <stdlib.h>
#include <qpid/dispatch.h>
#include "dispatch_private.h"
+#include "router_private.h"
static char *module = "ROUTER";
static void dx_router_python_setup(dx_router_t *router);
static void dx_pyrouter_tick(dx_router_t *router);
-static char *router_address = "_local/qdxrouter";
-static char *local_prefix = "_local/";
-//static char *topo_prefix = "_topo/";
+static char *local_prefix = "_local/";
+static char *topo_prefix = "_topo/";
/**
* Address Types and Processing:
@@ -48,86 +49,82 @@ static char *local_prefix = "_local/";
* <mobile> M<mobile> forward handler
*/
+ALLOC_DEFINE(dx_routed_event_t);
+ALLOC_DEFINE(dx_router_link_t);
+ALLOC_DEFINE(dx_router_node_t);
+ALLOC_DEFINE(dx_router_ref_t);
+ALLOC_DEFINE(dx_router_link_ref_t);
+ALLOC_DEFINE(dx_address_t);
-typedef struct dx_router_link_t dx_router_link_t;
-typedef struct dx_router_node_t dx_router_node_t;
+static void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link)
+{
+ dx_router_link_ref_t *ref = new_dx_router_link_ref_t();
+ DEQ_ITEM_INIT(ref);
+ ref->link = link;
+ link->ref = ref;
+ DEQ_INSERT_TAIL(*ref_list, ref);
+}
-typedef enum {
- DX_LINK_ENDPOINT, // A link to a connected endpoint
- DX_LINK_ROUTER, // A link to a peer router in the same area
- DX_LINK_AREA // A link to a peer router in a different area (area boundary)
-} dx_link_type_t;
+static void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link)
+{
+ if (link->ref) {
+ DEQ_REMOVE(*ref_list, link->ref);
+ free_dx_router_link_ref_t(link->ref);
+ link->ref = 0;
+ }
+}
-typedef struct dx_routed_event_t {
- DEQ_LINKS(struct dx_routed_event_t);
- dx_delivery_t *delivery;
- dx_message_t *message;
- bool settle;
- uint64_t disposition;
-} dx_routed_event_t;
-ALLOC_DECLARE(dx_routed_event_t);
-ALLOC_DEFINE(dx_routed_event_t);
-DEQ_DECLARE(dx_routed_event_t, dx_routed_event_list_t);
-
-
-struct dx_router_link_t {
- DEQ_LINKS(dx_router_link_t);
- dx_direction_t link_direction;
- dx_link_type_t link_type;
- dx_address_t *owning_addr; // [ref] Address record that owns this link
- dx_link_t *link; // [own] Link pointer
- dx_router_link_t *connected_link; // [ref] If this is a link-route, reference the connected link
- dx_router_link_t *peer_link; // [ref] If this is a bidirectional link-route, reference the peer link
- dx_routed_event_list_t event_fifo; // FIFO of outgoing delivery/link events (no messages)
- dx_routed_event_list_t msg_fifo; // FIFO of outgoing message deliveries
-};
+/**
+ * Check an address to see if it no longer has any associated destinations.
+ * Depending on its policy, the address may be eligible for being closed out
+ * (i.e. Logging its terminal statistics and freeing its resources).
+ */
+static void dx_router_check_addr_LH(dx_address_t *addr)
+{
+ // TODO
+}
-ALLOC_DECLARE(dx_router_link_t);
-ALLOC_DEFINE(dx_router_link_t);
-DEQ_DECLARE(dx_router_link_t, dx_router_link_list_t);
-
-struct dx_router_node_t {
- DEQ_LINKS(dx_router_node_t);
- const char *id;
- dx_router_node_t *next_hop; // Next hop node _if_ this is not a neighbor node
- dx_router_link_t *peer_link; // Outgoing link _if_ this is a neighbor node
- // list of valid origins (pointers to router_node) - (bit masks?)
-};
-ALLOC_DECLARE(dx_router_node_t);
-ALLOC_DEFINE(dx_router_node_t);
-DEQ_DECLARE(dx_router_node_t, dx_router_node_list_t);
+/**
+ * Determine whether a terminus has router capability
+ */
+static int dx_router_terminus_is_router(pn_terminus_t *term)
+{
+ pn_data_t *cap = pn_terminus_capabilities(term);
+ if (cap && pn_data_type(cap) == PN_SYMBOL) {
+ pn_bytes_t sym = pn_data_get_symbol(cap);
+ if (sym.size == strlen(DX_CAPABILITY_ROUTER) &&
+ strcmp(sym.start, DX_CAPABILITY_ROUTER) == 0)
+ return 1;
+ }
-struct dx_address_t {
- dx_router_message_cb handler; // In-Process Consumer
- void *handler_context;
- dx_router_link_list_t rlinks; // Locally-Connected Consumers
- dx_router_node_list_t rnodes; // Remotely-Connected Consumers
-};
+ return 0;
+}
-ALLOC_DECLARE(dx_address_t);
-ALLOC_DEFINE(dx_address_t);
+static void dx_router_generate_temp_addr(dx_router_t *router, char *buffer, size_t length)
+{
+ static const char *table = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+_";
+ char discriminator[11];
+ long int rnd = random();
+ int idx;
+
+ for (idx = 0; idx < 10; idx++)
+ discriminator[idx] = table[(rnd >> (idx * 6)) & 63];
+ discriminator[idx] = '\0';
-struct dx_router_t {
- dx_dispatch_t *dx;
- const char *router_area;
- const char *router_id;
- dx_node_t *node;
- dx_router_link_list_t in_links;
- dx_router_node_list_t routers;
- dx_message_list_t in_fifo;
- sys_mutex_t *lock;
- dx_timer_t *timer;
- hash_t *out_hash;
- uint64_t dtag;
- PyObject *pyRouter;
- PyObject *pyTick;
-};
+ snprintf(buffer, length, "amqp:/%s%s/%s/temp.%s", topo_prefix, router->router_area, router->router_id, discriminator);
+}
+
+
+static int dx_router_find_mask_bit(dx_link_t *link)
+{
+ return 0; // TODO
+}
/**
@@ -191,7 +188,7 @@ static int router_writable_link_handler(void* context, dx_link_t *link)
DEQ_REMOVE_HEAD(to_send);
//
- // Get a delivery for the send. This will be the current deliver on the link.
+ // Get a delivery for the send. This will be the current delivery on the link.
//
tag++;
delivery = dx_delivery(link, pn_dtag((char*) &tag, 8));
@@ -259,8 +256,8 @@ static void router_annotate_message(dx_router_t *router, dx_message_t *msg)
dx_parsed_field_t *ingress = 0;
if (in_da) {
- trace = dx_parse_value_by_key(in_da, "qdx.trace");
- ingress = dx_parse_value_by_key(in_da, "qdx.ingress");
+ trace = dx_parse_value_by_key(in_da, DX_DA_TRACE);
+ ingress = dx_parse_value_by_key(in_da, DX_DA_INGRESS);
}
dx_compose_start_map(out_da);
@@ -269,7 +266,7 @@ static void router_annotate_message(dx_router_t *router, dx_message_t *msg)
// If there is a trace field, append this router's ID to the trace.
//
if (trace && dx_parse_is_list(trace)) {
- dx_compose_insert_string(out_da, "qdx.trace");
+ dx_compose_insert_string(out_da, DX_DA_TRACE);
dx_compose_start_list(out_da);
uint32_t idx = 0;
@@ -289,7 +286,7 @@ static void router_annotate_message(dx_router_t *router, dx_message_t *msg)
// If there is no ingress field, annotate the ingress as this router else
// keep the original field.
//
- dx_compose_insert_string(out_da, "qdx.ingress");
+ dx_compose_insert_string(out_da, DX_DA_INGRESS);
if (ingress && dx_parse_is_scalar(ingress)) {
dx_field_iterator_t *iter = dx_parse_raw(ingress);
dx_compose_insert_string_iterator(out_da, iter);
@@ -380,7 +377,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
if (iter) {
dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
- hash_retrieve(router->out_hash, iter, (void*) &addr);
+ hash_retrieve(router->addr_hash, iter, (void*) &addr);
dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST);
int is_local = dx_field_iterator_prefix(iter, local_prefix);
dx_field_iterator_free(iter);
@@ -415,33 +412,34 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
//
// Forward to all of the local links receiving this address.
//
- dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks);
- while (dest_link) {
+ dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks);
+ while (dest_link_ref) {
dx_routed_event_t *re = new_dx_routed_event_t();
DEQ_ITEM_INIT(re);
re->delivery = 0;
re->message = dx_message_copy(msg);
re->settle = 0;
re->disposition = 0;
- DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+ DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re);
fanout++;
if (fanout == 1 && !dx_delivery_settled(delivery))
re->delivery = delivery;
- dx_link_activate(dest_link->link);
- dest_link = DEQ_NEXT(dest_link);
+ dx_link_activate(dest_link_ref->link->link);
+ dest_link_ref = DEQ_NEXT(dest_link_ref);
}
//
// Forward to the next-hops for remote destinations.
//
- dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes);
- while (dest_node) {
- if (dest_node->next_hop)
- dest_link = dest_node->next_hop->peer_link;
+ dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes);
+ dx_router_link_t *dest_link;
+ while (dest_node_ref) {
+ if (dest_node_ref->router->next_hop)
+ dest_link = dest_node_ref->router->next_hop->peer_link;
else
- dest_link = dest_node->peer_link;
+ dest_link = dest_node_ref->router->peer_link;
if (dest_link) {
dx_routed_event_t *re = new_dx_routed_event_t();
DEQ_ITEM_INIT(re);
@@ -457,7 +455,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
dx_link_activate(dest_link->link);
}
- dest_node = DEQ_NEXT(dest_node);
+ dest_node_ref = DEQ_NEXT(dest_node_ref);
}
}
}
@@ -487,7 +485,7 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
// Invoke the in-process handler now that the lock is released.
//
if (handler)
- handler(handler_context, in_process_copy);
+ handler(handler_context, in_process_copy, rlink->mask_bit);
}
@@ -541,25 +539,28 @@ static int router_incoming_link_handler(void* context, dx_link_t *link)
dx_router_t *router = (dx_router_t*) context;
dx_router_link_t *rlink = new_dx_router_link_t();
pn_link_t *pn_link = dx_link_pn(link);
+ int is_router = dx_router_terminus_is_router(dx_link_remote_source(link));
DEQ_ITEM_INIT(rlink);
+ rlink->mask_bit = is_router ? dx_router_find_mask_bit(link) : 0;
+ rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT;
rlink->link_direction = DX_INCOMING;
- rlink->link_type = DX_LINK_ENDPOINT;
rlink->owning_addr = 0;
rlink->link = link;
rlink->connected_link = 0;
rlink->peer_link = 0;
+ rlink->ref = 0;
DEQ_INIT(rlink->event_fifo);
DEQ_INIT(rlink->msg_fifo);
dx_link_set_context(link, rlink);
sys_mutex_lock(router->lock);
- DEQ_INSERT_TAIL(router->in_links, rlink);
+ DEQ_INSERT_TAIL(router->links, rlink);
sys_mutex_unlock(router->lock);
- pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
- pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
+ pn_terminus_copy(dx_link_source(link), dx_link_remote_source(link));
+ pn_terminus_copy(dx_link_target(link), dx_link_remote_target(link));
pn_link_flow(pn_link, 1000);
pn_link_open(pn_link);
@@ -579,52 +580,90 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
{
dx_router_t *router = (dx_router_t*) context;
pn_link_t *pn_link = dx_link_pn(link);
- const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link));
+ const char *r_src = pn_terminus_get_address(dx_link_remote_source(link));
+ int is_dynamic = pn_terminus_is_dynamic(dx_link_remote_source(link));
+ int is_router = dx_router_terminus_is_router(dx_link_remote_target(link));
- if (!r_tgt) {
+ //
+ // If this link is not a router link and it has no source address, we can't
+ // accept it.
+ //
+ if (r_src == 0 && !is_router && !is_dynamic) {
pn_link_close(pn_link);
return 0;
}
- dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST);
- dx_router_link_t *rlink = new_dx_router_link_t();
-
- int is_router = dx_field_iterator_equal(iter, (unsigned char*) router_address);
-
+ //
+ // Create a router_link record for this link. Some of the fields will be
+ // modified in the different cases below.
+ //
+ dx_router_link_t *rlink = new_dx_router_link_t();
DEQ_ITEM_INIT(rlink);
- rlink->link_direction = DX_OUTGOING;
+ rlink->mask_bit = is_router ? dx_router_find_mask_bit(link) : 0;
rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT;
+ rlink->link_direction = DX_OUTGOING;
+ rlink->owning_addr = 0;
rlink->link = link;
rlink->connected_link = 0;
rlink->peer_link = 0;
+ rlink->ref = 0;
DEQ_INIT(rlink->event_fifo);
DEQ_INIT(rlink->msg_fifo);
dx_link_set_context(link, rlink);
-
- dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
- dx_address_t *addr;
+ pn_terminus_copy(dx_link_source(link), dx_link_remote_source(link));
+ pn_terminus_copy(dx_link_target(link), dx_link_remote_target(link));
sys_mutex_lock(router->lock);
- hash_retrieve(router->out_hash, iter, (void**) &addr);
- if (!addr) {
- addr = new_dx_address_t();
- addr->handler = 0;
- addr->handler_context = 0;
- DEQ_INIT(addr->rlinks);
- DEQ_INIT(addr->rnodes);
- hash_insert(router->out_hash, iter, addr);
+
+ if (is_router) {
+ //
+ // If this is a router link, put it in the router_address link-list.
+ //
+ dx_router_add_link_ref_LH(&router->router_addr->rlinks, rlink);
+ rlink->owning_addr = router->router_addr;
+
+ } else {
+ //
+ // If this is an endpoint link, check the source. If it is dynamic, we will
+ // assign it an ephemeral and routable address. If it has a non-dymanic
+ // address, that address needs to be set up in the address list.
+ //
+ dx_field_iterator_t *iter;
+ char temp_addr[1000];
+ dx_address_t *addr;
+
+ if (is_dynamic) {
+ dx_router_generate_temp_addr(router, temp_addr, 1000);
+ iter = dx_field_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH);
+ pn_terminus_set_address(dx_link_source(link), temp_addr);
+ dx_log(module, LOG_INFO, "Assigned temporary routable address: %s", temp_addr);
+ } else {
+ iter = dx_field_iterator_string(r_src, ITER_VIEW_ADDRESS_HASH);
+ dx_log(module, LOG_INFO, "Registered local address: %s", r_src);
+ }
+
+ hash_retrieve(router->addr_hash, iter, (void**) &addr);
+ if (!addr) {
+ addr = new_dx_address_t();
+ DEQ_ITEM_INIT(addr);
+ addr->handler = 0;
+ addr->handler_context = 0;
+ DEQ_INIT(addr->rlinks);
+ DEQ_INIT(addr->rnodes);
+ hash_insert(router->addr_hash, iter, addr);
+ DEQ_INSERT_TAIL(router->addrs, addr);
+ }
+ dx_field_iterator_free(iter);
+
+ rlink->owning_addr = addr;
+ dx_router_add_link_ref_LH(&addr->rlinks, rlink);
}
- dx_field_iterator_free(iter);
- rlink->owning_addr = addr;
- DEQ_INSERT_TAIL(addr->rlinks, rlink);
+ DEQ_INSERT_TAIL(router->links, rlink);
+ sys_mutex_unlock(router->lock);
- pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
- pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
pn_link_open(pn_link);
- sys_mutex_unlock(router->lock);
- dx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt);
return 0;
}
@@ -634,40 +673,37 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
*/
static int router_link_detach_handler(void* context, dx_link_t *link, int closed)
{
- dx_router_t *router = (dx_router_t*) context;
- pn_link_t *pn_link = dx_link_pn(link);
- dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
- const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link));
+ dx_router_t *router = (dx_router_t*) context;
+ dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
if (!rlink)
return 0;
sys_mutex_lock(router->lock);
- if (pn_link_is_sender(pn_link)) {
- DEQ_REMOVE(rlink->owning_addr->rlinks, rlink);
-
- if ((rlink->owning_addr->handler == 0) &&
- (DEQ_SIZE(rlink->owning_addr->rlinks) == 0) &&
- (DEQ_SIZE(rlink->owning_addr->rnodes) == 0)) {
- dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH);
- dx_address_t *addr;
- if (iter) {
- hash_retrieve(router->out_hash, iter, (void**) &addr);
- if (addr == rlink->owning_addr) {
- hash_remove(router->out_hash, iter);
- free_dx_router_link_t(rlink);
- free_dx_address_t(addr);
- dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt);
- }
- dx_field_iterator_free(iter);
- }
- }
- } else {
- DEQ_REMOVE(router->in_links, rlink);
- free_dx_router_link_t(rlink);
+
+ //
+ // If the link is outgoing, we must disassociate it from its address.
+ //
+ if (rlink->link_direction == DX_OUTGOING && rlink->owning_addr) {
+ dx_router_del_link_ref_LH(&rlink->owning_addr->rlinks, rlink);
+ dx_router_check_addr_LH(rlink->owning_addr);
}
+ //
+ // If this is an incoming inter-router link, we must free the mask_bit.
+ //
+ if (rlink->link_type == DX_LINK_ROUTER && rlink->link_direction == DX_INCOMING)
+ dx_bitmask_set_bit(router->neighbor_free_mask, rlink->mask_bit);
+
+ //
+ // Remove the link from the master list-of-links.
+ //
+ DEQ_REMOVE(router->links, rlink);
sys_mutex_unlock(router->lock);
+
+ // TODO - wrap the free to handle the recursive items
+ free_dx_router_link_t(rlink);
+
return 0;
}
@@ -683,24 +719,37 @@ static void router_outbound_open_handler(void *type_context, dx_connection_t *co
// Ignore otherwise
dx_router_t *router = (dx_router_t*) type_context;
- dx_field_iterator_t *aiter = dx_field_iterator_string(router_address, ITER_VIEW_ADDRESS_HASH);
dx_link_t *sender;
dx_link_t *receiver;
dx_router_link_t *rlink;
+ int mask_bit = 0;
+ size_t clen = strlen(DX_CAPABILITY_ROUTER);
+
+ //
+ // Allocate a mask bit to designate the pair of links connected to the neighbor router
+ //
+ sys_mutex_lock(router->lock);
+ if (dx_bitmask_first_set(router->neighbor_free_mask, &mask_bit)) {
+ dx_bitmask_clear_bit(router->neighbor_free_mask, mask_bit);
+ } else {
+ sys_mutex_unlock(router->lock);
+ dx_log(module, LOG_CRITICAL, "Exceeded maximum inter-router link count");
+ return;
+ }
//
- // Create an incoming link and put it in the in-links collection. The address
- // of the remote source of this link is '_local/qdxrouter'.
+ // Create an incoming link with router source capability
//
- receiver = dx_link(router->node, conn, DX_INCOMING, "inter-router-rx");
- pn_terminus_set_address(dx_link_remote_source(receiver), router_address);
- pn_terminus_set_address(dx_link_target(receiver), router_address);
+ receiver = dx_link(router->node, conn, DX_INCOMING, DX_INTERNODE_LINK_NAME_1);
+ // TODO - We don't want to have to cast away the constness of the literal string here!
+ // See PROTON-429
+ pn_data_put_symbol(pn_terminus_capabilities(dx_link_target(receiver)), pn_bytes(clen, (char*) DX_CAPABILITY_ROUTER));
rlink = new_dx_router_link_t();
-
DEQ_ITEM_INIT(rlink);
- rlink->link_direction = DX_INCOMING;
+ rlink->mask_bit = mask_bit;
rlink->link_type = DX_LINK_ROUTER;
+ rlink->link_direction = DX_INCOMING;
rlink->owning_addr = 0;
rlink->link = receiver;
rlink->connected_link = 0;
@@ -709,53 +758,40 @@ static void router_outbound_open_handler(void *type_context, dx_connection_t *co
DEQ_INIT(rlink->msg_fifo);
dx_link_set_context(receiver, rlink);
-
- sys_mutex_lock(router->lock);
- DEQ_INSERT_TAIL(router->in_links, rlink);
- sys_mutex_unlock(router->lock);
+ DEQ_INSERT_TAIL(router->links, rlink);
//
- // Create an outgoing link with a local source of '_local/qdxrouter' and place
- // it in the routing table.
+ // Create an outgoing link with router target capability
//
- sender = dx_link(router->node, conn, DX_OUTGOING, "inter-router-tx");
- pn_terminus_set_address(dx_link_remote_target(sender), router_address);
- pn_terminus_set_address(dx_link_source(sender), router_address);
+ sender = dx_link(router->node, conn, DX_OUTGOING, DX_INTERNODE_LINK_NAME_2);
+ // TODO - We don't want to have to cast away the constness of the literal string here!
+ // See PROTON-429
+ pn_data_put_symbol(pn_terminus_capabilities(dx_link_target(receiver)), pn_bytes(clen, (char *) DX_CAPABILITY_ROUTER));
rlink = new_dx_router_link_t();
-
DEQ_ITEM_INIT(rlink);
- rlink->link_direction = DX_OUTGOING;
+ rlink->mask_bit = mask_bit;
rlink->link_type = DX_LINK_ROUTER;
+ rlink->link_direction = DX_OUTGOING;
+ rlink->owning_addr = router->router_addr;
rlink->link = sender;
rlink->connected_link = 0;
rlink->peer_link = 0;
DEQ_INIT(rlink->event_fifo);
DEQ_INIT(rlink->msg_fifo);
- dx_link_set_context(sender, rlink);
-
- dx_address_t *addr;
-
- sys_mutex_lock(router->lock);
- hash_retrieve(router->out_hash, aiter, (void**) &addr);
- if (!addr) {
- addr = new_dx_address_t();
- addr->handler = 0;
- addr->handler_context = 0;
- DEQ_INIT(addr->rlinks);
- DEQ_INIT(addr->rnodes);
- hash_insert(router->out_hash, aiter, addr);
- }
+ //
+ // Add the new outgoing link to the router_address's list of links.
+ //
+ dx_router_add_link_ref_LH(&router->router_addr->rlinks, rlink);
- rlink->owning_addr = addr;
- DEQ_INSERT_TAIL(addr->rlinks, rlink);
+ dx_link_set_context(sender, rlink);
+ DEQ_INSERT_TAIL(router->links, rlink);
sys_mutex_unlock(router->lock);
pn_link_open(dx_link_pn(receiver));
pn_link_open(dx_link_pn(sender));
pn_link_flow(dx_link_pn(receiver), 1000);
- dx_field_iterator_free(aiter);
}
@@ -767,7 +803,6 @@ static void dx_router_timer_handler(void *context)
// Periodic processing.
//
dx_pyrouter_tick(router);
-
dx_timer_schedule(router->timer, 1000);
}
@@ -797,20 +832,29 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id)
router_node.type_context = router;
+ dx->router = router;
router->dx = dx;
router->router_area = area;
router->router_id = id;
router->node = dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH);
- DEQ_INIT(router->in_links);
+ DEQ_INIT(router->addrs);
+ router->addr_hash = hash(10, 32, 0);
+
+ DEQ_INIT(router->links);
DEQ_INIT(router->routers);
- DEQ_INIT(router->in_fifo);
- router->lock = sys_mutex();
- router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router);
- router->out_hash = hash(10, 32, 0);
- router->dtag = 1;
- router->pyRouter = 0;
- router->pyTick = 0;
+ router->neighbor_free_mask = dx_bitmask(1);
+ router->lock = sys_mutex();
+ router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router);
+ router->dtag = 1;
+ router->pyRouter = 0;
+ router->pyTick = 0;
+
+ //
+ // Create an address for all of the routers in the topology. It will be registered
+ // locally later in the initialization sequence.
+ //
+ router->router_addr = dx_router_register_address(dx, "qdxrouter", 0, 0);
//
// Inform the field iterator module of this router's id and area. The field iterator
@@ -824,7 +868,6 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id)
dx_python_start();
dx_log(module, LOG_INFO, "Router started, area=%s id=%s", area, id);
-
return router;
}
@@ -869,14 +912,17 @@ dx_address_t *dx_router_register_address(dx_dispatch_t *dx,
iter = dx_field_iterator_string(addr_string, ITER_VIEW_NO_HOST);
sys_mutex_lock(router->lock);
- hash_retrieve(router->out_hash, iter, (void**) &addr);
+ hash_retrieve(router->addr_hash, iter, (void**) &addr);
if (!addr) {
addr = new_dx_address_t();
+ DEQ_ITEM_INIT(addr);
addr->handler = 0;
addr->handler_context = 0;
DEQ_INIT(addr->rlinks);
DEQ_INIT(addr->rnodes);
- hash_insert(router->out_hash, iter, addr);
+ hash_insert(router->addr_hash, iter, addr);
+ DEQ_ITEM_INIT(addr);
+ DEQ_INSERT_TAIL(router->addrs, addr);
}
dx_field_iterator_free(iter);
@@ -885,7 +931,8 @@ dx_address_t *dx_router_register_address(dx_dispatch_t *dx,
sys_mutex_unlock(router->lock);
- dx_log(module, LOG_TRACE, "In-Process Address Registered: %s", address);
+ if (handler)
+ dx_log(module, LOG_INFO, "In-Process Address Registered: %s", address);
return addr;
}
@@ -905,34 +952,35 @@ void dx_router_send(dx_dispatch_t *dx,
dx_field_iterator_reset_view(address, ITER_VIEW_ADDRESS_HASH);
sys_mutex_lock(router->lock);
- hash_retrieve(router->out_hash, address, (void*) &addr);
+ hash_retrieve(router->addr_hash, address, (void*) &addr);
if (addr) {
//
// Forward to all of the local links receiving this address.
//
- dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks);
- while (dest_link) {
+ dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks);
+ while (dest_link_ref) {
dx_routed_event_t *re = new_dx_routed_event_t();
DEQ_ITEM_INIT(re);
re->delivery = 0;
re->message = dx_message_copy(msg);
re->settle = 0;
re->disposition = 0;
- DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+ DEQ_INSERT_TAIL(dest_link_ref->link->msg_fifo, re);
- dx_link_activate(dest_link->link);
- dest_link = DEQ_NEXT(dest_link);
+ dx_link_activate(dest_link_ref->link->link);
+ dest_link_ref = DEQ_NEXT(dest_link_ref);
}
//
// Forward to the next-hops for remote destinations.
//
- dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes);
- while (dest_node) {
- if (dest_node->next_hop)
- dest_link = dest_node->next_hop->peer_link;
+ dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes);
+ dx_router_link_t *dest_link;
+ while (dest_node_ref) {
+ if (dest_node_ref->router->next_hop)
+ dest_link = dest_node_ref->router->next_hop->peer_link;
else
- dest_link = dest_node->peer_link;
+ dest_link = dest_node_ref->router->peer_link;
if (dest_link) {
dx_routed_event_t *re = new_dx_routed_event_t();
DEQ_ITEM_INIT(re);
@@ -943,7 +991,7 @@ void dx_router_send(dx_dispatch_t *dx,
DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
dx_link_activate(dest_link->link);
}
- dest_node = DEQ_NEXT(dest_node);
+ dest_node_ref = DEQ_NEXT(dest_node_ref);
}
}
sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher?
@@ -977,8 +1025,11 @@ static PyObject* dx_router_node_updated(PyObject *self, PyObject *args)
const char *address;
int is_reachable;
int is_neighbor;
+ int link_maskbit;
+ int router_maskbit;
- if (!PyArg_ParseTuple(args, "sii", &address, &is_reachable, &is_neighbor))
+ if (!PyArg_ParseTuple(args, "siiii", &address, &is_reachable, &is_neighbor,
+ &link_maskbit, &router_maskbit))
return 0;
// TODO
@@ -1099,6 +1150,7 @@ static void dx_router_python_setup(dx_router_t *router)
PyObject* pName;
PyObject* pId;
PyObject* pArea;
+ PyObject* pMaxRouters;
PyObject* pModule;
PyObject* pClass;
PyObject* pArgs;
@@ -1126,7 +1178,7 @@ static void dx_router_python_setup(dx_router_t *router)
//
// Constructor Arguments for RouterEngine
//
- pArgs = PyTuple_New(3);
+ pArgs = PyTuple_New(4);
// arg 0: adapter instance
PyTuple_SetItem(pArgs, 0, adapterInstance);
@@ -1135,10 +1187,14 @@ static void dx_router_python_setup(dx_router_t *router)
pId = PyString_FromString(router->router_id);
PyTuple_SetItem(pArgs, 1, pId);
- // arg 2: area id
+ // arg 2: area_id
pArea = PyString_FromString(router->router_area);
PyTuple_SetItem(pArgs, 2, pArea);
+ // arg 3: max_routers
+ pMaxRouters = PyInt_FromLong((long) dx_bitmask_width());
+ PyTuple_SetItem(pArgs, 3, pMaxRouters);
+
//
// Instantiate the router
//
diff --git a/qpid/extras/dispatch/src/router_private.h b/qpid/extras/dispatch/src/router_private.h
new file mode 100644
index 0000000000..8e481375ab
--- /dev/null
+++ b/qpid/extras/dispatch/src/router_private.h
@@ -0,0 +1,128 @@
+#ifndef __router_private_h__
+#define __router_private_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+typedef struct dx_router_link_t dx_router_link_t;
+typedef struct dx_router_node_t dx_router_node_t;
+typedef struct dx_router_ref_t dx_router_ref_t;
+typedef struct dx_router_link_ref_t dx_router_link_ref_t;
+
+
+typedef enum {
+ DX_LINK_ENDPOINT, // A link to a connected endpoint
+ DX_LINK_ROUTER, // A link to a peer router in the same area
+ DX_LINK_AREA // A link to a peer router in a different area (area boundary)
+} dx_link_type_t;
+
+
+typedef struct dx_routed_event_t {
+ DEQ_LINKS(struct dx_routed_event_t);
+ dx_delivery_t *delivery;
+ dx_message_t *message;
+ bool settle;
+ uint64_t disposition;
+} dx_routed_event_t;
+
+ALLOC_DECLARE(dx_routed_event_t);
+DEQ_DECLARE(dx_routed_event_t, dx_routed_event_list_t);
+
+
+struct dx_router_link_t {
+ DEQ_LINKS(dx_router_link_t);
+ int mask_bit; // Unique mask bit if this is an inter-router link
+ dx_link_type_t link_type;
+ dx_direction_t link_direction;
+ dx_address_t *owning_addr; // [ref] Address record that owns this link
+ dx_link_t *link; // [own] Link pointer
+ dx_router_link_t *connected_link; // [ref] If this is a link-route, reference the connected link
+ dx_router_link_t *peer_link; // [ref] If this is a bidirectional link-route, reference the peer link
+ dx_router_link_ref_t *ref; // Pointer to a containing reference object
+ dx_routed_event_list_t event_fifo; // FIFO of outgoing delivery/link events (no messages)
+ dx_routed_event_list_t msg_fifo; // FIFO of outgoing message deliveries
+};
+
+ALLOC_DECLARE(dx_router_link_t);
+DEQ_DECLARE(dx_router_link_t, dx_router_link_list_t);
+
+struct dx_router_node_t {
+ DEQ_LINKS(dx_router_node_t);
+ const char *id;
+ int mask_bit;
+ dx_router_node_t *next_hop; // Next hop node _if_ this is not a neighbor node
+ dx_router_link_t *peer_link; // Outgoing link _if_ this is a neighbor node
+ dx_bitmask_t *valid_origins;
+};
+
+ALLOC_DECLARE(dx_router_node_t);
+DEQ_DECLARE(dx_router_node_t, dx_router_node_list_t);
+
+struct dx_router_ref_t {
+ DEQ_LINKS(dx_router_ref_t);
+ dx_router_node_t *router;
+};
+
+ALLOC_DECLARE(dx_router_ref_t);
+DEQ_DECLARE(dx_router_ref_t, dx_router_ref_list_t);
+
+
+struct dx_router_link_ref_t {
+ DEQ_LINKS(dx_router_link_ref_t);
+ dx_router_link_t *link;
+};
+
+ALLOC_DECLARE(dx_router_link_ref_t);
+DEQ_DECLARE(dx_router_link_ref_t, dx_router_link_ref_list_t);
+
+
+struct dx_address_t {
+ DEQ_LINKS(dx_address_t);
+ dx_router_message_cb handler; // In-Process Consumer
+ void *handler_context; // In-Process Consumer context
+ dx_router_link_ref_list_t rlinks; // Locally-Connected Consumers
+ dx_router_ref_list_t rnodes; // Remotely-Connected Consumers
+};
+
+ALLOC_DECLARE(dx_address_t);
+DEQ_DECLARE(dx_address_t, dx_address_list_t);
+
+
+struct dx_router_t {
+ dx_dispatch_t *dx;
+ const char *router_area;
+ const char *router_id;
+ dx_node_t *node;
+
+ dx_address_list_t addrs;
+ hash_t *addr_hash;
+ dx_address_t *router_addr;
+
+ dx_router_link_list_t links;
+ dx_router_node_list_t routers;
+
+ dx_bitmask_t *neighbor_free_mask;
+ sys_mutex_t *lock;
+ dx_timer_t *timer;
+ uint64_t dtag;
+
+ PyObject *pyRouter;
+ PyObject *pyTick;
+};
+
+#endif
diff --git a/qpid/extras/dispatch/tests/router_engine_test.py b/qpid/extras/dispatch/tests/router_engine_test.py
index 605e7568d3..7e3940fda4 100644
--- a/qpid/extras/dispatch/tests/router_engine_test.py
+++ b/qpid/extras/dispatch/tests/router_engine_test.py
@@ -18,393 +18,545 @@
#
import unittest
-from qpid.dispatch.router.router_engine import NeighborEngine, PathEngine, Configuration
+from qpid.dispatch.router.router_engine import NeighborEngine, PathEngine, Configuration, NodeTracker
from qpid.dispatch.router.data import LinkState, MessageHELLO
class Adapter(object):
- def __init__(self, domain):
- self._domain = domain
+ def __init__(self, domain):
+ self._domain = domain
- def log(self, level, text):
- print "Adapter.log(%d): domain=%s, text=%s" % (level, self._domain, text)
+ def log(self, level, text):
+ print "Adapter.log(%d): domain=%s, text=%s" % (level, self._domain, text)
- def send(self, dest, opcode, body):
- print "Adapter.send: domain=%s, dest=%s, opcode=%s, body=%s" % (self._domain, dest, opcode, body)
- def remote_bind(self, subject, peer):
- print "Adapter.remote_bind: subject=%s, peer=%s" % (subject, peer)
+ def send(self, dest, opcode, body):
+ print "Adapter.send: domain=%s, dest=%s, opcode=%s, body=%s" % (self._domain, dest, opcode, body)
- def remote_unbind(self, subject, peer):
- print "Adapter.remote_unbind: subject=%s, peer=%s" % (subject, peer)
+ def remote_bind(self, subject, peer):
+ print "Adapter.remote_bind: subject=%s, peer=%s" % (subject, peer)
+ def remote_unbind(self, subject, peer):
+ print "Adapter.remote_unbind: subject=%s, peer=%s" % (subject, peer)
+
+ def node_updated(self, address, reachable, neighbor, link_bit, router_bit):
+ print "Adapter.node_updated: address=%s, reachable=%r, neighbor=%r, link_bit=%d, router_bit=%d" % \
+ (address, reachable, neighbor, link_bit, router_bit)
-class DataTest(unittest.TestCase):
- def test_link_state(self):
- ls = LinkState(None, 'R1', 'area', 1, ['R2', 'R3'])
- self.assertEqual(ls.id, 'R1')
- self.assertEqual(ls.area, 'area')
- self.assertEqual(ls.ls_seq, 1)
- self.assertEqual(ls.peers, ['R2', 'R3'])
- ls.bump_sequence()
- self.assertEqual(ls.id, 'R1')
- self.assertEqual(ls.area, 'area')
- self.assertEqual(ls.ls_seq, 2)
- self.assertEqual(ls.peers, ['R2', 'R3'])
-
- result = ls.add_peer('R4')
- self.assertTrue(result)
- self.assertEqual(ls.peers, ['R2', 'R3', 'R4'])
- result = ls.add_peer('R2')
- self.assertFalse(result)
- self.assertEqual(ls.peers, ['R2', 'R3', 'R4'])
-
- result = ls.del_peer('R3')
- self.assertTrue(result)
- self.assertEqual(ls.peers, ['R2', 'R4'])
- result = ls.del_peer('R5')
- self.assertFalse(result)
- self.assertEqual(ls.peers, ['R2', 'R4'])
-
- encoded = ls.to_dict()
- new_ls = LinkState(encoded)
- self.assertEqual(new_ls.id, 'R1')
- self.assertEqual(new_ls.area, 'area')
- self.assertEqual(new_ls.ls_seq, 2)
- self.assertEqual(new_ls.peers, ['R2', 'R4'])
-
-
- def test_hello_message(self):
- msg1 = MessageHELLO(None, 'R1', 'area', ['R2', 'R3', 'R4'])
- self.assertEqual(msg1.get_opcode(), "HELLO")
- self.assertEqual(msg1.id, 'R1')
- self.assertEqual(msg1.area, 'area')
- self.assertEqual(msg1.seen_peers, ['R2', 'R3', 'R4'])
- encoded = msg1.to_dict()
- msg2 = MessageHELLO(encoded)
- self.assertEqual(msg2.get_opcode(), "HELLO")
- self.assertEqual(msg2.id, 'R1')
- self.assertEqual(msg2.area, 'area')
- self.assertEqual(msg2.seen_peers, ['R2', 'R3', 'R4'])
- self.assertTrue(msg2.is_seen('R3'))
- self.assertFalse(msg2.is_seen('R9'))
+class DataTest(unittest.TestCase):
+ def test_link_state(self):
+ ls = LinkState(None, 'R1', 'area', 1, ['R2', 'R3'])
+ self.assertEqual(ls.id, 'R1')
+ self.assertEqual(ls.area, 'area')
+ self.assertEqual(ls.ls_seq, 1)
+ self.assertEqual(ls.peers, ['R2', 'R3'])
+ ls.bump_sequence()
+ self.assertEqual(ls.id, 'R1')
+ self.assertEqual(ls.area, 'area')
+ self.assertEqual(ls.ls_seq, 2)
+ self.assertEqual(ls.peers, ['R2', 'R3'])
+
+ result = ls.add_peer('R4')
+ self.assertTrue(result)
+ self.assertEqual(ls.peers, ['R2', 'R3', 'R4'])
+ result = ls.add_peer('R2')
+ self.assertFalse(result)
+ self.assertEqual(ls.peers, ['R2', 'R3', 'R4'])
+
+ result = ls.del_peer('R3')
+ self.assertTrue(result)
+ self.assertEqual(ls.peers, ['R2', 'R4'])
+ result = ls.del_peer('R5')
+ self.assertFalse(result)
+ self.assertEqual(ls.peers, ['R2', 'R4'])
+
+ encoded = ls.to_dict()
+ new_ls = LinkState(encoded)
+ self.assertEqual(new_ls.id, 'R1')
+ self.assertEqual(new_ls.area, 'area')
+ self.assertEqual(new_ls.ls_seq, 2)
+ self.assertEqual(new_ls.peers, ['R2', 'R4'])
+
+
+ def test_hello_message(self):
+ msg1 = MessageHELLO(None, 'R1', 'area', ['R2', 'R3', 'R4'])
+ self.assertEqual(msg1.get_opcode(), "HELLO")
+ self.assertEqual(msg1.id, 'R1')
+ self.assertEqual(msg1.area, 'area')
+ self.assertEqual(msg1.seen_peers, ['R2', 'R3', 'R4'])
+ encoded = msg1.to_dict()
+ msg2 = MessageHELLO(encoded)
+ self.assertEqual(msg2.get_opcode(), "HELLO")
+ self.assertEqual(msg2.id, 'R1')
+ self.assertEqual(msg2.area, 'area')
+ self.assertEqual(msg2.seen_peers, ['R2', 'R3', 'R4'])
+ self.assertTrue(msg2.is_seen('R3'))
+ self.assertFalse(msg2.is_seen('R9'))
+
+
+class NodeTrackerTest(unittest.TestCase):
+ def log(self, level, text):
+ pass
+
+ def node_updated(self, address, reachable, neighbor, link_bit, router_bit):
+ self.address = address
+ self.reachable = reachable
+ self.neighbor = neighbor
+ self.link_bit = link_bit
+ self.router_bit = router_bit
+
+ def reset(self):
+ self.address = None
+ self.reachable = None
+ self.neighbor = None
+ self.link_bit = None
+ self.router_bit = None
+
+ def test_node_tracker_limits(self):
+ tracker = NodeTracker(self, 5)
+
+ self.reset()
+ tracker.new_neighbor('A', 1)
+ self.assertEqual(self.address, 'RA')
+ self.assertTrue(self.reachable)
+ self.assertTrue(self.neighbor)
+ self.assertEqual(self.link_bit, 1)
+ self.assertEqual(self.router_bit, 0)
+
+ self.reset()
+ tracker.new_neighbor('B', 5)
+ self.assertEqual(self.address, 'RB')
+ self.assertTrue(self.reachable)
+ self.assertTrue(self.neighbor)
+ self.assertEqual(self.link_bit, 5)
+ self.assertEqual(self.router_bit, 1)
+
+ self.reset()
+ tracker.new_neighbor('C', 6)
+ self.assertEqual(self.address, 'RC')
+ self.assertTrue(self.reachable)
+ self.assertTrue(self.neighbor)
+ self.assertEqual(self.link_bit, 6)
+ self.assertEqual(self.router_bit, 2)
+
+ self.reset()
+ tracker.new_neighbor('D', 7)
+ self.assertEqual(self.address, 'RD')
+ self.assertTrue(self.reachable)
+ self.assertTrue(self.neighbor)
+ self.assertEqual(self.link_bit, 7)
+ self.assertEqual(self.router_bit, 3)
+
+ self.reset()
+ tracker.new_neighbor('E', 8)
+ self.assertEqual(self.address, 'RE')
+ self.assertTrue(self.reachable)
+ self.assertTrue(self.neighbor)
+ self.assertEqual(self.link_bit, 8)
+ self.assertEqual(self.router_bit, 4)
+
+ self.reset()
+ try:
+ tracker.new_neighbor('F', 9)
+ AssertFalse("We shouldn't be here")
+ except:
+ pass
+
+ self.reset()
+ tracker.lost_neighbor('C')
+ self.assertEqual(self.address, 'RC')
+ self.assertFalse(self.reachable)
+
+ self.reset()
+ tracker.new_neighbor('F', 9)
+ self.assertEqual(self.address, 'RF')
+ self.assertTrue(self.reachable)
+ self.assertTrue(self.neighbor)
+ self.assertEqual(self.link_bit, 9)
+ self.assertEqual(self.router_bit, 2)
+
+
+ def test_node_tracker_remote_neighbor(self):
+ tracker = NodeTracker(self, 5)
+
+ self.reset()
+ tracker.new_node('A')
+ self.assertEqual(self.address, 'RA')
+ self.assertTrue(self.reachable)
+ self.assertFalse(self.neighbor)
+ self.assertFalse(self.link_bit)
+ self.assertEqual(self.router_bit, 0)
+
+ self.reset()
+ tracker.new_neighbor('A', 3)
+ self.assertEqual(self.address, 'RA')
+ self.assertTrue(self.reachable)
+ self.assertTrue(self.neighbor)
+ self.assertEqual(self.link_bit, 3)
+ self.assertEqual(self.router_bit, 0)
+
+ self.reset()
+ tracker.lost_node('A')
+ self.assertEqual(self.address, 'RA')
+ self.assertTrue(self.reachable)
+ self.assertTrue(self.neighbor)
+ self.assertEqual(self.link_bit, 3)
+ self.assertEqual(self.router_bit, 0)
+
+ self.reset()
+ tracker.lost_neighbor('A')
+ self.assertEqual(self.address, 'RA')
+ self.assertFalse(self.reachable)
+
+
+ def test_node_tracker_neighbor_remote(self):
+ tracker = NodeTracker(self, 5)
+
+ self.reset()
+ tracker.new_neighbor('A', 3)
+ self.assertEqual(self.address, 'RA')
+ self.assertTrue(self.reachable)
+ self.assertTrue(self.neighbor)
+ self.assertEqual(self.link_bit, 3)
+ self.assertEqual(self.router_bit, 0)
+
+ self.reset()
+ tracker.new_node('A')
+ self.assertEqual(self.address, 'RA')
+ self.assertTrue(self.reachable)
+ self.assertTrue(self.neighbor)
+ self.assertEqual(self.link_bit, 3)
+ self.assertEqual(self.router_bit, 0)
+
+ self.reset()
+ tracker.lost_neighbor('A')
+ self.assertEqual(self.address, 'RA')
+ self.assertTrue(self.reachable)
+ self.assertFalse(self.neighbor)
+ self.assertFalse(self.link_bit)
+ self.assertEqual(self.router_bit, 0)
+
+ self.reset()
+ tracker.lost_node('A')
+ self.assertEqual(self.address, 'RA')
+ self.assertFalse(self.reachable)
class NeighborTest(unittest.TestCase):
- def log(self, level, text):
- pass
-
- def send(self, dest, msg):
- self.sent.append((dest, msg))
-
- def local_link_state_changed(self, link_state):
- self.local_link_state = link_state
-
- def new_neighbor(self, rid):
- self.neighbors[rid] = None
-
- def lost_neighbor(self, rid):
- self.neighbors.pop(rid)
-
- def setUp(self):
- self.sent = []
- self.local_link_state = None
- self.id = "R1"
- self.area = "area"
- self.config = Configuration()
- self.neighbors = {}
-
- def test_hello_sent(self):
- self.sent = []
- self.local_link_state = None
- self.engine = NeighborEngine(self)
- self.engine.tick(0.5)
- self.assertEqual(self.sent, [])
- self.engine.tick(1.5)
- self.assertEqual(len(self.sent), 1)
- dest, msg = self.sent.pop(0)
- self.assertEqual(dest, "_local/qdxrouter")
- self.assertEqual(msg.get_opcode(), "HELLO")
- self.assertEqual(msg.id, self.id)
- self.assertEqual(msg.area, self.area)
- self.assertEqual(msg.seen_peers, [])
- self.assertEqual(self.local_link_state, None)
-
- def test_sees_peer(self):
- self.sent = []
- self.local_link_state = None
- self.engine = NeighborEngine(self)
- self.engine.handle_hello(MessageHELLO(None, 'R2', 'area', []), 2.0)
- self.engine.tick(5.0)
- self.assertEqual(len(self.sent), 1)
- dest, msg = self.sent.pop(0)
- self.assertEqual(msg.seen_peers, ['R2'])
-
- def test_establish_peer(self):
- self.sent = []
- self.local_link_state = None
- self.engine = NeighborEngine(self)
- self.engine.handle_hello(MessageHELLO(None, 'R2', 'area', ['R1']), 0.5)
- self.engine.tick(1.0)
- self.engine.tick(2.0)
- self.engine.tick(3.0)
- self.assertEqual(self.local_link_state.id, 'R1')
- self.assertEqual(self.local_link_state.area, 'area')
- self.assertEqual(self.local_link_state.ls_seq, 1)
- self.assertEqual(self.local_link_state.peers, ['R2'])
-
- def test_establish_multiple_peers(self):
- self.sent = []
- self.local_link_state = None
- self.engine = NeighborEngine(self)
- self.engine.handle_hello(MessageHELLO(None, 'R2', 'area', ['R1']), 0.5)
- self.engine.tick(1.0)
- self.engine.handle_hello(MessageHELLO(None, 'R3', 'area', ['R1', 'R2']), 1.5)
- self.engine.tick(2.0)
- self.engine.handle_hello(MessageHELLO(None, 'R4', 'area', ['R1']), 2.5)
- self.engine.handle_hello(MessageHELLO(None, 'R5', 'area', ['R2']), 2.5)
- self.engine.handle_hello(MessageHELLO(None, 'R6', 'area', ['R1']), 2.5)
- self.engine.tick(3.0)
- self.assertEqual(self.local_link_state.id, 'R1')
- self.assertEqual(self.local_link_state.area, 'area')
- self.assertEqual(self.local_link_state.ls_seq, 3)
- self.local_link_state.peers.sort()
- self.assertEqual(self.local_link_state.peers, ['R2', 'R3', 'R4', 'R6'])
-
- def test_timeout_peer(self):
- self.sent = []
- self.local_link_state = None
- self.engine = NeighborEngine(self)
- self.engine.handle_hello(MessageHELLO(None, 'R2', 'area', ['R3', 'R1']), 2.0)
- self.engine.tick(5.0)
- self.engine.tick(17.1)
- self.assertEqual(self.local_link_state.id, 'R1')
- self.assertEqual(self.local_link_state.area, 'area')
- self.assertEqual(self.local_link_state.ls_seq, 2)
- self.assertEqual(self.local_link_state.peers, [])
+ def log(self, level, text):
+ pass
+
+ def send(self, dest, msg):
+ self.sent.append((dest, msg))
+
+ def local_link_state_changed(self, link_state):
+ self.local_link_state = link_state
+
+ def new_neighbor(self, rid, lbit):
+ self.neighbors[rid] = None
+
+ def lost_neighbor(self, rid):
+ self.neighbors.pop(rid)
+
+ def setUp(self):
+ self.sent = []
+ self.local_link_state = None
+ self.id = "R1"
+ self.area = "area"
+ self.config = Configuration()
+ self.neighbors = {}
+
+ def test_hello_sent(self):
+ self.sent = []
+ self.local_link_state = None
+ self.engine = NeighborEngine(self)
+ self.engine.tick(0.5)
+ self.assertEqual(self.sent, [])
+ self.engine.tick(1.5)
+ self.assertEqual(len(self.sent), 1)
+ dest, msg = self.sent.pop(0)
+ self.assertEqual(dest, "_local/qdxrouter")
+ self.assertEqual(msg.get_opcode(), "HELLO")
+ self.assertEqual(msg.id, self.id)
+ self.assertEqual(msg.area, self.area)
+ self.assertEqual(msg.seen_peers, [])
+ self.assertEqual(self.local_link_state, None)
+
+ def test_sees_peer(self):
+ self.sent = []
+ self.local_link_state = None
+ self.engine = NeighborEngine(self)
+ self.engine.handle_hello(MessageHELLO(None, 'R2', 'area', []), 2.0, 0)
+ self.engine.tick(5.0)
+ self.assertEqual(len(self.sent), 1)
+ dest, msg = self.sent.pop(0)
+ self.assertEqual(msg.seen_peers, ['R2'])
+
+ def test_establish_peer(self):
+ self.sent = []
+ self.local_link_state = None
+ self.engine = NeighborEngine(self)
+ self.engine.handle_hello(MessageHELLO(None, 'R2', 'area', ['R1']), 0.5, 0)
+ self.engine.tick(1.0)
+ self.engine.tick(2.0)
+ self.engine.tick(3.0)
+ self.assertEqual(self.local_link_state.id, 'R1')
+ self.assertEqual(self.local_link_state.area, 'area')
+ self.assertEqual(self.local_link_state.ls_seq, 1)
+ self.assertEqual(self.local_link_state.peers, ['R2'])
+
+ def test_establish_multiple_peers(self):
+ self.sent = []
+ self.local_link_state = None
+ self.engine = NeighborEngine(self)
+ self.engine.handle_hello(MessageHELLO(None, 'R2', 'area', ['R1']), 0.5, 0)
+ self.engine.tick(1.0)
+ self.engine.handle_hello(MessageHELLO(None, 'R3', 'area', ['R1', 'R2']), 1.5, 0)
+ self.engine.tick(2.0)
+ self.engine.handle_hello(MessageHELLO(None, 'R4', 'area', ['R1']), 2.5, 0)
+ self.engine.handle_hello(MessageHELLO(None, 'R5', 'area', ['R2']), 2.5, 0)
+ self.engine.handle_hello(MessageHELLO(None, 'R6', 'area', ['R1']), 2.5, 0)
+ self.engine.tick(3.0)
+ self.assertEqual(self.local_link_state.id, 'R1')
+ self.assertEqual(self.local_link_state.area, 'area')
+ self.assertEqual(self.local_link_state.ls_seq, 3)
+ self.local_link_state.peers.sort()
+ self.assertEqual(self.local_link_state.peers, ['R2', 'R3', 'R4', 'R6'])
+
+ def test_timeout_peer(self):
+ self.sent = []
+ self.local_link_state = None
+ self.engine = NeighborEngine(self)
+ self.engine.handle_hello(MessageHELLO(None, 'R2', 'area', ['R3', 'R1']), 2.0, 0)
+ self.engine.tick(5.0)
+ self.engine.tick(17.1)
+ self.assertEqual(self.local_link_state.id, 'R1')
+ self.assertEqual(self.local_link_state.area, 'area')
+ self.assertEqual(self.local_link_state.ls_seq, 2)
+ self.assertEqual(self.local_link_state.peers, [])
class PathTest(unittest.TestCase):
- def setUp(self):
- self.id = 'R1'
- self.area = 'area'
- self.next_hops = None
- self.engine = PathEngine(self)
-
- def log(self, level, text):
- pass
-
- def next_hops_changed(self, nh):
- self.next_hops = nh
-
- def test_topology1(self):
- """
-
- +====+ +----+ +----+
- | R1 |------| R2 |------| R3 |
- +====+ +----+ +----+
-
- """
- collection = { 'R1': LinkState(None, 'R1', 'area', 1, ['R2']),
- 'R2': LinkState(None, 'R2', 'area', 1, ['R1', 'R3']),
- 'R3': LinkState(None, 'R3', 'area', 1, ['R2']) }
- self.engine.ls_collection_changed(collection)
- self.engine.tick(1.0)
- self.assertEqual(len(self.next_hops), 2)
- self.assertEqual(self.next_hops['R2'], 'R2')
- self.assertEqual(self.next_hops['R3'], 'R2')
-
- def test_topology2(self):
- """
-
- +====+ +----+ +----+
- | R1 |------| R2 |------| R4 |
- +====+ +----+ +----+
- | |
- +----+ +----+ +----+
- | R3 |------| R5 |------| R6 |
- +----+ +----+ +----+
-
- """
- collection = { 'R1': LinkState(None, 'R1', 'area', 1, ['R2']),
- 'R2': LinkState(None, 'R2', 'area', 1, ['R1', 'R3', 'R4']),
- 'R3': LinkState(None, 'R3', 'area', 1, ['R2', 'R5']),
- 'R4': LinkState(None, 'R4', 'area', 1, ['R2', 'R5']),
- 'R5': LinkState(None, 'R5', 'area', 1, ['R3', 'R4', 'R6']),
- 'R6': LinkState(None, 'R6', 'area', 1, ['R5']) }
- self.engine.ls_collection_changed(collection)
- self.engine.tick(1.0)
- self.assertEqual(len(self.next_hops), 5)
- self.assertEqual(self.next_hops['R2'], 'R2')
- self.assertEqual(self.next_hops['R3'], 'R2')
- self.assertEqual(self.next_hops['R4'], 'R2')
- self.assertEqual(self.next_hops['R5'], 'R2')
- self.assertEqual(self.next_hops['R6'], 'R2')
-
- def test_topology3(self):
- """
-
- +----+ +----+ +----+
- | R2 |------| R3 |------| R4 |
- +----+ +----+ +----+
- | |
- +====+ +----+ +----+
- | R1 |------| R5 |------| R6 |
- +====+ +----+ +----+
-
- """
- collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3']),
- 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']),
- 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']),
- 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5']),
- 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']),
- 'R6': LinkState(None, 'R6', 'area', 1, ['R5']) }
- self.engine.ls_collection_changed(collection)
- self.engine.tick(1.0)
- self.assertEqual(len(self.next_hops), 5)
- self.assertEqual(self.next_hops['R2'], 'R3')
- self.assertEqual(self.next_hops['R3'], 'R3')
- self.assertEqual(self.next_hops['R4'], 'R3')
- self.assertEqual(self.next_hops['R5'], 'R5')
- self.assertEqual(self.next_hops['R6'], 'R5')
-
- def test_topology4(self):
- """
-
- +----+ +----+ +----+
- | R2 |------| R3 |------| R4 |
- +----+ +----+ +----+
- | |
- +====+ +----+ +----+
- | R1 |------| R5 |------| R6 |------ R7 (no ls from R7)
- +====+ +----+ +----+
-
- """
- collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3']),
- 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']),
- 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']),
- 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5']),
- 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']),
- 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) }
- self.engine.ls_collection_changed(collection)
- self.engine.tick(1.0)
- self.assertEqual(len(self.next_hops), 6)
- self.assertEqual(self.next_hops['R2'], 'R3')
- self.assertEqual(self.next_hops['R3'], 'R3')
- self.assertEqual(self.next_hops['R4'], 'R3')
- self.assertEqual(self.next_hops['R5'], 'R5')
- self.assertEqual(self.next_hops['R6'], 'R5')
- self.assertEqual(self.next_hops['R7'], 'R5')
-
- def test_topology5(self):
- """
-
- +----+ +----+ +----+
- | R2 |------| R3 |------| R4 |
- +----+ +----+ +----+
- | | |
- | +====+ +----+ +----+
- +--------| R1 |------| R5 |------| R6 |------ R7 (no ls from R7)
- +====+ +----+ +----+
-
- """
- collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3', 'R1']),
- 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']),
- 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']),
- 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5', 'R2']),
- 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']),
- 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) }
- self.engine.ls_collection_changed(collection)
- self.engine.tick(1.0)
- self.assertEqual(len(self.next_hops), 6)
- self.assertEqual(self.next_hops['R2'], 'R2')
- self.assertEqual(self.next_hops['R3'], 'R3')
- self.assertEqual(self.next_hops['R4'], 'R3')
- self.assertEqual(self.next_hops['R5'], 'R5')
- self.assertEqual(self.next_hops['R6'], 'R5')
- self.assertEqual(self.next_hops['R7'], 'R5')
-
- def test_topology5_with_asymmetry1(self):
- """
-
- +----+ +----+ +----+
- | R2 |------| R3 |------| R4 |
- +----+ +----+ +----+
- ^ | |
- ^ +====+ +----+ +----+
- +-<-<-<--| R1 |------| R5 |------| R6 |------ R7 (no ls from R7)
- +====+ +----+ +----+
-
- """
- collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3']),
- 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']),
- 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']),
- 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5', 'R2']),
- 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']),
- 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) }
- self.engine.ls_collection_changed(collection)
- self.engine.tick(1.0)
- self.assertEqual(len(self.next_hops), 6)
- self.assertEqual(self.next_hops['R2'], 'R2')
- self.assertEqual(self.next_hops['R3'], 'R3')
- self.assertEqual(self.next_hops['R4'], 'R3')
- self.assertEqual(self.next_hops['R5'], 'R5')
- self.assertEqual(self.next_hops['R6'], 'R5')
- self.assertEqual(self.next_hops['R7'], 'R5')
-
- def test_topology5_with_asymmetry2(self):
- """
-
- +----+ +----+ +----+
- | R2 |------| R3 |------| R4 |
- +----+ +----+ +----+
- v | |
- v +====+ +----+ +----+
- +->->->->| R1 |------| R5 |------| R6 |------ R7 (no ls from R7)
- +====+ +----+ +----+
-
- """
- collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3', 'R1']),
- 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']),
- 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']),
- 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5']),
- 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']),
- 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) }
- self.engine.ls_collection_changed(collection)
- self.engine.tick(1.0)
- self.assertEqual(len(self.next_hops), 6)
- self.assertEqual(self.next_hops['R2'], 'R3')
- self.assertEqual(self.next_hops['R3'], 'R3')
- self.assertEqual(self.next_hops['R4'], 'R3')
- self.assertEqual(self.next_hops['R5'], 'R5')
- self.assertEqual(self.next_hops['R6'], 'R5')
- self.assertEqual(self.next_hops['R7'], 'R5')
-
- def test_topology5_with_asymmetry3(self):
- """
-
- +----+ +----+ +----+
- | R2 |------| R3 |------| R4 |
- +----+ +----+ +----+
- v | |
- v +====+ +----+ +----+
- +->->->->| R1 |------| R5 |<-<-<-| R6 |------ R7 (no ls from R7)
- +====+ +----+ +----+
-
- """
- collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3', 'R1']),
- 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']),
- 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']),
- 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5']),
- 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4']),
- 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) }
- self.engine.ls_collection_changed(collection)
- self.engine.tick(1.0)
- self.assertEqual(len(self.next_hops), 4)
- self.assertEqual(self.next_hops['R2'], 'R3')
- self.assertEqual(self.next_hops['R3'], 'R3')
- self.assertEqual(self.next_hops['R4'], 'R3')
- self.assertEqual(self.next_hops['R5'], 'R5')
+ def setUp(self):
+ self.id = 'R1'
+ self.area = 'area'
+ self.next_hops = None
+ self.engine = PathEngine(self)
+
+ def log(self, level, text):
+ pass
+
+ def next_hops_changed(self, nh):
+ self.next_hops = nh
+
+ def test_topology1(self):
+ """
+
+ +====+ +----+ +----+
+ | R1 |------| R2 |------| R3 |
+ +====+ +----+ +----+
+
+ """
+ collection = { 'R1': LinkState(None, 'R1', 'area', 1, ['R2']),
+ 'R2': LinkState(None, 'R2', 'area', 1, ['R1', 'R3']),
+ 'R3': LinkState(None, 'R3', 'area', 1, ['R2']) }
+ self.engine.ls_collection_changed(collection)
+ self.engine.tick(1.0)
+ self.assertEqual(len(self.next_hops), 2)
+ self.assertEqual(self.next_hops['R2'], 'R2')
+ self.assertEqual(self.next_hops['R3'], 'R2')
+
+ def test_topology2(self):
+ """
+
+ +====+ +----+ +----+
+ | R1 |------| R2 |------| R4 |
+ +====+ +----+ +----+
+ | |
+ +----+ +----+ +----+
+ | R3 |------| R5 |------| R6 |
+ +----+ +----+ +----+
+
+ """
+ collection = { 'R1': LinkState(None, 'R1', 'area', 1, ['R2']),
+ 'R2': LinkState(None, 'R2', 'area', 1, ['R1', 'R3', 'R4']),
+ 'R3': LinkState(None, 'R3', 'area', 1, ['R2', 'R5']),
+ 'R4': LinkState(None, 'R4', 'area', 1, ['R2', 'R5']),
+ 'R5': LinkState(None, 'R5', 'area', 1, ['R3', 'R4', 'R6']),
+ 'R6': LinkState(None, 'R6', 'area', 1, ['R5']) }
+ self.engine.ls_collection_changed(collection)
+ self.engine.tick(1.0)
+ self.assertEqual(len(self.next_hops), 5)
+ self.assertEqual(self.next_hops['R2'], 'R2')
+ self.assertEqual(self.next_hops['R3'], 'R2')
+ self.assertEqual(self.next_hops['R4'], 'R2')
+ self.assertEqual(self.next_hops['R5'], 'R2')
+ self.assertEqual(self.next_hops['R6'], 'R2')
+
+ def test_topology3(self):
+ """
+
+ +----+ +----+ +----+
+ | R2 |------| R3 |------| R4 |
+ +----+ +----+ +----+
+ | |
+ +====+ +----+ +----+
+ | R1 |------| R5 |------| R6 |
+ +====+ +----+ +----+
+
+ """
+ collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3']),
+ 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']),
+ 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']),
+ 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5']),
+ 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']),
+ 'R6': LinkState(None, 'R6', 'area', 1, ['R5']) }
+ self.engine.ls_collection_changed(collection)
+ self.engine.tick(1.0)
+ self.assertEqual(len(self.next_hops), 5)
+ self.assertEqual(self.next_hops['R2'], 'R3')
+ self.assertEqual(self.next_hops['R3'], 'R3')
+ self.assertEqual(self.next_hops['R4'], 'R3')
+ self.assertEqual(self.next_hops['R5'], 'R5')
+ self.assertEqual(self.next_hops['R6'], 'R5')
+
+ def test_topology4(self):
+ """
+
+ +----+ +----+ +----+
+ | R2 |------| R3 |------| R4 |
+ +----+ +----+ +----+
+ | |
+ +====+ +----+ +----+
+ | R1 |------| R5 |------| R6 |------ R7 (no ls from R7)
+ +====+ +----+ +----+
+
+ """
+ collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3']),
+ 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']),
+ 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']),
+ 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5']),
+ 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']),
+ 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) }
+ self.engine.ls_collection_changed(collection)
+ self.engine.tick(1.0)
+ self.assertEqual(len(self.next_hops), 6)
+ self.assertEqual(self.next_hops['R2'], 'R3')
+ self.assertEqual(self.next_hops['R3'], 'R3')
+ self.assertEqual(self.next_hops['R4'], 'R3')
+ self.assertEqual(self.next_hops['R5'], 'R5')
+ self.assertEqual(self.next_hops['R6'], 'R5')
+ self.assertEqual(self.next_hops['R7'], 'R5')
+
+ def test_topology5(self):
+ """
+
+ +----+ +----+ +----+
+ | R2 |------| R3 |------| R4 |
+ +----+ +----+ +----+
+ | | |
+ | +====+ +----+ +----+
+ +--------| R1 |------| R5 |------| R6 |------ R7 (no ls from R7)
+ +====+ +----+ +----+
+
+ """
+ collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3', 'R1']),
+ 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']),
+ 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']),
+ 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5', 'R2']),
+ 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']),
+ 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) }
+ self.engine.ls_collection_changed(collection)
+ self.engine.tick(1.0)
+ self.assertEqual(len(self.next_hops), 6)
+ self.assertEqual(self.next_hops['R2'], 'R2')
+ self.assertEqual(self.next_hops['R3'], 'R3')
+ self.assertEqual(self.next_hops['R4'], 'R3')
+ self.assertEqual(self.next_hops['R5'], 'R5')
+ self.assertEqual(self.next_hops['R6'], 'R5')
+ self.assertEqual(self.next_hops['R7'], 'R5')
+
+ def test_topology5_with_asymmetry1(self):
+ """
+
+ +----+ +----+ +----+
+ | R2 |------| R3 |------| R4 |
+ +----+ +----+ +----+
+ ^ | |
+ ^ +====+ +----+ +----+
+ +-<-<-<--| R1 |------| R5 |------| R6 |------ R7 (no ls from R7)
+ +====+ +----+ +----+
+
+ """
+ collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3']),
+ 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']),
+ 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']),
+ 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5', 'R2']),
+ 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']),
+ 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) }
+ self.engine.ls_collection_changed(collection)
+ self.engine.tick(1.0)
+ self.assertEqual(len(self.next_hops), 6)
+ self.assertEqual(self.next_hops['R2'], 'R2')
+ self.assertEqual(self.next_hops['R3'], 'R3')
+ self.assertEqual(self.next_hops['R4'], 'R3')
+ self.assertEqual(self.next_hops['R5'], 'R5')
+ self.assertEqual(self.next_hops['R6'], 'R5')
+ self.assertEqual(self.next_hops['R7'], 'R5')
+
+ def test_topology5_with_asymmetry2(self):
+ """
+
+ +----+ +----+ +----+
+ | R2 |------| R3 |------| R4 |
+ +----+ +----+ +----+
+ v | |
+ v +====+ +----+ +----+
+ +->->->->| R1 |------| R5 |------| R6 |------ R7 (no ls from R7)
+ +====+ +----+ +----+
+
+ """
+ collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3', 'R1']),
+ 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']),
+ 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']),
+ 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5']),
+ 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']),
+ 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) }
+ self.engine.ls_collection_changed(collection)
+ self.engine.tick(1.0)
+ self.assertEqual(len(self.next_hops), 6)
+ self.assertEqual(self.next_hops['R2'], 'R3')
+ self.assertEqual(self.next_hops['R3'], 'R3')
+ self.assertEqual(self.next_hops['R4'], 'R3')
+ self.assertEqual(self.next_hops['R5'], 'R5')
+ self.assertEqual(self.next_hops['R6'], 'R5')
+ self.assertEqual(self.next_hops['R7'], 'R5')
+
+ def test_topology5_with_asymmetry3(self):
+ """
+
+ +----+ +----+ +----+
+ | R2 |------| R3 |------| R4 |
+ +----+ +----+ +----+
+ v | |
+ v +====+ +----+ +----+
+ +->->->->| R1 |------| R5 |<-<-<-| R6 |------ R7 (no ls from R7)
+ +====+ +----+ +----+
+
+ """
+ collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3', 'R1']),
+ 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']),
+ 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']),
+ 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5']),
+ 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4']),
+ 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) }
+ self.engine.ls_collection_changed(collection)
+ self.engine.tick(1.0)
+ self.assertEqual(len(self.next_hops), 4)
+ self.assertEqual(self.next_hops['R2'], 'R3')
+ self.assertEqual(self.next_hops['R3'], 'R3')
+ self.assertEqual(self.next_hops['R4'], 'R3')
+ self.assertEqual(self.next_hops['R5'], 'R5')
if __name__ == '__main__':
- unittest.main()
+ unittest.main()