diff options
author | Thomas Haller <thaller@redhat.com> | 2020-01-30 18:21:00 +0100 |
---|---|---|
committer | Thomas Haller <thaller@redhat.com> | 2020-01-31 01:03:45 +0100 |
commit | 46a952234a7c5bb9b2d2a3e5119a396be1cd0f4f (patch) | |
tree | 137a9c5faf54d61bab4f52029ea5a8afcd48d9b2 /nm.py | |
parent | b6ce3c7163150852976b3530576a977810373753 (diff) | |
download | NetworkManager-th/libnm-test.tar.gz |
TESTth/libnm-test
Diffstat (limited to 'nm.py')
-rw-r--r-- | nm.py | 573 |
1 files changed, 573 insertions, 0 deletions
@@ -0,0 +1,573 @@ +#!/usr/bin/python3 +# Test NetworkManager on simulated network devices +# For an interactive shell test, run "nm ColdplugWifi.shell", see below + +__author__ = 'Martin Pitt <martin.pitt@ubuntu.com>' +__copyright__ = '(C) 2013 Canonical Ltd.' +__license__ = 'GPL v2 or later' + +import sys +import os +import os.path +import re +import time +import subprocess +import socket +import netaddr +import unittest +import ctypes + +try: + from dbusmock import DBusTestCase +except ImportError: + DBusTestCase = object # dummy so that the class declaration works + +import gi + +gi.require_version('NM', '1.0') +from gi.repository import NM, GLib, Gio + +sys.path.append(os.path.dirname(__file__)) +import network_test_base + +SSID = 'fake net' + +# If True, NetworkManager logs directly to stdout, to watch logs in real time +#NM_LOG_STDOUT = os.getenv('NM_LOG_STDOUT', False) +NM_LOG_STDOUT = False + +# avoid accidentally destroying any real config +os.environ['GSETTINGS_BACKEND'] = 'memory' + +# we currently get a lot of WARNINGs/CRITICALs from GI (leaked objects from +# previous test runs/main loops?) Redirect them to stdout, to avoid failing +# autopkgtests +os.dup2(sys.stdout.fileno(), sys.stderr.fileno()) + + +class NetworkManagerTest(network_test_base.NetworkTestBase): + '''Provide common functionality for NM tests''' + + def start_nm(self, wait_iface=None, auto_connect=True): + '''Start NetworkManager and initialize client object + + If wait_iface is given, wait until NM recognizes that interface. + Otherwise, just wait until NM has initialized (for coldplug mode). + + If auto_connect is False, set the "no-auto-default=*" option to avoid + auto-connecting to wired devices. + ''' + # mount tmpfses over system directories, to avoid destroying the + # production configuration, and isolating tests from each other + if not os.path.exists('/run/NetworkManager'): + os.mkdir('/run/NetworkManager') + for d in ['/etc/NetworkManager', '/var/lib/NetworkManager', + '/run/NetworkManager']: + subprocess.check_call(['mount', '-n', '-t', 'tmpfs', 'none', d]) + self.addCleanup(subprocess.call, ['umount', d]) + os.mkdir('/etc/NetworkManager/system-connections') + + # create local configuration; this allows us to have full control, and + # we also need to blacklist the AP device so that NM does not tear it + # down; we also blacklist any existing real interface to avoid + # interfering with it, and for getting predictable results + blacklist = '' + for iface in os.listdir('/sys/class/net'): + if iface == "bonding_masters": + continue + if iface != self.dev_w_client and iface != self.dev_e_client: + with open('/sys/class/net/%s/address' % iface) as f: + if blacklist: + blacklist += ';' + blacklist += 'mac:%s' % f.read().strip() + + conf = os.path.join(self.workdir, 'NetworkManager.conf') + extra_main = '' + if not auto_connect: + extra_main += 'no-auto-default=*\n' + + with open(conf, 'w') as f: + f.write('[main]\nplugins=keyfile\n%s\n[keyfile]\nunmanaged-devices=%s\n' % + (extra_main, blacklist)) + + if NM_LOG_STDOUT: + f_log = None + else: + log = os.path.join(self.workdir, 'NetworkManager.log') + f_log = os.open(log, os.O_CREAT | os.O_WRONLY | os.O_SYNC) + + # build NM command line + argv = ['NetworkManager', '--log-level=debug', '--debug', '--config=' + conf] + # allow specifying extra arguments + argv += os.environ.get('NM_TEST_DAEMON_ARGS', '').strip().split() + + p = subprocess.Popen(argv, stdout=f_log, stderr=subprocess.STDOUT) + # automatically terminate process at end of test case + self.addCleanup(p.wait) + self.addCleanup(p.terminate) + self.addCleanup(self.shutdown_connections) + + if NM_LOG_STDOUT: + # let it initialize, then print a marker + time.sleep(1) + print('******* NM initialized *********\n\n') + else: + self.addCleanup(os.close, f_log) + + # this should be fast, give it 2 s to initialize + if wait_iface: + self.poll_text(log, 'manager: (%s): new' % wait_iface, timeout=100) + + self.nmclient = NM.Client.new() + self.assertTrue(self.nmclient.networking_get_enabled()) + + # FIXME: This certainly ought to be true, but isn't + #self.assertTrue(self.nmclient.get_manager_running()) + + # determine device objects + for d in self.nmclient.get_devices(): + if d.props.interface == self.dev_w_ap: + self.assertEqual(d.get_device_type(), NM.DeviceType.WIFI) + self.assertEqual(d.get_driver(), 'mac80211_hwsim') + self.assertEqual(d.get_hw_address(), self.mac_w_ap) + self.nmdev_w_ap = d + elif d.props.interface == self.dev_w_client: + self.assertEqual(d.get_device_type(), NM.DeviceType.WIFI) + self.assertEqual(d.get_driver(), 'mac80211_hwsim') + # NM ≥ 1.4 randomizes MAC addresses by default, so we can't + # test for equality, just make sure it's not our AP + self.assertNotEqual(d.get_hw_address(), self.mac_w_ap) + self.nmdev_w = d + elif d.props.interface == self.dev_e_client: + self.assertEqual(d.get_device_type(), NM.DeviceType.VETH) + self.assertEqual(d.get_driver(), 'veth') + self.assertEqual(d.get_hw_address(), self.mac_e_client) + self.nmdev_e = d + + self.assertTrue(hasattr(self, 'nmdev_w_ap'), 'Could not determine wifi AP NM device') + self.assertTrue(hasattr(self, 'nmdev_w'), 'Could not determine wifi client NM device') + self.assertTrue(hasattr(self, 'nmdev_e'), 'Could not determine eth client NM device') + + self.process_glib_events() + + def shutdown_connections(self): + '''Shut down all active NM connections.''' + + if NM_LOG_STDOUT: + print('\n\n******* Shutting down NM connections *********') + + # remove all created connections + for active_conn in self.nmclient.get_active_connections(): + self.nmclient.deactivate_connection(active_conn) + self.assertEventually(lambda: self.nmclient.get_active_connections() == [], + timeout=20) + + # verify that NM properly deconfigures the devices + self.assert_iface_down(self.dev_w_client) + self.assert_iface_down(self.dev_e_client) + + @classmethod + def process_glib_events(klass): + '''Process pending GLib main loop events''' + + context = GLib.MainContext.default() + while context.iteration(False): + pass + + def assertEventually(self, condition, message=None, timeout=50): + '''Assert that condition function eventually returns True. + + timeout is in deciseconds, defaulting to 50 (5 seconds). message is + printed on failure. + ''' + while timeout >= 0: + self.process_glib_events() + if condition(): + break + timeout -= 1 + time.sleep(0.1) + else: + self.fail(message or 'timed out waiting for ' + str(condition)) + + def assert_iface_down(self, iface): + '''Assert that client interface is down''' + + out = subprocess.check_output(['ip', 'a', 'show', 'dev', iface], + universal_newlines=True) + self.assertNotIn('inet 192', out) + self.assertNotIn('inet6 2600', out) + + if iface == self.dev_w_client: + out = subprocess.check_output(['iw', 'dev', iface, 'link'], + universal_newlines=True) + self.assertIn('Not connected', out) + + # but AP device should never be touched by NM + out = subprocess.check_output(['ip', 'a', 'show', 'dev', self.dev_w_ap], + universal_newlines=True) + self.assertIn('state UP', out) + + def assert_iface_up(self, iface, expected_ip_a=None, unexpected_ip_a=None): + '''Assert that client interface is up''' + + out = subprocess.check_output(['ip', 'a', 'show', 'dev', iface], + universal_newlines=True) + self.assertIn('state UP', out) + if expected_ip_a: + for r in expected_ip_a: + self.assertRegex(out, r) + if unexpected_ip_a: + for r in unexpected_ip_a: + self.assertNotRegex(out, r) + + if iface == self.dev_w_client: + out = subprocess.check_output(['iw', 'dev', iface, 'link'], + universal_newlines=True) + self.assertIn('Connected to ' + self.mac_w_ap, out) + self.assertIn('SSID: ' + SSID, out) + + def wait_ap(self, timeout): + '''Wait for AccessPoint NM object to appear, and return it''' + + self.assertEventually(lambda: len(self.nmdev_w.get_access_points()) > 0, + 'timed out waiting for AP to be detected', + timeout=timeout) + + return self.nmdev_w.get_access_points()[0] + + def connect_to_ap(self, ap, secret, ipv6_mode, ip6_privacy): + '''Connect to an NMAccessPoint. + + secret should be None for open networks, and a string with the password + for WEP/WPA. + + ip6_privacy is a NM.SettingIP6ConfigPrivacy flag. + + Return (NMConnection, NMActiveConnection) objects. + ''' + + ip4_method = NM.SETTING_IP4_CONFIG_METHOD_DISABLED + ip6_method = NM.SETTING_IP6_CONFIG_METHOD_IGNORE + if ipv6_mode is None: + ip4_method = NM.SETTING_IP4_CONFIG_METHOD_AUTO + else: + ip6_method = NM.SETTING_IP6_CONFIG_METHOD_AUTO + + # If we have a secret, supply it to the new connection right away; + # adding it afterwards with update_secrets() does not work, and we + # can't implement a SecretAgent as get_secrets() would need to build a + # map of a map of gpointers to gpointers which is too much for PyGI + partial_conn = NM.SimpleConnection.new() + partial_conn.add_setting(NM.SettingIP4Config(method=ip4_method)) + if secret: + partial_conn.add_setting(NM.SettingWirelessSecurity.new()) + # FIXME: needs update for other auth types + partial_conn.update_secrets(NM.SETTING_WIRELESS_SECURITY_SETTING_NAME, + GLib.Variant('a{sv}', { + 'psk': GLib.Variant('s', secret) + })) + if ip6_privacy is not None: + partial_conn.add_setting(NM.SettingIP6Config(ip6_privacy=ip6_privacy, + method=ip6_method)) + + ml = GLib.MainLoop() + self.cb_conn = None + self.cancel = Gio.Cancellable() + self.timeout_tag = 0 + + def add_activate_cb(client, res, data): + if (self.timeout_tag > 0): + GLib.source_remove(self.timeout_tag) + self.timeout_tag = 0 + try: + self.cb_conn = \ + self.nmclient.add_and_activate_connection_finish(res) + + except gi.repository.GLib.Error as e: + # Check if the error is "Operation was cancelled" + if (e.domain != "g-io-error-quark" or e.code != 19): + self.fail("add_and_activate_connection failed: %s (%s, %d)" % + (e.message, e.domain, e.code)) + ml.quit() + + def timeout_cb(): + self.timeout_tag = -1 + self.cancel.cancel() + ml.quit() + return GLib.SOURCE_REMOVE + + self.nmclient.add_and_activate_connection_async(partial_conn, self.nmdev_w, ap.get_path(), self.cancel, add_activate_cb, None) + self.timeout_tag = GLib.timeout_add_seconds(300, timeout_cb) + ml.run() + if (self.timeout_tag < 0): + self.timeout_tag = 0 + self.fail('Main loop for adding connection timed out!') + self.assertNotEqual(self.cb_conn, None) + active_conn = self.cb_conn + self.cb_conn = None + + conn = self.conn_from_active_conn(active_conn) + self.assertTrue(conn.verify()) + + # verify need_secrets() + needed_secrets = conn.need_secrets() + if secret is None: + self.assertEqual(needed_secrets, (None, [])) + else: + self.assertEqual(needed_secrets[0], NM.SETTING_WIRELESS_SECURITY_SETTING_NAME) + self.assertEqual(type(needed_secrets[1]), list) + self.assertGreaterEqual(len(needed_secrets[1]), 1) + # FIXME: needs update for other auth types + self.assertIn(needed_secrets[1][0], [NM.SETTING_WIRELESS_SECURITY_PSK]) + + # we are usually ACTIVATING at this point; wait for completion + # TODO: 5s is not enough, argh slow DHCP client + self.assertEventually(lambda: active_conn.get_state() == NM.ActiveConnectionState.ACTIVATED, + 'timed out waiting for %s to get activated' % active_conn.get_connection(), + timeout=600) + self.assertEqual(self.nmdev_w.get_state(), NM.DeviceState.ACTIVATED) + return (conn, active_conn) + + def conn_from_active_conn(self, active_conn): + '''Get NMConnection object for an NMActiveConnection object''' + + # this sometimes takes a second try, when the corresponding + # NMConnection object is not yet available + tries = 3 + while tries > 0: + self.process_glib_events() + print("active_conn", active_conn, "\n") + print("state", active_conn.props.state, "\n") + print("get_path", active_conn.get_path(), "\n") +# time.sleep(1) + path = active_conn.get_connection().get_path() + for dev in active_conn.get_devices(): + for c in dev.get_available_connections(): + if c.get_path() == path: + return c + time.sleep(0.1) + tries -= 1 + + self.fail('Could not find NMConnection object for %s' % path) + + def check_low_level_config(self, iface, ipv6_mode, ip6_privacy): + '''Check actual hardware state with ip/iw after being connected''' + + # list of expected regexps in "ip a" output + expected_ip_a = [] + unexpected_ip_a = [] + + if ipv6_mode is not None: + if ipv6_mode in ('', 'slaac'): + # has global address from our DHCP server + expected_ip_a.append('inet6 2600::[0-9a-f]+/') + else: + # has address with our prefix and MAC + expected_ip_a.append('inet6 2600::[0-9a-f:]+/64 scope global (?:tentative )?(?:mngtmpaddr )?(?:noprefixroute )?dynamic') + # has address with our prefix and random IP (Privacy + # Extension), if requested + priv_re = 'inet6 2600:[0-9a-f:]+/64 scope global temporary (?:tentative )?(?:mngtmpaddr )?dynamic' + if ip6_privacy in (NM.SettingIP6ConfigPrivacy.PREFER_TEMP_ADDR, + NM.SettingIP6ConfigPrivacy.PREFER_PUBLIC_ADDR): + expected_ip_a.append(priv_re) + else: + # FIXME: add a negative test here + pass + #unexpected_ip_a.append(priv_re) + + # has a link-local address + expected_ip_a.append('inet6 fe80::[0-9a-f:]+/64 scope link') + else: + expected_ip_a.append('inet 192.168.5.\d+/24') + + self.assert_iface_up(iface, expected_ip_a, unexpected_ip_a) + + +class ColdplugWifi(NetworkManagerTest): + '''Wifi: In these tests NM starts after setting up the AP''' + + # not run by default; run "nm-wifi ColdplugWifi.shell" to get this + @network_test_base.run_in_subprocess + def shell(self): + '''Start AP and NM, then run a shell (for debugging)''' + + self.setup_ap('hw_mode=b\nchannel=1\nssid=' + SSID, None) + self.start_nm(self.dev_w_client) + print(''' + +client interface: %s, access point interface: %s, AP SSID: "%s" + +You can now run commands like "nmcli dev" or "nmcli dev wifi connect '%s'". +Logs are in '%s'. When done, exit the shell. + +''' % (self.dev_w_client, self.dev_w_ap, SSID, SSID, self.workdir)) + subprocess.call(['bash', '-i']) + + @network_test_base.run_in_subprocess + def test_open_b_ip4(self): + '''Open network, 802.11b, IPv4''' + + self.do_test('hw_mode=b\nchannel=1\nssid=' + SSID, None, 11000) + + # + # Common test code + # + + # libnm-glib has a lot of internal persistent state (private D-BUS + # connections and such); as it is very brittle and hard to track down + # all remaining references to any NM* object after a test, we rather + # run each test in a separate subprocess + @network_test_base.run_in_subprocess + def do_test(self, hostapd_conf, ipv6_mode, expected_max_bitrate, + secret=None, ip6_privacy=None): + '''Actual test code, parameterized for the particular test case''' + + self.setup_ap(hostapd_conf, ipv6_mode) + self.start_nm(self.dev_w_client) + + # on coldplug we expect the AP to be picked out fast + ap = self.wait_ap(timeout=100) + self.assertTrue(ap.get_path().startswith('/org/freedesktop/NetworkManager')) + self.assertEqual(ap.get_mode(), getattr(NM, '80211Mode').INFRA) + self.assertEqual(ap.get_max_bitrate(), expected_max_bitrate) + #self.assertEqual(ap.get_flags(), ) + + # should not auto-connect + self.assertEqual(self.nmclient.get_active_connections(), []) + + # connect to that AP + (conn, active_conn) = self.connect_to_ap(ap, secret, ipv6_mode, ip6_privacy) + + # check NMActiveConnection object + self.assertIn(active_conn.get_uuid(), [c.get_uuid() for c in self.nmclient.get_active_connections()]) + self.assertEqual([d.get_udi() for d in active_conn.get_devices()], [self.nmdev_w.get_udi()]) + + # check corresponding NMConnection object + wireless_setting = conn.get_setting_wireless() + self.assertEqual(wireless_setting.get_ssid().get_data(), SSID.encode()) + self.assertEqual(wireless_setting.get_hidden(), False) + if secret: + self.assertEqual(conn.get_setting_wireless_security().get_name(), NM.SETTING_WIRELESS_SECURITY_SETTING_NAME) + else: + self.assertEqual(conn.get_setting_wireless_security(), None) + # for debugging + #conn.dump() + + # for IPv6, check privacy setting + if ipv6_mode is not None and ip6_privacy != NM.SettingIP6ConfigPrivacy.UNKNOWN: + assert ip6_privacy is not None, 'for IPv6 tests you need to specify ip6_privacy flag' + ip6_setting = conn.get_setting_ip6_config() + self.assertEqual(ip6_setting.props.ip6_privacy, ip6_privacy) + + self.check_low_level_config(self.dev_w_client, ipv6_mode, ip6_privacy) + +@unittest.skipIf(DBusTestCase is object, + 'WARNING: python-dbusmock not installed, skipping suspend tests; get it from https://pypi.python.org/pypi/python-dbusmock') +class Suspend(NetworkManagerTest, DBusTestCase): + '''These tests run under a mock logind on a private system D-BUS''' + + @classmethod + def setUpClass(klass): + klass.start_system_bus() + NetworkManagerTest.setUpClass() + + @classmethod + def tearDownClass(klass): + NetworkManagerTest.tearDownClass() + DBusTestCase.tearDownClass() + + def setUp(self): + NetworkManagerTest.setUp(self) + + # start mock polkit and logind processes, so that we can + # intercept/control suspend + (p_polkit, self.obj_polkit) = self.spawn_server_template('polkitd', {}, stdout=subprocess.PIPE) + # by default we are not concerned about restricting access in the tests + self.obj_polkit.AllowUnknown(True) + self.addCleanup(p_polkit.wait) + self.addCleanup(p_polkit.terminate) + + (p_logind, self.obj_logind) = self.spawn_server_template('logind', {}, stdout=subprocess.PIPE) + self.addCleanup(p_logind.wait) + self.addCleanup(p_logind.terminate) + + # we have to manually start wpa_supplicant, as D-BUS activation does + # not happen for the fake D-BUS + log = os.path.join(self.workdir, 'wpasupplicant.log') + p_wpasupp = subprocess.Popen(['wpa_supplicant', '-u', '-d', '-e', '-K', + self.entropy_file, '-f', log]) + self.addCleanup(p_wpasupp.wait) + self.addCleanup(p_wpasupp.terminate) + + def fixme_test_active_ip4(self): + '''suspend during active IPv4 connection''' + + self.do_test('hw_mode=b\nchannel=1\nssid=' + SSID, None, + ['inet 192.168.5.\d+/24']) + + def fixme_test_active_ip6(self): + '''suspend during active IPv6 connection''' + + self.do_test('hw_mode=b\nchannel=1\nssid=' + SSID, 'ra-only', + ['inet6 2600::']) + + # + # Common test code + # + + @network_test_base.run_in_subprocess + def do_test(self, hostapd_conf, ipv6_mode, expected_ip_a): + '''Actual test code, parameterized for the particular test case''' + + self.setup_ap(hostapd_conf, ipv6_mode) + self.start_nm(self.dev_w_client) + ap = self.wait_ap(timeout=1800) + (conn, active_conn) = self.connect_to_ap(ap, None, ipv6_mode, None) + + # send logind signal that we are about to suspend + self.obj_logind.EmitSignal('', 'PrepareForSleep', 'b', [True]) + + # disabling should be fast, give it one second + self.assertEventually(lambda: self.nmdev_w.get_state() == NM.DeviceState.UNMANAGED, + timeout=10) + self.assert_iface_down(self.dev_w_client) + + # send logind signal that we resumed + self.obj_logind.EmitSignal('', 'PrepareForSleep', 'b', [False]) + + # this involves DHCP, use same timeout as for regular connection + self.assertEventually(lambda: self.nmdev_w.get_state() == NM.DeviceState.ACTIVATED, + timeout=100) + + # dev_w_client should be back up + self.assert_iface_up(self.dev_w_client, expected_ip_a) + + +def setUpModule(): + # AppArmor currently does not allow us to access the system D-BUS from an + # unshared file system. Hack the policy to allow that until that gets fixed + # properly. See https://launchpad.net/bugs/1244157 + # unshare the mount namespace, so that our tmpfs mounts are guaranteed to get + # cleaned up, and don't influence the production system + libc6 = ctypes.cdll.LoadLibrary('libc.so.6') + assert libc6.unshare(ctypes.c_int(0x00020000)) == 0, 'failed to unshare mount namespace' + + # stop system-wide NetworkManager to avoid interfering with tests + nm_running = subprocess.call('service network-manager stop 2>&1', shell=True) == 0 + + +def tearDownModule(): + subprocess.call('dhclient eth0', shell=True) + subprocess.call('sleep 10', shell=True) + + +if __name__ == '__main__': + # avoid unintelligible error messages, and breaking "make check" when not being + # root + if os.getuid() != 0: + sys.stderr.write('This integration test suite needs to be run as root\n') + sys.exit(1) + + # write to stdout, not stderr + runner = unittest.TextTestRunner(stream=sys.stdout, verbosity=2) + unittest.main(testRunner=runner) |