diff options
Diffstat (limited to 'src/buildstream/_scheduler/resources.py')
-rw-r--r-- | src/buildstream/_scheduler/resources.py | 166 |
1 files changed, 166 insertions, 0 deletions
diff --git a/src/buildstream/_scheduler/resources.py b/src/buildstream/_scheduler/resources.py new file mode 100644 index 000000000..73bf66b4a --- /dev/null +++ b/src/buildstream/_scheduler/resources.py @@ -0,0 +1,166 @@ +class ResourceType(): + CACHE = 0 + DOWNLOAD = 1 + PROCESS = 2 + UPLOAD = 3 + + +class Resources(): + def __init__(self, num_builders, num_fetchers, num_pushers): + self._max_resources = { + ResourceType.CACHE: 0, + ResourceType.DOWNLOAD: num_fetchers, + ResourceType.PROCESS: num_builders, + ResourceType.UPLOAD: num_pushers + } + + # Resources jobs are currently using. + self._used_resources = { + ResourceType.CACHE: 0, + ResourceType.DOWNLOAD: 0, + ResourceType.PROCESS: 0, + ResourceType.UPLOAD: 0 + } + + # Resources jobs currently want exclusive access to. The set + # of jobs that have asked for exclusive access is the value - + # this is so that we can avoid scheduling any other jobs until + # *all* exclusive jobs that "register interest" have finished + # - which avoids starving them of scheduling time. + self._exclusive_resources = { + ResourceType.CACHE: set(), + ResourceType.DOWNLOAD: set(), + ResourceType.PROCESS: set(), + ResourceType.UPLOAD: set() + } + + # reserve() + # + # Reserves a set of resources + # + # Args: + # resources (set): A set of ResourceTypes + # exclusive (set): Another set of ResourceTypes + # peek (bool): Whether to only peek at whether the resource is available + # + # Returns: + # (bool): True if the resources could be reserved + # + def reserve(self, resources, exclusive=None, *, peek=False): + if exclusive is None: + exclusive = set() + + resources = set(resources) + exclusive = set(exclusive) + + # First, we check if the job wants to access a resource that + # another job wants exclusive access to. If so, it cannot be + # scheduled. + # + # Note that if *both* jobs want this exclusively, we don't + # fail yet. + # + # FIXME: I *think* we can deadlock if two jobs want disjoint + # sets of exclusive and non-exclusive resources. This + # is currently not possible, but may be worth thinking + # about. + # + for resource in resources - exclusive: + + # If our job wants this resource exclusively, we never + # check this, so we can get away with not (temporarily) + # removing it from the set. + if self._exclusive_resources[resource]: + return False + + # Now we check if anything is currently using any resources + # this job wants exclusively. If so, the job cannot be + # scheduled. + # + # Since jobs that use a resource exclusively are also using + # it, this means only one exclusive job can ever be scheduled + # at a time, despite being allowed to be part of the exclusive + # set. + # + for resource in exclusive: + if self._used_resources[resource] != 0: + return False + + # Finally, we check if we have enough of each resource + # available. If we don't have enough, the job cannot be + # scheduled. + for resource in resources: + if (self._max_resources[resource] > 0 and + self._used_resources[resource] >= self._max_resources[resource]): + return False + + # Now we register the fact that our job is using the resources + # it asked for, and tell the scheduler that it is allowed to + # continue. + if not peek: + for resource in resources: + self._used_resources[resource] += 1 + + return True + + # release() + # + # Release resources previously reserved with Resources.reserve() + # + # Args: + # resources (set): A set of resources to release + # + def release(self, resources): + for resource in resources: + assert self._used_resources[resource] > 0, "Scheduler resource imbalance" + self._used_resources[resource] -= 1 + + # register_exclusive_interest() + # + # Inform the resources pool that `source` has an interest in + # reserving this resource exclusively. + # + # The source parameter is used to identify the caller, it + # must be ensured to be unique for the time that the + # interest is registered. + # + # This function may be called multiple times, and subsequent + # calls will simply have no effect until clear_exclusive_interest() + # is used to clear the interest. + # + # This must be called in advance of reserve() + # + # Args: + # resources (set): Set of resources to reserve exclusively + # source (any): Source identifier, to be used again when unregistering + # the interest. + # + def register_exclusive_interest(self, resources, source): + + # The very first thing we do is to register any exclusive + # resources this job may want. Even if the job is not yet + # allowed to run (because another job is holding the resource + # it wants), we can still set this - it just means that any + # job *currently* using these resources has to finish first, + # and no new jobs wanting these can be launched (except other + # exclusive-access jobs). + # + for resource in resources: + self._exclusive_resources[resource].add(source) + + # unregister_exclusive_interest() + # + # Clear the exclusive interest in these resources. + # + # This should be called by the given source which registered + # an exclusive interest. + # + # Args: + # resources (set): Set of resources to reserve exclusively + # source (str): Source identifier, to be used again when unregistering + # the interest. + # + def unregister_exclusive_interest(self, resources, source): + + for resource in resources: + self._exclusive_resources[resource].discard(source) |