summaryrefslogtreecommitdiff
path: root/extensions/pyfdisk.py
diff options
context:
space:
mode:
Diffstat (limited to 'extensions/pyfdisk.py')
-rw-r--r--extensions/pyfdisk.py769
1 files changed, 0 insertions, 769 deletions
diff --git a/extensions/pyfdisk.py b/extensions/pyfdisk.py
deleted file mode 100644
index a7796729..00000000
--- a/extensions/pyfdisk.py
+++ /dev/null
@@ -1,769 +0,0 @@
-#!/usr/bin/env python2
-# Copyright (C) 2015 Codethink Limited
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; version 2 of the License.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program. If not, see <http://www.gnu.org/licenses/>.
-
-"""
-A simple Python wrapper for fdisk
-
- * Intends to have as few dependencies as possible, beyond command line fdisk
- * Intends to work on Linux, though may work on other operating systems with
- fdisk from util-linux.
- * Provides for the creation of MBR and GPT partitioned images or devices
- * Includes some utility functions for reading information from existing
- partition tables
-
-Caveats:
- * Designed to cater for disks using 4096 byte sectors, although this hasn't
- been tested yet.
-"""
-
-import contextlib
-from copy import deepcopy
-import re
-import subprocess
-import time
-import yaml
-
-
-class Extent(object):
- """
- A class to hold start and end points for other objects
-
- Start and end points are measured in sectors. This class transparently
- handles the inclusive nature of the start and end sectors of blocks of
- storage. It also allows extents to be aligned within other extents.
- """
-
- def __init__(self, start=0, length=0, end=0):
- if length and not start:
- raise PartitioningError('Extent requires a non-zero start '
- 'point and length')
- if start and length:
- self.start = int(start)
- self.end = int(start) + int(length) - 1
- else:
- self.start = int(start)
- self.end = int(end)
-
- self.filled_sectors = 0
-
- def __max__(self):
- return self.end
-
- def __min__(self):
- return self.start
-
- def __len__(self):
- return self.end - self.start + 1
-
- def __add__(self, other):
- """Return the sum of two extents"""
- return Extent(start=self.start,
- length=(len(self) + len(other)))
-
- def __iadd__(self, other):
- """+="""
- self.end += len(other)
- return self
-
- def __gt__(self, other):
- return len(self) > len(other)
-
- def __lt__(self, other):
- return not self > other
-
- def __str__(self):
- return ('<Extent: Start=%d, End=%d, Length=%d>' %
- (self.start, self.end, len(self)))
-
- def pack(self, other):
- """
- Return a new Extent aligned to self's first unused sector
-
- This is done by length, to quantify fitting an area of disk space
- inside the other. The filled space in self is calculated and updated.
-
- Returns:
- A new Extent, starting at the first available sector in `self`,
- with the same length as `other`.
- """
- length_other = len(other)
- first_free_sector = self.start + self.filled_sectors
- if length_other + self.filled_sectors > len(self):
- raise PartitioningError('Not enough free space to pack Extent')
- self.filled_sectors += length_other
- return Extent(start=first_free_sector, length=length_other)
-
- def free_sectors(self):
- return len(self) - self.filled_sectors
-
-
-class PartitionList(object):
- """
- An iterable object to contain and process a list of Partition objects
-
- This class eases the calculation of partition sizes and numbering, since
- the properties of a given partition depend on each of the other partitions
- in the list.
-
- Attributes:
- device: A Device class containing the partition list
- """
-
- def __init__(self, device):
- """
- Initialisation function
-
- Args:
- device: A Device object
- """
- self.device = device
- self.extent = device.extent
-
- self.__cached_list_hash = 0
-
- self.__partition_list = []
- self.__iter_index = 0
-
- def append(self, partition):
- """Append a new Partition object to the list"""
- partition.check()
- if isinstance(partition, Partition):
- for part in self.__partition_list:
- dup_attrib = part.compare(partition)
- if dup_attrib:
- raise PartitioningError('Duplicated partition attribute '
- '\'%s\'' % dup_attrib)
- self.__partition_list.append(partition)
- else:
- raise PartitioningError('PartitionList can only '
- 'contain Partition objects')
-
- def __iter__(self):
- """Return a copy of self as an iterable object"""
- self.__iter_index = 0
- copy = deepcopy(self)
- return copy
-
- def __next__(self):
- """Return the next item in an iteration"""
- if self.__iter_index == len(self.__partition_list):
- raise StopIteration
- else:
- partition = self[self.__iter_index]
- self.__iter_index += 1
- return partition
-
- def next(self):
- """Provide a next() method for Python 2 compatibility"""
- return self.__next__()
-
- def __getitem__(self, i):
- """Return an partition from the list, sorted by partition number"""
- part_list = sorted(self.__update_partition_list(),
- key=lambda part: part.number)
- return part_list[i]
-
- def free_sectors(self):
- """Calculate the amount of unused space in the list"""
- part_list = self.__update_partition_list()
- self.extent.filled_sectors = 0
- for part in part_list:
- self.extent.pack(part.extent)
- return self.extent.free_sectors()
-
- def __update_partition_list(self):
- """
- Allocate extent and numbering for each Partition object in the list
-
- A copy of the partition list is made so that any Partition object
- returned from this list is a copy of a stored Partition object, thus
- any partitions stored in the partition list remain intact even if a
- copy is modified after is is returned. Hashing is used to avoid
- updating the list when the partition list has not changed.
- """
- current_list_hash = hash(str(self.__partition_list))
- if current_list_hash == self.__cached_list_hash:
- return self.__cached_list
-
- part_list = deepcopy(self.__partition_list)
- used_numbers = set()
- fill_partitions = set(partition for partition in part_list
- if partition.size == 'fill')
- requested_numbers = set(partition.number for partition in part_list
- if hasattr(partition, 'number'))
-
- # Get free space and the size of 'fill' partitions
- self.extent.filled_sectors = 0
- for part in part_list:
- if part.size != 'fill':
- extent = Extent(start=1,
- length=self.get_length_sectors(part.size))
- part.extent = extent
- self.extent.pack(extent)
-
- # Allocate aligned Extents and process partition numbers
- if len(fill_partitions):
- fill_size = self.extent.free_sectors() / len(fill_partitions)
- # Set size of fill partitions
- for part in fill_partitions:
- part.size = fill_size * self.device.sector_size
- part.extent = Extent(start=1, length=fill_size)
-
- self.extent.filled_sectors = 0
- for part in part_list:
- part.extent = self.extent.pack(part.extent)
-
- # Find the next unused partition number if not assigned
- if hasattr(part, 'number'):
- num = part.number
- else:
- for n in range(1, self.device.max_allowed_partitions + 1):
- if n not in used_numbers and n not in requested_numbers:
- num = n
- break
-
- part.number = num
- used_numbers.add(num)
-
- self.__cached_list_hash = current_list_hash
- self.__cached_list = part_list
- return part_list
-
- def get_length_sectors(self, size_bytes):
- """Get a length in sectors, aligned to 4096 byte boundaries"""
- return (int(size_bytes) / self.device.sector_size +
- ((int(size_bytes) % 4096) != 0) *
- (4096 / self.device.sector_size))
-
- def __str__(self):
- string = ''
- for part in self:
- string = '%s\n%s\n' % (part, string)
- return string.rstrip()
-
- def __len__(self):
- return len(self.__partition_list)
-
- def __setitem__(self, i, value):
- """Update the ith item in the list"""
- self.append(partition)
-
-
-class Partition(object):
- """
- A class to describe a partition in a disk or image
-
- The required attributes are loaded via kwargs.
-
- Required attributes:
- size: String describing the size of the partition in bytes
- This may also be 'fill' to indicate that this partition should
- be expanded to fill all unused space. Where there is more than
- one fill partition, unused space is divided equally between the
- fill partitions.
- fdisk_type: An integer representing the hexadecimal code used by fdisk
- to describe the partition type. Any partitions with
- fdisk_type='none' create an area of unused space.
-
- Optional attributes:
- **kwargs: A mapping of any keyword arguments
- filesystem: A string describing the filesystem format for the
- partition, or 'none' to skip filesystem creation.
- description: A string describing the partition, for documentation
- boot: Boolean string describing whether to set the bootable flag
- mountpoint: String describing the mountpoint for the partition
- number: Number used to override partition numbering for the
- partition (Possible only when using an MBR partition table)
- """
- def __init__(self, size=0, fdisk_type=0x81, filesystem='none', **kwargs):
- if not size and 'size' not in kwargs:
- raise PartitioningError('Partition must have a non-zero size')
-
- self.filesystem = filesystem
- self.fdisk_type = fdisk_type
-
- self.size = human_size(size)
- self.__dict__.update(**kwargs)
-
- def check(self):
- """Check for correctness"""
- if self.fdisk_type == 'none':
- if self.filesystem != 'none':
- raise PartitioningError('Partition: Free space '
- 'cannot have a filesystem')
- if hasattr(self, 'mountpoint') and self.mountpoint != 'none':
- raise PartitioningError('Partition: Free space '
- 'cannot have a mountpoint')
-
- def compare(self, other):
- """Check for mutually exclusive attributes"""
- non_duplicable = ('number', 'mountpoint')
- for attrib in non_duplicable:
- if hasattr(self, attrib) and hasattr(other, attrib):
- if getattr(self, attrib) == getattr(other, attrib):
- return attrib
- return False
-
- def __str__(self):
- string = ('Partition\n'
- ' size: %s\n'
- ' fdisk type: %s\n'
- ' filesystem: %s'
- % (self.size,
- hex(self.fdisk_type) if self.fdisk_type != 'none'
- else 'none',
- self.filesystem))
- if hasattr(self, 'extent'):
- string += (
- '\n start: %s'
- '\n end: %s'
- % (self.extent.start, self.extent.end))
- if hasattr(self, 'number'):
- string += '\n number: %s' % self.number
- if hasattr(self, 'mountpoint'):
- string += '\n mountpoint: %s' % self.mountpoint
- if hasattr(self, 'boot'):
- string += '\n bootable: %s' % self.boot
-
- return string
-
-
-class Device(object):
- """
- A class to describe a disk or image, and its partition layout
-
- Attributes are loaded from **kwargs, containing key-value pairs describing
- the required attributes. This can be loaded from a YAML file, using the
- module function load_yaml().
-
- Required attributes:
- location: The location of the device or disk image
- size: A size in bytes describing the total amount of space the
- partition table on the device will occupy, or 'fill' to
- automatically fill the available space.
-
- Optional attributes:
- **kwargs: A mapping of any keyword arguments
- start_offset: The first 512 byte sector of the first partition
- (default: 2048)
- partition_table_format: A string describing the type of partition
- table used on the device (default: 'gpt')
- partitions: A list of mappings for the attributes for each Partition
- object. update_partitions() populates the partition list
- based on the contents of this attribute.
- """
- min_start_bytes = 1024**2
-
- def __init__(self, location, size, **kwargs):
-
- if 'partition_table_format' not in kwargs:
- self.partition_table_format = 'gpt'
- if 'start_offset' not in kwargs:
- self.start_offset = 2048
-
- target_size = get_disk_size(location)
- if str(size).lower() == 'fill':
- self.size = target_size
- else:
- self.size = human_size(size)
-
- if self.size > target_size:
- raise PartitioningError('Not enough space available on target')
-
- if self.size <= self.min_start_bytes:
- raise PartitioningError('Device size must be greater than %d '
- 'bytes' % self.min_start_bytes)
-
- # Get sector size
- self.sector_size = get_sector_size(location)
- self.location = location
-
- # Populate Device attributes from keyword args
- self.__dict__.update(**kwargs)
-
- if self.partition_table_format.lower() == 'gpt':
- self.max_allowed_partitions = 128
- else:
- self.max_allowed_partitions = 4
-
- # Process Device size
- start = (self.start_offset * 512) / self.sector_size
- # Sector quantities in the specification are assumed to be 512 bytes
- # This converts to the real sector size
- if (start * self.sector_size) < self.min_start_bytes:
- raise PartitioningError('Start offset should be greater than '
- '%d, for %d byte sectors' %
- (min_start_bytes / self.sector_size,
- self.sector_size))
- # Check the disk's first partition starts on a 4096 byte boundary
- # this ensures alignment, and avoiding a reduction in performance
- # on disks which use a 4096 byte physical sector size
- if (start * self.sector_size) % 4096 != 0:
- print('WARNING: Start sector is not aligned '
- 'to 4096 byte sector boundaries')
-
- # End sector is one sector less than the disk length
- disk_end_sector = (self.size / self.sector_size) - 1
- if self.partition_table_format == 'gpt':
- # GPT partition table is duplicated at the end of the device.
- # GPT header takes one sector, whatever the sector size,
- # with a 16384 byte 'minimum' area for partition entries,
- # supporting up to 128 partitions (128 bytes per entry).
- # The duplicate GPT does not include the 'protective' MBR
- gpt_size = ((16 * 1024) / self.sector_size) + 1
- self.extent = Extent(start=start,
- end=(disk_end_sector - gpt_size))
- else:
- self.extent = Extent(start=start, end=disk_end_sector)
-
- self.update_partitions()
-
- def update_partitions(self, partitions=None):
- """
- Reset list, populate with partitions from a list of attributes
-
- Args:
- partitions: A list of partition keyword attributes
- """
-
- self.partitionlist = PartitionList(self)
- if partitions:
- self.partitions = partitions
- if hasattr(self, 'partitions'):
- for partition_args in self.partitions:
- self.add_partition(Partition(**partition_args))
-
- def add_partition(self, partition):
- """
- Add a Partition object to the device's list of partitions
-
- Args:
- partition: a Partition class
- """
-
- if len(self.partitionlist) < self.max_allowed_partitions:
- self.partitionlist.append(partition)
- else:
- raise PartitioningError('Exceeded maximum number of partitions '
- 'for %s partition table (%d)' %
- (self.partition_table_format.upper(),
- self.max_allowed_partitions))
-
- def get_partition_by_mountpoint(self, mountpoint):
- """Return a Partition with a specified mountpoint"""
-
- try:
- return next(r for r in self.partitionlist
- if hasattr(r, 'mountpoint')
- and r.mountpoint == mountpoint)
- except StopIteration:
- return False
-
- def commit(self):
- """Write the partition table to the disk or image"""
-
- pt_format = self.partition_table_format.lower()
- print("Creating %s partition table on %s" %
- (pt_format.upper(), self.location))
-
- # Create a new partition table
- if pt_format in ('mbr', 'dos'):
- cmd = "o\n"
- elif pt_format == 'gpt':
- cmd = "g\n"
- else:
- raise PartitioningError('Unrecognised partition '
- 'table type \'%s\'' % pt_format)
-
- for partition in self.partitionlist:
- # Create partitions
- if str(partition.fdisk_type).lower() != 'none':
- cmd += "n\n"
- if pt_format in ('mbr', 'dos'):
- cmd += "p\n"
- cmd += (str(partition.number) + "\n"
- "" + str(partition.extent.start) + "\n"
- "" + str(partition.extent.end) + "\n")
-
- # Set partition types
- cmd += "t\n"
- if partition.number > 1:
- # fdisk does not ask for a partition
- # number when setting the type of the
- # first created partition
- cmd += str(partition.number) + "\n"
- cmd += str(hex(partition.fdisk_type)) + "\n"
-
- # Set bootable flag
- if hasattr(partition, 'boot') and pt_format == 'mbr':
- if str(partition.boot).lower() in ('yes', 'true'):
- cmd += "a\n"
- if partition.number > 1:
- cmd += str(partition.number) + "\n"
-
- # Write changes
- cmd += ("w\n"
- "q\n")
- p = subprocess.Popen(["fdisk", self.location],
- stdin=subprocess.PIPE,
- stderr=subprocess.PIPE,
- stdout=subprocess.PIPE)
- output = p.communicate(cmd)
-
- errors = output[1].split('\n')[1:-1]
- if errors:
- # Exception handling is done in this way since fdisk will not
- # return a failure exit code if it finds problems with its input.
- # Note that the message 'disk does not contain a valid partition
- # table' is not an error, it's a status message printed to stderr
- # when fdisk starts with a blank device.
- raise FdiskError('"%s"' % ' '.join(str(x) for x in errors))
-
- def get_partition_uuid(self, partition):
- """Read a partition's UUID from disk (MBR or GPT)"""
-
- return get_partition_uuid(self.location, partition.number,
- self.partition_table_format)
-
- def create_filesystems(self, skip=[]):
- """Create filesystems on the disk or image
-
- Args:
- skip: An iterable of mountpoints identifying partitions to skip
- filesystem creation on, for example if custom settings are
- required
- """
-
- for part in self.partitionlist:
- if hasattr(part, 'mountpoint') and part.mountpoint in skip:
- continue
- if part.filesystem.lower() != 'none':
- with create_loopback(self.location,
- part.extent.start * self.sector_size,
- part.size) as loop:
- print ('Creating %s filesystem on partition %s' %
- (part.filesystem, part.number))
- subprocess.check_output(['mkfs.' + part.filesystem, loop])
-
- def __str__(self):
- return ('<Device: location=%s, size=%s, partitions: %s>' %
- (self.location, self.size, len(self.partitionlist)))
-
-
-class PartitioningError(Exception):
-
- def __init__(self, msg=None):
- self.msg = msg
-
- def __str__(self):
- return self.msg
-
-
-class FdiskError(Exception):
-
- def __init__(self, msg=None):
- self.msg = msg
-
- def __str__(self):
- return self.msg
-
-
-def load_yaml(location, size, yaml_file):
- """
- Load partition data from a yaml specification
-
- The YAML file describes the attributes documented in the Device
- and Partition classes.
-
- Args:
- yaml_file: String path to a YAML file to load
- location: Path to the device node or image to use for partitioning
- size: The desired device size in bytes (may be 'fill' to occupy the
- entire device
-
- Returns:
- A Device object
- """
-
- with open(yaml_file, 'r') as f:
- kwargs = yaml.safe_load(f)
- return Device(location, size, **kwargs)
-
-
-def get_sector_size(location):
- """Get the logical sector size of a block device or image, in bytes"""
-
- return int(__filter_fdisk_list_output('Sector size.*?(\d+) bytes',
- location)[0])
-
-def get_disk_size(location):
- """Get the total size of a block device or image, in bytes"""
-
- return int(__filter_fdisk_list_output('Disk.*?(\d+) bytes',
- location)[0])
-
-def get_partition_offsets(location):
- """Return an array of the partition start sectors in a device or image"""
-
- return __get_fdisk_list_numeric_column(location, 1)
-
-def get_partition_sector_sizes(location):
- """Return an array of sizes of partitions in a device or image in sectors"""
-
- return __get_fdisk_list_numeric_column(location, 3)
-
-def __get_fdisk_list_numeric_column(location, column):
- return map(int, __filter_fdisk_list_output('%s(?:\d+[\*\s]+){%d}(\d+)' %
- (location, column), location))
-
-def __filter_fdisk_list_output(regex, location):
- r = re.compile(regex, re.DOTALL)
- m = re.findall(r, subprocess.check_output(['fdisk', '-l', location]))
- if m:
- return m
- else:
- raise PartitioningError('Error reading information from fdisk')
-
-def human_size(size_string):
- """Parse strings for human readable size factors"""
-
- facts_of_1024 = ['', 'k', 'm', 'g', 't']
- m = re.match('^(\d+)([kmgtKMGT]?)$', str(size_string))
- if not m:
- return size_string
- return int(m.group(1)) * (1024 ** facts_of_1024.index(m.group(2).lower()))
-
-@contextlib.contextmanager
-def create_loopback(mount_path, offset=0, size=0):
- """
- Create a loopback device for accessing partitions in block devices
-
- Args:
- mount_path: String path to mount
- offset: Offset of the start of a partition in bytes (default 0)
- size: Limits the size of the partition, in bytes (default 0). This is
- important when creating filesystems, otherwise tools often
- corrupt areas beyond the desired limits of the partition.
- Returns:
- The path to a created loopback device node
- """
-
- try:
- base_args = ['losetup', '--show', '-f', '-P', '-o', str(offset)]
- if size and offset:
- cmd = base_args + ['--sizelimit', str(size), mount_path]
- else:
- cmd = base_args + [mount_path]
- loop_device = subprocess.check_output(cmd).rstrip()
- # Allow the system time to see the new device On some systems, mounts
- # created on the loopdev too soon after creating the loopback device
- # may be unreliable, even though the -P option (--partscan) is passed
- # to losetup
- time.sleep(1)
- except subprocess.CalledProcessError:
- PartitioningError('Error creating loopback')
- try:
- yield loop_device
- finally:
- subprocess.check_call(['losetup', '-d', loop_device])
-
-def get_pt_type(location):
- """Read the partition table type from location (device or image)"""
-
- pt_type = __get_blkid_output('PTTYPE', location).lower()
- return 'none' if pt_type == '' else pt_type
-
-def __get_blkid_output(field, location):
- return subprocess.check_output(['blkid', '-p', '-o', 'value',
- '-s', field, location]).rstrip()
-
-def get_partition_uuid(location, part_num, pt_type=None):
- """
- Read the partition UUID (MBR or GPT) for location (device or image)
-
- Args:
- location: Path to device or image
- part_num: Integer number of the partition
- pt_type: The partition table format (MBR or GPT)
- """
-
- if not pt_type:
- pt_type = get_pt_type(location)
- if pt_type == 'gpt':
- return get_partition_gpt_guid(location, part_num)
- elif pt_type == 'mbr':
- return get_partition_mbr_uuid(location, part_num)
-
-def get_partition_mbr_uuid(location, part_num):
- """
- Get a partition's UUID in a device using MBR partition table
-
- In Linux, MBR partition UUIDs are comprised of the NT disk signature,
- followed by '-' and a two digit, zero-padded partition number. This is
- necessary since the MBR does not provide per-partition GUIDs as GPT
- partition tables do. This can be passed to the kernel with
- "root=PARTUUID=$UUID" to identify a partition containing a root
- filesystem.
-
- Args:
- partition: A partition object
- location: Location of the storage device containing the partition -
- an image or device node
- Returns:
- A UUID referring to an MBR partition, e.g. '97478dab-02'
- """
-
- pt_uuid = __get_blkid_output('PTUUID', location).upper()
- return '%s-%02d' % (pt_uuid, part_num)
-
-def get_partition_gpt_guid(location, part_num):
- """
- Get a partition's GUID from a GPT partition table
-
- This is read directly from the partition table, since current fdisk does
- not support reading GPT partition GUIDs. It does not require special tools
- (gfdisk). This is the GUID which identifies the partition, created with
- the partition table, as opposed to the filesystem UUID, created with the
- filesystem. It is particularly useful for specifying the partition which
- the Linux kernel can use on boot to find the root filesystem, e.g. when
- using the kernel command line "root=PARTUUID=$UUID"
-
- Args:
- part_num: The partition number
- location: Location of the storage device containing the partition -
- an image path or device node
- Returns:
- A GUID string, e.g. 'B342D1AB-4B65-4601-97DC-D6DF3FE2E95E'
- """
-
- sector_size = get_sector_size(location)
- # The partition GUID is located two sectors (protective MBR + GPT header)
- # plus 128 bytes for each partition entry in the table, plus 16 bytes for
- # the location of the partition's GUID
- guid_offset = (2 * sector_size) + (128 * (part_num - 1)) + 16
-
- with open(location, 'rb') as f:
- f.seek(guid_offset)
- raw_uuid_bin = f.read(16)
-
- a = ''
- for c in raw_uuid_bin:
- a += '%02X' % ord(c)
-
- return ('%s%s%s%s-%s%s-%s%s-%s-%s' %
- (a[6:8], a[4:6], a[2:4], a[0:2],
- a[10:12], a[8:10],
- a[14:16], a[12:14],
- a[16:20], a[20:32]))