#!/usr/bin/env python2
# Copyright (C) 2015 Codethink Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program. If not, see .
"""
A simple Python wrapper for fdisk
* Intends to have as few dependencies as possible, beyond command line fdisk
* Intends to work on Linux, though may work on other operating systems with
fdisk from util-linux.
* Provides for the creation of MBR and GPT partitioned images or devices
* Includes some utility functions for reading information from existing
partition tables
Caveats:
* Designed to cater for disks using 4096 byte sectors, although this hasn't
been tested yet.
"""
import contextlib
from copy import deepcopy
import re
import subprocess
import time
import yaml
class Extent(object):
"""
A class to hold start and end points for other objects
Start and end points are measured in sectors. This class transparently
handles the inclusive nature of the start and end sectors of blocks of
storage. It also allows extents to be aligned within other extents.
"""
def __init__(self, start=0, length=0, end=0):
if length and not start:
raise PartitioningError('Extent requires a non-zero start '
'point and length')
if start and length:
self.start = int(start)
self.end = int(start) + int(length) - 1
else:
self.start = int(start)
self.end = int(end)
self.filled_sectors = 0
def __max__(self):
return self.end
def __min__(self):
return self.start
def __len__(self):
return self.end - self.start + 1
def __add__(self, other):
"""Return the sum of two extents"""
return Extent(start=self.start,
length=(len(self) + len(other)))
def __iadd__(self, other):
"""+="""
self.end += len(other)
return self
def __gt__(self, other):
return len(self) > len(other)
def __lt__(self, other):
return not self > other
def __str__(self):
return ('' %
(self.start, self.end, len(self)))
def pack(self, other):
"""
Return a new Extent aligned to self's first unused sector
This is done by length, to quantify fitting an area of disk space
inside the other. The filled space in self is calculated and updated.
Returns:
A new Extent, starting at the first available sector in `self`,
with the same length as `other`.
"""
length_other = len(other)
first_free_sector = self.start + self.filled_sectors
if length_other + self.filled_sectors > len(self):
raise PartitioningError('Not enough free space to pack Extent')
self.filled_sectors += length_other
return Extent(start=first_free_sector, length=length_other)
def free_sectors(self):
return len(self) - self.filled_sectors
class PartitionList(object):
"""
An iterable object to contain and process a list of Partition objects
This class eases the calculation of partition sizes and numbering, since
the properties of a given partition depend on each of the other partitions
in the list.
Attributes:
device: A Device class containing the partition list
"""
def __init__(self, device):
"""
Initialisation function
Args:
device: A Device object
"""
self.device = device
self.extent = device.extent
self.__cached_list_hash = 0
self.__partition_list = []
self.__iter_index = 0
def append(self, partition):
"""Append a new Partition object to the list"""
partition.check()
if isinstance(partition, Partition):
for part in self.__partition_list:
dup_attrib = part.compare(partition)
if dup_attrib:
raise PartitioningError('Duplicated partition attribute '
'\'%s\'' % dup_attrib)
self.__partition_list.append(partition)
else:
raise PartitioningError('PartitionList can only '
'contain Partition objects')
def __iter__(self):
"""Return a copy of self as an iterable object"""
self.__iter_index = 0
copy = deepcopy(self)
return copy
def __next__(self):
"""Return the next item in an iteration"""
if self.__iter_index == len(self.__partition_list):
raise StopIteration
else:
partition = self[self.__iter_index]
self.__iter_index += 1
return partition
def next(self):
"""Provide a next() method for Python 2 compatibility"""
return self.__next__()
def __getitem__(self, i):
"""Return an partition from the list, sorted by partition number"""
part_list = sorted(self.__update_partition_list(),
key=lambda part: part.number)
return part_list[i]
def free_sectors(self):
"""Calculate the amount of unused space in the list"""
part_list = self.__update_partition_list()
self.extent.filled_sectors = 0
for part in part_list:
self.extent.pack(part.extent)
return self.extent.free_sectors()
def __update_partition_list(self):
"""
Allocate extent and numbering for each Partition object in the list
A copy of the partition list is made so that any Partition object
returned from this list is a copy of a stored Partition object, thus
any partitions stored in the partition list remain intact even if a
copy is modified after is is returned. Hashing is used to avoid
updating the list when the partition list has not changed.
"""
current_list_hash = hash(str(self.__partition_list))
if current_list_hash == self.__cached_list_hash:
return self.__cached_list
part_list = deepcopy(self.__partition_list)
used_numbers = set()
fill_partitions = set(partition for partition in part_list
if partition.size == 'fill')
requested_numbers = set(partition.number for partition in part_list
if hasattr(partition, 'number'))
# Get free space and the size of 'fill' partitions
self.extent.filled_sectors = 0
for part in part_list:
if part.size != 'fill':
extent = Extent(start=1,
length=self.get_length_sectors(part.size))
part.extent = extent
self.extent.pack(extent)
# Allocate aligned Extents and process partition numbers
if len(fill_partitions):
fill_size = self.extent.free_sectors() / len(fill_partitions)
# Set size of fill partitions
for part in fill_partitions:
part.size = fill_size * self.device.sector_size
part.extent = Extent(start=1, length=fill_size)
self.extent.filled_sectors = 0
for part in part_list:
part.extent = self.extent.pack(part.extent)
# Find the next unused partition number if not assigned
if hasattr(part, 'number'):
num = part.number
else:
for n in range(1, self.device.max_allowed_partitions + 1):
if n not in used_numbers and n not in requested_numbers:
num = n
break
part.number = num
used_numbers.add(num)
self.__cached_list_hash = current_list_hash
self.__cached_list = part_list
return part_list
def get_length_sectors(self, size_bytes):
"""Get a length in sectors, aligned to 4096 byte boundaries"""
return (int(size_bytes) / self.device.sector_size +
((int(size_bytes) % 4096) != 0) *
(4096 / self.device.sector_size))
def __str__(self):
string = ''
for part in self:
string = '%s\n%s\n' % (part, string)
return string.rstrip()
def __len__(self):
return len(self.__partition_list)
def __setitem__(self, i, value):
"""Update the ith item in the list"""
self.append(partition)
class Partition(object):
"""
A class to describe a partition in a disk or image
The required attributes are loaded via kwargs.
Required attributes:
size: String describing the size of the partition in bytes
This may also be 'fill' to indicate that this partition should
be expanded to fill all unused space. Where there is more than
one fill partition, unused space is divided equally between the
fill partitions.
fdisk_type: An integer representing the hexadecimal code used by fdisk
to describe the partition type. Any partitions with
fdisk_type='none' create an area of unused space.
Optional attributes:
**kwargs: A mapping of any keyword arguments
filesystem: A string describing the filesystem format for the
partition, or 'none' to skip filesystem creation.
description: A string describing the partition, for documentation
boot: Boolean string describing whether to set the bootable flag
mountpoint: String describing the mountpoint for the partition
number: Number used to override partition numbering for the
partition (Possible only when using an MBR partition table)
"""
def __init__(self, size=0, fdisk_type=0x81, filesystem='none', **kwargs):
if not size and 'size' not in kwargs:
raise PartitioningError('Partition must have a non-zero size')
self.filesystem = filesystem
self.fdisk_type = fdisk_type
self.size = human_size(size)
self.__dict__.update(**kwargs)
def check(self):
"""Check for correctness"""
if self.fdisk_type == 'none':
if self.filesystem != 'none':
raise PartitioningError('Partition: Free space '
'cannot have a filesystem')
if hasattr(self, 'mountpoint') and self.mountpoint != 'none':
raise PartitioningError('Partition: Free space '
'cannot have a mountpoint')
def compare(self, other):
"""Check for mutually exclusive attributes"""
non_duplicable = ('number', 'mountpoint')
for attrib in non_duplicable:
if hasattr(self, attrib) and hasattr(other, attrib):
if getattr(self, attrib) == getattr(other, attrib):
return attrib
return False
def __str__(self):
string = ('Partition\n'
' size: %s\n'
' fdisk type: %s\n'
' filesystem: %s'
% (self.size,
hex(self.fdisk_type) if self.fdisk_type != 'none'
else 'none',
self.filesystem))
if hasattr(self, 'extent'):
string += (
'\n start: %s'
'\n end: %s'
% (self.extent.start, self.extent.end))
if hasattr(self, 'number'):
string += '\n number: %s' % self.number
if hasattr(self, 'mountpoint'):
string += '\n mountpoint: %s' % self.mountpoint
if hasattr(self, 'boot'):
string += '\n bootable: %s' % self.boot
return string
class Device(object):
"""
A class to describe a disk or image, and its partition layout
Attributes are loaded from **kwargs, containing key-value pairs describing
the required attributes. This can be loaded from a YAML file, using the
module function load_yaml().
Required attributes:
location: The location of the device or disk image
size: A size in bytes describing the total amount of space the
partition table on the device will occupy, or 'fill' to
automatically fill the available space.
Optional attributes:
**kwargs: A mapping of any keyword arguments
start_offset: The first 512 byte sector of the first partition
(default: 2048)
partition_table_format: A string describing the type of partition
table used on the device (default: 'gpt')
partitions: A list of mappings for the attributes for each Partition
object. update_partitions() populates the partition list
based on the contents of this attribute.
"""
min_start_bytes = 1024**2
def __init__(self, location, size, **kwargs):
if 'partition_table_format' not in kwargs:
self.partition_table_format = 'gpt'
if 'start_offset' not in kwargs:
self.start_offset = 2048
target_size = get_disk_size(location)
if str(size).lower() == 'fill':
self.size = target_size
else:
self.size = human_size(size)
if self.size > target_size:
raise PartitioningError('Not enough space available on target')
if self.size <= self.min_start_bytes:
raise PartitioningError('Device size must be greater than %d '
'bytes' % self.min_start_bytes)
# Get sector size
self.sector_size = get_sector_size(location)
self.location = location
# Populate Device attributes from keyword args
self.__dict__.update(**kwargs)
if self.partition_table_format.lower() == 'gpt':
self.max_allowed_partitions = 128
else:
self.max_allowed_partitions = 4
# Process Device size
start = (self.start_offset * 512) / self.sector_size
# Sector quantities in the specification are assumed to be 512 bytes
# This converts to the real sector size
if (start * self.sector_size) < self.min_start_bytes:
raise PartitioningError('Start offset should be greater than '
'%d, for %d byte sectors' %
(min_start_bytes / self.sector_size,
self.sector_size))
# Check the disk's first partition starts on a 4096 byte boundary
# this ensures alignment, and avoiding a reduction in performance
# on disks which use a 4096 byte physical sector size
if (start * self.sector_size) % 4096 != 0:
print('WARNING: Start sector is not aligned '
'to 4096 byte sector boundaries')
# End sector is one sector less than the disk length
disk_end_sector = (self.size / self.sector_size) - 1
if self.partition_table_format == 'gpt':
# GPT partition table is duplicated at the end of the device.
# GPT header takes one sector, whatever the sector size,
# with a 16384 byte 'minimum' area for partition entries,
# supporting up to 128 partitions (128 bytes per entry).
# The duplicate GPT does not include the 'protective' MBR
gpt_size = ((16 * 1024) / self.sector_size) + 1
self.extent = Extent(start=start,
end=(disk_end_sector - gpt_size))
else:
self.extent = Extent(start=start, end=disk_end_sector)
self.update_partitions()
def update_partitions(self, partitions=None):
"""
Reset list, populate with partitions from a list of attributes
Args:
partitions: A list of partition keyword attributes
"""
self.partitionlist = PartitionList(self)
if partitions:
self.partitions = partitions
if hasattr(self, 'partitions'):
for partition_args in self.partitions:
self.add_partition(Partition(**partition_args))
def add_partition(self, partition):
"""
Add a Partition object to the device's list of partitions
Args:
partition: a Partition class
"""
if len(self.partitionlist) < self.max_allowed_partitions:
self.partitionlist.append(partition)
else:
raise PartitioningError('Exceeded maximum number of partitions '
'for %s partition table (%d)' %
(self.partition_table_format.upper(),
self.max_allowed_partitions))
def get_partition_by_mountpoint(self, mountpoint):
"""Return a Partition with a specified mountpoint"""
return next(r for r in self.partitionlist
if hasattr(r, 'mountpoint')
and r.mountpoint == '/')
def commit(self):
"""Write the partition table to the disk or image"""
pt_format = self.partition_table_format.lower()
print("Creating %s partition table on %s" %
(pt_format.upper(), self.location))
# Create a new partition table
if pt_format in ('mbr', 'dos'):
cmd = "o\n"
elif pt_format == 'gpt':
cmd = "g\n"
else:
raise PartitioningError('Unrecognised partition '
'table type \'%s\'' % pt_format)
for partition in self.partitionlist:
# Create partitions
if str(partition.fdisk_type).lower() != 'none':
cmd += "n\n"
if pt_format in ('mbr', 'dos'):
cmd += "p\n"
cmd += (str(partition.number) + "\n"
"" + str(partition.extent.start) + "\n"
"" + str(partition.extent.end) + "\n")
# Set partition types
cmd += "t\n"
if partition.number > 1:
# fdisk does not ask for a partition
# number when setting the type of the
# first created partition
cmd += str(partition.number) + "\n"
cmd += str(hex(partition.fdisk_type)) + "\n"
# Set bootable flag
if hasattr(partition, 'boot') and pt_format == 'mbr':
if str(partition.boot).lower() in ('yes', 'true'):
cmd += "a\n"
if partition.number > 1:
cmd += str(partition.number) + "\n"
# Write changes
cmd += ("w\n"
"q\n")
p = subprocess.Popen(["fdisk", self.location],
stdin=subprocess.PIPE,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
output = p.communicate(cmd)
errors = output[1].split('\n')[1:-1]
if errors:
# Exception handling is done in this way since fdisk will not
# return a failure exit code if it finds problems with its input.
# Note that the message 'disk does not contain a valid partition
# table' is not an error, it's a status message printed to stderr
# when fdisk starts with a blank device.
raise FdiskError('"%s"' % ' '.join(str(x) for x in errors))
def get_partition_uuid(self, partition):
"""Read a partition's UUID from disk (MBR or GPT)"""
return get_partition_uuid(self.location, partition.number,
self.partition_table_format)
def create_filesystems(self, skip=[]):
"""Create filesystems on the disk or image
Args:
skip: An iterable of mountpoints identifying partitions to skip
filesystem creation on, for example if custom settings are
required
"""
for part in self.partitionlist:
if hasattr(part, 'mountpoint') and part.mountpoint in skip:
continue
if part.filesystem.lower() != 'none':
with create_loopback(self.location,
part.extent.start * self.sector_size,
part.size) as loop:
print ('Creating %s filesystem on partition %s' %
(part.filesystem, part.number))
subprocess.check_output(['mkfs.' + part.filesystem, loop])
def __str__(self):
return ('' %
(self.location, self.size, len(self.partitionlist)))
class PartitioningError(Exception):
def __init__(self, msg=None):
self.msg = msg
def __str__(self):
return self.msg
class FdiskError(Exception):
def __init__(self, msg=None):
self.msg = msg
def __str__(self):
return self.msg
def load_yaml(location, size, yaml_file):
"""
Load partition data from a yaml specification
The YAML file describes the attributes documented in the Device
and Partition classes.
Args:
yaml_file: String path to a YAML file to load
location: Path to the device node or image to use for partitioning
size: The desired device size in bytes (may be 'fill' to occupy the
entire device
Returns:
A Device object
"""
with open(yaml_file, 'r') as f:
kwargs = yaml.safe_load(f)
return Device(location, size, **kwargs)
def get_sector_size(location):
"""Get the logical sector size of a block device or image, in bytes"""
return int(__filter_fdisk_list_output('Sector size.*?(\d+) bytes',
location)[0])
def get_disk_size(location):
"""Get the total size of a block device or image, in bytes"""
return int(__filter_fdisk_list_output('Disk.*?(\d+) bytes',
location)[0])
def get_partition_offsets(location):
"""Return an array of the partition start sectors in a device or image"""
return __get_fdisk_list_numeric_column(location, 1)
def get_partition_sector_sizes(location):
"""Return an array of sizes of partitions in a device or image in sectors"""
return __get_fdisk_list_numeric_column(location, 3)
def __get_fdisk_list_numeric_column(location, column):
return map(int, __filter_fdisk_list_output('%s(?:\d+[\*\s]+){%d}(\d+)' %
(location, column), location))
def __filter_fdisk_list_output(regex, location):
r = re.compile(regex, re.DOTALL)
m = re.findall(r, subprocess.check_output(['fdisk', '-l', location]))
if m:
return m
else:
raise PartitioningError('Error reading information from fdisk')
def human_size(size_string):
"""Parse strings for human readable size factors"""
facts_of_1024 = ['', 'k', 'm', 'g', 't']
m = re.match('^(\d+)([kmgtKMGT]?)$', str(size_string))
if not m:
return size_string
return int(m.group(1)) * (1024 ** facts_of_1024.index(m.group(2).lower()))
@contextlib.contextmanager
def create_loopback(mount_path, offset=0, size=0):
"""
Create a loopback device for accessing partitions in block devices
Args:
mount_path: String path to mount
offset: Offset of the start of a partition in bytes (default 0)
size: Limits the size of the partition, in bytes (default 0). This is
important when creating filesystems, otherwise tools often
corrupt areas beyond the desired limits of the partition.
Returns:
The path to a created loopback device node
"""
try:
base_args = ['losetup', '--show', '-f', '-P', '-o', str(offset)]
if size and offset:
cmd = base_args + ['--sizelimit', str(size), mount_path]
else:
cmd = base_args + [mount_path]
loop_device = subprocess.check_output(cmd).rstrip()
# Allow the system time to see the new device On some systems, mounts
# created on the loopdev too soon after creating the loopback device
# may be unreliable, even though the -P option (--partscan) is passed
# to losetup
time.sleep(1)
except subprocess.CalledProcessError:
PartitioningError('Error creating loopback')
try:
yield loop_device
finally:
subprocess.check_call(['losetup', '-d', loop_device])
def get_pt_type(location):
"""Read the partition table type from location (device or image)"""
pt_type = __get_blkid_output('PTTYPE', location).lower()
return 'none' if pt_type == '' else pt_type
def __get_blkid_output(field, location):
return subprocess.check_output(['blkid', '-p', '-o', 'value',
'-s', field, location]).rstrip()
def get_partition_uuid(location, part_num, pt_type=None):
"""
Read the partition UUID (MBR or GPT) for location (device or image)
Args:
location: Path to device or image
part_num: Integer number of the partition
pt_type: The partition table format (MBR or GPT)
"""
if not pt_type:
pt_type = get_pt_type(location)
if pt_type == 'gpt':
return get_partition_gpt_guid(location, part_num)
elif pt_type == 'mbr':
return get_partition_mbr_uuid(location, part_num)
def get_partition_mbr_uuid(location, part_num):
"""
Get a partition's UUID in a device using MBR partition table
In Linux, MBR partition UUIDs are comprised of the NT disk signature,
followed by '-' and a two digit, zero-padded partition number. This is
necessary since the MBR does not provide per-partition GUIDs as GPT
partition tables do. This can be passed to the kernel with
"root=PARTUUID=$UUID" to identify a partition containing a root
filesystem.
Args:
partition: A partition object
location: Location of the storage device containing the partition -
an image or device node
Returns:
A UUID referring to an MBR partition, e.g. '97478dab-02'
"""
pt_uuid = __get_blkid_output('PTUUID', location).upper()
return '%s-%02d' % (pt_uuid, part_num)
def get_partition_gpt_guid(location, part_num):
"""
Get a partition's GUID from a GPT partition table
This is read directly from the partition table, since current fdisk does
not support reading GPT partition GUIDs. It does not require special tools
(gfdisk). This is the GUID which identifies the partition, created with
the partition table, as opposed to the filesystem UUID, created with the
filesystem. It is particularly useful for specifying the partition which
the Linux kernel can use on boot to find the root filesystem, e.g. when
using the kernel command line "root=PARTUUID=$UUID"
Args:
part_num: The partition number
location: Location of the storage device containing the partition -
an image path or device node
Returns:
A GUID string, e.g. 'B342D1AB-4B65-4601-97DC-D6DF3FE2E95E'
"""
sector_size = get_sector_size(location)
# The partition GUID is located two sectors (protective MBR + GPT header)
# plus 128 bytes for each partition entry in the table, plus 16 bytes for
# the location of the partition's GUID
guid_offset = (2 * sector_size) + (128 * (part_num - 1)) + 16
with open(location, 'rb') as f:
f.seek(guid_offset)
raw_uuid_bin = f.read(16)
a = ''
for c in raw_uuid_bin:
a += '%02X' % ord(c)
return ('%s%s%s%s-%s%s-%s%s-%s-%s' %
(a[6:8], a[4:6], a[2:4], a[0:2],
a[10:12], a[8:10],
a[14:16], a[12:14],
a[16:20], a[20:32]))