diff options
author | Thomas Haller <thaller@redhat.com> | 2020-01-30 18:21:00 +0100 |
---|---|---|
committer | Thomas Haller <thaller@redhat.com> | 2020-01-31 01:03:45 +0100 |
commit | 46a952234a7c5bb9b2d2a3e5119a396be1cd0f4f (patch) | |
tree | 137a9c5faf54d61bab4f52029ea5a8afcd48d9b2 | |
parent | b6ce3c7163150852976b3530576a977810373753 (diff) | |
download | NetworkManager-th/libnm-test.tar.gz |
TESTth/libnm-test
-rwxr-xr-x | libnm-test.py | 136 | ||||
-rw-r--r-- | libnm/libnm.ver | 1 | ||||
-rw-r--r-- | libnm/nm-active-connection.c | 20 | ||||
-rw-r--r-- | libnm/nm-client.c | 2 | ||||
-rw-r--r-- | libnm/nm-client.h | 2 | ||||
-rw-r--r-- | libnm/nm-libnm-utils.c | 10 | ||||
-rw-r--r-- | network_test_base.py | 360 | ||||
-rw-r--r-- | nm.py | 573 |
8 files changed, 1103 insertions, 1 deletions
diff --git a/libnm-test.py b/libnm-test.py new file mode 100755 index 0000000000..df7ddea3b7 --- /dev/null +++ b/libnm-test.py @@ -0,0 +1,136 @@ +#!/bin/python3 + +import sys +import uuid +import socket + +import gi +gi.require_version('NM', '1.0') +from gi.repository import GLib, NM + +mainloop = GLib.MainLoop.new(None, False) + +PROFILE_NAME = 'x-libnm-test' + +DEVICE_NAME = 'eth0' + +def log(msg): + NM.utils_log(f'{msg}') + +def die(msg): + log(f"DIE: {msg}") + sys.exit(1) + +def nmc_create(): + nmc = NM.Client() + def cb(source, res): + assert(source == nmc) + try: + source.init_finish(res) + except Exception as e: + die("failed to initialize NMClient: %s" % (e)) + mainloop.quit() + nmc.init_async(0, None, cb) + mainloop.run() + return nmc + +def create_profile(name): + + profile = NM.SimpleConnection.new() + + s_con = NM.SettingConnection.new() + s_con.set_property(NM.SETTING_CONNECTION_ID, name) + s_con.set_property(NM.SETTING_CONNECTION_UUID, str(uuid.uuid4())) + s_con.set_property(NM.SETTING_CONNECTION_TYPE, "802-3-ethernet") + + s_wired = NM.SettingWired.new() + + s_ip4 = NM.SettingIP4Config.new() + s_ip4.set_property(NM.SETTING_IP_CONFIG_METHOD, "manual") + s_ip4.add_address(NM.IPAddress.new(socket.AF_INET, "192.168.33.5", 24)) + + s_ip6 = NM.SettingIP6Config.new() + s_ip6.set_property(NM.SETTING_IP_CONFIG_METHOD, "disabled") + + profile.add_setting(s_con) + profile.add_setting(s_ip4) + profile.add_setting(s_ip6) + profile.add_setting(s_wired) + + return profile + +def find_device(nmc, **kwargs): + best = None + for d in nmc.get_devices(): + if 'iface' in kwargs: + if d.get_iface() != kwargs['iface']: + continue + if best is None: + best = d + elif kwargs.get('unique', True): + raise Exception("Cannot find unique device for parameters %s" % (kwargs)) + return best + +def add_and_activate(nmc, device, profile): + res = {} + def cb(source, result): + assert(source == nmc) + try: + res['ac'] = source.add_and_activate_connection_finish(result); + log("add and activate finished and got %s" % (res['ac'].get_path())) + c = res['ac'].get_connection() + log(' has %s' % (c)) + if not c: + log("No connection when add-and-activate completes."); + #res['ac'].handler_disconnect(100000) + except Exception as e: + log("add and activate finished and failed with %s" % (e)) + res['error'] = e + mainloop.quit() + log("add and activate '%s' (%s) starting..." % (profile.get_id(), profile.get_uuid())) + nmc.add_and_activate_connection_async(profile, + device, + None, + None, + cb) + mainloop.run() + if 'error' in res: + raise res['error'] + return res['ac'] + +nmc = nmc_create() + +def cb(source, con): + log(f'connection {con.get_id()}, {con.get_uuid()} added ({con.get_path()})') +nmc.connect('connection-added', cb) + +device = find_device(nmc, iface = DEVICE_NAME) +assert(device) + +profile = create_profile(PROFILE_NAME) + +ac = add_and_activate(nmc, device, profile) +assert(ac) +con = ac.get_connection() + +if not con: + log('ac has no connection yet. Wait...') + + def cb(source, pspec): + assert(source == ac) + assert(pspec.name == 'connection') + assert(ac.get_connection()) + log('event!') + mainloop.quit() + + signal_id = ac.connect('notify::connection', cb) + + mainloop.run() + + ac.handler_disconnect(signal_id) + con = ac.get_connection() + if not con: + log('ac has still no connection') + sys.exit(1) + +log('ac has connection %s (%s). Done...' % (con.get_id(), con.get_uuid())) diff --git a/libnm/libnm.ver b/libnm/libnm.ver index 457b55a798..1d5a5f8785 100644 --- a/libnm/libnm.ver +++ b/libnm/libnm.ver @@ -1676,4 +1676,5 @@ global: nm_setting_vrf_get_table; nm_setting_vrf_get_type; nm_setting_vrf_new; + nm_utils_log; } libnm_1_22_0; diff --git a/libnm/nm-active-connection.c b/libnm/nm-active-connection.c index 076cfd8f01..6abb5e4520 100644 --- a/libnm/nm-active-connection.c +++ b/libnm/nm-active-connection.c @@ -96,9 +96,16 @@ G_DEFINE_TYPE (NMActiveConnection, nm_active_connection, NM_TYPE_OBJECT); NMRemoteConnection * nm_active_connection_get_connection (NMActiveConnection *connection) { + NMRemoteConnection *con; + g_return_val_if_fail (NM_IS_ACTIVE_CONNECTION (connection), NULL); - return nml_dbus_property_o_get_obj (&NM_ACTIVE_CONNECTION_GET_PRIVATE (connection)->property_o[PROPERTY_O_IDX_CONNECTION]); + con = nml_dbus_property_o_get_obj (&NM_ACTIVE_CONNECTION_GET_PRIVATE (connection)->property_o[PROPERTY_O_IDX_CONNECTION]); + + g_printerr (">>>>>>>>>> request ac.connection: %s -> %s\n", + nm_object_get_path ((gpointer) connection), + !con ? "(null)" : nm_object_get_path ((gpointer) con)); + return con; } /** @@ -425,6 +432,7 @@ _nm_active_connection_state_changed_commit (NMActiveConnection *self, _notify_event_state_changed, g_object_ref (self)); } + /*****************************************************************************/ static gboolean @@ -432,6 +440,16 @@ is_ready (NMObject *nmobj) { NMActiveConnectionPrivate *priv = NM_ACTIVE_CONNECTION_GET_PRIVATE (nmobj); + NMLDBusPropertyO *p; + + p = &priv->property_o[PROPERTY_O_IDX_CONNECTION]; + + g_printerr (">>>>>>>>> is_ready[%s]: is-ready=%d, is-ready-fully=%d, obj=%p\n", + nm_object_get_path ((gpointer) nmobj), + nml_dbus_property_o_is_ready (p), + nml_dbus_property_o_is_ready_fully (p), + nml_dbus_property_o_get_obj (p)); + /* Usually, we want not expose our NMObject instances until they are fully initialized. * For NMRemoteSetting this means to wait until GetSettings() returns. * diff --git a/libnm/nm-client.c b/libnm/nm-client.c index a93a81c353..cf4758de7e 100644 --- a/libnm/nm-client.c +++ b/libnm/nm-client.c @@ -933,6 +933,8 @@ _nm_client_notify_event_emit (NMClient *self) while ((base = c_list_first_entry (&priv->queue_notify_lst_head, NMObjectBase, queue_notify_lst))) { c_list_unlink (&base->queue_notify_lst); + if (NM_IS_OBJECT (base)) + g_printerr (">>>>>>> thaw object %s\n", nm_object_get_path ((gpointer) base)); g_object_thaw_notify (G_OBJECT (base)); g_object_unref (base); } diff --git a/libnm/nm-client.h b/libnm/nm-client.h index 9c2fdff9f1..56c5249069 100644 --- a/libnm/nm-client.h +++ b/libnm/nm-client.h @@ -478,6 +478,8 @@ gboolean nm_client_reload_finish (NMClient *client, GAsyncResult *result, GError **error); +void nm_utils_log (const char *msg); + G_END_DECLS #endif /* __NM_CLIENT_H__ */ diff --git a/libnm/nm-libnm-utils.c b/libnm/nm-libnm-utils.c index f04a7419f5..a9a4ea93b7 100644 --- a/libnm/nm-libnm-utils.c +++ b/libnm/nm-libnm-utils.c @@ -39,6 +39,16 @@ _nml_dbus_log_level_init (void) return l; } +/** + * nm_utils_log: + * @msg: string + */ +void +nm_utils_log (const char *msg) +{ + g_printerr (">>>>> %s\n", msg); +} + void _nml_dbus_log (NMLDBusLogLevel level, const char *fmt, diff --git a/network_test_base.py b/network_test_base.py new file mode 100644 index 0000000000..9ef8dec3f8 --- /dev/null +++ b/network_test_base.py @@ -0,0 +1,360 @@ +''' +Base class for network related tests. + +This provides fake wifi devices with mac80211_hwsim and hostapd, test ethernet +devices with veth, utility functions to start wpasupplicant, dnsmasq, get/set +rfkill status, and some utility functions. +''' + +__author__ = 'Martin Pitt <martin.pitt@ubuntu.com>' +__copyright__ = '(C) 2013 Canonical Ltd.' +__license__ = 'GPL v2 or later' + +import sys +import os +import os.path +import time +import tempfile +import subprocess +import re +import unittest +import traceback +import functools +from glob import glob + +# check availability of programs, and cleanly skip test if they are not +# available +for program in ['wpa_supplicant', 'hostapd', 'dnsmasq', 'dhclient']: + if subprocess.call(['which', program], stdout=subprocess.PIPE) != 0: + sys.stderr.write('%s is required for this test suite, but not available. Skipping\n' % program) + sys.exit(0) + + +class NetworkTestBase(unittest.TestCase): + '''Common functionality for network test cases + + setUp() creates two test wlan devices, one for a simulated access point + (self.dev_w_ap), the other for a simulated client device + (self.dev_w_client), and two test ethernet devices (self.dev_e_ap and + self.dev_e_client). + + Each test should call self.setup_ap() or self.setup_eth() with the desired + configuration. + ''' + @classmethod + def setUpClass(klass): + # ensure we have this so that iw works + subprocess.check_call(['modprobe', 'cfg80211']) + + # set regulatory domain "EU", so that we can use 80211.a 5 GHz channels + out = subprocess.check_output(['iw', 'reg', 'get'], universal_newlines=True) + m = re.match('^(?:global\n)?country (\S+):', out) + assert m + klass.orig_country = m.group(1) + subprocess.check_call(['iw', 'reg', 'set', 'EU']) + + @classmethod + def tearDownClass(klass): + subprocess.check_call(['iw', 'reg', 'set', klass.orig_country]) + os.remove('/run/udev/rules.d/99-nm-veth-test.rules') + + @classmethod + def create_devices(klass): + '''Create Access Point and Client devices with mac80211_hwsim and veth''' + + klass.dev_e_ap = 'veth42' + klass.dev_e_client = 'eth42' + + if os.path.exists('/sys/module/mac80211_hwsim'): + raise SystemError('mac80211_hwsim module already loaded') + if os.path.exists('/sys/class/net/' + klass.dev_e_client): + raise SystemError('%s interface already exists' % klass.dev_e_client) + + # ensure NM can manage our fake eths + os.makedirs('/run/udev/rules.d', exist_ok=True) + with open('/run/udev/rules.d/99-nm-veth-test.rules', 'w') as f: + f.write('ENV{ID_NET_DRIVER}=="veth", ENV{INTERFACE}=="%s", ENV{NM_UNMANAGED}="0"\n' % klass.dev_e_client) + subprocess.check_call(['udevadm', 'control', '--reload']) + + # create virtual ethernet devs + subprocess.check_call(['ip', 'link', 'add', 'name', klass.dev_e_client, 'type', + 'veth', 'peer', 'name', klass.dev_e_ap]) + + # create virtual wlan devs + before_wlan = set([c for c in os.listdir('/sys/class/net') if c.startswith('wlan')]) + subprocess.check_call(['modprobe', 'mac80211_hwsim']) + # wait 5 seconds for fake devices to appear + timeout = 50 + while timeout > 0: + after_wlan = set([c for c in os.listdir('/sys/class/net') if c.startswith('wlan')]) + if len(after_wlan) - len(before_wlan) >= 2: + break + timeout -= 1 + time.sleep(0.1) + else: + raise SystemError('timed out waiting for fake devices to appear') + + devs = list(after_wlan - before_wlan) + klass.dev_w_ap = devs[0] + klass.dev_w_client = devs[1] + + # determine and store MAC addresses + with open('/sys/class/net/%s/address' % klass.dev_w_ap) as f: + klass.mac_w_ap = f.read().strip().upper() + with open('/sys/class/net/%s/address' % klass.dev_w_client) as f: + klass.mac_w_client = f.read().strip().upper() + with open('/sys/class/net/%s/address' % klass.dev_e_ap) as f: + klass.mac_e_ap = f.read().strip().upper() + with open('/sys/class/net/%s/address' % klass.dev_e_client) as f: + klass.mac_e_client = f.read().strip().upper() + #print('Created fake devices: AP: %s, client: %s' % (klass.dev_w_ap, klass.dev_w_client)) + + @classmethod + def shutdown_devices(klass): + '''Remove test wlan devices''' + + subprocess.check_call(['rmmod', 'mac80211_hwsim']) + subprocess.check_call(['ip', 'link', 'del', 'dev', klass.dev_e_ap]) + klass.dev_w_ap = None + klass.dev_w_client = None + klass.dev_e_ap = None + klass.dev_e_client = None + + @classmethod + def get_rfkill(klass, interface): + '''Get rfkill status of an interface. + + Returns whether the interface is blocked, i. e. "True" for blocked, + "False" for enabled. + ''' + with open(klass._rfkill_attribute(interface)) as f: + val = f.read() + return val == '1' + + @classmethod + def set_rfkill(klass, interface, block): + '''Set rfkill status of an interface + + Use block==True for disabling ("killswitching") an interface, + block==False to re-enable. + ''' + with open(klass._rfkill_attribute(interface), 'w') as f: + f.write(block and '1' or '0') + + def run(self, result=None): + '''Show log files on failed tests''' + + if result: + orig_err_fail = len(result.errors) + len(result.failures) + super().run(result) + if hasattr(self, 'workdir'): + logs = glob(os.path.join(self.workdir, '*.log')) + if result and len(result.errors) + len(result.failures) > orig_err_fail: + for log_file in logs: + with open(log_file) as f: + print('\n----- %s -----\n%s\n------\n' + % (os.path.basename(log_file), f.read())) + + # clean up log files, so that we don't see ones from previous tests + for log_file in logs: + os.unlink(log_file) + + def setUp(self): + '''Create test devices and workdir''' + + self.create_devices() + self.addCleanup(self.shutdown_devices) + self.workdir_obj = tempfile.TemporaryDirectory() + self.workdir = self.workdir_obj.name + + # create static entropy file to avoid draining/blocking on /dev/random + self.entropy_file = os.path.join(self.workdir, 'entropy') + with open(self.entropy_file, 'wb') as f: + f.write(b'012345678901234567890') + + def setup_ap(self, hostapd_conf, ipv6_mode): + '''Set up simulated access point + + On self.dev_w_ap, run hostapd with given configuration. Setup dnsmasq + according to ipv6_mode, see start_dnsmasq(). + + This is torn down automatically at the end of the test. + ''' + # give our AP an IP + subprocess.check_call(['ip', 'a', 'flush', 'dev', self.dev_w_ap]) + if ipv6_mode is not None: + subprocess.check_call(['ip', 'a', 'add', '2600::1/64', 'dev', self.dev_w_ap]) + else: + subprocess.check_call(['ip', 'a', 'add', '192.168.5.1/24', 'dev', self.dev_w_ap]) + + self.start_hostapd(hostapd_conf) + self.start_dnsmasq(ipv6_mode, self.dev_w_ap) + + def setup_eth(self, ipv6_mode, start_dnsmasq=True): + '''Set up simulated ethernet router + + On self.dev_e_ap, run dnsmasq according to ipv6_mode, see + start_dnsmasq(). + + This is torn down automatically at the end of the test. + ''' + # give our router an IP + subprocess.check_call(['ip', 'a', 'flush', 'dev', self.dev_e_ap]) + if ipv6_mode is not None: + subprocess.check_call(['ip', 'a', 'add', '2600::1/64', 'dev', self.dev_e_ap]) + else: + subprocess.check_call(['ip', 'a', 'add', '192.168.5.1/24', 'dev', self.dev_e_ap]) + subprocess.check_call(['ip', 'link', 'set', self.dev_e_ap, 'up']) + # we don't really want to up the client iface already, but veth doesn't + # work otherwise (no link detected) + subprocess.check_call(['ip', 'link', 'set', self.dev_e_client, 'up']) + + if start_dnsmasq: + self.start_dnsmasq(ipv6_mode, self.dev_e_ap) + + def start_wpasupp(self, conf): + '''Start wpa_supplicant on client interface''' + + w_conf = os.path.join(self.workdir, 'wpasupplicant.conf') + with open(w_conf, 'w') as f: + f.write('ctrl_interface=%s\nnetwork={\n%s\n}\n' % (self.workdir, conf)) + log = os.path.join(self.workdir, 'wpasupp.log') + p = subprocess.Popen(['wpa_supplicant', '-Dwext', '-i', self.dev_w_client, + '-e', self.entropy_file, '-c', w_conf, '-f', log], + stderr=subprocess.PIPE) + self.addCleanup(p.wait) + self.addCleanup(p.terminate) + # TODO: why does this sometimes take so long? + self.poll_text(log, 'CTRL-EVENT-CONNECTED', timeout=200) + + def wrap_process(self, fn, *args, **kwargs): + '''Run a test method in a separate process. + + Run test method fn(*args, **kwargs) in a child process. If that raises + any exception, it gets propagated to the main process and + wrap_process() fails with that exception. + ''' + # exception from subprocess is propagated through this file + exc_path = os.path.join(self.workdir, 'exc') + try: + os.unlink(exc_path) + except OSError: + pass + + pid = os.fork() + + # run the actual test in the child + if pid == 0: + # short-circuit tearDownClass(), as this will be done by the parent + # process + self.addCleanup(os._exit, 0) + try: + fn(*args, **kwargs) + except: + with open(exc_path, 'w') as f: + f.write(traceback.format_exc()) + raise + else: + # get success/failure state from child + os.waitpid(pid, 0) + # propagate exception + if os.path.exists(exc_path): + with open(exc_path) as f: + self.fail(f.read()) + + # + # Internal implementation details + # + + @classmethod + def poll_text(klass, logpath, string, timeout=50): + '''Poll log file for a given string with a timeout. + + Timeout is given in deciseconds. + ''' + log = '' + while timeout > 0: + if os.path.exists(logpath): + break + timeout -= 1 + time.sleep(0.1) + assert timeout > 0, 'Timed out waiting for file %s to appear' % logpath + + with open(logpath) as f: + while timeout > 0: + line = f.readline() + if line: + log += line + if string in line: + break + continue + timeout -= 1 + time.sleep(0.1) + + assert timeout > 0, 'Timed out waiting for "%s":\n------------\n%s\n-------\n' % (string, log) + + def start_hostapd(self, conf): + hostapd_conf = os.path.join(self.workdir, 'hostapd.conf') + with open(hostapd_conf, 'w') as f: + f.write('interface=%s\ndriver=nl80211\n' % self.dev_w_ap) + f.write(conf) + + log = os.path.join(self.workdir, 'hostapd.log') + p = subprocess.Popen(['hostapd', '-e', self.entropy_file, '-f', log, hostapd_conf], + stdout=subprocess.PIPE) + self.addCleanup(p.wait) + self.addCleanup(p.terminate) + self.poll_text(log, '' + self.dev_w_ap + ': AP-ENABLED') + + def start_dnsmasq(self, ipv6_mode, iface): + '''Start dnsmasq. + + If ipv6_mode is None, IPv4 is set up with DHCP. If it is not None, it + must be a valid dnsmasq mode, i. e. a combination of "ra-only", + "slaac", "ra-stateless", and "ra-names". See dnsmasq(8). + ''' + if ipv6_mode is None: + dhcp_range = '192.168.5.10,192.168.5.200' + else: + dhcp_range = '2600::10,2600::20' + if ipv6_mode: + dhcp_range += ',' + ipv6_mode + + self.dnsmasq_log = os.path.join(self.workdir, 'dnsmasq.log') + lease_file = os.path.join(self.workdir, 'dnsmasq.leases') + + p = subprocess.Popen(['dnsmasq', '--keep-in-foreground', '--log-queries', + '--log-facility=' + self.dnsmasq_log, + '--conf-file=/dev/null', + '--dhcp-leasefile=' + lease_file, + '--bind-interfaces', + '--interface=' + iface, + '--except-interface=lo', + '--enable-ra', + '--dhcp-range=' + dhcp_range]) + self.addCleanup(p.wait) + self.addCleanup(p.terminate) + + if ipv6_mode is not None: + self.poll_text(self.dnsmasq_log, 'IPv6 router advertisement enabled') + else: + self.poll_text(self.dnsmasq_log, 'DHCP, IP range') + + @classmethod + def _rfkill_attribute(klass, interface): + '''Return the path to interface's rfkill soft toggle in sysfs.''' + + g = glob('/sys/class/net/%s/phy80211/rfkill*/soft' % interface) + assert len(g) == 1, 'Did not find exactly one "soft" rfkill attribute for %s: %s' % ( + interface, str(g)) + return g[0] + + +def run_in_subprocess(fn): + '''Decorator for running fn in a child process''' + + @functools.wraps(fn) + def wrapped(*args, **kwargs): + # args[0] is self + args[0].wrap_process(fn, *args, **kwargs) + return wrapped @@ -0,0 +1,573 @@ +#!/usr/bin/python3 +# Test NetworkManager on simulated network devices +# For an interactive shell test, run "nm ColdplugWifi.shell", see below + +__author__ = 'Martin Pitt <martin.pitt@ubuntu.com>' +__copyright__ = '(C) 2013 Canonical Ltd.' +__license__ = 'GPL v2 or later' + +import sys +import os +import os.path +import re +import time +import subprocess +import socket +import netaddr +import unittest +import ctypes + +try: + from dbusmock import DBusTestCase +except ImportError: + DBusTestCase = object # dummy so that the class declaration works + +import gi + +gi.require_version('NM', '1.0') +from gi.repository import NM, GLib, Gio + +sys.path.append(os.path.dirname(__file__)) +import network_test_base + +SSID = 'fake net' + +# If True, NetworkManager logs directly to stdout, to watch logs in real time +#NM_LOG_STDOUT = os.getenv('NM_LOG_STDOUT', False) +NM_LOG_STDOUT = False + +# avoid accidentally destroying any real config +os.environ['GSETTINGS_BACKEND'] = 'memory' + +# we currently get a lot of WARNINGs/CRITICALs from GI (leaked objects from +# previous test runs/main loops?) Redirect them to stdout, to avoid failing +# autopkgtests +os.dup2(sys.stdout.fileno(), sys.stderr.fileno()) + + +class NetworkManagerTest(network_test_base.NetworkTestBase): + '''Provide common functionality for NM tests''' + + def start_nm(self, wait_iface=None, auto_connect=True): + '''Start NetworkManager and initialize client object + + If wait_iface is given, wait until NM recognizes that interface. + Otherwise, just wait until NM has initialized (for coldplug mode). + + If auto_connect is False, set the "no-auto-default=*" option to avoid + auto-connecting to wired devices. + ''' + # mount tmpfses over system directories, to avoid destroying the + # production configuration, and isolating tests from each other + if not os.path.exists('/run/NetworkManager'): + os.mkdir('/run/NetworkManager') + for d in ['/etc/NetworkManager', '/var/lib/NetworkManager', + '/run/NetworkManager']: + subprocess.check_call(['mount', '-n', '-t', 'tmpfs', 'none', d]) + self.addCleanup(subprocess.call, ['umount', d]) + os.mkdir('/etc/NetworkManager/system-connections') + + # create local configuration; this allows us to have full control, and + # we also need to blacklist the AP device so that NM does not tear it + # down; we also blacklist any existing real interface to avoid + # interfering with it, and for getting predictable results + blacklist = '' + for iface in os.listdir('/sys/class/net'): + if iface == "bonding_masters": + continue + if iface != self.dev_w_client and iface != self.dev_e_client: + with open('/sys/class/net/%s/address' % iface) as f: + if blacklist: + blacklist += ';' + blacklist += 'mac:%s' % f.read().strip() + + conf = os.path.join(self.workdir, 'NetworkManager.conf') + extra_main = '' + if not auto_connect: + extra_main += 'no-auto-default=*\n' + + with open(conf, 'w') as f: + f.write('[main]\nplugins=keyfile\n%s\n[keyfile]\nunmanaged-devices=%s\n' % + (extra_main, blacklist)) + + if NM_LOG_STDOUT: + f_log = None + else: + log = os.path.join(self.workdir, 'NetworkManager.log') + f_log = os.open(log, os.O_CREAT | os.O_WRONLY | os.O_SYNC) + + # build NM command line + argv = ['NetworkManager', '--log-level=debug', '--debug', '--config=' + conf] + # allow specifying extra arguments + argv += os.environ.get('NM_TEST_DAEMON_ARGS', '').strip().split() + + p = subprocess.Popen(argv, stdout=f_log, stderr=subprocess.STDOUT) + # automatically terminate process at end of test case + self.addCleanup(p.wait) + self.addCleanup(p.terminate) + self.addCleanup(self.shutdown_connections) + + if NM_LOG_STDOUT: + # let it initialize, then print a marker + time.sleep(1) + print('******* NM initialized *********\n\n') + else: + self.addCleanup(os.close, f_log) + + # this should be fast, give it 2 s to initialize + if wait_iface: + self.poll_text(log, 'manager: (%s): new' % wait_iface, timeout=100) + + self.nmclient = NM.Client.new() + self.assertTrue(self.nmclient.networking_get_enabled()) + + # FIXME: This certainly ought to be true, but isn't + #self.assertTrue(self.nmclient.get_manager_running()) + + # determine device objects + for d in self.nmclient.get_devices(): + if d.props.interface == self.dev_w_ap: + self.assertEqual(d.get_device_type(), NM.DeviceType.WIFI) + self.assertEqual(d.get_driver(), 'mac80211_hwsim') + self.assertEqual(d.get_hw_address(), self.mac_w_ap) + self.nmdev_w_ap = d + elif d.props.interface == self.dev_w_client: + self.assertEqual(d.get_device_type(), NM.DeviceType.WIFI) + self.assertEqual(d.get_driver(), 'mac80211_hwsim') + # NM ≥ 1.4 randomizes MAC addresses by default, so we can't + # test for equality, just make sure it's not our AP + self.assertNotEqual(d.get_hw_address(), self.mac_w_ap) + self.nmdev_w = d + elif d.props.interface == self.dev_e_client: + self.assertEqual(d.get_device_type(), NM.DeviceType.VETH) + self.assertEqual(d.get_driver(), 'veth') + self.assertEqual(d.get_hw_address(), self.mac_e_client) + self.nmdev_e = d + + self.assertTrue(hasattr(self, 'nmdev_w_ap'), 'Could not determine wifi AP NM device') + self.assertTrue(hasattr(self, 'nmdev_w'), 'Could not determine wifi client NM device') + self.assertTrue(hasattr(self, 'nmdev_e'), 'Could not determine eth client NM device') + + self.process_glib_events() + + def shutdown_connections(self): + '''Shut down all active NM connections.''' + + if NM_LOG_STDOUT: + print('\n\n******* Shutting down NM connections *********') + + # remove all created connections + for active_conn in self.nmclient.get_active_connections(): + self.nmclient.deactivate_connection(active_conn) + self.assertEventually(lambda: self.nmclient.get_active_connections() == [], + timeout=20) + + # verify that NM properly deconfigures the devices + self.assert_iface_down(self.dev_w_client) + self.assert_iface_down(self.dev_e_client) + + @classmethod + def process_glib_events(klass): + '''Process pending GLib main loop events''' + + context = GLib.MainContext.default() + while context.iteration(False): + pass + + def assertEventually(self, condition, message=None, timeout=50): + '''Assert that condition function eventually returns True. + + timeout is in deciseconds, defaulting to 50 (5 seconds). message is + printed on failure. + ''' + while timeout >= 0: + self.process_glib_events() + if condition(): + break + timeout -= 1 + time.sleep(0.1) + else: + self.fail(message or 'timed out waiting for ' + str(condition)) + + def assert_iface_down(self, iface): + '''Assert that client interface is down''' + + out = subprocess.check_output(['ip', 'a', 'show', 'dev', iface], + universal_newlines=True) + self.assertNotIn('inet 192', out) + self.assertNotIn('inet6 2600', out) + + if iface == self.dev_w_client: + out = subprocess.check_output(['iw', 'dev', iface, 'link'], + universal_newlines=True) + self.assertIn('Not connected', out) + + # but AP device should never be touched by NM + out = subprocess.check_output(['ip', 'a', 'show', 'dev', self.dev_w_ap], + universal_newlines=True) + self.assertIn('state UP', out) + + def assert_iface_up(self, iface, expected_ip_a=None, unexpected_ip_a=None): + '''Assert that client interface is up''' + + out = subprocess.check_output(['ip', 'a', 'show', 'dev', iface], + universal_newlines=True) + self.assertIn('state UP', out) + if expected_ip_a: + for r in expected_ip_a: + self.assertRegex(out, r) + if unexpected_ip_a: + for r in unexpected_ip_a: + self.assertNotRegex(out, r) + + if iface == self.dev_w_client: + out = subprocess.check_output(['iw', 'dev', iface, 'link'], + universal_newlines=True) + self.assertIn('Connected to ' + self.mac_w_ap, out) + self.assertIn('SSID: ' + SSID, out) + + def wait_ap(self, timeout): + '''Wait for AccessPoint NM object to appear, and return it''' + + self.assertEventually(lambda: len(self.nmdev_w.get_access_points()) > 0, + 'timed out waiting for AP to be detected', + timeout=timeout) + + return self.nmdev_w.get_access_points()[0] + + def connect_to_ap(self, ap, secret, ipv6_mode, ip6_privacy): + '''Connect to an NMAccessPoint. + + secret should be None for open networks, and a string with the password + for WEP/WPA. + + ip6_privacy is a NM.SettingIP6ConfigPrivacy flag. + + Return (NMConnection, NMActiveConnection) objects. + ''' + + ip4_method = NM.SETTING_IP4_CONFIG_METHOD_DISABLED + ip6_method = NM.SETTING_IP6_CONFIG_METHOD_IGNORE + if ipv6_mode is None: + ip4_method = NM.SETTING_IP4_CONFIG_METHOD_AUTO + else: + ip6_method = NM.SETTING_IP6_CONFIG_METHOD_AUTO + + # If we have a secret, supply it to the new connection right away; + # adding it afterwards with update_secrets() does not work, and we + # can't implement a SecretAgent as get_secrets() would need to build a + # map of a map of gpointers to gpointers which is too much for PyGI + partial_conn = NM.SimpleConnection.new() + partial_conn.add_setting(NM.SettingIP4Config(method=ip4_method)) + if secret: + partial_conn.add_setting(NM.SettingWirelessSecurity.new()) + # FIXME: needs update for other auth types + partial_conn.update_secrets(NM.SETTING_WIRELESS_SECURITY_SETTING_NAME, + GLib.Variant('a{sv}', { + 'psk': GLib.Variant('s', secret) + })) + if ip6_privacy is not None: + partial_conn.add_setting(NM.SettingIP6Config(ip6_privacy=ip6_privacy, + method=ip6_method)) + + ml = GLib.MainLoop() + self.cb_conn = None + self.cancel = Gio.Cancellable() + self.timeout_tag = 0 + + def add_activate_cb(client, res, data): + if (self.timeout_tag > 0): + GLib.source_remove(self.timeout_tag) + self.timeout_tag = 0 + try: + self.cb_conn = \ + self.nmclient.add_and_activate_connection_finish(res) + + except gi.repository.GLib.Error as e: + # Check if the error is "Operation was cancelled" + if (e.domain != "g-io-error-quark" or e.code != 19): + self.fail("add_and_activate_connection failed: %s (%s, %d)" % + (e.message, e.domain, e.code)) + ml.quit() + + def timeout_cb(): + self.timeout_tag = -1 + self.cancel.cancel() + ml.quit() + return GLib.SOURCE_REMOVE + + self.nmclient.add_and_activate_connection_async(partial_conn, self.nmdev_w, ap.get_path(), self.cancel, add_activate_cb, None) + self.timeout_tag = GLib.timeout_add_seconds(300, timeout_cb) + ml.run() + if (self.timeout_tag < 0): + self.timeout_tag = 0 + self.fail('Main loop for adding connection timed out!') + self.assertNotEqual(self.cb_conn, None) + active_conn = self.cb_conn + self.cb_conn = None + + conn = self.conn_from_active_conn(active_conn) + self.assertTrue(conn.verify()) + + # verify need_secrets() + needed_secrets = conn.need_secrets() + if secret is None: + self.assertEqual(needed_secrets, (None, [])) + else: + self.assertEqual(needed_secrets[0], NM.SETTING_WIRELESS_SECURITY_SETTING_NAME) + self.assertEqual(type(needed_secrets[1]), list) + self.assertGreaterEqual(len(needed_secrets[1]), 1) + # FIXME: needs update for other auth types + self.assertIn(needed_secrets[1][0], [NM.SETTING_WIRELESS_SECURITY_PSK]) + + # we are usually ACTIVATING at this point; wait for completion + # TODO: 5s is not enough, argh slow DHCP client + self.assertEventually(lambda: active_conn.get_state() == NM.ActiveConnectionState.ACTIVATED, + 'timed out waiting for %s to get activated' % active_conn.get_connection(), + timeout=600) + self.assertEqual(self.nmdev_w.get_state(), NM.DeviceState.ACTIVATED) + return (conn, active_conn) + + def conn_from_active_conn(self, active_conn): + '''Get NMConnection object for an NMActiveConnection object''' + + # this sometimes takes a second try, when the corresponding + # NMConnection object is not yet available + tries = 3 + while tries > 0: + self.process_glib_events() + print("active_conn", active_conn, "\n") + print("state", active_conn.props.state, "\n") + print("get_path", active_conn.get_path(), "\n") +# time.sleep(1) + path = active_conn.get_connection().get_path() + for dev in active_conn.get_devices(): + for c in dev.get_available_connections(): + if c.get_path() == path: + return c + time.sleep(0.1) + tries -= 1 + + self.fail('Could not find NMConnection object for %s' % path) + + def check_low_level_config(self, iface, ipv6_mode, ip6_privacy): + '''Check actual hardware state with ip/iw after being connected''' + + # list of expected regexps in "ip a" output + expected_ip_a = [] + unexpected_ip_a = [] + + if ipv6_mode is not None: + if ipv6_mode in ('', 'slaac'): + # has global address from our DHCP server + expected_ip_a.append('inet6 2600::[0-9a-f]+/') + else: + # has address with our prefix and MAC + expected_ip_a.append('inet6 2600::[0-9a-f:]+/64 scope global (?:tentative )?(?:mngtmpaddr )?(?:noprefixroute )?dynamic') + # has address with our prefix and random IP (Privacy + # Extension), if requested + priv_re = 'inet6 2600:[0-9a-f:]+/64 scope global temporary (?:tentative )?(?:mngtmpaddr )?dynamic' + if ip6_privacy in (NM.SettingIP6ConfigPrivacy.PREFER_TEMP_ADDR, + NM.SettingIP6ConfigPrivacy.PREFER_PUBLIC_ADDR): + expected_ip_a.append(priv_re) + else: + # FIXME: add a negative test here + pass + #unexpected_ip_a.append(priv_re) + + # has a link-local address + expected_ip_a.append('inet6 fe80::[0-9a-f:]+/64 scope link') + else: + expected_ip_a.append('inet 192.168.5.\d+/24') + + self.assert_iface_up(iface, expected_ip_a, unexpected_ip_a) + + +class ColdplugWifi(NetworkManagerTest): + '''Wifi: In these tests NM starts after setting up the AP''' + + # not run by default; run "nm-wifi ColdplugWifi.shell" to get this + @network_test_base.run_in_subprocess + def shell(self): + '''Start AP and NM, then run a shell (for debugging)''' + + self.setup_ap('hw_mode=b\nchannel=1\nssid=' + SSID, None) + self.start_nm(self.dev_w_client) + print(''' + +client interface: %s, access point interface: %s, AP SSID: "%s" + +You can now run commands like "nmcli dev" or "nmcli dev wifi connect '%s'". +Logs are in '%s'. When done, exit the shell. + +''' % (self.dev_w_client, self.dev_w_ap, SSID, SSID, self.workdir)) + subprocess.call(['bash', '-i']) + + @network_test_base.run_in_subprocess + def test_open_b_ip4(self): + '''Open network, 802.11b, IPv4''' + + self.do_test('hw_mode=b\nchannel=1\nssid=' + SSID, None, 11000) + + # + # Common test code + # + + # libnm-glib has a lot of internal persistent state (private D-BUS + # connections and such); as it is very brittle and hard to track down + # all remaining references to any NM* object after a test, we rather + # run each test in a separate subprocess + @network_test_base.run_in_subprocess + def do_test(self, hostapd_conf, ipv6_mode, expected_max_bitrate, + secret=None, ip6_privacy=None): + '''Actual test code, parameterized for the particular test case''' + + self.setup_ap(hostapd_conf, ipv6_mode) + self.start_nm(self.dev_w_client) + + # on coldplug we expect the AP to be picked out fast + ap = self.wait_ap(timeout=100) + self.assertTrue(ap.get_path().startswith('/org/freedesktop/NetworkManager')) + self.assertEqual(ap.get_mode(), getattr(NM, '80211Mode').INFRA) + self.assertEqual(ap.get_max_bitrate(), expected_max_bitrate) + #self.assertEqual(ap.get_flags(), ) + + # should not auto-connect + self.assertEqual(self.nmclient.get_active_connections(), []) + + # connect to that AP + (conn, active_conn) = self.connect_to_ap(ap, secret, ipv6_mode, ip6_privacy) + + # check NMActiveConnection object + self.assertIn(active_conn.get_uuid(), [c.get_uuid() for c in self.nmclient.get_active_connections()]) + self.assertEqual([d.get_udi() for d in active_conn.get_devices()], [self.nmdev_w.get_udi()]) + + # check corresponding NMConnection object + wireless_setting = conn.get_setting_wireless() + self.assertEqual(wireless_setting.get_ssid().get_data(), SSID.encode()) + self.assertEqual(wireless_setting.get_hidden(), False) + if secret: + self.assertEqual(conn.get_setting_wireless_security().get_name(), NM.SETTING_WIRELESS_SECURITY_SETTING_NAME) + else: + self.assertEqual(conn.get_setting_wireless_security(), None) + # for debugging + #conn.dump() + + # for IPv6, check privacy setting + if ipv6_mode is not None and ip6_privacy != NM.SettingIP6ConfigPrivacy.UNKNOWN: + assert ip6_privacy is not None, 'for IPv6 tests you need to specify ip6_privacy flag' + ip6_setting = conn.get_setting_ip6_config() + self.assertEqual(ip6_setting.props.ip6_privacy, ip6_privacy) + + self.check_low_level_config(self.dev_w_client, ipv6_mode, ip6_privacy) + +@unittest.skipIf(DBusTestCase is object, + 'WARNING: python-dbusmock not installed, skipping suspend tests; get it from https://pypi.python.org/pypi/python-dbusmock') +class Suspend(NetworkManagerTest, DBusTestCase): + '''These tests run under a mock logind on a private system D-BUS''' + + @classmethod + def setUpClass(klass): + klass.start_system_bus() + NetworkManagerTest.setUpClass() + + @classmethod + def tearDownClass(klass): + NetworkManagerTest.tearDownClass() + DBusTestCase.tearDownClass() + + def setUp(self): + NetworkManagerTest.setUp(self) + + # start mock polkit and logind processes, so that we can + # intercept/control suspend + (p_polkit, self.obj_polkit) = self.spawn_server_template('polkitd', {}, stdout=subprocess.PIPE) + # by default we are not concerned about restricting access in the tests + self.obj_polkit.AllowUnknown(True) + self.addCleanup(p_polkit.wait) + self.addCleanup(p_polkit.terminate) + + (p_logind, self.obj_logind) = self.spawn_server_template('logind', {}, stdout=subprocess.PIPE) + self.addCleanup(p_logind.wait) + self.addCleanup(p_logind.terminate) + + # we have to manually start wpa_supplicant, as D-BUS activation does + # not happen for the fake D-BUS + log = os.path.join(self.workdir, 'wpasupplicant.log') + p_wpasupp = subprocess.Popen(['wpa_supplicant', '-u', '-d', '-e', '-K', + self.entropy_file, '-f', log]) + self.addCleanup(p_wpasupp.wait) + self.addCleanup(p_wpasupp.terminate) + + def fixme_test_active_ip4(self): + '''suspend during active IPv4 connection''' + + self.do_test('hw_mode=b\nchannel=1\nssid=' + SSID, None, + ['inet 192.168.5.\d+/24']) + + def fixme_test_active_ip6(self): + '''suspend during active IPv6 connection''' + + self.do_test('hw_mode=b\nchannel=1\nssid=' + SSID, 'ra-only', + ['inet6 2600::']) + + # + # Common test code + # + + @network_test_base.run_in_subprocess + def do_test(self, hostapd_conf, ipv6_mode, expected_ip_a): + '''Actual test code, parameterized for the particular test case''' + + self.setup_ap(hostapd_conf, ipv6_mode) + self.start_nm(self.dev_w_client) + ap = self.wait_ap(timeout=1800) + (conn, active_conn) = self.connect_to_ap(ap, None, ipv6_mode, None) + + # send logind signal that we are about to suspend + self.obj_logind.EmitSignal('', 'PrepareForSleep', 'b', [True]) + + # disabling should be fast, give it one second + self.assertEventually(lambda: self.nmdev_w.get_state() == NM.DeviceState.UNMANAGED, + timeout=10) + self.assert_iface_down(self.dev_w_client) + + # send logind signal that we resumed + self.obj_logind.EmitSignal('', 'PrepareForSleep', 'b', [False]) + + # this involves DHCP, use same timeout as for regular connection + self.assertEventually(lambda: self.nmdev_w.get_state() == NM.DeviceState.ACTIVATED, + timeout=100) + + # dev_w_client should be back up + self.assert_iface_up(self.dev_w_client, expected_ip_a) + + +def setUpModule(): + # AppArmor currently does not allow us to access the system D-BUS from an + # unshared file system. Hack the policy to allow that until that gets fixed + # properly. See https://launchpad.net/bugs/1244157 + # unshare the mount namespace, so that our tmpfs mounts are guaranteed to get + # cleaned up, and don't influence the production system + libc6 = ctypes.cdll.LoadLibrary('libc.so.6') + assert libc6.unshare(ctypes.c_int(0x00020000)) == 0, 'failed to unshare mount namespace' + + # stop system-wide NetworkManager to avoid interfering with tests + nm_running = subprocess.call('service network-manager stop 2>&1', shell=True) == 0 + + +def tearDownModule(): + subprocess.call('dhclient eth0', shell=True) + subprocess.call('sleep 10', shell=True) + + +if __name__ == '__main__': + # avoid unintelligible error messages, and breaking "make check" when not being + # root + if os.getuid() != 0: + sys.stderr.write('This integration test suite needs to be run as root\n') + sys.exit(1) + + # write to stdout, not stderr + runner = unittest.TextTestRunner(stream=sys.stdout, verbosity=2) + unittest.main(testRunner=runner) |