diff options
author | Martin Schwenke <martin@meltin.net> | 2011-07-21 21:33:40 +1000 |
---|---|---|
committer | Martin Schwenke <martin@meltin.net> | 2011-07-29 14:32:07 +1000 |
commit | 2acf892e6e8eab5e1cffde71761ca6936d88cbb0 (patch) | |
tree | 538400d5e8318866fae05020bb1adc1c00f9c895 /ctdb/tests/takeover | |
parent | c3bdf4a0a140895aa894d494f73debc7a22e3f32 (diff) | |
download | samba-2acf892e6e8eab5e1cffde71761ca6936d88cbb0.tar.gz |
IP allocation simulation - add LCP2 algorithm.
Add -L/--lcp2 option and implement LCP2 algorithm as an alternative to
the basic non-deterministic algorithm.
Existing examples will break if used with LCP2 since it needs real IP
addresses.
Signed-off-by: Martin Schwenke <martin@meltin.net>
(This used to be ctdb commit 22b14e1a887f0479cc75ed9027af5cc24797f217)
Diffstat (limited to 'ctdb/tests/takeover')
-rwxr-xr-x | ctdb/tests/takeover/ctdb_takeover.py | 351 |
1 files changed, 290 insertions, 61 deletions
diff --git a/ctdb/tests/takeover/ctdb_takeover.py b/ctdb/tests/takeover/ctdb_takeover.py index a82f34cce19..00966c3f08d 100755 --- a/ctdb/tests/takeover/ctdb_takeover.py +++ b/ctdb/tests/takeover/ctdb_takeover.py @@ -2,7 +2,7 @@ # ctdb ip takeover code -# Copyright (C) Martin Schwenke 2010 +# Copyright (C) Martin Schwenke, Ronnie Sahlberg 2010, 2011 # Based on original CTDB C code: # @@ -30,6 +30,10 @@ from optparse import OptionParser import copy import random +# For parsing IP addresses +import socket +import struct + options = None def process_args(extra_options=[]): @@ -44,6 +48,9 @@ def process_args(extra_options=[]): parser.add_option("--ni", action="store_true", dest="no_ip_failback", default=False, help="turn on no_ip_failback") + parser.add_option("-L", "--lcp2", + action="store_true", dest="lcp2", default=False, + help="use LCP2 IP rebalancing algorithm [default: %default]") parser.add_option("-b", "--balance", action="store_true", dest="balance", default=False, help="show (im)balance information after each event") @@ -58,7 +65,7 @@ def process_args(extra_options=[]): help="print information and actions taken to stdout") parser.add_option("-r", "--retries", action="store", type="int", dest="retries", default=5, - help="number of retry loops for rebalancing [default: %default]") + help="number of retry loops for rebalancing non-deterministic failback [default: %default]") parser.add_option("-i", "--iterations", action="store", type="int", dest="iterations", default=1000, @@ -76,7 +83,7 @@ def process_args(extra_options=[]): parser.add_option("-x", "--exit", action="store_true", dest="exit", default=False, help="exit on the 1st gratuitous IP move") - + (options, args) = parser.parse_args() if len(args) != 0: @@ -120,12 +127,68 @@ def debug_print(t): if t != []: print "\n".join([str(i) for i in t]) +def ip_to_list_of_ints(ip): + # Be lazy... but only expose errors in IPv4 addresses, since + # they'll be more commonly used. :-) + try: + l = socket.inet_pton(socket.AF_INET6, ip) + except: + l = socket.inet_pton(socket.AF_INET, ip) + + return map(lambda x: struct.unpack('B', x)[0], l) + +def ip_distance(ip1, ip2): + """Calculate the distance between 2 IPs. + + This is the length of the longtest common prefix between the IPs. + It is calculated by XOR-ing the 2 IPs together and counting the + number of leading zeroes.""" + + distance = 0 + for (o1, o2) in zip(ip_to_list_of_ints(ip1), ip_to_list_of_ints(ip2)): + # XOR this pair of octets + x = o1 ^ o2 + # count number leading zeroes + if x == 0: + distance += 8 + else: + # bin() gives minimal length '0bNNN' string + distance += (8 - (len(bin(x)) - 2)) + break + + return distance + +def ip_distance_2_sum(ip, ips): + """Calculate the IP distance for the given IP relative to IPs. + + This could be made more efficient by insering ip_distance_2 into + the loop in this function. However, that would result in some + loss of clarity and also will not be necessary in a C + implemntation.""" + + sum = 0 + for i in ips: + sum += ip_distance(ip, i) ** 2 + + return sum + +def imbalance_metric(ips): + """Return the imbalance metric for a group of IPs. + + This is the sum of squares of the IP distances between each pair of IPs.""" + if len(ips) > 1: + (h, t) = (ips[0], ips[1:]) + return ip_distance_2_sum(h, t) + imbalance_metric(t) + else: + return 0 + class Node(object): def __init__(self, public_addresses): self.public_addresses = set(public_addresses) self.current_addresses = set() self.healthy = True + self.imbalance = -1 def can_node_serve_ip(self, ip): return ip in self.public_addresses @@ -133,6 +196,18 @@ class Node(object): def node_ip_coverage(self): return len(self.current_addresses) + def set_imbalance(self, imbalance=-1): + """Set the imbalance metric to the given value. If none given + then calculate it.""" + + if imbalance != -1: + self.imbalance = imbalance + else: + self.imbalance = imbalance_metric(list(self.current_addresses)) + + def get_imbalance(self): + return self.imbalance + class Cluster(object): def __init__(self): self.nodes = [] @@ -191,7 +266,7 @@ class Cluster(object): verbose_print(pnn) verbose_end() - + def unhealthy(self, *pnns): verbose_begin("UNHEALTHY") @@ -269,7 +344,7 @@ class Cluster(object): if minnode == -1 or num < minnum: minnode = i minnum = num - + if maxnode == -1: continue @@ -310,7 +385,7 @@ class Cluster(object): (prefix, ip, old, new)) return (ip_moves, grat_ip_moves, details) - + def find_takeover_node(self, ip): pnn = -1 @@ -341,6 +416,190 @@ class Cluster(object): verbose_print("%s -> %d" % (ip, pnn)) return True + def basic_allocate_unassigned(self): + + assigned = set([ip for n in self.nodes for ip in n.current_addresses]) + unassigned = sorted(list(self.all_public_ips - assigned)) + + for ip in unassigned: + self.find_takeover_node(ip) + + def basic_failback(self, retries_l): + + assigned = sorted([ip + for n in self.nodes + for ip in n.current_addresses]) + for ip in assigned: + + maxnode = -1 + minnode = -1 + for (i, n) in enumerate(self.nodes): + if not n.healthy: + continue + + if not n.can_node_serve_ip(ip): + continue + + num = n.node_ip_coverage() + + if maxnode == -1: + maxnode = i + maxnum = num + else: + if num > maxnum: + maxnode = i + maxnum = num + if minnode == -1: + minnode = i + minnum = num + else: + if num < minnum: + minnode = i + minnum = num + + if maxnode == -1: + print "Could not find maxnode. May not be able to serve ip", ip + continue + + #if self.deterministic_public_ips: + # continue + + if maxnum > minnum + 1 and retries_l[0] < options.retries: + # Remove the 1st ip from maxnode + t = sorted(list(self.nodes[maxnode].current_addresses)) + realloc = t[0] + verbose_print("%s <- %d" % (realloc, maxnode)) + self.nodes[maxnode].current_addresses.remove(realloc) + # Redo the outer loop. + retries_l[0] += 1 + return True + + return False + + + def lcp2_allocate_unassigned(self): + + # Assign as many unassigned addresses as possible. Keep + # selecting the optimal assignment until we don't manage to + # assign anything. + assigned = set([ip for n in self.nodes for ip in n.current_addresses]) + unassigned = sorted(list(self.all_public_ips - assigned)) + + should_loop = True + while len(unassigned) > 0 and should_loop: + should_loop = False + + debug_begin(" CONSIDERING MOVES (UNASSIGNED)") + + minnode = -1 + mindsum = 0 + minip = None + + for ip in unassigned: + for dstnode in range(len(self.nodes)): + if self.nodes[dstnode].can_node_serve_ip(ip) and \ + self.nodes[dstnode].healthy: + dstdsum = ip_distance_2_sum(ip, self.nodes[dstnode].current_addresses) + dstimbl = self.nodes[dstnode].get_imbalance() + dstdsum + debug_print(" %s -> %d [+%d]" % \ + (ip, + dstnode, + dstimbl - self.nodes[dstnode].get_imbalance())) + + if (minnode == -1) or (dstdsum < mindsum): + minnode = dstnode + minimbl = dstimbl + mindsum = dstdsum + minip = ip + should_loop = True + debug_end() + + if minnode != -1: + self.nodes[minnode].current_addresses.add(minip) + self.nodes[minnode].set_imbalance(self.nodes[minnode].get_imbalance() + mindsum) + verbose_print("%s -> %d [+%d]" % (minip, minnode, mindsum)) + unassigned.remove(minip) + + for ip in unassigned: + verbose_print("Could not find node to take over public address %s" % ip) + + def lcp2_failback(self, targets): + + # Get the node with the highest imbalance metric. + srcnode = -1 + maximbl = 0 + for (pnn, n) in enumerate(self.nodes): + b = n.get_imbalance() + if (srcnode == -1) or (b > maximbl): + srcnode = pnn + maximbl = b + + # This means that all nodes had 0 or 1 addresses, so can't + # be imbalanced. + if maximbl == 0: + return False + + # We'll need this a few times... + ips = self.nodes[srcnode].current_addresses + + # Find an IP and destination node that best reduces imbalance. + optimum = None + debug_begin(" CONSIDERING MOVES FROM %d [%d]" % (srcnode, maximbl)) + for ip in ips: + # What is this IP address costing the source node? + srcdsum = ip_distance_2_sum(ip, ips - set([ip])) + srcimbl = maximbl - srcdsum + + # Consider this IP address would cost each potential + # destination node. Destination nodes are limited to + # those that are newly healthy, since we don't want to + # do gratuitous failover of IPs just to make minor + # balance improvements. + for dstnode in targets: + if self.nodes[dstnode].can_node_serve_ip(ip) and \ + self.nodes[dstnode].healthy: + dstdsum = ip_distance_2_sum(ip, self.nodes[dstnode].current_addresses) + dstimbl = self.nodes[dstnode].get_imbalance() + dstdsum + debug_print(" %d [%d] -> %s -> %d [+%d]" % \ + (srcnode, + srcimbl - self.nodes[srcnode].get_imbalance(), + ip, + dstnode, + dstimbl - self.nodes[dstnode].get_imbalance())) + + if (dstimbl < maximbl) and (dstdsum < srcdsum): + if optimum is None: + optimum = (ip, srcnode, srcimbl, dstnode, dstimbl) + else: + (x, sn, si, dn, di) = optimum + if (srcimbl + dstimbl) < (si + di): + optimum = (ip, srcnode, srcimbl, dstnode, dstimbl) + debug_end() + + if optimum is not None: + # We found a move that makes things better... + (ip, srcnode, srcimbl, dstnode, dstimbl) = optimum + ini_srcimbl = self.nodes[srcnode].get_imbalance() + ini_dstimbl = self.nodes[dstnode].get_imbalance() + + self.nodes[srcnode].current_addresses.remove(ip) + self.nodes[srcnode].set_imbalance(srcimbl) + + self.nodes[dstnode].current_addresses.add(ip) + self.nodes[dstnode].set_imbalance(dstimbl) + + verbose_print("%d [%d] -> %s -> %d [+%d]" % \ + (srcnode, + srcimbl - ini_srcimbl, + ip, + dstnode, + dstimbl - ini_dstimbl)) + + return True + + return False + + def ctdb_takeover_run(self): self.events += 1 @@ -374,69 +633,39 @@ class Cluster(object): for ip in n.current_addresses - n.public_addresses]) n.current_addresses &= n.public_addresses - # We'll only retry the balancing act up to 5 times. - retries = 0 + if options.lcp2: + newly_healthy = [pnn for (pnn, n) in enumerate(self.nodes) + if len(n.current_addresses) == 0 and n.healthy] + for n in self.nodes: + n.set_imbalance() + + # We'll only retry the balancing act up to options.retries + # times (for the basic non-deterministic algorithm). This + # nonsense gives us a reference on the retries count in + # Python. It will be easier in C. :-) + # For LCP2 we reassignas many IPs from heavily "loaded" nodes + # to nodes that are newly healthy, looping until we fail to + # reassign an IP. + retries_l = [0] should_loop = True while should_loop: should_loop = False - assigned = set([ip for n in self.nodes for ip in n.current_addresses]) - unassigned = sorted(list(self.all_public_ips - assigned)) - - for ip in unassigned: - self.find_takeover_node(ip) + if options.lcp2: + self.lcp2_allocate_unassigned() + else: + self.basic_allocate_unassigned() - if self.no_ip_failback: + if self.no_ip_failback or self.deterministic_public_ips: break - assigned = sorted([ip - for n in self.nodes - for ip in n.current_addresses]) - for ip in assigned: - - maxnode = -1 - minnode = -1 - for (i, n) in enumerate(self.nodes): - if not n.healthy: - continue - - if not n.can_node_serve_ip(ip): - continue - - num = n.node_ip_coverage() - - if maxnode == -1: - maxnode = i - maxnum = num - else: - if num > maxnum: - maxnode = i - maxnum = num - if minnode == -1: - minnode = i - minnum = num - else: - if num < minnum: - minnode = i - minnum = num - - if maxnode == -1: - print "Could not maxnode. May not be able to serve ip", ip - continue - - if self.deterministic_public_ips: - continue - - if maxnum > minnum + 1 and retries < options.retries: - # Remove the 1st ip from maxnode - t = sorted(list(self.nodes[maxnode].current_addresses)) - realloc = t[0] - verbose_print("%s <- %d" % (realloc, maxnode)) - self.nodes[maxnode].current_addresses.remove(realloc) - retries += 1 - # Redo the outer loop. - should_loop = True + if options.lcp2: + if len(newly_healthy) == 0: break + should_loop = self.lcp2_failback(newly_healthy) + else: + should_loop = self.basic_failback(retries_l) + def recover(self): verbose_begin("TAKEOVER") |