summaryrefslogtreecommitdiff
path: root/nm.py
diff options
context:
space:
mode:
authorThomas Haller <thaller@redhat.com>2020-01-30 18:21:00 +0100
committerThomas Haller <thaller@redhat.com>2020-01-31 01:03:45 +0100
commit46a952234a7c5bb9b2d2a3e5119a396be1cd0f4f (patch)
tree137a9c5faf54d61bab4f52029ea5a8afcd48d9b2 /nm.py
parentb6ce3c7163150852976b3530576a977810373753 (diff)
downloadNetworkManager-th/libnm-test.tar.gz
Diffstat (limited to 'nm.py')
-rw-r--r--nm.py573
1 files changed, 573 insertions, 0 deletions
diff --git a/nm.py b/nm.py
new file mode 100644
index 0000000000..032c8577f4
--- /dev/null
+++ b/nm.py
@@ -0,0 +1,573 @@
+#!/usr/bin/python3
+# Test NetworkManager on simulated network devices
+# For an interactive shell test, run "nm ColdplugWifi.shell", see below
+
+__author__ = 'Martin Pitt <martin.pitt@ubuntu.com>'
+__copyright__ = '(C) 2013 Canonical Ltd.'
+__license__ = 'GPL v2 or later'
+
+import sys
+import os
+import os.path
+import re
+import time
+import subprocess
+import socket
+import netaddr
+import unittest
+import ctypes
+
+try:
+ from dbusmock import DBusTestCase
+except ImportError:
+ DBusTestCase = object # dummy so that the class declaration works
+
+import gi
+
+gi.require_version('NM', '1.0')
+from gi.repository import NM, GLib, Gio
+
+sys.path.append(os.path.dirname(__file__))
+import network_test_base
+
+SSID = 'fake net'
+
+# If True, NetworkManager logs directly to stdout, to watch logs in real time
+#NM_LOG_STDOUT = os.getenv('NM_LOG_STDOUT', False)
+NM_LOG_STDOUT = False
+
+# avoid accidentally destroying any real config
+os.environ['GSETTINGS_BACKEND'] = 'memory'
+
+# we currently get a lot of WARNINGs/CRITICALs from GI (leaked objects from
+# previous test runs/main loops?) Redirect them to stdout, to avoid failing
+# autopkgtests
+os.dup2(sys.stdout.fileno(), sys.stderr.fileno())
+
+
+class NetworkManagerTest(network_test_base.NetworkTestBase):
+ '''Provide common functionality for NM tests'''
+
+ def start_nm(self, wait_iface=None, auto_connect=True):
+ '''Start NetworkManager and initialize client object
+
+ If wait_iface is given, wait until NM recognizes that interface.
+ Otherwise, just wait until NM has initialized (for coldplug mode).
+
+ If auto_connect is False, set the "no-auto-default=*" option to avoid
+ auto-connecting to wired devices.
+ '''
+ # mount tmpfses over system directories, to avoid destroying the
+ # production configuration, and isolating tests from each other
+ if not os.path.exists('/run/NetworkManager'):
+ os.mkdir('/run/NetworkManager')
+ for d in ['/etc/NetworkManager', '/var/lib/NetworkManager',
+ '/run/NetworkManager']:
+ subprocess.check_call(['mount', '-n', '-t', 'tmpfs', 'none', d])
+ self.addCleanup(subprocess.call, ['umount', d])
+ os.mkdir('/etc/NetworkManager/system-connections')
+
+ # create local configuration; this allows us to have full control, and
+ # we also need to blacklist the AP device so that NM does not tear it
+ # down; we also blacklist any existing real interface to avoid
+ # interfering with it, and for getting predictable results
+ blacklist = ''
+ for iface in os.listdir('/sys/class/net'):
+ if iface == "bonding_masters":
+ continue
+ if iface != self.dev_w_client and iface != self.dev_e_client:
+ with open('/sys/class/net/%s/address' % iface) as f:
+ if blacklist:
+ blacklist += ';'
+ blacklist += 'mac:%s' % f.read().strip()
+
+ conf = os.path.join(self.workdir, 'NetworkManager.conf')
+ extra_main = ''
+ if not auto_connect:
+ extra_main += 'no-auto-default=*\n'
+
+ with open(conf, 'w') as f:
+ f.write('[main]\nplugins=keyfile\n%s\n[keyfile]\nunmanaged-devices=%s\n' %
+ (extra_main, blacklist))
+
+ if NM_LOG_STDOUT:
+ f_log = None
+ else:
+ log = os.path.join(self.workdir, 'NetworkManager.log')
+ f_log = os.open(log, os.O_CREAT | os.O_WRONLY | os.O_SYNC)
+
+ # build NM command line
+ argv = ['NetworkManager', '--log-level=debug', '--debug', '--config=' + conf]
+ # allow specifying extra arguments
+ argv += os.environ.get('NM_TEST_DAEMON_ARGS', '').strip().split()
+
+ p = subprocess.Popen(argv, stdout=f_log, stderr=subprocess.STDOUT)
+ # automatically terminate process at end of test case
+ self.addCleanup(p.wait)
+ self.addCleanup(p.terminate)
+ self.addCleanup(self.shutdown_connections)
+
+ if NM_LOG_STDOUT:
+ # let it initialize, then print a marker
+ time.sleep(1)
+ print('******* NM initialized *********\n\n')
+ else:
+ self.addCleanup(os.close, f_log)
+
+ # this should be fast, give it 2 s to initialize
+ if wait_iface:
+ self.poll_text(log, 'manager: (%s): new' % wait_iface, timeout=100)
+
+ self.nmclient = NM.Client.new()
+ self.assertTrue(self.nmclient.networking_get_enabled())
+
+ # FIXME: This certainly ought to be true, but isn't
+ #self.assertTrue(self.nmclient.get_manager_running())
+
+ # determine device objects
+ for d in self.nmclient.get_devices():
+ if d.props.interface == self.dev_w_ap:
+ self.assertEqual(d.get_device_type(), NM.DeviceType.WIFI)
+ self.assertEqual(d.get_driver(), 'mac80211_hwsim')
+ self.assertEqual(d.get_hw_address(), self.mac_w_ap)
+ self.nmdev_w_ap = d
+ elif d.props.interface == self.dev_w_client:
+ self.assertEqual(d.get_device_type(), NM.DeviceType.WIFI)
+ self.assertEqual(d.get_driver(), 'mac80211_hwsim')
+ # NM ≥ 1.4 randomizes MAC addresses by default, so we can't
+ # test for equality, just make sure it's not our AP
+ self.assertNotEqual(d.get_hw_address(), self.mac_w_ap)
+ self.nmdev_w = d
+ elif d.props.interface == self.dev_e_client:
+ self.assertEqual(d.get_device_type(), NM.DeviceType.VETH)
+ self.assertEqual(d.get_driver(), 'veth')
+ self.assertEqual(d.get_hw_address(), self.mac_e_client)
+ self.nmdev_e = d
+
+ self.assertTrue(hasattr(self, 'nmdev_w_ap'), 'Could not determine wifi AP NM device')
+ self.assertTrue(hasattr(self, 'nmdev_w'), 'Could not determine wifi client NM device')
+ self.assertTrue(hasattr(self, 'nmdev_e'), 'Could not determine eth client NM device')
+
+ self.process_glib_events()
+
+ def shutdown_connections(self):
+ '''Shut down all active NM connections.'''
+
+ if NM_LOG_STDOUT:
+ print('\n\n******* Shutting down NM connections *********')
+
+ # remove all created connections
+ for active_conn in self.nmclient.get_active_connections():
+ self.nmclient.deactivate_connection(active_conn)
+ self.assertEventually(lambda: self.nmclient.get_active_connections() == [],
+ timeout=20)
+
+ # verify that NM properly deconfigures the devices
+ self.assert_iface_down(self.dev_w_client)
+ self.assert_iface_down(self.dev_e_client)
+
+ @classmethod
+ def process_glib_events(klass):
+ '''Process pending GLib main loop events'''
+
+ context = GLib.MainContext.default()
+ while context.iteration(False):
+ pass
+
+ def assertEventually(self, condition, message=None, timeout=50):
+ '''Assert that condition function eventually returns True.
+
+ timeout is in deciseconds, defaulting to 50 (5 seconds). message is
+ printed on failure.
+ '''
+ while timeout >= 0:
+ self.process_glib_events()
+ if condition():
+ break
+ timeout -= 1
+ time.sleep(0.1)
+ else:
+ self.fail(message or 'timed out waiting for ' + str(condition))
+
+ def assert_iface_down(self, iface):
+ '''Assert that client interface is down'''
+
+ out = subprocess.check_output(['ip', 'a', 'show', 'dev', iface],
+ universal_newlines=True)
+ self.assertNotIn('inet 192', out)
+ self.assertNotIn('inet6 2600', out)
+
+ if iface == self.dev_w_client:
+ out = subprocess.check_output(['iw', 'dev', iface, 'link'],
+ universal_newlines=True)
+ self.assertIn('Not connected', out)
+
+ # but AP device should never be touched by NM
+ out = subprocess.check_output(['ip', 'a', 'show', 'dev', self.dev_w_ap],
+ universal_newlines=True)
+ self.assertIn('state UP', out)
+
+ def assert_iface_up(self, iface, expected_ip_a=None, unexpected_ip_a=None):
+ '''Assert that client interface is up'''
+
+ out = subprocess.check_output(['ip', 'a', 'show', 'dev', iface],
+ universal_newlines=True)
+ self.assertIn('state UP', out)
+ if expected_ip_a:
+ for r in expected_ip_a:
+ self.assertRegex(out, r)
+ if unexpected_ip_a:
+ for r in unexpected_ip_a:
+ self.assertNotRegex(out, r)
+
+ if iface == self.dev_w_client:
+ out = subprocess.check_output(['iw', 'dev', iface, 'link'],
+ universal_newlines=True)
+ self.assertIn('Connected to ' + self.mac_w_ap, out)
+ self.assertIn('SSID: ' + SSID, out)
+
+ def wait_ap(self, timeout):
+ '''Wait for AccessPoint NM object to appear, and return it'''
+
+ self.assertEventually(lambda: len(self.nmdev_w.get_access_points()) > 0,
+ 'timed out waiting for AP to be detected',
+ timeout=timeout)
+
+ return self.nmdev_w.get_access_points()[0]
+
+ def connect_to_ap(self, ap, secret, ipv6_mode, ip6_privacy):
+ '''Connect to an NMAccessPoint.
+
+ secret should be None for open networks, and a string with the password
+ for WEP/WPA.
+
+ ip6_privacy is a NM.SettingIP6ConfigPrivacy flag.
+
+ Return (NMConnection, NMActiveConnection) objects.
+ '''
+
+ ip4_method = NM.SETTING_IP4_CONFIG_METHOD_DISABLED
+ ip6_method = NM.SETTING_IP6_CONFIG_METHOD_IGNORE
+ if ipv6_mode is None:
+ ip4_method = NM.SETTING_IP4_CONFIG_METHOD_AUTO
+ else:
+ ip6_method = NM.SETTING_IP6_CONFIG_METHOD_AUTO
+
+ # If we have a secret, supply it to the new connection right away;
+ # adding it afterwards with update_secrets() does not work, and we
+ # can't implement a SecretAgent as get_secrets() would need to build a
+ # map of a map of gpointers to gpointers which is too much for PyGI
+ partial_conn = NM.SimpleConnection.new()
+ partial_conn.add_setting(NM.SettingIP4Config(method=ip4_method))
+ if secret:
+ partial_conn.add_setting(NM.SettingWirelessSecurity.new())
+ # FIXME: needs update for other auth types
+ partial_conn.update_secrets(NM.SETTING_WIRELESS_SECURITY_SETTING_NAME,
+ GLib.Variant('a{sv}', {
+ 'psk': GLib.Variant('s', secret)
+ }))
+ if ip6_privacy is not None:
+ partial_conn.add_setting(NM.SettingIP6Config(ip6_privacy=ip6_privacy,
+ method=ip6_method))
+
+ ml = GLib.MainLoop()
+ self.cb_conn = None
+ self.cancel = Gio.Cancellable()
+ self.timeout_tag = 0
+
+ def add_activate_cb(client, res, data):
+ if (self.timeout_tag > 0):
+ GLib.source_remove(self.timeout_tag)
+ self.timeout_tag = 0
+ try:
+ self.cb_conn = \
+ self.nmclient.add_and_activate_connection_finish(res)
+
+ except gi.repository.GLib.Error as e:
+ # Check if the error is "Operation was cancelled"
+ if (e.domain != "g-io-error-quark" or e.code != 19):
+ self.fail("add_and_activate_connection failed: %s (%s, %d)" %
+ (e.message, e.domain, e.code))
+ ml.quit()
+
+ def timeout_cb():
+ self.timeout_tag = -1
+ self.cancel.cancel()
+ ml.quit()
+ return GLib.SOURCE_REMOVE
+
+ self.nmclient.add_and_activate_connection_async(partial_conn, self.nmdev_w, ap.get_path(), self.cancel, add_activate_cb, None)
+ self.timeout_tag = GLib.timeout_add_seconds(300, timeout_cb)
+ ml.run()
+ if (self.timeout_tag < 0):
+ self.timeout_tag = 0
+ self.fail('Main loop for adding connection timed out!')
+ self.assertNotEqual(self.cb_conn, None)
+ active_conn = self.cb_conn
+ self.cb_conn = None
+
+ conn = self.conn_from_active_conn(active_conn)
+ self.assertTrue(conn.verify())
+
+ # verify need_secrets()
+ needed_secrets = conn.need_secrets()
+ if secret is None:
+ self.assertEqual(needed_secrets, (None, []))
+ else:
+ self.assertEqual(needed_secrets[0], NM.SETTING_WIRELESS_SECURITY_SETTING_NAME)
+ self.assertEqual(type(needed_secrets[1]), list)
+ self.assertGreaterEqual(len(needed_secrets[1]), 1)
+ # FIXME: needs update for other auth types
+ self.assertIn(needed_secrets[1][0], [NM.SETTING_WIRELESS_SECURITY_PSK])
+
+ # we are usually ACTIVATING at this point; wait for completion
+ # TODO: 5s is not enough, argh slow DHCP client
+ self.assertEventually(lambda: active_conn.get_state() == NM.ActiveConnectionState.ACTIVATED,
+ 'timed out waiting for %s to get activated' % active_conn.get_connection(),
+ timeout=600)
+ self.assertEqual(self.nmdev_w.get_state(), NM.DeviceState.ACTIVATED)
+ return (conn, active_conn)
+
+ def conn_from_active_conn(self, active_conn):
+ '''Get NMConnection object for an NMActiveConnection object'''
+
+ # this sometimes takes a second try, when the corresponding
+ # NMConnection object is not yet available
+ tries = 3
+ while tries > 0:
+ self.process_glib_events()
+ print("active_conn", active_conn, "\n")
+ print("state", active_conn.props.state, "\n")
+ print("get_path", active_conn.get_path(), "\n")
+# time.sleep(1)
+ path = active_conn.get_connection().get_path()
+ for dev in active_conn.get_devices():
+ for c in dev.get_available_connections():
+ if c.get_path() == path:
+ return c
+ time.sleep(0.1)
+ tries -= 1
+
+ self.fail('Could not find NMConnection object for %s' % path)
+
+ def check_low_level_config(self, iface, ipv6_mode, ip6_privacy):
+ '''Check actual hardware state with ip/iw after being connected'''
+
+ # list of expected regexps in "ip a" output
+ expected_ip_a = []
+ unexpected_ip_a = []
+
+ if ipv6_mode is not None:
+ if ipv6_mode in ('', 'slaac'):
+ # has global address from our DHCP server
+ expected_ip_a.append('inet6 2600::[0-9a-f]+/')
+ else:
+ # has address with our prefix and MAC
+ expected_ip_a.append('inet6 2600::[0-9a-f:]+/64 scope global (?:tentative )?(?:mngtmpaddr )?(?:noprefixroute )?dynamic')
+ # has address with our prefix and random IP (Privacy
+ # Extension), if requested
+ priv_re = 'inet6 2600:[0-9a-f:]+/64 scope global temporary (?:tentative )?(?:mngtmpaddr )?dynamic'
+ if ip6_privacy in (NM.SettingIP6ConfigPrivacy.PREFER_TEMP_ADDR,
+ NM.SettingIP6ConfigPrivacy.PREFER_PUBLIC_ADDR):
+ expected_ip_a.append(priv_re)
+ else:
+ # FIXME: add a negative test here
+ pass
+ #unexpected_ip_a.append(priv_re)
+
+ # has a link-local address
+ expected_ip_a.append('inet6 fe80::[0-9a-f:]+/64 scope link')
+ else:
+ expected_ip_a.append('inet 192.168.5.\d+/24')
+
+ self.assert_iface_up(iface, expected_ip_a, unexpected_ip_a)
+
+
+class ColdplugWifi(NetworkManagerTest):
+ '''Wifi: In these tests NM starts after setting up the AP'''
+
+ # not run by default; run "nm-wifi ColdplugWifi.shell" to get this
+ @network_test_base.run_in_subprocess
+ def shell(self):
+ '''Start AP and NM, then run a shell (for debugging)'''
+
+ self.setup_ap('hw_mode=b\nchannel=1\nssid=' + SSID, None)
+ self.start_nm(self.dev_w_client)
+ print('''
+
+client interface: %s, access point interface: %s, AP SSID: "%s"
+
+You can now run commands like "nmcli dev" or "nmcli dev wifi connect '%s'".
+Logs are in '%s'. When done, exit the shell.
+
+''' % (self.dev_w_client, self.dev_w_ap, SSID, SSID, self.workdir))
+ subprocess.call(['bash', '-i'])
+
+ @network_test_base.run_in_subprocess
+ def test_open_b_ip4(self):
+ '''Open network, 802.11b, IPv4'''
+
+ self.do_test('hw_mode=b\nchannel=1\nssid=' + SSID, None, 11000)
+
+ #
+ # Common test code
+ #
+
+ # libnm-glib has a lot of internal persistent state (private D-BUS
+ # connections and such); as it is very brittle and hard to track down
+ # all remaining references to any NM* object after a test, we rather
+ # run each test in a separate subprocess
+ @network_test_base.run_in_subprocess
+ def do_test(self, hostapd_conf, ipv6_mode, expected_max_bitrate,
+ secret=None, ip6_privacy=None):
+ '''Actual test code, parameterized for the particular test case'''
+
+ self.setup_ap(hostapd_conf, ipv6_mode)
+ self.start_nm(self.dev_w_client)
+
+ # on coldplug we expect the AP to be picked out fast
+ ap = self.wait_ap(timeout=100)
+ self.assertTrue(ap.get_path().startswith('/org/freedesktop/NetworkManager'))
+ self.assertEqual(ap.get_mode(), getattr(NM, '80211Mode').INFRA)
+ self.assertEqual(ap.get_max_bitrate(), expected_max_bitrate)
+ #self.assertEqual(ap.get_flags(), )
+
+ # should not auto-connect
+ self.assertEqual(self.nmclient.get_active_connections(), [])
+
+ # connect to that AP
+ (conn, active_conn) = self.connect_to_ap(ap, secret, ipv6_mode, ip6_privacy)
+
+ # check NMActiveConnection object
+ self.assertIn(active_conn.get_uuid(), [c.get_uuid() for c in self.nmclient.get_active_connections()])
+ self.assertEqual([d.get_udi() for d in active_conn.get_devices()], [self.nmdev_w.get_udi()])
+
+ # check corresponding NMConnection object
+ wireless_setting = conn.get_setting_wireless()
+ self.assertEqual(wireless_setting.get_ssid().get_data(), SSID.encode())
+ self.assertEqual(wireless_setting.get_hidden(), False)
+ if secret:
+ self.assertEqual(conn.get_setting_wireless_security().get_name(), NM.SETTING_WIRELESS_SECURITY_SETTING_NAME)
+ else:
+ self.assertEqual(conn.get_setting_wireless_security(), None)
+ # for debugging
+ #conn.dump()
+
+ # for IPv6, check privacy setting
+ if ipv6_mode is not None and ip6_privacy != NM.SettingIP6ConfigPrivacy.UNKNOWN:
+ assert ip6_privacy is not None, 'for IPv6 tests you need to specify ip6_privacy flag'
+ ip6_setting = conn.get_setting_ip6_config()
+ self.assertEqual(ip6_setting.props.ip6_privacy, ip6_privacy)
+
+ self.check_low_level_config(self.dev_w_client, ipv6_mode, ip6_privacy)
+
+@unittest.skipIf(DBusTestCase is object,
+ 'WARNING: python-dbusmock not installed, skipping suspend tests; get it from https://pypi.python.org/pypi/python-dbusmock')
+class Suspend(NetworkManagerTest, DBusTestCase):
+ '''These tests run under a mock logind on a private system D-BUS'''
+
+ @classmethod
+ def setUpClass(klass):
+ klass.start_system_bus()
+ NetworkManagerTest.setUpClass()
+
+ @classmethod
+ def tearDownClass(klass):
+ NetworkManagerTest.tearDownClass()
+ DBusTestCase.tearDownClass()
+
+ def setUp(self):
+ NetworkManagerTest.setUp(self)
+
+ # start mock polkit and logind processes, so that we can
+ # intercept/control suspend
+ (p_polkit, self.obj_polkit) = self.spawn_server_template('polkitd', {}, stdout=subprocess.PIPE)
+ # by default we are not concerned about restricting access in the tests
+ self.obj_polkit.AllowUnknown(True)
+ self.addCleanup(p_polkit.wait)
+ self.addCleanup(p_polkit.terminate)
+
+ (p_logind, self.obj_logind) = self.spawn_server_template('logind', {}, stdout=subprocess.PIPE)
+ self.addCleanup(p_logind.wait)
+ self.addCleanup(p_logind.terminate)
+
+ # we have to manually start wpa_supplicant, as D-BUS activation does
+ # not happen for the fake D-BUS
+ log = os.path.join(self.workdir, 'wpasupplicant.log')
+ p_wpasupp = subprocess.Popen(['wpa_supplicant', '-u', '-d', '-e', '-K',
+ self.entropy_file, '-f', log])
+ self.addCleanup(p_wpasupp.wait)
+ self.addCleanup(p_wpasupp.terminate)
+
+ def fixme_test_active_ip4(self):
+ '''suspend during active IPv4 connection'''
+
+ self.do_test('hw_mode=b\nchannel=1\nssid=' + SSID, None,
+ ['inet 192.168.5.\d+/24'])
+
+ def fixme_test_active_ip6(self):
+ '''suspend during active IPv6 connection'''
+
+ self.do_test('hw_mode=b\nchannel=1\nssid=' + SSID, 'ra-only',
+ ['inet6 2600::'])
+
+ #
+ # Common test code
+ #
+
+ @network_test_base.run_in_subprocess
+ def do_test(self, hostapd_conf, ipv6_mode, expected_ip_a):
+ '''Actual test code, parameterized for the particular test case'''
+
+ self.setup_ap(hostapd_conf, ipv6_mode)
+ self.start_nm(self.dev_w_client)
+ ap = self.wait_ap(timeout=1800)
+ (conn, active_conn) = self.connect_to_ap(ap, None, ipv6_mode, None)
+
+ # send logind signal that we are about to suspend
+ self.obj_logind.EmitSignal('', 'PrepareForSleep', 'b', [True])
+
+ # disabling should be fast, give it one second
+ self.assertEventually(lambda: self.nmdev_w.get_state() == NM.DeviceState.UNMANAGED,
+ timeout=10)
+ self.assert_iface_down(self.dev_w_client)
+
+ # send logind signal that we resumed
+ self.obj_logind.EmitSignal('', 'PrepareForSleep', 'b', [False])
+
+ # this involves DHCP, use same timeout as for regular connection
+ self.assertEventually(lambda: self.nmdev_w.get_state() == NM.DeviceState.ACTIVATED,
+ timeout=100)
+
+ # dev_w_client should be back up
+ self.assert_iface_up(self.dev_w_client, expected_ip_a)
+
+
+def setUpModule():
+ # AppArmor currently does not allow us to access the system D-BUS from an
+ # unshared file system. Hack the policy to allow that until that gets fixed
+ # properly. See https://launchpad.net/bugs/1244157
+ # unshare the mount namespace, so that our tmpfs mounts are guaranteed to get
+ # cleaned up, and don't influence the production system
+ libc6 = ctypes.cdll.LoadLibrary('libc.so.6')
+ assert libc6.unshare(ctypes.c_int(0x00020000)) == 0, 'failed to unshare mount namespace'
+
+ # stop system-wide NetworkManager to avoid interfering with tests
+ nm_running = subprocess.call('service network-manager stop 2>&1', shell=True) == 0
+
+
+def tearDownModule():
+ subprocess.call('dhclient eth0', shell=True)
+ subprocess.call('sleep 10', shell=True)
+
+
+if __name__ == '__main__':
+ # avoid unintelligible error messages, and breaking "make check" when not being
+ # root
+ if os.getuid() != 0:
+ sys.stderr.write('This integration test suite needs to be run as root\n')
+ sys.exit(1)
+
+ # write to stdout, not stderr
+ runner = unittest.TextTestRunner(stream=sys.stdout, verbosity=2)
+ unittest.main(testRunner=runner)