summaryrefslogtreecommitdiff
path: root/cloudinit/sources/helpers/netlink.py
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/sources/helpers/netlink.py')
-rw-r--r--cloudinit/sources/helpers/netlink.py187
1 files changed, 102 insertions, 85 deletions
diff --git a/cloudinit/sources/helpers/netlink.py b/cloudinit/sources/helpers/netlink.py
index e13d6834..2953e858 100644
--- a/cloudinit/sources/helpers/netlink.py
+++ b/cloudinit/sources/helpers/netlink.py
@@ -2,14 +2,14 @@
#
# This file is part of cloud-init. See LICENSE file for license information.
-from cloudinit import log as logging
-from cloudinit import util
-from collections import namedtuple
-
import os
import select
import socket
import struct
+from collections import namedtuple
+
+from cloudinit import log as logging
+from cloudinit import util
LOG = logging.getLogger(__name__)
@@ -47,29 +47,30 @@ OPER_TESTING = 4
OPER_DORMANT = 5
OPER_UP = 6
-RTAAttr = namedtuple('RTAAttr', ['length', 'rta_type', 'data'])
-InterfaceOperstate = namedtuple('InterfaceOperstate', ['ifname', 'operstate'])
-NetlinkHeader = namedtuple('NetlinkHeader', ['length', 'type', 'flags', 'seq',
- 'pid'])
+RTAAttr = namedtuple("RTAAttr", ["length", "rta_type", "data"])
+InterfaceOperstate = namedtuple("InterfaceOperstate", ["ifname", "operstate"])
+NetlinkHeader = namedtuple(
+ "NetlinkHeader", ["length", "type", "flags", "seq", "pid"]
+)
class NetlinkCreateSocketError(RuntimeError):
- '''Raised if netlink socket fails during create or bind.'''
+ """Raised if netlink socket fails during create or bind."""
def create_bound_netlink_socket():
- '''Creates netlink socket and bind on netlink group to catch interface
+ """Creates netlink socket and bind on netlink group to catch interface
down/up events. The socket will bound only on RTMGRP_LINK (which only
includes RTM_NEWLINK/RTM_DELLINK/RTM_GETLINK events). The socket is set to
non-blocking mode since we're only receiving messages.
:returns: netlink socket in non-blocking mode
:raises: NetlinkCreateSocketError
- '''
+ """
try:
- netlink_socket = socket.socket(socket.AF_NETLINK,
- socket.SOCK_RAW,
- socket.NETLINK_ROUTE)
+ netlink_socket = socket.socket(
+ socket.AF_NETLINK, socket.SOCK_RAW, socket.NETLINK_ROUTE
+ )
netlink_socket.bind((os.getpid(), RTMGRP_LINK))
netlink_socket.setblocking(0)
except socket.error as e:
@@ -80,7 +81,7 @@ def create_bound_netlink_socket():
def get_netlink_msg_header(data):
- '''Gets netlink message type and length
+ """Gets netlink message type and length
:param: data read from netlink socket
:returns: netlink message type
@@ -92,18 +93,20 @@ def get_netlink_msg_header(data):
__u32 nlmsg_seq; /* Sequence number */
__u32 nlmsg_pid; /* Sender port ID */
};
- '''
- assert (data is not None), ("data is none")
- assert (len(data) >= NLMSGHDR_SIZE), (
- "data is smaller than netlink message header")
- msg_len, msg_type, flags, seq, pid = struct.unpack(NLMSGHDR_FMT,
- data[:MSG_TYPE_OFFSET])
+ """
+ assert data is not None, "data is none"
+ assert (
+ len(data) >= NLMSGHDR_SIZE
+ ), "data is smaller than netlink message header"
+ msg_len, msg_type, flags, seq, pid = struct.unpack(
+ NLMSGHDR_FMT, data[:MSG_TYPE_OFFSET]
+ )
LOG.debug("Got netlink msg of type %d", msg_type)
return NetlinkHeader(msg_len, msg_type, flags, seq, pid)
def read_netlink_socket(netlink_socket, timeout=None):
- '''Select and read from the netlink socket if ready.
+ """Select and read from the netlink socket if ready.
:param: netlink_socket: specify which socket object to read from
:param: timeout: specify a timeout value (integer) to wait while reading,
@@ -111,8 +114,8 @@ def read_netlink_socket(netlink_socket, timeout=None):
:returns: string of data read (max length = <MAX_SIZE>) from socket,
if no data read, returns None
:raises: AssertionError if netlink_socket is None
- '''
- assert (netlink_socket is not None), ("netlink socket is none")
+ """
+ assert netlink_socket is not None, "netlink socket is none"
read_set, _, _ = select.select([netlink_socket], [], [], timeout)
# Incase of timeout,read_set doesn't contain netlink socket.
# just return from this function
@@ -126,32 +129,33 @@ def read_netlink_socket(netlink_socket, timeout=None):
def unpack_rta_attr(data, offset):
- '''Unpack a single rta attribute.
+ """Unpack a single rta attribute.
:param: data: string of data read from netlink socket
:param: offset: starting offset of RTA Attribute
:return: RTAAttr object with length, type and data. On error, return None.
:raises: AssertionError if data is None or offset is not integer.
- '''
- assert (data is not None), ("data is none")
- assert (type(offset) == int), ("offset is not integer")
- assert (offset >= RTATTR_START_OFFSET), (
- "rta offset is less than expected length")
+ """
+ assert data is not None, "data is none"
+ assert type(offset) == int, "offset is not integer"
+ assert (
+ offset >= RTATTR_START_OFFSET
+ ), "rta offset is less than expected length"
length = rta_type = 0
attr_data = None
try:
length = struct.unpack_from("H", data, offset=offset)[0]
- rta_type = struct.unpack_from("H", data, offset=offset+2)[0]
+ rta_type = struct.unpack_from("H", data, offset=offset + 2)[0]
except struct.error:
return None # Should mean our offset is >= remaining data
# Unpack just the attribute's data. Offset by 4 to skip length/type header
- attr_data = data[offset+RTA_DATA_START_OFFSET:offset+length]
+ attr_data = data[offset + RTA_DATA_START_OFFSET : offset + length]
return RTAAttr(length, rta_type, attr_data)
def read_rta_oper_state(data):
- '''Reads Interface name and operational state from RTA Data.
+ """Reads Interface name and operational state from RTA Data.
:param: data: string of data read from netlink socket
:returns: InterfaceOperstate object containing if_name and oper_state.
@@ -159,10 +163,11 @@ def read_rta_oper_state(data):
IFLA_IFNAME messages.
:raises: AssertionError if data is None or length of data is
smaller than RTATTR_START_OFFSET.
- '''
- assert (data is not None), ("data is none")
- assert (len(data) > RTATTR_START_OFFSET), (
- "length of data is smaller than RTATTR_START_OFFSET")
+ """
+ assert data is not None, "data is none"
+ assert (
+ len(data) > RTATTR_START_OFFSET
+ ), "length of data is smaller than RTATTR_START_OFFSET"
ifname = operstate = None
offset = RTATTR_START_OFFSET
while offset <= len(data):
@@ -170,15 +175,16 @@ def read_rta_oper_state(data):
if not attr or attr.length == 0:
break
# Each attribute is 4-byte aligned. Determine pad length.
- padlen = (PAD_ALIGNMENT -
- (attr.length % PAD_ALIGNMENT)) % PAD_ALIGNMENT
+ padlen = (
+ PAD_ALIGNMENT - (attr.length % PAD_ALIGNMENT)
+ ) % PAD_ALIGNMENT
offset += attr.length + padlen
if attr.rta_type == IFLA_OPERSTATE:
operstate = ord(attr.data)
elif attr.rta_type == IFLA_IFNAME:
- interface_name = util.decode_binary(attr.data, 'utf-8')
- ifname = interface_name.strip('\0')
+ interface_name = util.decode_binary(attr.data, "utf-8")
+ ifname = interface_name.strip("\0")
if not ifname or operstate is None:
return None
LOG.debug("rta attrs: ifname %s operstate %d", ifname, operstate)
@@ -186,12 +192,12 @@ def read_rta_oper_state(data):
def wait_for_nic_attach_event(netlink_socket, existing_nics):
- '''Block until a single nic is attached.
+ """Block until a single nic is attached.
:param: netlink_socket: netlink_socket to receive events
:param: existing_nics: List of existing nics so that we can skip them.
:raises: AssertionError if netlink_socket is none.
- '''
+ """
LOG.debug("Preparing to wait for nic attach.")
ifname = None
@@ -204,19 +210,21 @@ def wait_for_nic_attach_event(netlink_socket, existing_nics):
# We can return even if the operational state of the new nic is DOWN
# because we set it to UP before doing dhcp.
- read_netlink_messages(netlink_socket,
- None,
- [RTM_NEWLINK],
- [OPER_UP, OPER_DOWN],
- should_continue_cb)
+ read_netlink_messages(
+ netlink_socket,
+ None,
+ [RTM_NEWLINK],
+ [OPER_UP, OPER_DOWN],
+ should_continue_cb,
+ )
return ifname
def wait_for_nic_detach_event(netlink_socket):
- '''Block until a single nic is detached and its operational state is down.
+ """Block until a single nic is detached and its operational state is down.
:param: netlink_socket: netlink_socket to receive events.
- '''
+ """
LOG.debug("Preparing to wait for nic detach.")
ifname = None
@@ -225,16 +233,14 @@ def wait_for_nic_detach_event(netlink_socket):
ifname = iname
return False
- read_netlink_messages(netlink_socket,
- None,
- [RTM_DELLINK],
- [OPER_DOWN],
- should_continue_cb)
+ read_netlink_messages(
+ netlink_socket, None, [RTM_DELLINK], [OPER_DOWN], should_continue_cb
+ )
return ifname
def wait_for_media_disconnect_connect(netlink_socket, ifname):
- '''Block until media disconnect and connect has happened on an interface.
+ """Block until media disconnect and connect has happened on an interface.
Listens on netlink socket to receive netlink events and when the carrier
changes from 0 to 1, it considers event has happened and
return from this function
@@ -242,10 +248,10 @@ def wait_for_media_disconnect_connect(netlink_socket, ifname):
:param: netlink_socket: netlink_socket to receive events
:param: ifname: Interface name to lookout for netlink events
:raises: AssertionError if netlink_socket is None or ifname is None.
- '''
- assert (netlink_socket is not None), ("netlink socket is none")
- assert (ifname is not None), ("interface name is none")
- assert (len(ifname) > 0), ("interface name cannot be empty")
+ """
+ assert netlink_socket is not None, "netlink socket is none"
+ assert ifname is not None, "interface name is none"
+ assert len(ifname) > 0, "interface name cannot be empty"
def should_continue_cb(iname, carrier, prevCarrier):
# check for carrier down, up sequence
@@ -256,19 +262,23 @@ def wait_for_media_disconnect_connect(netlink_socket, ifname):
return True
LOG.debug("Wait for media disconnect and reconnect to happen")
- read_netlink_messages(netlink_socket,
- ifname,
- [RTM_NEWLINK, RTM_DELLINK],
- [OPER_UP, OPER_DOWN],
- should_continue_cb)
-
-
-def read_netlink_messages(netlink_socket,
- ifname_filter,
- rtm_types,
- operstates,
- should_continue_callback):
- ''' Reads from the netlink socket until the condition specified by
+ read_netlink_messages(
+ netlink_socket,
+ ifname,
+ [RTM_NEWLINK, RTM_DELLINK],
+ [OPER_UP, OPER_DOWN],
+ should_continue_cb,
+ )
+
+
+def read_netlink_messages(
+ netlink_socket,
+ ifname_filter,
+ rtm_types,
+ operstates,
+ should_continue_callback,
+):
+ """Reads from the netlink socket until the condition specified by
the continuation callback is met.
:param: netlink_socket: netlink_socket to receive events.
@@ -276,7 +286,7 @@ def read_netlink_messages(netlink_socket,
:param: rtm_types: Type of netlink events to listen for.
:param: operstates: Operational states to listen.
:param: should_continue_callback: Specifies when to stop listening.
- '''
+ """
if netlink_socket is None:
raise RuntimeError("Netlink socket is none")
data = bytes()
@@ -286,9 +296,9 @@ def read_netlink_messages(netlink_socket,
recv_data = read_netlink_socket(netlink_socket, SELECT_TIMEOUT)
if recv_data is None:
continue
- LOG.debug('read %d bytes from socket', len(recv_data))
+ LOG.debug("read %d bytes from socket", len(recv_data))
data += recv_data
- LOG.debug('Length of data after concat %d', len(data))
+ LOG.debug("Length of data after concat %d", len(data))
offset = 0
datalen = len(data)
while offset < datalen:
@@ -300,30 +310,37 @@ def read_netlink_messages(netlink_socket,
if len(nl_msg) < nlheader.length:
LOG.debug("Partial data. Smaller than netlink message")
break
- padlen = (nlheader.length+PAD_ALIGNMENT-1) & ~(PAD_ALIGNMENT-1)
+ padlen = (nlheader.length + PAD_ALIGNMENT - 1) & ~(
+ PAD_ALIGNMENT - 1
+ )
offset = offset + padlen
- LOG.debug('offset to next netlink message: %d', offset)
+ LOG.debug("offset to next netlink message: %d", offset)
# Continue if we are not interested in this message.
if nlheader.type not in rtm_types:
continue
interface_state = read_rta_oper_state(nl_msg)
if interface_state is None:
- LOG.debug('Failed to read rta attributes: %s', interface_state)
+ LOG.debug("Failed to read rta attributes: %s", interface_state)
continue
- if (ifname_filter is not None and
- interface_state.ifname != ifname_filter):
+ if (
+ ifname_filter is not None
+ and interface_state.ifname != ifname_filter
+ ):
LOG.debug(
"Ignored netlink event on interface %s. Waiting for %s.",
- interface_state.ifname, ifname_filter)
+ interface_state.ifname,
+ ifname_filter,
+ )
continue
if interface_state.operstate not in operstates:
continue
prevCarrier = carrier
carrier = interface_state.operstate
- if not should_continue_callback(interface_state.ifname,
- carrier,
- prevCarrier):
+ if not should_continue_callback(
+ interface_state.ifname, carrier, prevCarrier
+ ):
return
data = data[offset:]
+
# vi: ts=4 expandtab