#!/usr/bin/python
# Copyright (C) 2012-2015 Codethink Limited
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program. If not, see .
"""
A simple Python wrapper for command line fdisk.
It is intended to work on Linux, though may work on other operating
systems using fdisk from util-linux.
"""
import contextlib
from copy import deepcopy
import re
import subprocess
import time
import yaml
class Extent(object):
"""
A class to hold start and end points for other objects
Start and end points are measured in sectors. This class transparently
handles the inclusive nature of the start and end sectors of blocks of
storage. It also allows extents to be aligned within other extents.
"""
def __init__(self, start=0, length=0, end=0):
if length and not start:
raise PartitioningError('Extent requires a non-zero start '
'point and length')
if start and length:
self.start = int(start)
self.end = int(start) + int(length) - 1
else:
self.start = int(start)
self.end = int(end)
self.filled_sectors = 0
def __max__(self):
return self.end
def __min__(self):
return self.start
def __len__(self):
return self.end - self.start + 1
def __add__(self, other):
"""Return the sum of two extents"""
return Extent(start=self.start,
length=(len(self) + len(other)))
def __iadd__(self, other):
"""+="""
self.end += len(other)
return self
def __gt__(self, other):
return len(self) > len(other)
def __lt__(self, other):
return not self > other
def __str__(self):
return ('' %
(self.start, self.end, len(self)))
def pack(self, other):
"""
Return a new Extent aligned to self's first unused sector
This is done by length, to quantify fitting an area of disk space
inside the other. The filled space in self is calculated and updated.
Returns:
A new Extent, starting at the first available sector in `self`,
with the same length as `other`.
"""
length_other = len(other)
first_free_sector = self.start + self.filled_sectors
if length_other + self.filled_sectors > len(self):
raise PartitioningError('Not enough free space to pack Extent')
self.filled_sectors += length_other
return Extent(start=first_free_sector, length=length_other)
def free_sectors(self):
return len(self) - self.filled_sectors
class PartitionList(object):
"""
An iterable object to contain and process a list of Partition objects
This class eases the calculation of partition sizes and numbering, since
the properties of a given partition depend on each of the other partitions
in the list.
Attributes:
device: A Device class containing the partition list
"""
def __init__(self, device):
"""
Initialisation function
Args:
device: A Device object
"""
self.device = device
self.extent = device.extent
self.__cached_list_hash = 0
self.__partition_list = []
self.__iter_index = 0
def append(self, partition):
"""Append a new Partition object to the list"""
if isinstance(partition, Partition):
for part in self.__partition_list:
dup_attrib = part.compare(partition)
if dup_attrib:
raise PartitioningError('Duplicated partition attribute '
'\'%s\'' % dup_attrib)
self.__partition_list.append(partition)
else:
raise PartitioningError('PartitionList can only '
'contain Partition objects')
def __iter__(self):
"""Return self as an iterable object"""
self.__iter_index = 0
return self
def __next__(self):
"""Return the next item in an iteration"""
if self.__iter_index == len(self.__partition_list):
raise StopIteration
else:
partition = self[self.__iter_index]
self.__iter_index += 1
return partition
def next(self):
"""Provide a next() method for Python 2 compatibility"""
return self.__next__()
def __getitem__(self, i):
"""Return an partition from the list, sorted by partition number"""
part_list = sorted(self.__update_partition_list(),
key=lambda part: part.number)
return part_list[i]
def free_sectors(self):
"""Calculate the amount of unused space in the list"""
part_list = self.__update_partition_list()
self.extent.filled_sectors = 0
for part in part_list:
self.extent.pack(part.extent)
return self.extent.free_sectors()
def __update_partition_list(self):
"""
Allocate extent and numbering for each Partition object in the list
A copy of the partition list is made so that any Partition object
returned from this list is a copy of a stored Partition object, thus
any partitions stored in the partition list remain intact even if a
copy is modified after is is returned. Hashing is used to avoid
updating the list when the partition list has not changed.
"""
current_list_hash = hash(str(self.__partition_list))
if current_list_hash == self.__cached_list_hash:
return self.__cached_list
self.__cached_list_hash = current_list_hash
part_list = deepcopy(self.__partition_list)
used_numbers = set()
fill_partitions = set(partition for partition in part_list
if partition.size == 'fill')
requested_numbers = set(partition.number for partition in part_list
if hasattr(partition, 'number'))
# Get free space and the size of 'fill' partitions
self.extent.filled_sectors = 0
for part in part_list:
if part.size != 'fill':
extent = Extent(start=1,
length=self.get_length_sectors(part.size))
part.extent = extent
self.extent.pack(extent)
# Allocate aligned Extents and process partition numbers
if len(fill_partitions):
fill_size = self.extent.free_sectors() / len(fill_partitions)
# Set size of fill partitions
for part in fill_partitions:
part.size = fill_size * self.device.sector_size
part.extent = Extent(start=1, length=fill_size)
self.extent.filled_sectors = 0
for part in part_list:
part.extent = self.extent.pack(part.extent)
# Find the next unused partition number if not assigned
if hasattr(part, 'number'):
num = part.number
else:
for n in range(1, self.device.max_allowed_partitions + 1):
if n not in used_numbers and n not in requested_numbers:
num = n
break
part.number = num
used_numbers.add(num)
self.__cached_list = part_list
return part_list
def get_length_sectors(self, size_bytes):
"""Get a length in sectors, aligned to 4096 byte boundaries"""
return (int(size_bytes) / self.device.sector_size +
((int(size_bytes) % 4096) != 0) *
(4096 / self.device.sector_size))
def __str__(self):
string = ''
for part in self:
string = '%s\n%s\n' % (part, string)
return string.rstrip()
def __len__(self):
return len(self.__partition_list)
def __setitem__(self, i, value):
""" Update the ith item in the list """
self.append(partition)
class Partition(object):
"""
A class to describe a partition in a disk or image
The required attributes are loaded via kwargs.
Required attributes:
size: String describing the size of the partition in bytes
This may also be 'fill' to indicate that this partition should
be expanded to fill all unused space. Where there is more than
one fill partition, unused space is divided equally between the
fill partitions.
fdisk_type: A number describing the hexadecimal code used by fdisk
to describe the partition type. Any partitions with
fdisk_type='none' create an area of unused space.
Optional attributes:
**kwargs: A mapping of any keyword arguments
filesystem: A string describing the filesystem format for the
partition, or 'none' to skip filesystem creation.
description: A string describing the partition, for documentation
boot: Boolean string describing whether to set the bootable flag
mountpoint: String describing the mountpoint for the partition
number: Number used to override partition numbering for the
partition (Possible only when using an MBR partition table)
"""
def __init__(self, size=0, fdisk_type=0x81, filesystem='none', **kwargs):
if not size and 'size' not in kwargs:
raise PartitioningError('Partition must have a non-zero size')
self.filesystem = filesystem
self.fdisk_type = fdisk_type
self.size = decode_human_size(size)
self.__dict__.update(**kwargs)
def compare(self, other):
"""Check for mutually exclusive attributes"""
non_duplicable = ('number', 'mountpoint')
for attrib in non_duplicable:
if hasattr(self, attrib) and hasattr(other, attrib):
if getattr(self, attrib) == getattr(other, attrib):
return attrib
return False
def __str__(self):
string = ('Partition\n'
' size: %s\n'
' fdisk type: %s\n'
' filesystem: %s'
% (self.size, hex(self.fdisk_type), self.filesystem))
if hasattr(self, 'extent'):
string += (
'\n start: %s'
'\n end: %s'
% (self.extent.start, self.extent.end))
if hasattr(self, 'number'):
string += '\n number: %s' % self.number
if hasattr(self, 'mountpoint'):
string += '\n mountpoint: %s' % self.mountpoint
if hasattr(self, 'boot'):
string += '\n bootable: %s' % self.boot
return string
class Device(object):
"""
A class to describe a disk or image, and its partition layout
Attributes are loaded from **kwargs, containing key-value pairs describing
the required attributes. This can be loaded from a YAML file, using the
module function load_yaml().
Required attributes:
location: The location of the device or disk image
size: A size in bytes describing the total amount of space the
partition table on the device will occupy, or 'fill' to
automatically fill the available space.
Optional attributes:
**kwargs: A mapping of any keyword arguments
start_offset: The first 512 byte sector of the first partition
(default: 2048)
partition_table_format: A string describing the type of partition
table used on the device (default: 'gpt')
partitions: A list of mappings for the attributes for each Partition
object. update_partitions() populates the partition list
based on the contents of this attribute.
"""
min_start_bytes = 1024**2
def __init__(self, location, size, **kwargs):
if 'partition_table_format' not in kwargs:
self.partition_table_format = 'gpt'
if 'start_offset' not in kwargs:
self.start_offset = 2048
target_size = get_disk_size(location)
if str(size).lower() == 'fill':
self.size = target_size
else:
self.size = decode_human_size(size)
if self.size > target_size:
raise PartitioningError('Not enough space available on target')
if self.size <= self.min_start_bytes:
raise PartitioningError('Device size must be greater than %d '
'bytes' % self.min_start_bytes)
# Get sector size
self.sector_size = get_sector_size(location)
self.location = location
# Populate Device attributes from keyword args
self.__dict__.update(**kwargs)
if self.partition_table_format.lower() == 'gpt':
self.max_allowed_partitions = 128
else:
self.max_allowed_partitions = 4
# Process Device size
start = (self.start_offset * 512) / self.sector_size
# Sector quantities in the specification are assumed to be 512 bytes
# This converts to the real sector size
if (start * self.sector_size) < self.min_start_bytes:
raise PartitioningError('Start offset should be greater than '
'%d, for %d byte sectors' %
(min_start_bytes / self.sector_size,
self.sector_size))
# Check the disk's first partition starts on a 4096 byte boundary
# this ensures alignment, and avoiding a reduction in performance
# on disks which use a 4096 byte physical sector size
if (start * self.sector_size) % 4096 != 0:
print('WARNING: Start sector is not aligned '
'to 4096 byte sector boundaries')
# End sector is one sector less than the disk length
disk_end_sector = (self.size / self.sector_size) - 1
if self.partition_table_format == 'gpt':
# GPT partition table is duplicated at the end of the device.
# GPT header takes one sector, whatever the sector size,
# with a 16384 byte 'minimum' area for partition entries,
# supporting up to 128 partitions (128 bytes per entry).
# The duplicate GPT does not include the 'protective' MBR
gpt_size = ((16 * 1024) / self.sector_size) + 1
self.extent = Extent(start=start,
end=(disk_end_sector - gpt_size))
else:
self.extent = Extent(start=start, end=disk_end_sector)
self.update_partitions()
def update_partitions(self, partitions=None):
"""
Reset list, populate with partitions from a list of attributes
Args:
partitions: A list of partition keyword attributes
"""
self.partitionlist = PartitionList(self)
if partitions:
self.partitions = partitions
if hasattr(self, 'partitions'):
for partition_args in self.partitions:
self.add_partition(Partition(**partition_args))
def add_partition(self, partition):
"""
Add a Partition object to the device's list of partitions
Args:
partition: a Partition class
"""
if len(self.partitionlist) < self.max_allowed_partitions:
self.partitionlist.append(partition)
else:
raise PartitioningError('Exceeded maximum number of partitions '
'for %s partition table (%d)' %
(self.partition_table_format.upper(),
self.max_allowed_partitions))
def commit(self):
"""Write the partition table to the disk or image"""
pt_format = self.partition_table_format.lower()
print("Creating %s partition table on %s" %
(pt_format.upper(), self.location))
# Create a new partition table
if pt_format in ('mbr', 'dos'):
cmd = "o\n"
elif pt_format == 'gpt':
cmd = "g\n"
else:
raise PartitioningError('Unrecognised partition '
'table type \'%s\'' % pt_format)
for partition in self.partitionlist:
# Create partitions
if str(partition.fdisk_type).lower() != 'none':
cmd += "n\n"
if pt_format in ('mbr', 'dos'):
cmd += "p\n"
cmd += (str(partition.number) + "\n"
"" + str(partition.extent.start) + "\n"
"" + str(partition.extent.end) + "\n")
# Set partition types
cmd += "t\n"
if partition.number > 1:
# fdisk does not ask for a partition
# number when setting the type of the
# first created partition
cmd += str(partition.number) + "\n"
cmd += str(partition.fdisk_type) + "\n"
# Set bootable flag
if hasattr(partition, 'boot') and pt_format == 'mbr':
if str(partition.boot).lower() in ('yes', 'true'):
cmd += "a\n"
if partition.number > 1:
cmd += str(partition.number) + "\n"
# Write changes
cmd += ("w\n"
"q\n")
p = subprocess.Popen(["fdisk", self.location],
stdin=subprocess.PIPE,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
output = p.communicate(cmd)
errors = output[1].split('\n')[1:-1]
if errors:
# Note that the message saying 'disk does not contain a valid
# partition table' is never an error, it's a status message
# printed to stderr when fdisk starts with a blank device.
# Exception handling is done like this since fdisk will not
# return a failure exit code if it finds problems with the input
raise FdiskError('"%s"' % ' '.join(str(x) for x in errors))
def create_filesystems(self, skip=None):
"""Create filesystems on the disk or image
Args:
skipping: A list of strings denoting partitions to skip filesystem
creation on, for example if custom settings are required
"""
for part in self.partitionlist:
if hasattr(part, 'mountpoint') and part.mountpoint in skip:
continue
if part.filesystem.lower() != 'none':
with create_loopback(self.location,
part.extent.start * self.sector_size,
part.size) as loop:
print ('Creating %s filesystem on partition %s' %
(part.filesystem, part.number))
subprocess.check_output(['mkfs.' + part.filesystem, loop])
def __str__(self):
return ('' %
(self.location, self.size, len(self.partitionlist)))
class PartitioningError(Exception):
def __init__(self, msg=None):
self.msg = msg
def __str__(self):
return self.msg
class FdiskError(Exception):
def __init__(self, msg=None):
self.msg = msg
def __str__(self):
return self.msg
def load_yaml(location, size, yaml_file):
"""
Load partition data from a yaml specification
The YAML file describes the attributes documented in the Device
and Partition classes.
Args:
yaml_file: String path to a YAML file to load
location: Path to the device node or image to use for partitioning
size: The desired device size in bytes (may be 'fill' to occupy the
entire device
Returns:
A Device object
"""
with open(yaml_file, 'r') as f:
kwargs = yaml.safe_load(f)
return Device(location, size, **kwargs)
def get_sector_size(location):
"""Get the logical sector size of a block device or image, in bytes"""
return int(__filter_fdisk_list_output('Sector size.*?(\d+) bytes',
location)[0])
def get_disk_size(location):
"""Get the total size of a block device or image, in bytes"""
return int(__filter_fdisk_list_output('Disk.*?(\d+) bytes',
location)[0])
def get_disk_offsets(location):
"""Return an array of the partition start sectors in a device or image"""
return map(int, __filter_fdisk_list_output('%s\d+[\s*]+(\d+)' % location,
location))
def __filter_fdisk_list_output(regex, location):
r = re.compile(regex, re.DOTALL)
m = re.findall(r, subprocess.check_output(['fdisk', '-l', location]))
if m:
return m
else:
raise PartitioningError('Error reading information from fdisk')
@contextlib.contextmanager
def create_loopback(mount_path, offset=0, size=0):
"""
Create a loopback device for accessing partitions in block devices
Args:
mount_path: String path to mount
offset: Offset of the start of a partition in bytes (default 0)
size: Limits the size of the partition, in bytes (default 0)
Returns:
The path to a created loopback device node
"""
try:
base_args = ['losetup', '--show', '-f', '-P', '-o', str(offset)]
if size and offset:
cmd = base_args + ['--sizelimit', str(size), mount_path]
else:
cmd = base_args + [mount_path]
loop_device = subprocess.check_output(cmd).rstrip()
# Allow the system time to see the new device
# On some systems, mounts created on the loopdev
# too soon after creating the loopback device
# may be unreliable, even though the -P option
# (--partscan) is passed to losetup
time.sleep(1)
except subprocess.CalledProcessError:
PartitioningError('Error creating loopback')
try:
yield loop_device
finally:
subprocess.check_call(['losetup', '-d', loop_device])
def decode_human_size(size_string):
"""Parse strings for human readable size factors"""
facts_of_1024 = ['', 'k', 'm', 'g', 't']
m = re.match('^(\d+)([kmgtKMGT]?)$', str(size_string))
if not m:
return size_string
return int(m.group(1)) * (1024 ** facts_of_1024.index(m.group(2).lower()))