summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas Haller <thaller@redhat.com>2020-01-30 18:21:00 +0100
committerThomas Haller <thaller@redhat.com>2020-01-31 01:03:45 +0100
commit46a952234a7c5bb9b2d2a3e5119a396be1cd0f4f (patch)
tree137a9c5faf54d61bab4f52029ea5a8afcd48d9b2
parentb6ce3c7163150852976b3530576a977810373753 (diff)
downloadNetworkManager-th/libnm-test.tar.gz
-rwxr-xr-xlibnm-test.py136
-rw-r--r--libnm/libnm.ver1
-rw-r--r--libnm/nm-active-connection.c20
-rw-r--r--libnm/nm-client.c2
-rw-r--r--libnm/nm-client.h2
-rw-r--r--libnm/nm-libnm-utils.c10
-rw-r--r--network_test_base.py360
-rw-r--r--nm.py573
8 files changed, 1103 insertions, 1 deletions
diff --git a/libnm-test.py b/libnm-test.py
new file mode 100755
index 0000000000..df7ddea3b7
--- /dev/null
+++ b/libnm-test.py
@@ -0,0 +1,136 @@
+#!/bin/python3
+
+import sys
+import uuid
+import socket
+
+import gi
+gi.require_version('NM', '1.0')
+from gi.repository import GLib, NM
+
+mainloop = GLib.MainLoop.new(None, False)
+
+PROFILE_NAME = 'x-libnm-test'
+
+DEVICE_NAME = 'eth0'
+
+def log(msg):
+ NM.utils_log(f'{msg}')
+
+def die(msg):
+ log(f"DIE: {msg}")
+ sys.exit(1)
+
+def nmc_create():
+ nmc = NM.Client()
+ def cb(source, res):
+ assert(source == nmc)
+ try:
+ source.init_finish(res)
+ except Exception as e:
+ die("failed to initialize NMClient: %s" % (e))
+ mainloop.quit()
+ nmc.init_async(0, None, cb)
+ mainloop.run()
+ return nmc
+
+def create_profile(name):
+
+ profile = NM.SimpleConnection.new()
+
+ s_con = NM.SettingConnection.new()
+ s_con.set_property(NM.SETTING_CONNECTION_ID, name)
+ s_con.set_property(NM.SETTING_CONNECTION_UUID, str(uuid.uuid4()))
+ s_con.set_property(NM.SETTING_CONNECTION_TYPE, "802-3-ethernet")
+
+ s_wired = NM.SettingWired.new()
+
+ s_ip4 = NM.SettingIP4Config.new()
+ s_ip4.set_property(NM.SETTING_IP_CONFIG_METHOD, "manual")
+ s_ip4.add_address(NM.IPAddress.new(socket.AF_INET, "192.168.33.5", 24))
+
+ s_ip6 = NM.SettingIP6Config.new()
+ s_ip6.set_property(NM.SETTING_IP_CONFIG_METHOD, "disabled")
+
+ profile.add_setting(s_con)
+ profile.add_setting(s_ip4)
+ profile.add_setting(s_ip6)
+ profile.add_setting(s_wired)
+
+ return profile
+
+def find_device(nmc, **kwargs):
+ best = None
+ for d in nmc.get_devices():
+ if 'iface' in kwargs:
+ if d.get_iface() != kwargs['iface']:
+ continue
+ if best is None:
+ best = d
+ elif kwargs.get('unique', True):
+ raise Exception("Cannot find unique device for parameters %s" % (kwargs))
+ return best
+
+def add_and_activate(nmc, device, profile):
+ res = {}
+ def cb(source, result):
+ assert(source == nmc)
+ try:
+ res['ac'] = source.add_and_activate_connection_finish(result);
+ log("add and activate finished and got %s" % (res['ac'].get_path()))
+ c = res['ac'].get_connection()
+ log(' has %s' % (c))
+ if not c:
+ log("No connection when add-and-activate completes.");
+ #res['ac'].handler_disconnect(100000)
+ except Exception as e:
+ log("add and activate finished and failed with %s" % (e))
+ res['error'] = e
+ mainloop.quit()
+ log("add and activate '%s' (%s) starting..." % (profile.get_id(), profile.get_uuid()))
+ nmc.add_and_activate_connection_async(profile,
+ device,
+ None,
+ None,
+ cb)
+ mainloop.run()
+ if 'error' in res:
+ raise res['error']
+ return res['ac']
+
+nmc = nmc_create()
+
+def cb(source, con):
+ log(f'connection {con.get_id()}, {con.get_uuid()} added ({con.get_path()})')
+nmc.connect('connection-added', cb)
+
+device = find_device(nmc, iface = DEVICE_NAME)
+assert(device)
+
+profile = create_profile(PROFILE_NAME)
+
+ac = add_and_activate(nmc, device, profile)
+assert(ac)
+con = ac.get_connection()
+
+if not con:
+ log('ac has no connection yet. Wait...')
+
+ def cb(source, pspec):
+ assert(source == ac)
+ assert(pspec.name == 'connection')
+ assert(ac.get_connection())
+ log('event!')
+ mainloop.quit()
+
+ signal_id = ac.connect('notify::connection', cb)
+
+ mainloop.run()
+
+ ac.handler_disconnect(signal_id)
+ con = ac.get_connection()
+ if not con:
+ log('ac has still no connection')
+ sys.exit(1)
+
+log('ac has connection %s (%s). Done...' % (con.get_id(), con.get_uuid()))
diff --git a/libnm/libnm.ver b/libnm/libnm.ver
index 457b55a798..1d5a5f8785 100644
--- a/libnm/libnm.ver
+++ b/libnm/libnm.ver
@@ -1676,4 +1676,5 @@ global:
nm_setting_vrf_get_table;
nm_setting_vrf_get_type;
nm_setting_vrf_new;
+ nm_utils_log;
} libnm_1_22_0;
diff --git a/libnm/nm-active-connection.c b/libnm/nm-active-connection.c
index 076cfd8f01..6abb5e4520 100644
--- a/libnm/nm-active-connection.c
+++ b/libnm/nm-active-connection.c
@@ -96,9 +96,16 @@ G_DEFINE_TYPE (NMActiveConnection, nm_active_connection, NM_TYPE_OBJECT);
NMRemoteConnection *
nm_active_connection_get_connection (NMActiveConnection *connection)
{
+ NMRemoteConnection *con;
+
g_return_val_if_fail (NM_IS_ACTIVE_CONNECTION (connection), NULL);
- return nml_dbus_property_o_get_obj (&NM_ACTIVE_CONNECTION_GET_PRIVATE (connection)->property_o[PROPERTY_O_IDX_CONNECTION]);
+ con = nml_dbus_property_o_get_obj (&NM_ACTIVE_CONNECTION_GET_PRIVATE (connection)->property_o[PROPERTY_O_IDX_CONNECTION]);
+
+ g_printerr (">>>>>>>>>> request ac.connection: %s -> %s\n",
+ nm_object_get_path ((gpointer) connection),
+ !con ? "(null)" : nm_object_get_path ((gpointer) con));
+ return con;
}
/**
@@ -425,6 +432,7 @@ _nm_active_connection_state_changed_commit (NMActiveConnection *self,
_notify_event_state_changed,
g_object_ref (self));
}
+
/*****************************************************************************/
static gboolean
@@ -432,6 +440,16 @@ is_ready (NMObject *nmobj)
{
NMActiveConnectionPrivate *priv = NM_ACTIVE_CONNECTION_GET_PRIVATE (nmobj);
+ NMLDBusPropertyO *p;
+
+ p = &priv->property_o[PROPERTY_O_IDX_CONNECTION];
+
+ g_printerr (">>>>>>>>> is_ready[%s]: is-ready=%d, is-ready-fully=%d, obj=%p\n",
+ nm_object_get_path ((gpointer) nmobj),
+ nml_dbus_property_o_is_ready (p),
+ nml_dbus_property_o_is_ready_fully (p),
+ nml_dbus_property_o_get_obj (p));
+
/* Usually, we want not expose our NMObject instances until they are fully initialized.
* For NMRemoteSetting this means to wait until GetSettings() returns.
*
diff --git a/libnm/nm-client.c b/libnm/nm-client.c
index a93a81c353..cf4758de7e 100644
--- a/libnm/nm-client.c
+++ b/libnm/nm-client.c
@@ -933,6 +933,8 @@ _nm_client_notify_event_emit (NMClient *self)
while ((base = c_list_first_entry (&priv->queue_notify_lst_head, NMObjectBase, queue_notify_lst))) {
c_list_unlink (&base->queue_notify_lst);
+ if (NM_IS_OBJECT (base))
+ g_printerr (">>>>>>> thaw object %s\n", nm_object_get_path ((gpointer) base));
g_object_thaw_notify (G_OBJECT (base));
g_object_unref (base);
}
diff --git a/libnm/nm-client.h b/libnm/nm-client.h
index 9c2fdff9f1..56c5249069 100644
--- a/libnm/nm-client.h
+++ b/libnm/nm-client.h
@@ -478,6 +478,8 @@ gboolean nm_client_reload_finish (NMClient *client,
GAsyncResult *result,
GError **error);
+void nm_utils_log (const char *msg);
+
G_END_DECLS
#endif /* __NM_CLIENT_H__ */
diff --git a/libnm/nm-libnm-utils.c b/libnm/nm-libnm-utils.c
index f04a7419f5..a9a4ea93b7 100644
--- a/libnm/nm-libnm-utils.c
+++ b/libnm/nm-libnm-utils.c
@@ -39,6 +39,16 @@ _nml_dbus_log_level_init (void)
return l;
}
+/**
+ * nm_utils_log:
+ * @msg: string
+ */
+void
+nm_utils_log (const char *msg)
+{
+ g_printerr (">>>>> %s\n", msg);
+}
+
void
_nml_dbus_log (NMLDBusLogLevel level,
const char *fmt,
diff --git a/network_test_base.py b/network_test_base.py
new file mode 100644
index 0000000000..9ef8dec3f8
--- /dev/null
+++ b/network_test_base.py
@@ -0,0 +1,360 @@
+'''
+Base class for network related tests.
+
+This provides fake wifi devices with mac80211_hwsim and hostapd, test ethernet
+devices with veth, utility functions to start wpasupplicant, dnsmasq, get/set
+rfkill status, and some utility functions.
+'''
+
+__author__ = 'Martin Pitt <martin.pitt@ubuntu.com>'
+__copyright__ = '(C) 2013 Canonical Ltd.'
+__license__ = 'GPL v2 or later'
+
+import sys
+import os
+import os.path
+import time
+import tempfile
+import subprocess
+import re
+import unittest
+import traceback
+import functools
+from glob import glob
+
+# check availability of programs, and cleanly skip test if they are not
+# available
+for program in ['wpa_supplicant', 'hostapd', 'dnsmasq', 'dhclient']:
+ if subprocess.call(['which', program], stdout=subprocess.PIPE) != 0:
+ sys.stderr.write('%s is required for this test suite, but not available. Skipping\n' % program)
+ sys.exit(0)
+
+
+class NetworkTestBase(unittest.TestCase):
+ '''Common functionality for network test cases
+
+ setUp() creates two test wlan devices, one for a simulated access point
+ (self.dev_w_ap), the other for a simulated client device
+ (self.dev_w_client), and two test ethernet devices (self.dev_e_ap and
+ self.dev_e_client).
+
+ Each test should call self.setup_ap() or self.setup_eth() with the desired
+ configuration.
+ '''
+ @classmethod
+ def setUpClass(klass):
+ # ensure we have this so that iw works
+ subprocess.check_call(['modprobe', 'cfg80211'])
+
+ # set regulatory domain "EU", so that we can use 80211.a 5 GHz channels
+ out = subprocess.check_output(['iw', 'reg', 'get'], universal_newlines=True)
+ m = re.match('^(?:global\n)?country (\S+):', out)
+ assert m
+ klass.orig_country = m.group(1)
+ subprocess.check_call(['iw', 'reg', 'set', 'EU'])
+
+ @classmethod
+ def tearDownClass(klass):
+ subprocess.check_call(['iw', 'reg', 'set', klass.orig_country])
+ os.remove('/run/udev/rules.d/99-nm-veth-test.rules')
+
+ @classmethod
+ def create_devices(klass):
+ '''Create Access Point and Client devices with mac80211_hwsim and veth'''
+
+ klass.dev_e_ap = 'veth42'
+ klass.dev_e_client = 'eth42'
+
+ if os.path.exists('/sys/module/mac80211_hwsim'):
+ raise SystemError('mac80211_hwsim module already loaded')
+ if os.path.exists('/sys/class/net/' + klass.dev_e_client):
+ raise SystemError('%s interface already exists' % klass.dev_e_client)
+
+ # ensure NM can manage our fake eths
+ os.makedirs('/run/udev/rules.d', exist_ok=True)
+ with open('/run/udev/rules.d/99-nm-veth-test.rules', 'w') as f:
+ f.write('ENV{ID_NET_DRIVER}=="veth", ENV{INTERFACE}=="%s", ENV{NM_UNMANAGED}="0"\n' % klass.dev_e_client)
+ subprocess.check_call(['udevadm', 'control', '--reload'])
+
+ # create virtual ethernet devs
+ subprocess.check_call(['ip', 'link', 'add', 'name', klass.dev_e_client, 'type',
+ 'veth', 'peer', 'name', klass.dev_e_ap])
+
+ # create virtual wlan devs
+ before_wlan = set([c for c in os.listdir('/sys/class/net') if c.startswith('wlan')])
+ subprocess.check_call(['modprobe', 'mac80211_hwsim'])
+ # wait 5 seconds for fake devices to appear
+ timeout = 50
+ while timeout > 0:
+ after_wlan = set([c for c in os.listdir('/sys/class/net') if c.startswith('wlan')])
+ if len(after_wlan) - len(before_wlan) >= 2:
+ break
+ timeout -= 1
+ time.sleep(0.1)
+ else:
+ raise SystemError('timed out waiting for fake devices to appear')
+
+ devs = list(after_wlan - before_wlan)
+ klass.dev_w_ap = devs[0]
+ klass.dev_w_client = devs[1]
+
+ # determine and store MAC addresses
+ with open('/sys/class/net/%s/address' % klass.dev_w_ap) as f:
+ klass.mac_w_ap = f.read().strip().upper()
+ with open('/sys/class/net/%s/address' % klass.dev_w_client) as f:
+ klass.mac_w_client = f.read().strip().upper()
+ with open('/sys/class/net/%s/address' % klass.dev_e_ap) as f:
+ klass.mac_e_ap = f.read().strip().upper()
+ with open('/sys/class/net/%s/address' % klass.dev_e_client) as f:
+ klass.mac_e_client = f.read().strip().upper()
+ #print('Created fake devices: AP: %s, client: %s' % (klass.dev_w_ap, klass.dev_w_client))
+
+ @classmethod
+ def shutdown_devices(klass):
+ '''Remove test wlan devices'''
+
+ subprocess.check_call(['rmmod', 'mac80211_hwsim'])
+ subprocess.check_call(['ip', 'link', 'del', 'dev', klass.dev_e_ap])
+ klass.dev_w_ap = None
+ klass.dev_w_client = None
+ klass.dev_e_ap = None
+ klass.dev_e_client = None
+
+ @classmethod
+ def get_rfkill(klass, interface):
+ '''Get rfkill status of an interface.
+
+ Returns whether the interface is blocked, i. e. "True" for blocked,
+ "False" for enabled.
+ '''
+ with open(klass._rfkill_attribute(interface)) as f:
+ val = f.read()
+ return val == '1'
+
+ @classmethod
+ def set_rfkill(klass, interface, block):
+ '''Set rfkill status of an interface
+
+ Use block==True for disabling ("killswitching") an interface,
+ block==False to re-enable.
+ '''
+ with open(klass._rfkill_attribute(interface), 'w') as f:
+ f.write(block and '1' or '0')
+
+ def run(self, result=None):
+ '''Show log files on failed tests'''
+
+ if result:
+ orig_err_fail = len(result.errors) + len(result.failures)
+ super().run(result)
+ if hasattr(self, 'workdir'):
+ logs = glob(os.path.join(self.workdir, '*.log'))
+ if result and len(result.errors) + len(result.failures) > orig_err_fail:
+ for log_file in logs:
+ with open(log_file) as f:
+ print('\n----- %s -----\n%s\n------\n'
+ % (os.path.basename(log_file), f.read()))
+
+ # clean up log files, so that we don't see ones from previous tests
+ for log_file in logs:
+ os.unlink(log_file)
+
+ def setUp(self):
+ '''Create test devices and workdir'''
+
+ self.create_devices()
+ self.addCleanup(self.shutdown_devices)
+ self.workdir_obj = tempfile.TemporaryDirectory()
+ self.workdir = self.workdir_obj.name
+
+ # create static entropy file to avoid draining/blocking on /dev/random
+ self.entropy_file = os.path.join(self.workdir, 'entropy')
+ with open(self.entropy_file, 'wb') as f:
+ f.write(b'012345678901234567890')
+
+ def setup_ap(self, hostapd_conf, ipv6_mode):
+ '''Set up simulated access point
+
+ On self.dev_w_ap, run hostapd with given configuration. Setup dnsmasq
+ according to ipv6_mode, see start_dnsmasq().
+
+ This is torn down automatically at the end of the test.
+ '''
+ # give our AP an IP
+ subprocess.check_call(['ip', 'a', 'flush', 'dev', self.dev_w_ap])
+ if ipv6_mode is not None:
+ subprocess.check_call(['ip', 'a', 'add', '2600::1/64', 'dev', self.dev_w_ap])
+ else:
+ subprocess.check_call(['ip', 'a', 'add', '192.168.5.1/24', 'dev', self.dev_w_ap])
+
+ self.start_hostapd(hostapd_conf)
+ self.start_dnsmasq(ipv6_mode, self.dev_w_ap)
+
+ def setup_eth(self, ipv6_mode, start_dnsmasq=True):
+ '''Set up simulated ethernet router
+
+ On self.dev_e_ap, run dnsmasq according to ipv6_mode, see
+ start_dnsmasq().
+
+ This is torn down automatically at the end of the test.
+ '''
+ # give our router an IP
+ subprocess.check_call(['ip', 'a', 'flush', 'dev', self.dev_e_ap])
+ if ipv6_mode is not None:
+ subprocess.check_call(['ip', 'a', 'add', '2600::1/64', 'dev', self.dev_e_ap])
+ else:
+ subprocess.check_call(['ip', 'a', 'add', '192.168.5.1/24', 'dev', self.dev_e_ap])
+ subprocess.check_call(['ip', 'link', 'set', self.dev_e_ap, 'up'])
+ # we don't really want to up the client iface already, but veth doesn't
+ # work otherwise (no link detected)
+ subprocess.check_call(['ip', 'link', 'set', self.dev_e_client, 'up'])
+
+ if start_dnsmasq:
+ self.start_dnsmasq(ipv6_mode, self.dev_e_ap)
+
+ def start_wpasupp(self, conf):
+ '''Start wpa_supplicant on client interface'''
+
+ w_conf = os.path.join(self.workdir, 'wpasupplicant.conf')
+ with open(w_conf, 'w') as f:
+ f.write('ctrl_interface=%s\nnetwork={\n%s\n}\n' % (self.workdir, conf))
+ log = os.path.join(self.workdir, 'wpasupp.log')
+ p = subprocess.Popen(['wpa_supplicant', '-Dwext', '-i', self.dev_w_client,
+ '-e', self.entropy_file, '-c', w_conf, '-f', log],
+ stderr=subprocess.PIPE)
+ self.addCleanup(p.wait)
+ self.addCleanup(p.terminate)
+ # TODO: why does this sometimes take so long?
+ self.poll_text(log, 'CTRL-EVENT-CONNECTED', timeout=200)
+
+ def wrap_process(self, fn, *args, **kwargs):
+ '''Run a test method in a separate process.
+
+ Run test method fn(*args, **kwargs) in a child process. If that raises
+ any exception, it gets propagated to the main process and
+ wrap_process() fails with that exception.
+ '''
+ # exception from subprocess is propagated through this file
+ exc_path = os.path.join(self.workdir, 'exc')
+ try:
+ os.unlink(exc_path)
+ except OSError:
+ pass
+
+ pid = os.fork()
+
+ # run the actual test in the child
+ if pid == 0:
+ # short-circuit tearDownClass(), as this will be done by the parent
+ # process
+ self.addCleanup(os._exit, 0)
+ try:
+ fn(*args, **kwargs)
+ except:
+ with open(exc_path, 'w') as f:
+ f.write(traceback.format_exc())
+ raise
+ else:
+ # get success/failure state from child
+ os.waitpid(pid, 0)
+ # propagate exception
+ if os.path.exists(exc_path):
+ with open(exc_path) as f:
+ self.fail(f.read())
+
+ #
+ # Internal implementation details
+ #
+
+ @classmethod
+ def poll_text(klass, logpath, string, timeout=50):
+ '''Poll log file for a given string with a timeout.
+
+ Timeout is given in deciseconds.
+ '''
+ log = ''
+ while timeout > 0:
+ if os.path.exists(logpath):
+ break
+ timeout -= 1
+ time.sleep(0.1)
+ assert timeout > 0, 'Timed out waiting for file %s to appear' % logpath
+
+ with open(logpath) as f:
+ while timeout > 0:
+ line = f.readline()
+ if line:
+ log += line
+ if string in line:
+ break
+ continue
+ timeout -= 1
+ time.sleep(0.1)
+
+ assert timeout > 0, 'Timed out waiting for "%s":\n------------\n%s\n-------\n' % (string, log)
+
+ def start_hostapd(self, conf):
+ hostapd_conf = os.path.join(self.workdir, 'hostapd.conf')
+ with open(hostapd_conf, 'w') as f:
+ f.write('interface=%s\ndriver=nl80211\n' % self.dev_w_ap)
+ f.write(conf)
+
+ log = os.path.join(self.workdir, 'hostapd.log')
+ p = subprocess.Popen(['hostapd', '-e', self.entropy_file, '-f', log, hostapd_conf],
+ stdout=subprocess.PIPE)
+ self.addCleanup(p.wait)
+ self.addCleanup(p.terminate)
+ self.poll_text(log, '' + self.dev_w_ap + ': AP-ENABLED')
+
+ def start_dnsmasq(self, ipv6_mode, iface):
+ '''Start dnsmasq.
+
+ If ipv6_mode is None, IPv4 is set up with DHCP. If it is not None, it
+ must be a valid dnsmasq mode, i. e. a combination of "ra-only",
+ "slaac", "ra-stateless", and "ra-names". See dnsmasq(8).
+ '''
+ if ipv6_mode is None:
+ dhcp_range = '192.168.5.10,192.168.5.200'
+ else:
+ dhcp_range = '2600::10,2600::20'
+ if ipv6_mode:
+ dhcp_range += ',' + ipv6_mode
+
+ self.dnsmasq_log = os.path.join(self.workdir, 'dnsmasq.log')
+ lease_file = os.path.join(self.workdir, 'dnsmasq.leases')
+
+ p = subprocess.Popen(['dnsmasq', '--keep-in-foreground', '--log-queries',
+ '--log-facility=' + self.dnsmasq_log,
+ '--conf-file=/dev/null',
+ '--dhcp-leasefile=' + lease_file,
+ '--bind-interfaces',
+ '--interface=' + iface,
+ '--except-interface=lo',
+ '--enable-ra',
+ '--dhcp-range=' + dhcp_range])
+ self.addCleanup(p.wait)
+ self.addCleanup(p.terminate)
+
+ if ipv6_mode is not None:
+ self.poll_text(self.dnsmasq_log, 'IPv6 router advertisement enabled')
+ else:
+ self.poll_text(self.dnsmasq_log, 'DHCP, IP range')
+
+ @classmethod
+ def _rfkill_attribute(klass, interface):
+ '''Return the path to interface's rfkill soft toggle in sysfs.'''
+
+ g = glob('/sys/class/net/%s/phy80211/rfkill*/soft' % interface)
+ assert len(g) == 1, 'Did not find exactly one "soft" rfkill attribute for %s: %s' % (
+ interface, str(g))
+ return g[0]
+
+
+def run_in_subprocess(fn):
+ '''Decorator for running fn in a child process'''
+
+ @functools.wraps(fn)
+ def wrapped(*args, **kwargs):
+ # args[0] is self
+ args[0].wrap_process(fn, *args, **kwargs)
+ return wrapped
diff --git a/nm.py b/nm.py
new file mode 100644
index 0000000000..032c8577f4
--- /dev/null
+++ b/nm.py
@@ -0,0 +1,573 @@
+#!/usr/bin/python3
+# Test NetworkManager on simulated network devices
+# For an interactive shell test, run "nm ColdplugWifi.shell", see below
+
+__author__ = 'Martin Pitt <martin.pitt@ubuntu.com>'
+__copyright__ = '(C) 2013 Canonical Ltd.'
+__license__ = 'GPL v2 or later'
+
+import sys
+import os
+import os.path
+import re
+import time
+import subprocess
+import socket
+import netaddr
+import unittest
+import ctypes
+
+try:
+ from dbusmock import DBusTestCase
+except ImportError:
+ DBusTestCase = object # dummy so that the class declaration works
+
+import gi
+
+gi.require_version('NM', '1.0')
+from gi.repository import NM, GLib, Gio
+
+sys.path.append(os.path.dirname(__file__))
+import network_test_base
+
+SSID = 'fake net'
+
+# If True, NetworkManager logs directly to stdout, to watch logs in real time
+#NM_LOG_STDOUT = os.getenv('NM_LOG_STDOUT', False)
+NM_LOG_STDOUT = False
+
+# avoid accidentally destroying any real config
+os.environ['GSETTINGS_BACKEND'] = 'memory'
+
+# we currently get a lot of WARNINGs/CRITICALs from GI (leaked objects from
+# previous test runs/main loops?) Redirect them to stdout, to avoid failing
+# autopkgtests
+os.dup2(sys.stdout.fileno(), sys.stderr.fileno())
+
+
+class NetworkManagerTest(network_test_base.NetworkTestBase):
+ '''Provide common functionality for NM tests'''
+
+ def start_nm(self, wait_iface=None, auto_connect=True):
+ '''Start NetworkManager and initialize client object
+
+ If wait_iface is given, wait until NM recognizes that interface.
+ Otherwise, just wait until NM has initialized (for coldplug mode).
+
+ If auto_connect is False, set the "no-auto-default=*" option to avoid
+ auto-connecting to wired devices.
+ '''
+ # mount tmpfses over system directories, to avoid destroying the
+ # production configuration, and isolating tests from each other
+ if not os.path.exists('/run/NetworkManager'):
+ os.mkdir('/run/NetworkManager')
+ for d in ['/etc/NetworkManager', '/var/lib/NetworkManager',
+ '/run/NetworkManager']:
+ subprocess.check_call(['mount', '-n', '-t', 'tmpfs', 'none', d])
+ self.addCleanup(subprocess.call, ['umount', d])
+ os.mkdir('/etc/NetworkManager/system-connections')
+
+ # create local configuration; this allows us to have full control, and
+ # we also need to blacklist the AP device so that NM does not tear it
+ # down; we also blacklist any existing real interface to avoid
+ # interfering with it, and for getting predictable results
+ blacklist = ''
+ for iface in os.listdir('/sys/class/net'):
+ if iface == "bonding_masters":
+ continue
+ if iface != self.dev_w_client and iface != self.dev_e_client:
+ with open('/sys/class/net/%s/address' % iface) as f:
+ if blacklist:
+ blacklist += ';'
+ blacklist += 'mac:%s' % f.read().strip()
+
+ conf = os.path.join(self.workdir, 'NetworkManager.conf')
+ extra_main = ''
+ if not auto_connect:
+ extra_main += 'no-auto-default=*\n'
+
+ with open(conf, 'w') as f:
+ f.write('[main]\nplugins=keyfile\n%s\n[keyfile]\nunmanaged-devices=%s\n' %
+ (extra_main, blacklist))
+
+ if NM_LOG_STDOUT:
+ f_log = None
+ else:
+ log = os.path.join(self.workdir, 'NetworkManager.log')
+ f_log = os.open(log, os.O_CREAT | os.O_WRONLY | os.O_SYNC)
+
+ # build NM command line
+ argv = ['NetworkManager', '--log-level=debug', '--debug', '--config=' + conf]
+ # allow specifying extra arguments
+ argv += os.environ.get('NM_TEST_DAEMON_ARGS', '').strip().split()
+
+ p = subprocess.Popen(argv, stdout=f_log, stderr=subprocess.STDOUT)
+ # automatically terminate process at end of test case
+ self.addCleanup(p.wait)
+ self.addCleanup(p.terminate)
+ self.addCleanup(self.shutdown_connections)
+
+ if NM_LOG_STDOUT:
+ # let it initialize, then print a marker
+ time.sleep(1)
+ print('******* NM initialized *********\n\n')
+ else:
+ self.addCleanup(os.close, f_log)
+
+ # this should be fast, give it 2 s to initialize
+ if wait_iface:
+ self.poll_text(log, 'manager: (%s): new' % wait_iface, timeout=100)
+
+ self.nmclient = NM.Client.new()
+ self.assertTrue(self.nmclient.networking_get_enabled())
+
+ # FIXME: This certainly ought to be true, but isn't
+ #self.assertTrue(self.nmclient.get_manager_running())
+
+ # determine device objects
+ for d in self.nmclient.get_devices():
+ if d.props.interface == self.dev_w_ap:
+ self.assertEqual(d.get_device_type(), NM.DeviceType.WIFI)
+ self.assertEqual(d.get_driver(), 'mac80211_hwsim')
+ self.assertEqual(d.get_hw_address(), self.mac_w_ap)
+ self.nmdev_w_ap = d
+ elif d.props.interface == self.dev_w_client:
+ self.assertEqual(d.get_device_type(), NM.DeviceType.WIFI)
+ self.assertEqual(d.get_driver(), 'mac80211_hwsim')
+ # NM ≥ 1.4 randomizes MAC addresses by default, so we can't
+ # test for equality, just make sure it's not our AP
+ self.assertNotEqual(d.get_hw_address(), self.mac_w_ap)
+ self.nmdev_w = d
+ elif d.props.interface == self.dev_e_client:
+ self.assertEqual(d.get_device_type(), NM.DeviceType.VETH)
+ self.assertEqual(d.get_driver(), 'veth')
+ self.assertEqual(d.get_hw_address(), self.mac_e_client)
+ self.nmdev_e = d
+
+ self.assertTrue(hasattr(self, 'nmdev_w_ap'), 'Could not determine wifi AP NM device')
+ self.assertTrue(hasattr(self, 'nmdev_w'), 'Could not determine wifi client NM device')
+ self.assertTrue(hasattr(self, 'nmdev_e'), 'Could not determine eth client NM device')
+
+ self.process_glib_events()
+
+ def shutdown_connections(self):
+ '''Shut down all active NM connections.'''
+
+ if NM_LOG_STDOUT:
+ print('\n\n******* Shutting down NM connections *********')
+
+ # remove all created connections
+ for active_conn in self.nmclient.get_active_connections():
+ self.nmclient.deactivate_connection(active_conn)
+ self.assertEventually(lambda: self.nmclient.get_active_connections() == [],
+ timeout=20)
+
+ # verify that NM properly deconfigures the devices
+ self.assert_iface_down(self.dev_w_client)
+ self.assert_iface_down(self.dev_e_client)
+
+ @classmethod
+ def process_glib_events(klass):
+ '''Process pending GLib main loop events'''
+
+ context = GLib.MainContext.default()
+ while context.iteration(False):
+ pass
+
+ def assertEventually(self, condition, message=None, timeout=50):
+ '''Assert that condition function eventually returns True.
+
+ timeout is in deciseconds, defaulting to 50 (5 seconds). message is
+ printed on failure.
+ '''
+ while timeout >= 0:
+ self.process_glib_events()
+ if condition():
+ break
+ timeout -= 1
+ time.sleep(0.1)
+ else:
+ self.fail(message or 'timed out waiting for ' + str(condition))
+
+ def assert_iface_down(self, iface):
+ '''Assert that client interface is down'''
+
+ out = subprocess.check_output(['ip', 'a', 'show', 'dev', iface],
+ universal_newlines=True)
+ self.assertNotIn('inet 192', out)
+ self.assertNotIn('inet6 2600', out)
+
+ if iface == self.dev_w_client:
+ out = subprocess.check_output(['iw', 'dev', iface, 'link'],
+ universal_newlines=True)
+ self.assertIn('Not connected', out)
+
+ # but AP device should never be touched by NM
+ out = subprocess.check_output(['ip', 'a', 'show', 'dev', self.dev_w_ap],
+ universal_newlines=True)
+ self.assertIn('state UP', out)
+
+ def assert_iface_up(self, iface, expected_ip_a=None, unexpected_ip_a=None):
+ '''Assert that client interface is up'''
+
+ out = subprocess.check_output(['ip', 'a', 'show', 'dev', iface],
+ universal_newlines=True)
+ self.assertIn('state UP', out)
+ if expected_ip_a:
+ for r in expected_ip_a:
+ self.assertRegex(out, r)
+ if unexpected_ip_a:
+ for r in unexpected_ip_a:
+ self.assertNotRegex(out, r)
+
+ if iface == self.dev_w_client:
+ out = subprocess.check_output(['iw', 'dev', iface, 'link'],
+ universal_newlines=True)
+ self.assertIn('Connected to ' + self.mac_w_ap, out)
+ self.assertIn('SSID: ' + SSID, out)
+
+ def wait_ap(self, timeout):
+ '''Wait for AccessPoint NM object to appear, and return it'''
+
+ self.assertEventually(lambda: len(self.nmdev_w.get_access_points()) > 0,
+ 'timed out waiting for AP to be detected',
+ timeout=timeout)
+
+ return self.nmdev_w.get_access_points()[0]
+
+ def connect_to_ap(self, ap, secret, ipv6_mode, ip6_privacy):
+ '''Connect to an NMAccessPoint.
+
+ secret should be None for open networks, and a string with the password
+ for WEP/WPA.
+
+ ip6_privacy is a NM.SettingIP6ConfigPrivacy flag.
+
+ Return (NMConnection, NMActiveConnection) objects.
+ '''
+
+ ip4_method = NM.SETTING_IP4_CONFIG_METHOD_DISABLED
+ ip6_method = NM.SETTING_IP6_CONFIG_METHOD_IGNORE
+ if ipv6_mode is None:
+ ip4_method = NM.SETTING_IP4_CONFIG_METHOD_AUTO
+ else:
+ ip6_method = NM.SETTING_IP6_CONFIG_METHOD_AUTO
+
+ # If we have a secret, supply it to the new connection right away;
+ # adding it afterwards with update_secrets() does not work, and we
+ # can't implement a SecretAgent as get_secrets() would need to build a
+ # map of a map of gpointers to gpointers which is too much for PyGI
+ partial_conn = NM.SimpleConnection.new()
+ partial_conn.add_setting(NM.SettingIP4Config(method=ip4_method))
+ if secret:
+ partial_conn.add_setting(NM.SettingWirelessSecurity.new())
+ # FIXME: needs update for other auth types
+ partial_conn.update_secrets(NM.SETTING_WIRELESS_SECURITY_SETTING_NAME,
+ GLib.Variant('a{sv}', {
+ 'psk': GLib.Variant('s', secret)
+ }))
+ if ip6_privacy is not None:
+ partial_conn.add_setting(NM.SettingIP6Config(ip6_privacy=ip6_privacy,
+ method=ip6_method))
+
+ ml = GLib.MainLoop()
+ self.cb_conn = None
+ self.cancel = Gio.Cancellable()
+ self.timeout_tag = 0
+
+ def add_activate_cb(client, res, data):
+ if (self.timeout_tag > 0):
+ GLib.source_remove(self.timeout_tag)
+ self.timeout_tag = 0
+ try:
+ self.cb_conn = \
+ self.nmclient.add_and_activate_connection_finish(res)
+
+ except gi.repository.GLib.Error as e:
+ # Check if the error is "Operation was cancelled"
+ if (e.domain != "g-io-error-quark" or e.code != 19):
+ self.fail("add_and_activate_connection failed: %s (%s, %d)" %
+ (e.message, e.domain, e.code))
+ ml.quit()
+
+ def timeout_cb():
+ self.timeout_tag = -1
+ self.cancel.cancel()
+ ml.quit()
+ return GLib.SOURCE_REMOVE
+
+ self.nmclient.add_and_activate_connection_async(partial_conn, self.nmdev_w, ap.get_path(), self.cancel, add_activate_cb, None)
+ self.timeout_tag = GLib.timeout_add_seconds(300, timeout_cb)
+ ml.run()
+ if (self.timeout_tag < 0):
+ self.timeout_tag = 0
+ self.fail('Main loop for adding connection timed out!')
+ self.assertNotEqual(self.cb_conn, None)
+ active_conn = self.cb_conn
+ self.cb_conn = None
+
+ conn = self.conn_from_active_conn(active_conn)
+ self.assertTrue(conn.verify())
+
+ # verify need_secrets()
+ needed_secrets = conn.need_secrets()
+ if secret is None:
+ self.assertEqual(needed_secrets, (None, []))
+ else:
+ self.assertEqual(needed_secrets[0], NM.SETTING_WIRELESS_SECURITY_SETTING_NAME)
+ self.assertEqual(type(needed_secrets[1]), list)
+ self.assertGreaterEqual(len(needed_secrets[1]), 1)
+ # FIXME: needs update for other auth types
+ self.assertIn(needed_secrets[1][0], [NM.SETTING_WIRELESS_SECURITY_PSK])
+
+ # we are usually ACTIVATING at this point; wait for completion
+ # TODO: 5s is not enough, argh slow DHCP client
+ self.assertEventually(lambda: active_conn.get_state() == NM.ActiveConnectionState.ACTIVATED,
+ 'timed out waiting for %s to get activated' % active_conn.get_connection(),
+ timeout=600)
+ self.assertEqual(self.nmdev_w.get_state(), NM.DeviceState.ACTIVATED)
+ return (conn, active_conn)
+
+ def conn_from_active_conn(self, active_conn):
+ '''Get NMConnection object for an NMActiveConnection object'''
+
+ # this sometimes takes a second try, when the corresponding
+ # NMConnection object is not yet available
+ tries = 3
+ while tries > 0:
+ self.process_glib_events()
+ print("active_conn", active_conn, "\n")
+ print("state", active_conn.props.state, "\n")
+ print("get_path", active_conn.get_path(), "\n")
+# time.sleep(1)
+ path = active_conn.get_connection().get_path()
+ for dev in active_conn.get_devices():
+ for c in dev.get_available_connections():
+ if c.get_path() == path:
+ return c
+ time.sleep(0.1)
+ tries -= 1
+
+ self.fail('Could not find NMConnection object for %s' % path)
+
+ def check_low_level_config(self, iface, ipv6_mode, ip6_privacy):
+ '''Check actual hardware state with ip/iw after being connected'''
+
+ # list of expected regexps in "ip a" output
+ expected_ip_a = []
+ unexpected_ip_a = []
+
+ if ipv6_mode is not None:
+ if ipv6_mode in ('', 'slaac'):
+ # has global address from our DHCP server
+ expected_ip_a.append('inet6 2600::[0-9a-f]+/')
+ else:
+ # has address with our prefix and MAC
+ expected_ip_a.append('inet6 2600::[0-9a-f:]+/64 scope global (?:tentative )?(?:mngtmpaddr )?(?:noprefixroute )?dynamic')
+ # has address with our prefix and random IP (Privacy
+ # Extension), if requested
+ priv_re = 'inet6 2600:[0-9a-f:]+/64 scope global temporary (?:tentative )?(?:mngtmpaddr )?dynamic'
+ if ip6_privacy in (NM.SettingIP6ConfigPrivacy.PREFER_TEMP_ADDR,
+ NM.SettingIP6ConfigPrivacy.PREFER_PUBLIC_ADDR):
+ expected_ip_a.append(priv_re)
+ else:
+ # FIXME: add a negative test here
+ pass
+ #unexpected_ip_a.append(priv_re)
+
+ # has a link-local address
+ expected_ip_a.append('inet6 fe80::[0-9a-f:]+/64 scope link')
+ else:
+ expected_ip_a.append('inet 192.168.5.\d+/24')
+
+ self.assert_iface_up(iface, expected_ip_a, unexpected_ip_a)
+
+
+class ColdplugWifi(NetworkManagerTest):
+ '''Wifi: In these tests NM starts after setting up the AP'''
+
+ # not run by default; run "nm-wifi ColdplugWifi.shell" to get this
+ @network_test_base.run_in_subprocess
+ def shell(self):
+ '''Start AP and NM, then run a shell (for debugging)'''
+
+ self.setup_ap('hw_mode=b\nchannel=1\nssid=' + SSID, None)
+ self.start_nm(self.dev_w_client)
+ print('''
+
+client interface: %s, access point interface: %s, AP SSID: "%s"
+
+You can now run commands like "nmcli dev" or "nmcli dev wifi connect '%s'".
+Logs are in '%s'. When done, exit the shell.
+
+''' % (self.dev_w_client, self.dev_w_ap, SSID, SSID, self.workdir))
+ subprocess.call(['bash', '-i'])
+
+ @network_test_base.run_in_subprocess
+ def test_open_b_ip4(self):
+ '''Open network, 802.11b, IPv4'''
+
+ self.do_test('hw_mode=b\nchannel=1\nssid=' + SSID, None, 11000)
+
+ #
+ # Common test code
+ #
+
+ # libnm-glib has a lot of internal persistent state (private D-BUS
+ # connections and such); as it is very brittle and hard to track down
+ # all remaining references to any NM* object after a test, we rather
+ # run each test in a separate subprocess
+ @network_test_base.run_in_subprocess
+ def do_test(self, hostapd_conf, ipv6_mode, expected_max_bitrate,
+ secret=None, ip6_privacy=None):
+ '''Actual test code, parameterized for the particular test case'''
+
+ self.setup_ap(hostapd_conf, ipv6_mode)
+ self.start_nm(self.dev_w_client)
+
+ # on coldplug we expect the AP to be picked out fast
+ ap = self.wait_ap(timeout=100)
+ self.assertTrue(ap.get_path().startswith('/org/freedesktop/NetworkManager'))
+ self.assertEqual(ap.get_mode(), getattr(NM, '80211Mode').INFRA)
+ self.assertEqual(ap.get_max_bitrate(), expected_max_bitrate)
+ #self.assertEqual(ap.get_flags(), )
+
+ # should not auto-connect
+ self.assertEqual(self.nmclient.get_active_connections(), [])
+
+ # connect to that AP
+ (conn, active_conn) = self.connect_to_ap(ap, secret, ipv6_mode, ip6_privacy)
+
+ # check NMActiveConnection object
+ self.assertIn(active_conn.get_uuid(), [c.get_uuid() for c in self.nmclient.get_active_connections()])
+ self.assertEqual([d.get_udi() for d in active_conn.get_devices()], [self.nmdev_w.get_udi()])
+
+ # check corresponding NMConnection object
+ wireless_setting = conn.get_setting_wireless()
+ self.assertEqual(wireless_setting.get_ssid().get_data(), SSID.encode())
+ self.assertEqual(wireless_setting.get_hidden(), False)
+ if secret:
+ self.assertEqual(conn.get_setting_wireless_security().get_name(), NM.SETTING_WIRELESS_SECURITY_SETTING_NAME)
+ else:
+ self.assertEqual(conn.get_setting_wireless_security(), None)
+ # for debugging
+ #conn.dump()
+
+ # for IPv6, check privacy setting
+ if ipv6_mode is not None and ip6_privacy != NM.SettingIP6ConfigPrivacy.UNKNOWN:
+ assert ip6_privacy is not None, 'for IPv6 tests you need to specify ip6_privacy flag'
+ ip6_setting = conn.get_setting_ip6_config()
+ self.assertEqual(ip6_setting.props.ip6_privacy, ip6_privacy)
+
+ self.check_low_level_config(self.dev_w_client, ipv6_mode, ip6_privacy)
+
+@unittest.skipIf(DBusTestCase is object,
+ 'WARNING: python-dbusmock not installed, skipping suspend tests; get it from https://pypi.python.org/pypi/python-dbusmock')
+class Suspend(NetworkManagerTest, DBusTestCase):
+ '''These tests run under a mock logind on a private system D-BUS'''
+
+ @classmethod
+ def setUpClass(klass):
+ klass.start_system_bus()
+ NetworkManagerTest.setUpClass()
+
+ @classmethod
+ def tearDownClass(klass):
+ NetworkManagerTest.tearDownClass()
+ DBusTestCase.tearDownClass()
+
+ def setUp(self):
+ NetworkManagerTest.setUp(self)
+
+ # start mock polkit and logind processes, so that we can
+ # intercept/control suspend
+ (p_polkit, self.obj_polkit) = self.spawn_server_template('polkitd', {}, stdout=subprocess.PIPE)
+ # by default we are not concerned about restricting access in the tests
+ self.obj_polkit.AllowUnknown(True)
+ self.addCleanup(p_polkit.wait)
+ self.addCleanup(p_polkit.terminate)
+
+ (p_logind, self.obj_logind) = self.spawn_server_template('logind', {}, stdout=subprocess.PIPE)
+ self.addCleanup(p_logind.wait)
+ self.addCleanup(p_logind.terminate)
+
+ # we have to manually start wpa_supplicant, as D-BUS activation does
+ # not happen for the fake D-BUS
+ log = os.path.join(self.workdir, 'wpasupplicant.log')
+ p_wpasupp = subprocess.Popen(['wpa_supplicant', '-u', '-d', '-e', '-K',
+ self.entropy_file, '-f', log])
+ self.addCleanup(p_wpasupp.wait)
+ self.addCleanup(p_wpasupp.terminate)
+
+ def fixme_test_active_ip4(self):
+ '''suspend during active IPv4 connection'''
+
+ self.do_test('hw_mode=b\nchannel=1\nssid=' + SSID, None,
+ ['inet 192.168.5.\d+/24'])
+
+ def fixme_test_active_ip6(self):
+ '''suspend during active IPv6 connection'''
+
+ self.do_test('hw_mode=b\nchannel=1\nssid=' + SSID, 'ra-only',
+ ['inet6 2600::'])
+
+ #
+ # Common test code
+ #
+
+ @network_test_base.run_in_subprocess
+ def do_test(self, hostapd_conf, ipv6_mode, expected_ip_a):
+ '''Actual test code, parameterized for the particular test case'''
+
+ self.setup_ap(hostapd_conf, ipv6_mode)
+ self.start_nm(self.dev_w_client)
+ ap = self.wait_ap(timeout=1800)
+ (conn, active_conn) = self.connect_to_ap(ap, None, ipv6_mode, None)
+
+ # send logind signal that we are about to suspend
+ self.obj_logind.EmitSignal('', 'PrepareForSleep', 'b', [True])
+
+ # disabling should be fast, give it one second
+ self.assertEventually(lambda: self.nmdev_w.get_state() == NM.DeviceState.UNMANAGED,
+ timeout=10)
+ self.assert_iface_down(self.dev_w_client)
+
+ # send logind signal that we resumed
+ self.obj_logind.EmitSignal('', 'PrepareForSleep', 'b', [False])
+
+ # this involves DHCP, use same timeout as for regular connection
+ self.assertEventually(lambda: self.nmdev_w.get_state() == NM.DeviceState.ACTIVATED,
+ timeout=100)
+
+ # dev_w_client should be back up
+ self.assert_iface_up(self.dev_w_client, expected_ip_a)
+
+
+def setUpModule():
+ # AppArmor currently does not allow us to access the system D-BUS from an
+ # unshared file system. Hack the policy to allow that until that gets fixed
+ # properly. See https://launchpad.net/bugs/1244157
+ # unshare the mount namespace, so that our tmpfs mounts are guaranteed to get
+ # cleaned up, and don't influence the production system
+ libc6 = ctypes.cdll.LoadLibrary('libc.so.6')
+ assert libc6.unshare(ctypes.c_int(0x00020000)) == 0, 'failed to unshare mount namespace'
+
+ # stop system-wide NetworkManager to avoid interfering with tests
+ nm_running = subprocess.call('service network-manager stop 2>&1', shell=True) == 0
+
+
+def tearDownModule():
+ subprocess.call('dhclient eth0', shell=True)
+ subprocess.call('sleep 10', shell=True)
+
+
+if __name__ == '__main__':
+ # avoid unintelligible error messages, and breaking "make check" when not being
+ # root
+ if os.getuid() != 0:
+ sys.stderr.write('This integration test suite needs to be run as root\n')
+ sys.exit(1)
+
+ # write to stdout, not stderr
+ runner = unittest.TextTestRunner(stream=sys.stdout, verbosity=2)
+ unittest.main(testRunner=runner)