#!/usr/bin/env python from __future__ import print_function import sys import gi from gi.repository import GLib try: gi.require_version("NM", "1.0") from gi.repository import NM except Exception as e: print("Cannot load gi.NM: %s" % (str(e))) sys.exit(77) import os import dbus import dbus.service import dbus.mainloop.glib import random import uuid import hashlib import socket import collections ############################################################################### _DEFAULT_ARG = object() ############################################################################### class Global: pass gl = None ############################################################################### class TestError(AssertionError): def __init__(self, message="Unspecified error", errors=None): AssertionError.__init__(self, message) self.errors = errors ############################################################################### class Util: PY3 = sys.version_info[0] == 3 @staticmethod def g_source_remove(source_id): if source_id is not None: GLib.source_remove(source_id) @staticmethod def addr_family_check(family, allow_af_unspec=False): if family == socket.AF_INET: return if family == socket.AF_INET6: return if allow_af_unspec and family == socket.AF_UNSPEC: return raise TestError("invalid address family %s" % (family)) @staticmethod def ip_addr_pton(addr, family=None): if addr is None: return (None, None) if family is not None and family is not socket.AF_UNSPEC: Util.addr_family_check(family) a = socket.inet_pton(family, addr) else: a = None family = None try: a = socket.inet_pton(socket.AF_INET, addr) family = socket.AF_INET except: a = socket.inet_pton(socket.AF_INET6, addr) family = socket.AF_INET6 if Util.PY3: a = tuple([int(c) for c in a]) else: a = tuple([ord(c) for c in a]) return (a, family) @staticmethod def ip_addr_ntop(addr, family=None): if Util.PY3: a = bytes(addr) else: a = "".join([chr(c) for c in addr]) if len(a) == 4: f = socket.AF_INET elif len(a) == 16: f = socket.AF_INET6 else: raise TestError("Invalid binary IP address '%s'" % (repr(addr))) if family is not None and f != family: raise TestError( "Unexpected address family. Expected %s but ip address was %s" % (family, repr(addr)) ) return socket.inet_ntop(f, a) @staticmethod def ip_addr_norm(addr, family=None): a, family = Util.ip_addr_pton(addr, family) return (Util.ip_addr_ntop(a, family), family) @staticmethod def ip4_addr_be32(addr): # return the IPv4 address as 32 bit integer in network byte order # (big endian). a, family = Util.ip_addr_pton(addr, socket.AF_INET) n = 0 for i in range(4): n = (n << 8) + a[i] return socket.htonl(n) @staticmethod def ip6_addr_ay(addr): return Util.ip_addr_pton(addr, socket.AF_INET6)[0] @staticmethod def ip_net_parse(net, family=None): parts = net.split("/") if len(parts) != 2: raise TestError( "Invalid IP network '%s' has not '/' for the prefix length" % (net) ) prefix = int(parts[1]) addr, family = Util.ip_addr_norm(parts[0], family) if family == socket.AF_INET: if prefix < 0 or prefix > 32: raise TestError("Invalid prefix length for IPv4 address '%s'" % (net)) else: if prefix < 0 or prefix > 128: raise TestError("Invalid prefix length for IPv4 address '%s'" % (net)) return (addr, prefix, family) class RandomSeed: def __init__(self, seed): self.cnt = 0 self.seed = str(seed) def _next(self): c = self.cnt self.cnt += 1 return self.seed + "-" + str(c) @staticmethod def wrap(seed): if seed is None: return None if isinstance(seed, Util.RandomSeed): return seed return Util.RandomSeed(seed) @staticmethod def get(seed, extra_seed=None): if seed is None: return None if isinstance(seed, Util.RandomSeed): seed = seed._next() else: seed = str(seed) if extra_seed is None: try: extra_seed = Util.RandomSeed._extra_seed except: extra_seed = os.environ.get( "NM_TEST_NETWORKMANAGER_SERVICE_SEED", "" ) Util.RandomSeed._extra_seed = extra_seed return extra_seed + seed @staticmethod def random_stream(seed, length=None): seed = Util.RandomSeed.wrap(seed) # generates a stream of integers, in the range [0..255] if seed is None: # without a seed, we generate new random numbers. while length is None or length > 0: yield random.randint(0, 255) if length is not None: length -= 1 return v = None while length is None or length > 0: if not v: s = Util.RandomSeed.get(seed) s = s.encode("utf8") v = hashlib.sha256(s).hexdigest() yield int(v[0:2], 16) v = v[2:] if length is not None: length -= 1 @staticmethod def random_int(seed, v_start=_DEFAULT_ARG, v_end=_DEFAULT_ARG): # - if neither start not end is give, return a number in the range # u32 range [0, 0xFFFFFFFF] # - if only start is given (the first argument), interpret it as # the range of the interval. That is, return random number in # range [0, start-1] # - if end and start is given, return a random number with this # range (inclusive!): [start, end] if v_end is _DEFAULT_ARG: # if only one edge is provided (no v_end), then the range # is [0, v_start[. That is, random_int(seed, 5), returns # values from 0 to 4. if v_start is _DEFAULT_ARG: # by default, return a 32u integer. v_end = 0x100000000 else: v_end = v_start v_start = 0 else: if v_start is _DEFAULT_ARG: raise TestError("Cannot specify end without start") # if a full range is provided, v_end is included. # random_int(seed, 0, 4) returns values from 0 to 4. v_end += 1 n = 0 span = v_end - v_start assert span > 0 for r in Util.random_stream(seed): n = n * 256 + r if n > span: break return v_start + (n % span) @staticmethod def random_bool(seed): return Util.random_int(seed, 0, 1) == 1 @staticmethod def random_subset(seed, all_set): all_set = list(all_set) result = [] seed = Util.RandomSeed.wrap(seed) for i in list( range(Util.random_int(Util.RandomSeed.get(seed), len(all_set) + 1)) ): idx = Util.random_int(Util.RandomSeed.get(seed), len(all_set)) result.append(all_set[idx]) del all_set[idx] return result @staticmethod def random_mac(seed): return "%02X:%02X:%02X:%02X:%02X:%02X" % tuple(Util.random_stream(seed, 6)) @staticmethod def random_ip(seed, net=None, family=None): if net is not None: mask, prefix, family = Util.ip_net_parse(net, family) a_mask, unused = Util.ip_addr_pton(mask, family) else: prefix = None Util.addr_family_check(family) if family == socket.AF_INET: l = 4 else: l = 16 a = tuple(Util.random_stream(seed, l)) if prefix is not None: a2 = [] for i in range(l): if prefix == 0: c = a[i] elif prefix >= 8: c = a_mask[i] prefix -= 8 else: c = 0xFF & (0xFF << (8 - prefix)) c = (a[i] & ~c) | (a_mask[i] & c) prefix = 0 a2.append(c) a = tuple(a2) return (Util.ip_addr_ntop(a, family), family) @staticmethod def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) @staticmethod def variant_from_dbus(val): if isinstance(val, (dbus.String, str)): return GLib.Variant("s", str(val)) if isinstance(val, dbus.UInt32): return GLib.Variant("u", int(val)) if isinstance(val, dbus.UInt64): return GLib.Variant("t", int(val)) if isinstance(val, dbus.Int32): return GLib.Variant("i", int(val)) if isinstance(val, dbus.Boolean): return GLib.Variant("b", bool(val)) if isinstance(val, dbus.Byte): return GLib.Variant("y", int(val)) if isinstance(val, dbus.Array): try: if val.signature == "s": return GLib.Variant("as", [Util.variant_from_dbus(x) for x in val]) if val.signature == "b": return GLib.Variant("ab", [Util.variant_from_dbus(x) for x in val]) if val.signature == "y": return GLib.Variant("ay", [int(x) for x in val]) if val.signature == "u": return GLib.Variant("au", [int(x) for x in val]) if val.signature == "ay": return GLib.Variant("aay", [Util.variant_from_dbus(x) for x in val]) if val.signature == "au": return GLib.Variant("aau", [Util.variant_from_dbus(x) for x in val]) if val.signature == "a{sv}": return GLib.Variant( "aa{sv}", [ collections.OrderedDict( [ (str(k), Util.variant_from_dbus(v)) for k, v in addr.items() ] ) for addr in val ], ) if val.signature == "(ayuay)": return GLib.Variant( "a(ayuay)", [Util.variant_from_dbus(x) for x in val] ) if val.signature == "(ayuayu)": return GLib.Variant( "a(ayuayu)", [Util.variant_from_dbus(x) for x in val] ) except Exception as e: raise Exception( "Cannot convert array element to type '%s': %s" % (val.signature, e.message) ) if isinstance(val, dbus.Dictionary): if val.signature == "ss": return GLib.Variant( "a{ss}", collections.OrderedDict([(str(k), str(v)) for k, v in val.items()]), ) if val.signature == "sv": return GLib.Variant( "a{sv}", collections.OrderedDict( [(str(k), Util.variant_from_dbus(v)) for k, v in val.items()] ), ) if val.signature == "sa{sv}": c = collections.OrderedDict( [ ( str(key1), collections.OrderedDict( [ (str(key2), Util.variant_from_dbus(arr2)) for key2, arr2 in arr1.items() ] ), ) for key1, arr1 in val.items() ] ) return GLib.Variant("a{sa{sv}}", c) raise Exception("Unsupported type for value '%s'" % (repr(val))) ############################################################################### IFACE_DBUS = "org.freedesktop.DBus" IFACE_OBJECT_MANAGER = "org.freedesktop.DBus.ObjectManager" IFACE_CONNECTION = "org.freedesktop.NetworkManager.Settings.Connection" IFACE_DEVICE = "org.freedesktop.NetworkManager.Device" IFACE_WIFI = "org.freedesktop.NetworkManager.Device.Wireless" IFACE_TEST = "org.freedesktop.NetworkManager.LibnmGlibTest" IFACE_NM = "org.freedesktop.NetworkManager" IFACE_SETTINGS = "org.freedesktop.NetworkManager.Settings" IFACE_AGENT_MANAGER = "org.freedesktop.NetworkManager.AgentManager" IFACE_AGENT = "org.freedesktop.NetworkManager.SecretAgent" IFACE_WIRED = "org.freedesktop.NetworkManager.Device.Wired" IFACE_MODEM = "org.freedesktop.NetworkManager.Device.Modem" IFACE_VLAN = "org.freedesktop.NetworkManager.Device.Vlan" IFACE_WIFI_AP = "org.freedesktop.NetworkManager.AccessPoint" IFACE_ACTIVE_CONNECTION = "org.freedesktop.NetworkManager.Connection.Active" IFACE_VPN_CONNECTION = "org.freedesktop.NetworkManager.VPN.Connection" IFACE_DNS_MANAGER = "org.freedesktop.NetworkManager.DnsManager" IFACE_IP4_CONFIG = "org.freedesktop.NetworkManager.IP4Config" IFACE_IP6_CONFIG = "org.freedesktop.NetworkManager.IP6Config" IFACE_DHCP4_CONFIG = "org.freedesktop.NetworkManager.DHCP4Config" IFACE_DHCP6_CONFIG = "org.freedesktop.NetworkManager.DHCP6Config" ############################################################################### class BusErr: class UnknownInterfaceException(dbus.DBusException): def __init__(self, *args, **kwargs): self._dbus_error_name = "{}.UnknownInterface".format(IFACE_DBUS) dbus.DBusException.__init__(self, *args, **kwargs) class UnknownPropertyException(dbus.DBusException): def __init__(self, *args, **kwargs): self._dbus_error_name = "{}.UnknownProperty".format(IFACE_DBUS) dbus.DBusException.__init__(self, *args, **kwargs) class InvalidPropertyException(dbus.DBusException): def __init__(self, *args, **kwargs): self._dbus_error_name = "{}.InvalidProperty".format(IFACE_CONNECTION) dbus.DBusException.__init__(self, *args, **kwargs) class MissingPropertyException(dbus.DBusException): def __init__(self, *args, **kwargs): self._dbus_error_name = "{}.MissingProperty".format(IFACE_CONNECTION) dbus.DBusException.__init__(self, *args, **kwargs) class InvalidSettingException(dbus.DBusException): def __init__(self, *args, **kwargs): self._dbus_error_name = "{}.InvalidSetting".format(IFACE_CONNECTION) dbus.DBusException.__init__(self, *args, **kwargs) class MissingSettingException(dbus.DBusException): def __init__(self, *args, **kwargs): self._dbus_error_name = "{}.MissingSetting".format(IFACE_CONNECTION) dbus.DBusException.__init__(self, *args, **kwargs) class NotSoftwareException(dbus.DBusException): def __init__(self, *args, **kwargs): self._dbus_error_name = "{}.NotSoftware".format(IFACE_DEVICE) dbus.DBusException.__init__(self, *args, **kwargs) class ApNotFoundException(dbus.DBusException): def __init__(self, *args, **kwargs): self._dbus_error_name = "{}.AccessPointNotFound".format(IFACE_WIFI) dbus.DBusException.__init__(self, *args, **kwargs) class PermissionDeniedException(dbus.DBusException): def __init__(self, *args, **kwargs): self._dbus_error_name = "{}.PermissionDenied".format(IFACE_NM) dbus.DBusException.__init__(self, *args, **kwargs) class UnknownDeviceException(dbus.DBusException): def __init__(self, *args, **kwargs): self._dbus_error_name = "{}.UnknownDevice".format(IFACE_NM) dbus.DBusException.__init__(self, *args, **kwargs) class UnknownConnectionException(dbus.DBusException): def __init__(self, *args, **kwargs): self._dbus_error_name = "{}.UnknownConnection".format(IFACE_NM) dbus.DBusException.__init__(self, *args, **kwargs) class InvalidHostnameException(dbus.DBusException): def __init__(self, *args, **kwargs): self._dbus_error_name = "{}.InvalidHostname".format(IFACE_SETTINGS) dbus.DBusException.__init__(self, *args, **kwargs) class NoSecretsException(dbus.DBusException): def __init__(self, *args, **kwargs): self._dbus_error_name = "{}.NoSecrets".format(IFACE_AGENT_MANAGER) dbus.DBusException.__init__(self, *args, **kwargs) class UserCanceledException(dbus.DBusException): def __init__(self, *args, **kwargs): self._dbus_error_name = "{}.UserCanceled".format(IFACE_AGENT_MANAGER) dbus.DBusException.__init__(self, *args, **kwargs) @staticmethod def from_nmerror(e): try: domain, code = (e.domain, e.code) except: return None if domain == GLib.quark_to_string(NM.ConnectionError.quark()): if code == NM.ConnectionError.MISSINGSETTING: return BusErr.MissingSettingException(e.message) if code == NM.ConnectionError.INVALIDPROPERTY: return BusErr.InvalidPropertyException(e.message) return None @staticmethod def raise_nmerror(e): e2 = BusErr.from_nmerror(e) if e2 is not None: raise e2 raise e ############################################################################### class NmUtil: @staticmethod def con_hash_to_connection(con_hash, do_verify=False, do_normalize=False): x_con = [] for v_setting_name, v_setting in list(con_hash.items()): if isinstance(v_setting_name, (dbus.String, str)): v_setting_name = str(v_setting_name) else: raise Exception( "Expected string dict, but got '%s' key" % (v_setting_name) ) x_setting = [] for v_property_name, v_value in list(v_setting.items()): if isinstance(v_property_name, (dbus.String, str)): v_property_name = str(v_property_name) else: raise Exception( "Expected string dict, but got '%s' subkey under %s (%s)" % (v_property_name, v_setting_name, repr(con_hash)) ) try: v = Util.variant_from_dbus(v_value) except Exception as e: raise Exception( "Unsupported value %s.%s = %s (%s)" % (v_setting_name, v_property_name, v_value, str(e)) ) x_setting.append((v_property_name, v)) x_con.append((v_setting_name, collections.OrderedDict(x_setting))) x_con = GLib.Variant("a{sa{sv}}", collections.OrderedDict(x_con)) assert GLib.Variant.equal(x_con, Util.variant_from_dbus(con_hash)) try: con = NM.SimpleConnection.new_from_dbus(x_con) except: if do_verify: raise return None if do_normalize: try: con.normalize() except: if do_verify: raise if do_verify: con.verify() return con @staticmethod def con_hash_verify(con_hash, do_verify_strict=True): if NM.SETTING_CONNECTION_SETTING_NAME not in con_hash: raise BusErr.MissingSettingException("connection: setting is required") s_con = con_hash[NM.SETTING_CONNECTION_SETTING_NAME] if NM.SETTING_CONNECTION_TYPE not in s_con: raise BusErr.MissingPropertyException( "connection.type: property is required" ) if NM.SETTING_CONNECTION_UUID not in s_con: raise BusErr.MissingPropertyException( "connection.uuid: property is required" ) if NM.SETTING_CONNECTION_ID not in s_con: raise BusErr.MissingPropertyException("connection.id: property is required") if not do_verify_strict: return t = s_con[NM.SETTING_CONNECTION_TYPE] if t not in [ NM.SETTING_GSM_SETTING_NAME, NM.SETTING_VLAN_SETTING_NAME, NM.SETTING_VPN_SETTING_NAME, NM.SETTING_WIMAX_SETTING_NAME, NM.SETTING_WIRED_SETTING_NAME, NM.SETTING_WIRELESS_SETTING_NAME, ]: raise BusErr.InvalidPropertyException( 'connection.type: unsupported connection type "%s"' % (t) ) try: con_nm = NmUtil.con_hash_to_connection( con_hash, do_verify=True, do_normalize=True ) except Exception as e: BusErr.raise_nmerror(e) @staticmethod def con_hash_get_id(con_hash): if NM.SETTING_CONNECTION_SETTING_NAME in con_hash: s_con = con_hash[NM.SETTING_CONNECTION_SETTING_NAME] if NM.SETTING_CONNECTION_ID in s_con: return s_con[NM.SETTING_CONNECTION_ID] return None @staticmethod def con_hash_get_uuid(con_hash): if NM.SETTING_CONNECTION_SETTING_NAME in con_hash: s_con = con_hash[NM.SETTING_CONNECTION_SETTING_NAME] if NM.SETTING_CONNECTION_UUID in s_con: return s_con[NM.SETTING_CONNECTION_UUID] return None @staticmethod def con_hash_get_type(con_hash): if NM.SETTING_CONNECTION_SETTING_NAME in con_hash: s_con = con_hash[NM.SETTING_CONNECTION_SETTING_NAME] if NM.SETTING_CONNECTION_TYPE in s_con: return s_con[NM.SETTING_CONNECTION_TYPE] return None ############################################################################### class ExportedObj(dbus.service.Object): DBusInterface = collections.namedtuple("DBusInterface", ["dbus_iface", "props"]) @staticmethod def create_path(klass, path_prefix=None): if path_prefix is None: path_prefix = klass.path_prefix path = path_prefix + str(klass.path_counter_next) klass.path_counter_next += 1 return path @staticmethod def to_path_array(src): return dbus.Array( [ExportedObj.to_path(o) for o in src] if src else [], signature=dbus.Signature("o"), ) @staticmethod def to_path(src): if src: return dbus.ObjectPath(src.path) return dbus.ObjectPath("/") def __init__(self, object_path, ident=None): dbus.service.Object.__init__(self) self._dbus_ifaces = {} self.path = object_path # ident is an optional (unique) identifier for the instance. # The test driver may set it to reference to the object by # this identifier. For NetworkManager, the real ID of an # object on D-Bus is the object_path. But that is generated # by the stub server only after the test user created the # object. The ident parameter may be specified by the user # and thus can be hard-coded in the test. if ident is None: ident = object_path self.ident = ident def export(self): self.add_to_connection(gl.bus, self.path) gl.object_manager.add_object(self) def unexport(self): gl.object_manager.remove_object(self) self.remove_from_connection() def dbus_interface_add(self, dbus_iface, props): self._dbus_ifaces[dbus_iface] = ExportedObj.DBusInterface(dbus_iface, props) def _dbus_interface_get(self, dbus_iface): if dbus_iface not in self._dbus_ifaces: raise BusErr.UnknownInterfaceException() return self._dbus_ifaces[dbus_iface] def _dbus_interface_get_property(self, dbus_interface, propname=None): props = dbus_interface.props if propname is None: return props if propname not in props: raise BusErr.UnknownPropertyException() return props[propname] def _dbus_property_get(self, dbus_iface, propname=None): return self._dbus_interface_get_property( self._dbus_interface_get(dbus_iface), propname ) def _dbus_property_set( self, dbus_iface, propname, value, allow_detect_dbus_iface=False, dry_run=False, force_update=False, ): if allow_detect_dbus_iface and not dbus_iface: props = None for p, dbus_interface in self._dbus_ifaces.items(): if propname in dbus_interface.props: if props is not None: raise TestError( "Cannot uniquely find the property '%s' on object '%s'" % (propname, self.path) ) props = dbus_interface.props dbus_iface = p if props is None: raise TestError( "Cannot find the property '%s' on object '%s'" % (propname, self.path) ) else: try: dbus_interface = self._dbus_interface_get(dbus_iface) props = self._dbus_interface_get_property(dbus_interface) except: if dry_run: raise TestError( "No interface '%s' on '%s'" % (dbus_iface, self.path) ) raise if dry_run: if propname not in props: raise TestError( "No property '%s' on '%s' on '%s'" % (propname, dbus_iface, self.path) ) permission_granted = False if isinstance(self, ActiveConnection): if dbus_iface == IFACE_ACTIVE_CONNECTION: if propname == PRP_ACTIVE_CONNECTION_STATE: permission_granted = True elif dbus_iface == IFACE_VPN_CONNECTION: if propname == PRP_VPN_CONNECTION_VPN_STATE: permission_granted = True if not permission_granted: raise TestError( "Cannot set property '%s' on '%s' on '%s' via D-Bus" % (propname, dbus_iface, self.path) ) return assert propname in props if not force_update: if props[propname] == value: return props[propname] = value self._dbus_property_notify(dbus_iface, propname) def _dbus_property_notify(self, dbus_iface, propname): dbus_interface = self._dbus_interface_get(dbus_iface) prop = self._dbus_interface_get_property(dbus_interface, propname) if propname is not None: prop = {propname: prop} ExportedObj.PropertiesChanged(self, dbus_iface, prop, []) @dbus.service.signal(dbus.PROPERTIES_IFACE, signature="sa{sv}as") def PropertiesChanged(self, iface, changed, invalidated): pass @dbus.service.method( dbus_interface=dbus.PROPERTIES_IFACE, in_signature="s", out_signature="a{sv}" ) def GetAll(self, dbus_iface): return self._dbus_property_get(dbus_iface) @dbus.service.method( dbus_interface=dbus.PROPERTIES_IFACE, in_signature="ss", out_signature="v" ) def Get(self, dbus_iface, name): return self._dbus_property_get(dbus_iface, name) def get_managed_ifaces(self): my_ifaces = {} for iface in self._dbus_ifaces: my_ifaces[iface] = self._dbus_ifaces[iface].props return my_ifaces ############################################################################### PRP_DEVICE_UDI = "Udi" PRP_DEVICE_IFACE = "Interface" PRP_DEVICE_IPIFACE = "IpInterface" PRP_DEVICE_DRIVER = "Driver" PRP_DEVICE_STATE = "State" PRP_DEVICE_STATE_REASON = "StateReason" PRP_DEVICE_ACTIVE_CONNECTION = "ActiveConnection" PRP_DEVICE_IP4_CONFIG = "Ip4Config" PRP_DEVICE_IP6_CONFIG = "Ip6Config" PRP_DEVICE_DHCP4_CONFIG = "Dhcp4Config" PRP_DEVICE_DHCP6_CONFIG = "Dhcp6Config" PRP_DEVICE_MANAGED = "Managed" PRP_DEVICE_AUTOCONNECT = "Autoconnect" PRP_DEVICE_DEVICE_TYPE = "DeviceType" PRP_DEVICE_AVAILABLE_CONNECTIONS = "AvailableConnections" PRP_DEVICE_LLDP_NEIGHBORS = "LldpNeighbors" PRP_DEVICE_INTERFACE_FLAGS = "InterfaceFlags" class Device(ExportedObj): path_counter_next = 1 path_prefix = "/org/freedesktop/NetworkManager/Devices/" def __init__(self, iface, devtype, ident=None): if ident is None: ident = iface ExportedObj.__init__(self, ExportedObj.create_path(Device), ident) self.ip4_config = None self.ip6_config = None self.dhcp4_config = None self.dhcp6_config = None self.prp_state = NM.DeviceState.UNAVAILABLE if devtype == NM.DeviceType.MODEM: udi = "/org/freedesktop/ModemManager1/Modem/0" else: udi = "/sys/devices/virtual/%s" % iface props = { PRP_DEVICE_UDI: udi, PRP_DEVICE_IFACE: iface, PRP_DEVICE_IPIFACE: iface, PRP_DEVICE_DRIVER: "virtual", PRP_DEVICE_STATE: dbus.UInt32(self.prp_state), PRP_DEVICE_STATE_REASON: dbus.Struct( (dbus.UInt32(self.prp_state), dbus.UInt32(NM.DeviceStateReason.NONE)) ), PRP_DEVICE_ACTIVE_CONNECTION: ExportedObj.to_path(None), PRP_DEVICE_IP4_CONFIG: ExportedObj.to_path(self.ip4_config), PRP_DEVICE_IP6_CONFIG: ExportedObj.to_path(self.ip6_config), PRP_DEVICE_DHCP4_CONFIG: ExportedObj.to_path(self.dhcp4_config), PRP_DEVICE_DHCP6_CONFIG: ExportedObj.to_path(self.dhcp6_config), PRP_DEVICE_MANAGED: True, PRP_DEVICE_AUTOCONNECT: True, PRP_DEVICE_DEVICE_TYPE: dbus.UInt32(devtype), PRP_DEVICE_AVAILABLE_CONNECTIONS: ExportedObj.to_path_array([]), PRP_DEVICE_INTERFACE_FLAGS: dbus.UInt32(3), # up,lower-up PRP_DEVICE_LLDP_NEIGHBORS: dbus.Array( [ dbus.Dictionary( { "chassis-id-type": dbus.UInt32(6), "chassis-id": dbus.String("00:11:22:33:44:00"), "port-id-type": dbus.UInt32(7), "port-id": dbus.String("Uplink port"), "port-description": dbus.String("GigabitEthernet #1"), "system-name": dbus.String("test1.example.com"), "system-description": dbus.String("Test system #1"), "system-capabilities": dbus.UInt32(20), "destination": dbus.String("nearest-bridge"), } ), dbus.Dictionary( { "chassis-id-type": dbus.UInt32(2), "chassis-id": dbus.String("chassis1"), "port-id-type": dbus.UInt32(3), "port-id": dbus.String("44:44:44:44:44:44"), "port-description": dbus.String("GigabitEthernet #2"), "system-name": dbus.String("test2.example.com"), "system-description": dbus.String("Test system #2"), "system-capabilities": dbus.UInt32(2047), "destination": dbus.String("nearest-non-tpmr-bridge"), "ieee-802-1-vlans": dbus.Array( [ dbus.Dictionary( { "vid": dbus.UInt32(80), "name": dbus.String("vlan80"), }, signature="sv", ), dbus.Dictionary( { "vid": dbus.UInt32(4000), "name": dbus.String("My VLAN"), }, signature="sv", ), ] ), "ieee-802-1-ppvids": dbus.Array( [ dbus.Dictionary( { "ppvid": dbus.UInt32(4), "flags": dbus.UInt32(0x12), }, signature="sv", ), dbus.Dictionary( { "ppvid": dbus.UInt32(10), "flags": dbus.UInt32(0x31), }, signature="sv", ), ] ), } ), dbus.Dictionary( { "chassis-id-type": dbus.UInt32(6), "chassis-id": dbus.String("00:11:22:33:44:22"), "port-id-type": dbus.UInt32(1), "port-id": dbus.String("port1"), "port-description": dbus.String("GigabitEthernet #3"), "system-name": dbus.String("test3.example.com"), "system-description": dbus.String("Test system #3"), "system-capabilities": dbus.UInt32(40), "destination": dbus.String("nearest-customer-bridge"), "management-addresses": dbus.Array( [ dbus.Dictionary( { "address-subtype": dbus.UInt32(1), "address": dbus.ByteArray( b"\xc0\xa8\x01\x01" ), "interface-number": dbus.UInt32(4), "interface-number-subtype": dbus.UInt32(3), "object-id": dbus.ByteArray( b"\x01\x02\x03\x04" ), }, signature="sv", ), dbus.Dictionary( { "address-subtype": dbus.UInt32(2), "address": dbus.ByteArray( b"\xfd\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x12\x34\x56\x78" ), "interface-number": dbus.UInt32(1), "interface-number-subtype": dbus.UInt32(2), }, signature="sv", ), ] ), "ieee-802-3-mac-phy-conf": dbus.Dictionary( { "autoneg": dbus.UInt32(3), "pmd-autoneg-cap": dbus.UInt32(0xFE), "operational-mau-type": dbus.UInt32(5), }, signature="sv", ), "ieee-802-3-power-via-mdi": dbus.Dictionary( { "mdi-power-support": dbus.UInt32(7), "pse-power-pair": dbus.UInt32(6), "power-class": dbus.UInt32(1), }, signature="sv", ), } ), ], "a{sv}", ), } self.dbus_interface_add(IFACE_DEVICE, props) def start(self): self.ip4_config = IP4Config() self._dbus_property_set( IFACE_DEVICE, PRP_DEVICE_IP4_CONFIG, ExportedObj.to_path(self.ip4_config) ) self.ip6_config = IP6Config() self._dbus_property_set( IFACE_DEVICE, PRP_DEVICE_IP6_CONFIG, ExportedObj.to_path(self.ip6_config) ) self.dhcp4_config = Dhcp4Config() self._dbus_property_set( IFACE_DEVICE, PRP_DEVICE_DHCP4_CONFIG, ExportedObj.to_path(self.dhcp4_config), ) self.dhcp6_config = Dhcp6Config() self._dbus_property_set( IFACE_DEVICE, PRP_DEVICE_DHCP6_CONFIG, ExportedObj.to_path(self.dhcp6_config), ) def stop(self): self._dbus_property_set( IFACE_DEVICE, PRP_DEVICE_IP4_CONFIG, ExportedObj.to_path(None) ) if self.ip4_config is not None: self.ip4_config.unexport() self.ip4_config = None self._dbus_property_set( IFACE_DEVICE, PRP_DEVICE_IP6_CONFIG, ExportedObj.to_path(None) ) if self.ip6_config is not None: self.ip6_config.unexport() self.ip6_config = None self._dbus_property_set( IFACE_DEVICE, PRP_DEVICE_DHCP4_CONFIG, ExportedObj.to_path(None) ) if self.dhcp4_config is not None: self.dhcp4_config.unexport() self.dhcp4_config = None self._dbus_property_set( IFACE_DEVICE, PRP_DEVICE_DHCP6_CONFIG, ExportedObj.to_path(None) ) if self.dhcp6_config is not None: self.dhcp6_config.unexport() self.dhcp6_config = None @dbus.service.method(dbus_interface=IFACE_DEVICE, in_signature="", out_signature="") def Disconnect(self): pass @dbus.service.method(dbus_interface=IFACE_DEVICE, in_signature="", out_signature="") def Delete(self): # We don't currently support any software device types, so... raise BusErr.NotSoftwareException() pass @dbus.service.signal(IFACE_DEVICE, signature="uuu") def StateChanged(self, new_state, old_state, reason): pass def set_state(self, state, reason): # libnm is plugged on notify::state-reason and not on state-changed dbus signal # so we must simulate the change of property to emit a state-changed signal on libnm self._dbus_property_set(IFACE_DEVICE, PRP_NM_STATE, dbus.UInt32(state)) self._dbus_property_set( IFACE_DEVICE, PRP_DEVICE_STATE_REASON, (dbus.UInt32(state), dbus.UInt32(reason)), ) old_state = self.prp_state self.prp_state = state self.StateChanged( dbus.UInt32(self.prp_state), dbus.UInt32(old_state), dbus.UInt32(reason) ) def set_carrier_status(self, carrier_status): self._dbus_property_set(IFACE_WIRED, PRP_WIRED_CARRIER, carrier_status) def set_active_connection(self, ac): self._dbus_property_set(IFACE_DEVICE, PRP_DEVICE_ACTIVE_CONNECTION, ac) def connection_is_available(self, con_inst): if con_inst.is_vpn(): return False if isinstance(self, WiredDevice): if con_inst.get_type() == NM.SETTING_WIRED_SETTING_NAME: return True elif isinstance(self, WifiDevice): if con_inst.get_type() == NM.SETTING_WIRELESS_SETTING_NAME: return True elif isinstance(self, VlanDevice): if con_inst.get_type() == NM.SETTING_VLAN_SETTING_NAME: return True return False def available_connections_get(self): return [ c for c in gl.settings.get_connections() if self.connection_is_available(c) ] def available_connections_update(self): self._dbus_property_set( IFACE_DEVICE, PRP_DEVICE_AVAILABLE_CONNECTIONS, ExportedObj.to_path_array(self.available_connections_get()), ) @dbus.service.method(IFACE_TEST, in_signature="", out_signature="") def Start(self): self.start() @dbus.service.method(IFACE_TEST, in_signature="", out_signature="") def Stop(self): self.stop() ############################################################################### PRP_WIRED_HW_ADDRESS = "HwAddress" PRP_WIRED_PERM_HW_ADDRESS = "PermHwAddress" PRP_WIRED_SPEED = "Speed" PRP_WIRED_CARRIER = "Carrier" PRP_WIRED_S390_SUBCHANNELS = "S390Subchannels" class WiredDevice(Device): def __init__(self, iface, mac=None, subchannels=None, ident=None): Device.__init__(self, iface, NM.DeviceType.ETHERNET, ident) if mac is None: mac = Util.random_mac(self.ident) if subchannels is None: subchannels = dbus.Array(signature="s") props = { PRP_WIRED_HW_ADDRESS: mac, PRP_WIRED_PERM_HW_ADDRESS: mac, PRP_WIRED_SPEED: dbus.UInt32(100), PRP_WIRED_CARRIER: True, PRP_WIRED_S390_SUBCHANNELS: subchannels, } self.dbus_interface_add(IFACE_WIRED, props) ############################################################################### PM_CURRENT_CAPABILITIES = "CurrentCapabilities" PM_MODEM_CAPABILITIES = "ModemCapabilities" # capability to make device seen compatible with GSM connection NM_DEVICE_MODEM_CAPABILITY_GSM_UMTS = 0x00000004 class ModemDevice(Device): def __init__(self, iface): Device.__init__(self, iface, NM.DeviceType.MODEM) props = { PM_CURRENT_CAPABILITIES: dbus.UInt32(NM_DEVICE_MODEM_CAPABILITY_GSM_UMTS), PM_MODEM_CAPABILITIES: dbus.UInt32(0), } self.dbus_interface_add(IFACE_MODEM, props) ############################################################################### PRP_VLAN_HW_ADDRESS = "HwAddress" PRP_VLAN_CARRIER = "Carrier" PRP_VLAN_VLAN_ID = "VlanId" class VlanDevice(Device): def __init__(self, iface, ident=None): Device.__init__(self, iface, NM.DeviceType.VLAN, ident) props = { PRP_VLAN_HW_ADDRESS: Util.random_mac(self.ident), PRP_VLAN_CARRIER: False, PRP_VLAN_VLAN_ID: dbus.UInt32(1), } self.dbus_interface_add(IFACE_VLAN, props) ############################################################################### PRP_WIFI_AP_FLAGS = "Flags" PRP_WIFI_AP_WPA_FLAGS = "WpaFlags" PRP_WIFI_AP_RSN_FLAGS = "RsnFlags" PRP_WIFI_AP_SSID = "Ssid" PRP_WIFI_AP_FREQUENCY = "Frequency" PRP_WIFI_AP_HW_ADDRESS = "HwAddress" PRP_WIFI_AP_MODE = "Mode" PRP_WIFI_AP_MAX_BITRATE = "MaxBitrate" PRP_WIFI_AP_STRENGTH = "Strength" PRP_WIFI_AP_LAST_SEEN = "LastSeen" class WifiAp(ExportedObj): path_counter_next = 1 path_prefix = "/org/freedesktop/NetworkManager/AccessPoint/" def __init__( self, ssid, bssid=None, flags=None, wpaf=None, rsnf=None, freq=None, strength=None, ident=None, ): ExportedObj.__init__(self, ExportedObj.create_path(WifiAp), ident) NM_AP_FLAGS = getattr(NM, "80211ApSecurityFlags") if flags is None: flags = 0x1 if wpaf is None: wpaf = 0x0 wpaf = wpaf | NM_AP_FLAGS.PAIR_TKIP wpaf = wpaf | NM_AP_FLAGS.PAIR_CCMP wpaf = wpaf | NM_AP_FLAGS.GROUP_TKIP wpaf = wpaf | NM_AP_FLAGS.GROUP_CCMP wpaf = wpaf | NM_AP_FLAGS.KEY_MGMT_PSK if rsnf is None: rsnf = 0x0 rsnf = rsnf | NM_AP_FLAGS.PAIR_TKIP rsnf = rsnf | NM_AP_FLAGS.PAIR_CCMP rsnf = rsnf | NM_AP_FLAGS.GROUP_TKIP rsnf = rsnf | NM_AP_FLAGS.GROUP_CCMP rsnf = rsnf | NM_AP_FLAGS.KEY_MGMT_PSK if freq is None: freq = 2412 if bssid is None: bssid = Util.random_mac(self.path) if strength is None: strength = Util.random_int(self.path, 100) self.ssid = ssid props = { PRP_WIFI_AP_FLAGS: dbus.UInt32(flags), PRP_WIFI_AP_WPA_FLAGS: dbus.UInt32(wpaf), PRP_WIFI_AP_RSN_FLAGS: dbus.UInt32(rsnf), PRP_WIFI_AP_SSID: dbus.ByteArray(self.ssid.encode("utf-8")), PRP_WIFI_AP_FREQUENCY: dbus.UInt32(freq), PRP_WIFI_AP_HW_ADDRESS: bssid, PRP_WIFI_AP_MODE: dbus.UInt32(getattr(NM, "80211Mode").INFRA), PRP_WIFI_AP_MAX_BITRATE: dbus.UInt32(54000), PRP_WIFI_AP_STRENGTH: dbus.Byte(strength), PRP_WIFI_AP_LAST_SEEN: dbus.Int32(NM.utils_get_timestamp_msec() / 1000), } self.dbus_interface_add(IFACE_WIFI_AP, props) ############################################################################### PRP_WIFI_HW_ADDRESS = "HwAddress" PRP_WIFI_PERM_HW_ADDRESS = "PermHwAddress" PRP_WIFI_MODE = "Mode" PRP_WIFI_BITRATE = "Bitrate" PRP_WIFI_ACCESS_POINTS = "AccessPoints" PRP_WIFI_ACTIVE_ACCESS_POINT = "ActiveAccessPoint" PRP_WIFI_WIRELESS_CAPABILITIES = "WirelessCapabilities" PRP_WIFI_LAST_SCAN = "LastScan" class WifiDevice(Device): def __init__(self, iface, mac=None, ident=None): Device.__init__(self, iface, NM.DeviceType.WIFI, ident) if mac is None: mac = Util.random_mac(self.ident) self.aps = [] self.scan_cb_id = None # Use a randomly older timestamp to trigger RequestScan() from the client ts = max( 0, NM.utils_get_timestamp_msec() - Util.random_int(self.path, 20000, 40000) ) props = { PRP_WIFI_HW_ADDRESS: mac, PRP_WIFI_PERM_HW_ADDRESS: mac, PRP_WIFI_MODE: dbus.UInt32(getattr(NM, "80211Mode").INFRA), PRP_WIFI_BITRATE: dbus.UInt32(21000), PRP_WIFI_WIRELESS_CAPABILITIES: dbus.UInt32(0xFF), PRP_WIFI_ACCESS_POINTS: ExportedObj.to_path_array(self.aps), PRP_WIFI_ACTIVE_ACCESS_POINT: ExportedObj.to_path(None), PRP_WIFI_LAST_SCAN: dbus.Int64(ts), } self.dbus_interface_add(IFACE_WIFI, props) @dbus.service.method(dbus_interface=IFACE_WIFI, in_signature="", out_signature="ao") def GetAccessPoints(self): # only include non-hidden APs return ExportedObj.to_path_array([a for a in self.aps if a.ssid]) @dbus.service.method(dbus_interface=IFACE_WIFI, in_signature="", out_signature="ao") def GetAllAccessPoints(self): # include all APs including hidden ones return ExportedObj.to_path_array(self.aps) @dbus.service.method( dbus_interface=IFACE_WIFI, in_signature="a{sv}", out_signature="" ) def RequestScan(self, props): self.scan_cb_id = Util.g_source_remove(self.scan_cb_id) def cb(): ts = NM.utils_get_timestamp_msec() for ap in self.aps: ap._dbus_property_set( IFACE_WIFI_AP, PRP_WIFI_AP_LAST_SEEN, dbus.Int32(ts / 1000) ) self._dbus_property_set(IFACE_WIFI, PRP_WIFI_LAST_SCAN, dbus.Int64(ts)) self.scan_cb_id = None return False self.scan_cb_id = GLib.idle_add(cb) pass @dbus.service.signal(IFACE_WIFI, signature="o") def AccessPointAdded(self, ap_path): pass def add_ap(self, ap): ap.export() self.aps.append(ap) self._dbus_property_set( IFACE_WIFI, PRP_WIFI_ACCESS_POINTS, ExportedObj.to_path_array(self.aps) ) self.AccessPointAdded(ExportedObj.to_path(ap)) return ap def remove_ap(self, ap): self.aps.remove(ap) self._dbus_property_set( IFACE_WIFI, PRP_WIFI_ACCESS_POINTS, ExportedObj.to_path_array(self.aps) ) self.AccessPointRemoved(ExportedObj.to_path(ap)) ap.unexport() def stop(self): self.scan_cb_id = Util.g_source_remove(self.scan_cb_id) super(WifiDevice, self).stop() @dbus.service.signal(IFACE_WIFI, signature="o") def AccessPointRemoved(self, ap_path): pass def remove_ap_by_path(self, path): for ap in self.aps: if ap.path == path: self.remove_ap(ap) return raise BusErr.ApNotFoundException("AP %s not found" % path) ############################################################################### PRP_ACTIVE_CONNECTION_CONNECTION = "Connection" PRP_ACTIVE_CONNECTION_SPECIFIC_OBJECT = "SpecificObject" PRP_ACTIVE_CONNECTION_ID = "Id" PRP_ACTIVE_CONNECTION_UUID = "Uuid" PRP_ACTIVE_CONNECTION_TYPE = "Type" PRP_ACTIVE_CONNECTION_DEVICES = "Devices" PRP_ACTIVE_CONNECTION_STATE = "State" PRP_ACTIVE_CONNECTION_DEFAULT = "Default" PRP_ACTIVE_CONNECTION_IP4CONFIG = "Ip4Config" PRP_ACTIVE_CONNECTION_DHCP4CONFIG = "Dhcp4Config" PRP_ACTIVE_CONNECTION_DEFAULT6 = "Default6" PRP_ACTIVE_CONNECTION_IP6CONFIG = "Ip6Config" PRP_ACTIVE_CONNECTION_DHCP6CONFIG = "Dhcp6Config" PRP_ACTIVE_CONNECTION_VPN = "Vpn" PRP_ACTIVE_CONNECTION_MASTER = "Master" PRP_VPN_CONNECTION_VPN_STATE = "VpnState" PRP_VPN_CONNECTION_BANNER = "Banner" class ActiveConnection(ExportedObj): path_counter_next = 1 path_prefix = "/org/freedesktop/NetworkManager/ActiveConnection/" def __init__(self, device, con_inst, specific_object): ExportedObj.__init__(self, ExportedObj.create_path(ActiveConnection)) self.device = device self.con_inst = con_inst self.is_vpn = con_inst.is_vpn() self._activation_id = None self._deactivation_id = None self.activation_state_change_delay_ms = 50 s_con = con_inst.con_hash[NM.SETTING_CONNECTION_SETTING_NAME] props = { PRP_ACTIVE_CONNECTION_CONNECTION: ExportedObj.to_path(con_inst), PRP_ACTIVE_CONNECTION_SPECIFIC_OBJECT: ExportedObj.to_path(specific_object), PRP_ACTIVE_CONNECTION_ID: s_con[NM.SETTING_CONNECTION_ID], PRP_ACTIVE_CONNECTION_UUID: s_con[NM.SETTING_CONNECTION_UUID], PRP_ACTIVE_CONNECTION_TYPE: s_con[NM.SETTING_CONNECTION_TYPE], PRP_ACTIVE_CONNECTION_DEVICES: ExportedObj.to_path_array([self.device]), PRP_ACTIVE_CONNECTION_STATE: dbus.UInt32(NM.ActiveConnectionState.UNKNOWN), PRP_ACTIVE_CONNECTION_DEFAULT: False, PRP_ACTIVE_CONNECTION_IP4CONFIG: ExportedObj.to_path(None), PRP_ACTIVE_CONNECTION_DHCP4CONFIG: ExportedObj.to_path(None), PRP_ACTIVE_CONNECTION_DEFAULT6: False, PRP_ACTIVE_CONNECTION_IP6CONFIG: ExportedObj.to_path(None), PRP_ACTIVE_CONNECTION_DHCP6CONFIG: ExportedObj.to_path(None), PRP_ACTIVE_CONNECTION_VPN: self.is_vpn, PRP_ACTIVE_CONNECTION_MASTER: ExportedObj.to_path(None), } self.dbus_interface_add(IFACE_ACTIVE_CONNECTION, props) if self.is_vpn: props = { PRP_VPN_CONNECTION_VPN_STATE: dbus.UInt32( NM.VpnConnectionState.UNKNOWN ), PRP_VPN_CONNECTION_BANNER: "*** VPN connection %s ***" % (con_inst.get_id()), } self.dbus_interface_add(IFACE_VPN_CONNECTION, props) def _set_state(self, state, reason): state = dbus.UInt32(state) self._dbus_property_set( IFACE_ACTIVE_CONNECTION, PRP_ACTIVE_CONNECTION_STATE, state ) self.StateChanged(state, dbus.UInt32(reason)) def activation_cancel(self): self._activation_id = Util.g_source_remove(self._activation_id) def _activation_step2(self): assert self._activation_id is not None self._activation_id = None s_con = self.con_inst.con_hash[NM.SETTING_CONNECTION_SETTING_NAME] conn_id = s_con[NM.SETTING_CONNECTION_ID] if gl.force_activation_failure.get(conn_id, False): self._set_state( NM.ActiveConnectionState.DEACTIVATED, NM.ActiveConnectionStateReason.UNKNOWN, ) self.device.set_state(NM.DeviceState.FAILED, NM.DeviceStateReason.UNKNOWN) else: self._set_state( NM.ActiveConnectionState.ACTIVATED, NM.ActiveConnectionStateReason.UNKNOWN, ) self.device.set_state(NM.DeviceState.ACTIVATED, NM.DeviceStateReason.NONE) return False def _activation_step1(self): assert self._activation_id is not None self._activation_id = GLib.timeout_add( self.activation_state_change_delay_ms, self._activation_step2 ) self.device.set_active_connection(self) self.device.set_state(NM.DeviceState.PREPARE, NM.DeviceStateReason.NONE) self._set_state( NM.ActiveConnectionState.ACTIVATING, NM.ActiveConnectionStateReason.UNKNOWN ) return False def _deactivation_step1(self): assert self._deactivation_id is not None self._deactivation_id = None self.device.set_state( NM.DeviceState.DISCONNECTED, NM.DeviceStateReason.USER_REQUESTED ) self._set_state( NM.ActiveConnectionState.DEACTIVATED, NM.ActiveConnectionStateReason.USER_DISCONNECTED, ) return False def set_state(self, state, reason): self._set_state(state, reason) def start_activation(self): assert self._activation_id is None self._activation_id = GLib.timeout_add( self.activation_state_change_delay_ms, self._activation_step1 ) def start_deactivation(self): assert self._deactivation_id is None self.device.set_state( NM.DeviceState.DEACTIVATING, NM.DeviceStateReason.USER_REQUESTED ) self._set_state( NM.ActiveConnectionState.DEACTIVATING, NM.ActiveConnectionStateReason.USER_DISCONNECTED, ) self._deactivation_id = GLib.timeout_add(50, self._deactivation_step1) @dbus.service.signal(IFACE_ACTIVE_CONNECTION, signature="uu") def StateChanged(self, state, reason): pass @dbus.service.signal(IFACE_VPN_CONNECTION, signature="uu") def VpnStateChanged(self, state, reason): pass ############################################################################### PRP_NM_DEVICES = "Devices" PRP_NM_ALL_DEVICES = "AllDevices" PRP_NM_NETWORKING_ENABLED = "NetworkingEnabled" PRP_NM_WWAN_ENABLED = "WwanEnabled" PRP_NM_WWAN_HARDWARE_ENABLED = "WwanHardwareEnabled" PRP_NM_WIRELESS_ENABLED = "WirelessEnabled" PRP_NM_WIRELESS_HARDWARE_ENABLED = "WirelessHardwareEnabled" PRP_NM_WIMAX_ENABLED = "WimaxEnabled" PRP_NM_WIMAX_HARDWARE_ENABLED = "WimaxHardwareEnabled" PRP_NM_ACTIVE_CONNECTIONS = "ActiveConnections" PRP_NM_PRIMARY_CONNECTION = "PrimaryConnection" PRP_NM_ACTIVATING_CONNECTION = "ActivatingConnection" PRP_NM_STARTUP = "Startup" PRP_NM_STATE = "State" PRP_NM_VERSION = "Version" PRP_NM_CONNECTIVITY = "Connectivity" class NetworkManager(ExportedObj): def __init__(self): ExportedObj.__init__(self, "/org/freedesktop/NetworkManager") self.devices = [] self.active_connections = [] props = { PRP_NM_DEVICES: ExportedObj.to_path_array(self.devices), PRP_NM_ALL_DEVICES: ExportedObj.to_path_array(self.devices), PRP_NM_NETWORKING_ENABLED: True, PRP_NM_WWAN_ENABLED: True, PRP_NM_WWAN_HARDWARE_ENABLED: True, PRP_NM_WIRELESS_ENABLED: True, PRP_NM_WIRELESS_HARDWARE_ENABLED: True, PRP_NM_WIMAX_ENABLED: True, PRP_NM_WIMAX_HARDWARE_ENABLED: True, PRP_NM_ACTIVE_CONNECTIONS: ExportedObj.to_path_array( self.active_connections ), PRP_NM_PRIMARY_CONNECTION: ExportedObj.to_path(None), PRP_NM_ACTIVATING_CONNECTION: ExportedObj.to_path(None), PRP_NM_STARTUP: False, PRP_NM_STATE: dbus.UInt32(NM.State.DISCONNECTED), PRP_NM_VERSION: "0.9.9.0", PRP_NM_CONNECTIVITY: dbus.UInt32(NM.ConnectivityState.NONE), } self.dbus_interface_add(IFACE_NM, props) self.export() @dbus.service.signal(IFACE_NM, signature="u") def StateChanged(self, new_state): pass def set_state(self, new_state): self._dbus_property_set(IFACE_NM, PRP_NM_STATE, new_state) self.StateChanged(dbus.UInt32(new_state)) @dbus.service.method(dbus_interface=IFACE_NM, in_signature="", out_signature="ao") def GetDevices(self): return ExportedObj.to_path_array(self.devices) @dbus.service.method(dbus_interface=IFACE_NM, in_signature="", out_signature="ao") def GetAllDevices(self): return ExportedObj.to_path_array(self.devices) @dbus.service.method(dbus_interface=IFACE_NM, in_signature="s", out_signature="o") def GetDeviceByIpIface(self, ip_iface): d = self.find_device_first( ip_iface=ip_iface, require=BusErr.UnknownDeviceException ) return ExportedObj.to_path(d) @dbus.service.method(dbus_interface=IFACE_NM, in_signature="ooo", out_signature="o") def ActivateConnection(self, conpath, devpath, specific_object): try: con_inst = gl.settings.get_connection(conpath) except Exception as e: raise BusErr.UnknownConnectionException("Connection not found") con_hash = con_inst.con_hash con_type = NmUtil.con_hash_get_type(con_hash) device = self.find_device_first(path=devpath) if not device: if con_type == NM.SETTING_WIRED_SETTING_NAME: device = self.find_device_first(dev_type=WiredDevice) elif con_type == NM.SETTING_WIRELESS_SETTING_NAME: device = self.find_device_first(dev_type=WifiDevice) elif con_type == NM.SETTING_VLAN_SETTING_NAME: ifname = con_hash[NM.SETTING_CONNECTION_SETTING_NAME]["interface-name"] device = VlanDevice(ifname) self.add_device(device) elif con_type == NM.SETTING_VPN_SETTING_NAME: for ac in self.active_connections: if ac.is_vpn: continue if ac.device: device = ac.device break if not device: raise BusErr.UnknownDeviceException( "No device found for the requested iface." ) # See if we need secrets. For the moment, we only support WPA if "802-11-wireless-security" in con_hash: s_wsec = con_hash["802-11-wireless-security"] if s_wsec["key-mgmt"] == "wpa-psk" and "psk" not in s_wsec: secrets = gl.agent_manager.get_secrets( con_hash, conpath, "802-11-wireless-security" ) if secrets is None: raise BusErr.NoSecretsException("No secret agent available") if "802-11-wireless-security" not in secrets: raise BusErr.NoSecretsException("No secrets provided") s_wsec = secrets["802-11-wireless-security"] if "psk" not in s_wsec: raise BusErr.NoSecretsException("No secrets provided") ac = ActiveConnection(device, con_inst, None) self.active_connection_add(ac) gl.manager.devices_available_connections_update() return ExportedObj.to_path(ac) def active_connection_add(self, ac): ac.export() self.active_connections.append(ac) self._dbus_property_set( IFACE_NM, PRP_NM_ACTIVE_CONNECTIONS, ExportedObj.to_path_array(self.active_connections), ) ac.start_activation() def active_connection_remove(self, ac): ac.activation_cancel() self.active_connections.remove(ac) self._dbus_property_set( IFACE_NM, PRP_NM_ACTIVE_CONNECTIONS, ExportedObj.to_path_array(self.active_connections), ) ac.unexport() @dbus.service.method( dbus_interface=IFACE_NM, in_signature="a{sa{sv}}oo", out_signature="oo" ) def AddAndActivateConnection(self, con_hash, devpath, specific_object): conpath, acpath, result = self.AddAndActivateConnection2( con_hash, devpath, specific_object, dict() ) return (conpath, acpath) @dbus.service.method( dbus_interface=IFACE_NM, in_signature="a{sa{sv}}ooa{sv}", out_signature="ooa{sv}", ) def AddAndActivateConnection2(self, con_hash, devpath, specific_object, options): device = self.find_device_first( path=devpath, require=BusErr.UnknownDeviceException ) conpath = gl.settings.AddConnection(con_hash) return (conpath, self.ActivateConnection(conpath, devpath, specific_object), []) @dbus.service.method(dbus_interface=IFACE_NM, in_signature="o", out_signature="") def DeactivateConnection(self, active_connection): # Look for an active connection with the same object path for ac in self.active_connections: if ac.path == str(active_connection): ac.activation_cancel() ac.start_deactivation() return raise BusErr.UnknownConnectionException( "Connection not found: %s" % str(active_connection) ) @dbus.service.method(dbus_interface=IFACE_NM, in_signature="b", out_signature="") def Sleep(self, do_sleep): if do_sleep: state = NM.State.ASLEEP else: state = NM.State.DISCONNECTED self.set_state(state) @dbus.service.method(dbus_interface=IFACE_NM, in_signature="b", out_signature="") def Enable(self, do_enable): pass @dbus.service.method( dbus_interface=IFACE_NM, in_signature="", out_signature="a{ss}" ) def GetPermissions(self): return { "org.freedesktop.NetworkManager.enable-disable-network": "yes", "org.freedesktop.NetworkManager.sleep-wake": "no", "org.freedesktop.NetworkManager.enable-disable-wifi": "yes", "org.freedesktop.NetworkManager.enable-disable-wwan": "yes", "org.freedesktop.NetworkManager.enable-disable-wimax": "yes", "org.freedesktop.NetworkManager.network-control": "yes", "org.freedesktop.NetworkManager.wifi.share.protected": "yes", "org.freedesktop.NetworkManager.wifi.share.open": "yes", "org.freedesktop.NetworkManager.settings.modify.own": "yes", "org.freedesktop.NetworkManager.settings.modify.system": "yes", "org.freedesktop.NetworkManager.settings.modify.hostname": "yes", "org.freedesktop.NetworkManager.settings.modify.global-dns": "no", "org.freedesktop.NetworkManager.reload": "no", } @dbus.service.method(dbus_interface=IFACE_NM, in_signature="ss", out_signature="") def SetLogging(self, level, domains): pass @dbus.service.method(dbus_interface=IFACE_NM, in_signature="", out_signature="ss") def GetLogging(self): return ("info", "HW,RFKILL,CORE,DEVICE,WIFI,ETHER") @dbus.service.method(dbus_interface=IFACE_NM, in_signature="", out_signature="u") def CheckConnectivity(self): raise BusErr.PermissionDeniedException("You fail") @dbus.service.signal(IFACE_NM, signature="o") def DeviceAdded(self, devpath): pass def find_devices( self, ident=_DEFAULT_ARG, path=_DEFAULT_ARG, iface=_DEFAULT_ARG, ip_iface=_DEFAULT_ARG, dev_type=_DEFAULT_ARG, ): r = None for d in self.devices: if ident is not _DEFAULT_ARG: if d.ident != ident: continue if path is not _DEFAULT_ARG: if d.path != path: continue if iface is not _DEFAULT_ARG: if d.iface != iface: continue if ip_iface is not _DEFAULT_ARG: # ignore iface/ip_iface distinction for now if d.iface != ip_iface: continue if dev_type is not _DEFAULT_ARG: if not isinstance(d, dev_type): continue yield d def find_device_first( self, ident=_DEFAULT_ARG, path=_DEFAULT_ARG, iface=_DEFAULT_ARG, ip_iface=_DEFAULT_ARG, dev_type=_DEFAULT_ARG, require=None, ): r = None for d in self.find_devices( ident=ident, path=path, iface=iface, ip_iface=ip_iface, dev_type=dev_type ): r = d break if r is None and require: if require is TestError: raise TestError("Device not found") raise BusErr.UnknownDeviceException("Device not found") return r def add_device(self, device): if self.find_device_first(ident=device.ident, path=device.path) is not None: raise TestError( "Duplicate device ident=%s / path=%s" % (device.ident, device.path) ) device.export() self.devices.append(device) self._dbus_property_set( IFACE_NM, PRP_NM_DEVICES, ExportedObj.to_path_array(self.devices) ) self._dbus_property_set( IFACE_NM, PRP_NM_ALL_DEVICES, ExportedObj.to_path_array(self.devices) ) self.DeviceAdded(ExportedObj.to_path(device)) device.start() return device def remove_device(self, device): device.stop() self.devices.remove(device) self._dbus_property_set( IFACE_NM, PRP_NM_DEVICES, ExportedObj.to_path_array(self.devices) ) self._dbus_property_set( IFACE_NM, PRP_NM_ALL_DEVICES, ExportedObj.to_path_array(self.devices) ) self.DeviceRemoved(ExportedObj.to_path(device)) device.unexport() def devices_available_connections_update(self): for d in self.devices: d.available_connections_update() @dbus.service.signal(IFACE_NM, signature="o") def DeviceRemoved(self, devpath): pass @dbus.service.method(IFACE_TEST, in_signature="", out_signature="") def Quit(self): gl.mainloop.quit() @dbus.service.method(IFACE_TEST, in_signature="a{ss}", out_signature="a(sss)") def FindConnections(self, selector_args): return [ (c.path, c.get_uuid(), c.get_id()) for c in gl.settings.find_connections(**selector_args) ] @dbus.service.method(IFACE_TEST, in_signature="a(oa(sa(sv)))", out_signature="") def SetProperties(self, all_args): for i in [0, 1]: for path, iface_args in all_args: o = gl.object_manager.find_object(path) if o is None: raise TestError("Object %s does not exist" % (path)) for iface_name, args in iface_args: for propname, value in args: o._dbus_property_set( iface_name, propname, value, allow_detect_dbus_iface=True, dry_run=(i == 0), ) @dbus.service.method(IFACE_TEST, in_signature="sa{sv}", out_signature="o") def AddObj(self, class_name, args): if class_name in ["WiredDevice", "WifiDevice"]: py_class = globals()[class_name] d = py_class(**args) return ExportedObj.to_path(self.add_device(d)) elif class_name in ["WifiAp"]: if "device" not in args: raise TestError('missing "device" paramter') d = self.find_device_first(ident=args["device"], require=TestError) del args["device"] if "ssid" not in args: args["ssid"] = d.ident + "-ap-" + str(WifiAp.path_counter_next) ap = WifiAp(**args) return ExportedObj.to_path(d.add_ap(ap)) raise TestError('Invalid python type "%s"' % (class_name)) @dbus.service.method(IFACE_TEST, in_signature="ssas", out_signature="o") def AddWiredDevice(self, ifname, mac, subchannels): dev = WiredDevice(ifname, mac, subchannels) return ExportedObj.to_path(self.add_device(dev)) @dbus.service.method(IFACE_TEST, in_signature="s", out_signature="o") def AddModemDevice(self, ifname): dev = ModemDevice(ifname) return ExportedObj.to_path(self.add_device(dev)) @dbus.service.method(IFACE_TEST, in_signature="s", out_signature="o") def AddWifiDevice(self, ifname): dev = WifiDevice(ifname) return ExportedObj.to_path(self.add_device(dev)) @dbus.service.method(IFACE_TEST, in_signature="o", out_signature="") def RemoveDevice(self, path): d = self.find_device_first(path=path, require=TestError) self.remove_device(d) @dbus.service.method(IFACE_TEST, in_signature="sss", out_signature="o") def AddWifiAp(self, ident, ssid, bssid): d = self.find_device_first(ident=ident, require=TestError) ap = WifiAp(ssid, bssid) return ExportedObj.to_path(d.add_ap(ap)) @dbus.service.method(IFACE_TEST, in_signature="so", out_signature="") def RemoveWifiAp(self, ident, ap_path): d = self.find_device_first(ident=ident, require=TestError) d.remove_ap_by_path(ap_path) @dbus.service.method(IFACE_TEST, in_signature="", out_signature="") def AutoRemoveNextConnection(self): gl.settings.auto_remove_next_connection() @dbus.service.method( dbus_interface=IFACE_TEST, in_signature="a{sa{sv}}b", out_signature="o" ) def AddConnection(self, con_hash, do_verify_strict): return gl.settings.add_connection(con_hash, do_verify_strict) @dbus.service.method(dbus_interface=IFACE_TEST, in_signature="sb", out_signature="") def SetActiveConnectionFailure(self, connection_id, failure): gl.force_activation_failure[connection_id] = failure @dbus.service.method(dbus_interface=IFACE_TEST, in_signature="ou", out_signature="") def SetActiveConnectionStateChangedDelay(self, devpath, delay_ms): for ac in reversed(self.active_connections): if ac.device.path == devpath: ac.activation_state_change_delay_ms = delay_ms return raise BusErr.UnknownDeviceException( "Device with iface '%s' not found" % devpath ) @dbus.service.method( dbus_interface=IFACE_TEST, in_signature="ouu", out_signature="" ) def SetActiveConnectionState(self, devpath, state, reason): for ac in reversed(self.active_connections): if ac.device.path == devpath: ac.set_state(state, reason) return raise BusErr.UnknownDeviceException( "Device with iface '%s' not found" % devpath ) @dbus.service.method( dbus_interface=IFACE_TEST, in_signature="ouu", out_signature="" ) def SetDeviceState(self, devpath, state, reason): try: nmstate = NM.DeviceState(state) except ValueError as e: raise BusErr.InvalidPropertyException("Invalid device state: " % e) try: nmreason = NM.DeviceStateReason(reason) except ValueError as e: raise BusErr.InvalidPropertyException("Invalid device state reason: " % e) for d in self.devices: if d.path == devpath: d.set_state(nmstate, nmreason) return raise BusErr.UnknownDeviceException( "Device with iface '%s' not found" % devpath ) @dbus.service.method(dbus_interface=IFACE_TEST, in_signature="ob", out_signature="") def SetCarrierStatus(self, devpath, status): for d in self.devices: if d.path == devpath: d.set_carrier_status(status) return raise BusErr.UnknownDeviceException( "Device with iface '%s' not found" % devpath ) @dbus.service.method( dbus_interface=IFACE_TEST, in_signature="sa{sa{sv}}b", out_signature="" ) def UpdateConnection(self, path, con_hash, do_verify_strict): return gl.settings.update_connection(con_hash, path, do_verify_strict) @dbus.service.method( dbus_interface=IFACE_TEST, in_signature="ba{ss}", out_signature="" ) def ConnectionSetVisible(self, vis, selector_args): cons = list(gl.settings.find_connections(**selector_args)) assert len(cons) == 1 cons[0].SetVisible(vis) @dbus.service.method(dbus_interface=IFACE_TEST, in_signature="", out_signature="") def Restart(self): gl.bus.release_name("org.freedesktop.NetworkManager") gl.bus.request_name("org.freedesktop.NetworkManager") ############################################################################### PRP_CONNECTION_UNSAVED = "Unsaved" PRP_CONNECTION_FILENAME = "Filename" class Connection(ExportedObj): def __init__(self, path_counter, con_hash, do_verify_strict=True): path = "/org/freedesktop/NetworkManager/Settings/Connection/%s" % (path_counter) ExportedObj.__init__(self, path) s_con = con_hash.get(NM.SETTING_CONNECTION_SETTING_NAME) if s_con is None: s_con = {} con_hash[NM.SETTING_CONNECTION_SETTING_NAME] = s_con if NmUtil.con_hash_get_id(con_hash) is None: s_con[NM.SETTING_CONNECTION_ID] = "connection-%s" % (path_counter) if NmUtil.con_hash_get_uuid(con_hash) is None: s_con[NM.SETTING_CONNECTION_UUID] = str( uuid.uuid3(uuid.NAMESPACE_URL, path) ) NmUtil.con_hash_verify(con_hash, do_verify_strict=do_verify_strict) self.path = path self.con_hash = con_hash self.visible = True props = { PRP_CONNECTION_UNSAVED: False, PRP_CONNECTION_FILENAME: "/etc/NetworkManager/system-connections/" + self.get_id(), } self.dbus_interface_add(IFACE_CONNECTION, props) def get_id(self): return NmUtil.con_hash_get_id(self.con_hash) def get_uuid(self): return NmUtil.con_hash_get_uuid(self.con_hash) def get_type(self): return NmUtil.con_hash_get_type(self.con_hash) def is_vpn(self): return self.get_type() == NM.SETTING_VPN_SETTING_NAME def update_connection(self, con_hash, do_verify_strict): NmUtil.con_hash_verify(con_hash, do_verify_strict=do_verify_strict) old_uuid = self.get_uuid() new_uuid = NmUtil.con_hash_get_uuid(con_hash) if old_uuid != new_uuid: raise BusErr.InvalidPropertyException( "connection.uuid: cannot change the uuid from %s to %s" % (old_uuid, new_uuid) ) self.con_hash = con_hash self.Updated() @dbus.service.method( dbus_interface=IFACE_CONNECTION, in_signature="", out_signature="a{sa{sv}}" ) def GetSettings(self): if hasattr(self, "_remove_next_connection_cb"): self._remove_next_connection_cb() raise BusErr.UnknownConnectionException("Connection not found") if not self.visible: raise BusErr.PermissionDeniedException() return self.con_hash @dbus.service.method( dbus_interface=IFACE_CONNECTION, in_signature="b", out_signature="" ) def SetVisible(self, vis): self.visible = vis self.Updated() @dbus.service.method( dbus_interface=IFACE_CONNECTION, in_signature="", out_signature="" ) def Delete(self): gl.settings.delete_connection(self) @dbus.service.method( dbus_interface=IFACE_CONNECTION, in_signature="a{sa{sv}}", out_signature="" ) def Update(self, con_hash): self.update_connection(con_hash, True) @dbus.service.method( dbus_interface=IFACE_CONNECTION, in_signature="a{sa{sv}}ua{sv}", out_signature="a{sv}", ) def Update2(self, con_hash, flags, args): self.update_connection(con_hash, True) return [] @dbus.service.signal(IFACE_CONNECTION, signature="") def Removed(self): pass @dbus.service.signal(IFACE_CONNECTION, signature="") def Updated(self): pass ############################################################################### PRP_SETTINGS_HOSTNAME = "Hostname" PRP_SETTINGS_CAN_MODIFY = "CanModify" PRP_SETTINGS_CONNECTIONS = "Connections" class Settings(ExportedObj): def __init__(self): ExportedObj.__init__(self, "/org/freedesktop/NetworkManager/Settings") self.connections = {} self.c_counter = 0 self.remove_next_connection = False props = { PRP_SETTINGS_HOSTNAME: "foobar.baz", PRP_SETTINGS_CAN_MODIFY: True, PRP_SETTINGS_CONNECTIONS: dbus.Array([], "o"), } self.dbus_interface_add(IFACE_SETTINGS, props) self.export() def auto_remove_next_connection(self): self.remove_next_connection = True def get_connection(self, path): return self.connections[path] def get_connections(self, stable_order=True): cons = list(self.connections.values()) if stable_order: cons.sort( key=lambda c: (Util.random_int(c.get_id()), Util.random_int(c.path)) ) return cons def get_connection_paths(self, stable_order=True): return [c.path for c in self.get_connections(stable_order=stable_order)] def find_connections(self, path=None, con_id=None, con_uuid=None): for c in self.get_connections(): if path is not None: if c.path != path: continue if con_id is not None: if c.get_id() != con_id: continue if con_uuid is not None: if c.get_uuid() != con_uuid: continue yield c @dbus.service.method( dbus_interface=IFACE_SETTINGS, in_signature="", out_signature="ao" ) def ListConnections(self): return self.get_connection_paths() @dbus.service.method( dbus_interface=IFACE_SETTINGS, in_signature="a{sa{sv}}", out_signature="o" ) def AddConnection(self, con_hash): return self.add_connection(con_hash) @dbus.service.method( dbus_interface=IFACE_SETTINGS, in_signature="", out_signature="b" ) def ReloadConnections(self): return True def add_connection(self, con_hash, do_verify_strict=True): self.c_counter += 1 con_inst = Connection(self.c_counter, con_hash, do_verify_strict) uuid = con_inst.get_uuid() if uuid in [c.get_uuid() for c in self.get_connections(stable_order=False)]: raise BusErr.InvalidSettingException( "cannot add duplicate connection with uuid %s" % (uuid) ) con_inst.export() self.connections[con_inst.path] = con_inst self.NewConnection(con_inst.path) self._dbus_property_set( IFACE_SETTINGS, PRP_SETTINGS_CONNECTIONS, dbus.Array(self.get_connection_paths(), "o"), ) gl.manager.devices_available_connections_update() if self.remove_next_connection: self.remove_next_connection = False def cb(): if hasattr(con_inst, "_remove_next_connection_cb"): del con_inst._remove_next_connection_cb self.delete_connection(con_inst) return False # We will delete the connection right away on an idle handler. However, # the test races with initializing the connection (calling GetSettings()). # To avoid the race, we will check in GetSettings() whether the profile # is about to be deleted, and delete it first. con_inst._remove_next_connection_cb = cb GLib.idle_add(cb) return con_inst.path def update_connection(self, con_hash, path=None, do_verify_strict=True): if path not in self.connections: raise BusErr.UnknownConnectionException("Connection not found") self.connections[path].update_connection(con_hash, do_verify_strict) def delete_connection(self, con_inst): del self.connections[con_inst.path] self._dbus_property_set( IFACE_SETTINGS, PRP_SETTINGS_CONNECTIONS, dbus.Array(self.get_connection_paths(), "o"), ) con_inst.Removed() con_inst.unexport() gl.manager.devices_available_connections_update() @dbus.service.method( dbus_interface=IFACE_SETTINGS, in_signature="s", out_signature="" ) def SaveHostname(self, hostname): # Arbitrary requirement to test error handling if hostname.find(".") == -1: raise BusErr.InvalidHostnameException() self._dbus_property_set(IFACE_SETTINGS, PRP_SETTINGS_HOSTNAME, hostname) @dbus.service.signal(IFACE_SETTINGS, signature="o") def NewConnection(self, path): pass @dbus.service.method(IFACE_SETTINGS, in_signature="", out_signature="") def Quit(self): gl.mainloop.quit() ############################################################################### PRP_IP4_CONFIG_ADDRESSES = "Addresses" PRP_IP4_CONFIG_ADDRESSDATA = "AddressData" PRP_IP4_CONFIG_GATEWAY = "Gateway" PRP_IP4_CONFIG_ROUTES = "Routes" PRP_IP4_CONFIG_ROUTEDATA = "RouteData" PRP_IP4_CONFIG_NAMESERVERS = "Nameservers" PRP_IP4_CONFIG_DOMAINS = "Domains" PRP_IP4_CONFIG_SEARCHES = "Searches" PRP_IP4_CONFIG_DNSOPTIONS = "DnsOptions" PRP_IP4_CONFIG_DNSPRIORITY = "DnsPriority" PRP_IP4_CONFIG_WINSSERVERS = "WinsServers" class IP4Config(ExportedObj): path_counter_next = 1 path_prefix = "/org/freedesktop/NetworkManager/IP4Config/" def __init__(self, generate_seed=_DEFAULT_ARG): ExportedObj.__init__(self, ExportedObj.create_path(IP4Config)) if generate_seed is _DEFAULT_ARG: generate_seed = self.path props = self._props_generate(generate_seed) self.dbus_interface_add(IFACE_IP4_CONFIG, props) self.export() def _props_generate(self, generate_seed): seed = Util.RandomSeed.wrap(generate_seed) gateway = None if seed: if Util.random_bool(seed): gateway = Util.random_ip(seed, net="192.168.0.0/16")[0] addrs = [] if seed: for n in range(0, Util.random_int(seed, 4)): a = { "addr": Util.random_ip(seed, net="192.168.0.0/16")[0], "prefix": Util.random_int(seed, 17, 32), "gateway": gateway if n == 0 else None, } addrs.append(a) routes = [] if seed: for n in range(0, Util.random_int(seed, 4)): a = { "dest": Util.random_ip(seed, net="192.168.0.0/16")[0], "prefix": Util.random_int(seed, 17, 32), "next-hop": None if (Util.random_int(seed) % 3 == 0) else Util.random_ip(seed, net="192.168.0.0/16")[0], "metric": -1 if (Util.random_int(seed) % 3 == 0) else Util.random_int(seed, 0, 0xFFFFFFFF), } routes.append(a) nameservers = [] if seed: nameservers = list( [ Util.random_ip(seed, net="192.168.0.0/16")[0] for x in range(Util.random_int(seed, 4)) ] ) names_selection = [ "foo1.bar", "foo2.bar", "foo3.bar", "foo4.bar", "fo.o.bar", "fo.x.y", ] domains = [] if seed: domains = Util.random_subset(seed, ["dom4." + s for s in names_selection]) searches = [] if seed: domains = Util.random_subset(seed, ["sear4." + s for s in names_selection]) dnsoptions = [] if seed: dnsoptions = Util.random_subset( seed, ["dns4-opt1", "dns4-opt2", "dns4-opt3", "dns4-opt4"] ) dnspriority = 0 if seed: dnspriority = Util.random_int(seed, -10000, 10000) winsservers = [] if seed: winsservers = list( [ Util.random_ip(seed, net="192.168.0.0/16")[0] for x in range(Util.random_int(seed, 4)) ] ) return { PRP_IP4_CONFIG_ADDRESSES: dbus.Array( [ [ Util.ip4_addr_be32(a["addr"]), a["prefix"], Util.ip4_addr_be32(a["gateway"]) if a["gateway"] else 0, ] for a in addrs ], "au", ), PRP_IP4_CONFIG_ADDRESSDATA: dbus.Array( [ dbus.Dictionary( collections.OrderedDict( [ ("address", dbus.String(a["addr"])), ("prefix", dbus.UInt32(a["prefix"])), ] + ( [("gateway", dbus.String(a["gateway"]))] if a["gateway"] else [] ) ), "sv", ) for a in addrs ], "a{sv}", ), PRP_IP4_CONFIG_GATEWAY: dbus.String(gateway) if gateway else "", PRP_IP4_CONFIG_ROUTES: dbus.Array( [ [ Util.ip4_addr_be32(a["dest"]), a["prefix"], Util.ip4_addr_be32(a["next-hop"] or "0.0.0.0"), max(a["metric"], 0), ] for a in routes ], "au", ), PRP_IP4_CONFIG_ROUTEDATA: dbus.Array( [ dbus.Dictionary( collections.OrderedDict( [ ("dest", dbus.String(a["dest"])), ("prefix", dbus.UInt32(a["prefix"])), ] + ( [("next-hop", dbus.String(a["next-hop"]))] if a["next-hop"] else [] ) + ( [("metric", dbus.UInt32(a["metric"]))] if a["metric"] != -1 else [] ) ), "sv", ) for a in routes ], "a{sv}", ), PRP_IP4_CONFIG_NAMESERVERS: dbus.Array( [dbus.UInt32(Util.ip4_addr_be32(n)) for n in nameservers], "u" ), PRP_IP4_CONFIG_DOMAINS: dbus.Array(domains, "s"), PRP_IP4_CONFIG_SEARCHES: dbus.Array(searches, "s"), PRP_IP4_CONFIG_DNSOPTIONS: dbus.Array(dnsoptions, "s"), PRP_IP4_CONFIG_DNSPRIORITY: dbus.Int32(dnspriority), PRP_IP4_CONFIG_WINSSERVERS: dbus.Array( [dbus.UInt32(Util.ip4_addr_be32(n)) for n in winsservers], "u" ), } def props_regenerate(self, generate_seed): props = self.generate_props(generate_seed) for k, v in props.items(): self._dbus_property_set(IFACE_IP4_CONFIG, k, v) @dbus.service.method(IFACE_TEST, in_signature="s", out_signature="") def SetGateway(self, gateway): self._dbus_property_set(IFACE_IP4_CONFIG, PRP_IP4_CONFIG_GATEWAY, gateway) ############################################################################### PRP_IP6_CONFIG_ADDRESSES = "Addresses" PRP_IP6_CONFIG_ADDRESSDATA = "AddressData" PRP_IP6_CONFIG_GATEWAY = "Gateway" PRP_IP6_CONFIG_ROUTES = "Routes" PRP_IP6_CONFIG_ROUTEDATA = "RouteData" PRP_IP6_CONFIG_NAMESERVERS = "Nameservers" PRP_IP6_CONFIG_DOMAINS = "Domains" PRP_IP6_CONFIG_SEARCHES = "Searches" PRP_IP6_CONFIG_DNSOPTIONS = "DnsOptions" PRP_IP6_CONFIG_DNSPRIORITY = "DnsPriority" class IP6Config(ExportedObj): path_counter_next = 1 path_prefix = "/org/freedesktop/NetworkManager/IP6Config/" def __init__(self, generate_seed=_DEFAULT_ARG): ExportedObj.__init__(self, ExportedObj.create_path(IP6Config)) if generate_seed is _DEFAULT_ARG: generate_seed = self.path props = self._props_generate(generate_seed) self.dbus_interface_add(IFACE_IP6_CONFIG, props) self.export() def _props_generate(self, generate_seed): seed = Util.RandomSeed.wrap(generate_seed) gateway = None if seed: if Util.random_bool(seed): gateway = Util.random_ip(seed, net="2001:a::/64")[0] addrs = [] if seed: for n in range(0, Util.random_int(seed, 4)): a = { "addr": Util.random_ip(seed, net="2001:a::/64")[0], "prefix": Util.random_int(seed, 65, 128), "gateway": gateway if n == 0 else None, } addrs.append(a) routes = [] if seed: for n in range(0, Util.random_int(seed, 4)): a = { "dest": Util.random_ip(seed, net="2001:a::/64")[0], "prefix": Util.random_int(seed, 65, 128), "next-hop": None if (Util.random_int(seed) % 3 == 0) else Util.random_ip(seed, net="2001:a::/64")[0], "metric": -1 if (Util.random_int(seed) % 3 == 0) else Util.random_int(seed, 0, 0xFFFFFFFF), } routes.append(a) nameservers = [] if seed: nameservers = list( [ Util.random_ip(seed, net="2001:a::/64")[0] for x in range(Util.random_int(seed, 4)) ] ) names_selection = [ "foo1.bar", "foo2.bar", "foo3.bar", "foo4.bar", "fo.o.bar", "fo.x.y", ] domains = [] if seed: domains = Util.random_subset(seed, ["dom6." + s for s in names_selection]) searches = [] if seed: domains = Util.random_subset(seed, ["sear6." + s for s in names_selection]) dnsoptions = [] if seed: dnsoptions = Util.random_subset( seed, ["dns6-opt1", "dns6-opt2", "dns6-opt3", "dns6-opt4"] ) dnspriority = 0 if seed: dnspriority = Util.random_int(seed, -10000, 10000) return { PRP_IP6_CONFIG_ADDRESSES: dbus.Array( [ [ Util.ip6_addr_ay(a["addr"]), a["prefix"], Util.ip6_addr_ay(a["gateway"] or "::"), ] for a in addrs ], "(ayuay)", ), PRP_IP6_CONFIG_ADDRESSDATA: dbus.Array( [ dbus.Dictionary( collections.OrderedDict( [ ("address", dbus.String(a["addr"])), ("prefix", dbus.UInt32(a["prefix"])), ] + ( [("gateway", dbus.String(a["gateway"]))] if a["gateway"] else [] ) ), "sv", ) for a in addrs ], "a{sv}", ), PRP_IP6_CONFIG_GATEWAY: dbus.String(gateway) if gateway else "", PRP_IP6_CONFIG_ROUTES: dbus.Array( [ [ Util.ip6_addr_ay(a["dest"]), a["prefix"], Util.ip6_addr_ay(a["next-hop"] or "::"), max(a["metric"], 0), ] for a in routes ], "(ayuayu)", ), PRP_IP6_CONFIG_ROUTEDATA: dbus.Array( [ dbus.Dictionary( collections.OrderedDict( [ ("dest", dbus.String(a["dest"])), ("prefix", dbus.UInt32(a["prefix"])), ] + ( [("next-hop", dbus.String(a["next-hop"]))] if a["next-hop"] else [] ) + ( [("metric", dbus.UInt32(a["metric"]))] if a["metric"] != -1 else [] ) ), "sv", ) for a in routes ], "a{sv}", ), PRP_IP6_CONFIG_NAMESERVERS: dbus.Array( [Util.ip6_addr_ay(n) for n in nameservers], "ay" ), PRP_IP6_CONFIG_DOMAINS: dbus.Array(domains, "s"), PRP_IP6_CONFIG_SEARCHES: dbus.Array(searches, "s"), PRP_IP6_CONFIG_DNSOPTIONS: dbus.Array(dnsoptions, "s"), PRP_IP6_CONFIG_DNSPRIORITY: dbus.Int32(dnspriority), } def props_regenerate(self, generate_seed): props = self.generate_props(generate_seed) for k, v in props.items(): self._dbus_property_set(IFACE_IP6_CONFIG, k, v) ############################################################################### PRP_DHCP4_CONFIG_OPTIONS = "Options" class Dhcp4Config(ExportedObj): path_counter_next = 1 path_prefix = "/org/freedesktop/NetworkManager/DHCP4Config/" def __init__(self, generate_seed=_DEFAULT_ARG): ExportedObj.__init__(self, ExportedObj.create_path(Dhcp4Config)) if generate_seed is _DEFAULT_ARG: generate_seed = self.path props = self._props_generate(generate_seed) self.dbus_interface_add(IFACE_DHCP4_CONFIG, props) self.export() def _props_generate(self, generate_seed): seed = Util.RandomSeed.wrap(generate_seed) options = [] if seed: options = Util.random_subset( seed, [("dhcp-4-opt-" + str(i), "val-" + str(i)) for i in range(10)] ) return { PRP_DHCP4_CONFIG_OPTIONS: dbus.Dictionary( collections.OrderedDict(options), "sv" ) } def props_regenerate(self, generate_seed): props = self.generate_props(generate_seed) for k, v in props.items(): self._dbus_property_set(IFACE_DHCP4_CONFIG, k, v) ############################################################################### PRP_DHCP6_CONFIG_OPTIONS = "Options" class Dhcp6Config(ExportedObj): path_counter_next = 1 path_prefix = "/org/freedesktop/NetworkManager/DHCP6Config/" def __init__(self, generate_seed=_DEFAULT_ARG): ExportedObj.__init__(self, ExportedObj.create_path(Dhcp6Config)) if generate_seed is _DEFAULT_ARG: generate_seed = self.path props = self._props_generate(generate_seed) self.dbus_interface_add(IFACE_DHCP6_CONFIG, props) self.export() def _props_generate(self, generate_seed): seed = Util.RandomSeed.wrap(generate_seed) options = [] if seed: options = Util.random_subset( seed, [("dhcp-6-opt-" + str(i), "val-" + str(i)) for i in range(10)] ) return { PRP_DHCP4_CONFIG_OPTIONS: dbus.Dictionary( collections.OrderedDict(options), "sv" ) } def props_regenerate(self, generate_seed): props = self.generate_props(generate_seed) for k, v in props.items(): self._dbus_property_set(IFACE_DHCP6_CONFIG, k, v) ############################################################################### PRP_DNS_MANAGER_MODE = "Mode" PRP_DNS_MANAGER_RC_MANAGER = "RcManager" PRP_DNS_MANAGER_CONFIGURATION = "Configuration" class DnsManager(ExportedObj): def __init__(self): ExportedObj.__init__(self, "/org/freedesktop/NetworkManager/DnsManager") props = { PRP_DNS_MANAGER_MODE: "dnsmasq", PRP_DNS_MANAGER_RC_MANAGER: "symlink", PRP_DNS_MANAGER_CONFIGURATION: dbus.Array( [ dbus.Dictionary( { "nameservers": dbus.Array(["1.2.3.4", "5.6.7.8"], "s"), "priority": dbus.Int32(100), }, "sv", ) ], "a{sv}", ), } self.dbus_interface_add(IFACE_DNS_MANAGER, props) self.export() ############################################################################### PATH_SECRET_AGENT = "/org/freedesktop/NetworkManager/SecretAgent" FLAG_ALLOW_INTERACTION = 0x1 FLAG_REQUEST_NEW = 0x2 FLAG_USER_REQUESTED = 0x4 class AgentManager(dbus.service.Object): def __init__(self): dbus.service.Object.__init__( self, gl.bus, "/org/freedesktop/NetworkManager/AgentManager" ) self.agents = {} @dbus.service.method( dbus_interface=IFACE_AGENT_MANAGER, in_signature="s", out_signature="", sender_keyword="sender", ) def Register(self, name, sender=None): self.RegisterWithCapabilities(name, 0, sender) @dbus.service.method( dbus_interface=IFACE_AGENT_MANAGER, in_signature="su", out_signature="", sender_keyword="sender", ) def RegisterWithCapabilities(self, name, caps, sender=None): self.agents[sender] = gl.bus.get_object(sender, PATH_SECRET_AGENT) @dbus.service.method( dbus_interface=IFACE_AGENT_MANAGER, in_signature="", out_signature="", sender_keyword="sender", ) def Unregister(self, sender=None): del self.agents[sender] def get_secrets(self, con_hash, path, setting_name): if len(self.agents) == 0: return None secrets = {} for sender in self.agents: agent = self.agents[sender] try: secrets = agent.GetSecrets( con_hash, path, setting_name, dbus.Array([], "s"), FLAG_ALLOW_INTERACTION | FLAG_USER_REQUESTED, dbus_interface=IFACE_AGENT, ) break except dbus.DBusException as e: if e.get_dbus_name() == IFACE_AGENT + ".UserCanceled": raise BusErr.UserCanceledException("User canceled") continue return secrets ############################################################################### class ObjectManager(dbus.service.Object): def __init__(self, object_path): dbus.service.Object.__init__(self, gl.bus, object_path) self.objs = [] def find_object(self, path): for o in self.objs: if path == o.path: return o return None def add_object(self, obj): self.objs.append(obj) self.InterfacesAdded(obj.path, obj.get_managed_ifaces()) def remove_object(self, obj): self.objs.remove(obj) self.InterfacesRemoved(obj.path, obj.get_managed_ifaces().keys()) @dbus.service.signal(IFACE_OBJECT_MANAGER, signature="oa{sa{sv}}") def InterfacesAdded(self, name, ifaces): pass @dbus.service.signal(IFACE_OBJECT_MANAGER, signature="oas") def InterfacesRemoved(self, name, ifaces): pass @dbus.service.method( dbus_interface=IFACE_OBJECT_MANAGER, in_signature="", out_signature="a{oa{sa{sv}}}", sender_keyword="sender", ) def GetManagedObjects(self, sender=None): managed_objects = {} for obj in self.objs: managed_objects[obj.path] = obj.get_managed_ifaces() return managed_objects ############################################################################### def main(): dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) random.seed() global gl gl = Global() gl.mainloop = GLib.MainLoop() gl.bus = dbus.SessionBus() gl.force_activation_failure = {} gl.object_manager = ObjectManager("/org/freedesktop") gl.manager = NetworkManager() gl.settings = Settings() gl.dns_manager = DnsManager() gl.agent_manager = AgentManager() if not gl.bus.request_name("org.freedesktop.NetworkManager"): raise AssertionError( "Failure to request D-Bus name org.freedesktop.NetworkManager" ) # Watch stdin; if it closes, assume our parent has crashed, and exit id1 = GLib.io_add_watch( GLib.IOChannel.unix_new(0), GLib.PRIORITY_DEFAULT, GLib.IO_HUP, lambda io, condition: gl.mainloop.quit() or True, ) gl.mainloop.run() GLib.source_remove(id1) gl.agent_manager.remove_from_connection() gl.dns_manager.unexport() gl.settings.unexport() gl.manager.unexport() gl.object_manager.remove_from_connection() sys.exit(0) if __name__ == "__main__": main()