summaryrefslogtreecommitdiff
path: root/src/buildstream/_scheduler/resources.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_scheduler/resources.py')
-rw-r--r--src/buildstream/_scheduler/resources.py166
1 files changed, 166 insertions, 0 deletions
diff --git a/src/buildstream/_scheduler/resources.py b/src/buildstream/_scheduler/resources.py
new file mode 100644
index 000000000..73bf66b4a
--- /dev/null
+++ b/src/buildstream/_scheduler/resources.py
@@ -0,0 +1,166 @@
+class ResourceType():
+ CACHE = 0
+ DOWNLOAD = 1
+ PROCESS = 2
+ UPLOAD = 3
+
+
+class Resources():
+ def __init__(self, num_builders, num_fetchers, num_pushers):
+ self._max_resources = {
+ ResourceType.CACHE: 0,
+ ResourceType.DOWNLOAD: num_fetchers,
+ ResourceType.PROCESS: num_builders,
+ ResourceType.UPLOAD: num_pushers
+ }
+
+ # Resources jobs are currently using.
+ self._used_resources = {
+ ResourceType.CACHE: 0,
+ ResourceType.DOWNLOAD: 0,
+ ResourceType.PROCESS: 0,
+ ResourceType.UPLOAD: 0
+ }
+
+ # Resources jobs currently want exclusive access to. The set
+ # of jobs that have asked for exclusive access is the value -
+ # this is so that we can avoid scheduling any other jobs until
+ # *all* exclusive jobs that "register interest" have finished
+ # - which avoids starving them of scheduling time.
+ self._exclusive_resources = {
+ ResourceType.CACHE: set(),
+ ResourceType.DOWNLOAD: set(),
+ ResourceType.PROCESS: set(),
+ ResourceType.UPLOAD: set()
+ }
+
+ # reserve()
+ #
+ # Reserves a set of resources
+ #
+ # Args:
+ # resources (set): A set of ResourceTypes
+ # exclusive (set): Another set of ResourceTypes
+ # peek (bool): Whether to only peek at whether the resource is available
+ #
+ # Returns:
+ # (bool): True if the resources could be reserved
+ #
+ def reserve(self, resources, exclusive=None, *, peek=False):
+ if exclusive is None:
+ exclusive = set()
+
+ resources = set(resources)
+ exclusive = set(exclusive)
+
+ # First, we check if the job wants to access a resource that
+ # another job wants exclusive access to. If so, it cannot be
+ # scheduled.
+ #
+ # Note that if *both* jobs want this exclusively, we don't
+ # fail yet.
+ #
+ # FIXME: I *think* we can deadlock if two jobs want disjoint
+ # sets of exclusive and non-exclusive resources. This
+ # is currently not possible, but may be worth thinking
+ # about.
+ #
+ for resource in resources - exclusive:
+
+ # If our job wants this resource exclusively, we never
+ # check this, so we can get away with not (temporarily)
+ # removing it from the set.
+ if self._exclusive_resources[resource]:
+ return False
+
+ # Now we check if anything is currently using any resources
+ # this job wants exclusively. If so, the job cannot be
+ # scheduled.
+ #
+ # Since jobs that use a resource exclusively are also using
+ # it, this means only one exclusive job can ever be scheduled
+ # at a time, despite being allowed to be part of the exclusive
+ # set.
+ #
+ for resource in exclusive:
+ if self._used_resources[resource] != 0:
+ return False
+
+ # Finally, we check if we have enough of each resource
+ # available. If we don't have enough, the job cannot be
+ # scheduled.
+ for resource in resources:
+ if (self._max_resources[resource] > 0 and
+ self._used_resources[resource] >= self._max_resources[resource]):
+ return False
+
+ # Now we register the fact that our job is using the resources
+ # it asked for, and tell the scheduler that it is allowed to
+ # continue.
+ if not peek:
+ for resource in resources:
+ self._used_resources[resource] += 1
+
+ return True
+
+ # release()
+ #
+ # Release resources previously reserved with Resources.reserve()
+ #
+ # Args:
+ # resources (set): A set of resources to release
+ #
+ def release(self, resources):
+ for resource in resources:
+ assert self._used_resources[resource] > 0, "Scheduler resource imbalance"
+ self._used_resources[resource] -= 1
+
+ # register_exclusive_interest()
+ #
+ # Inform the resources pool that `source` has an interest in
+ # reserving this resource exclusively.
+ #
+ # The source parameter is used to identify the caller, it
+ # must be ensured to be unique for the time that the
+ # interest is registered.
+ #
+ # This function may be called multiple times, and subsequent
+ # calls will simply have no effect until clear_exclusive_interest()
+ # is used to clear the interest.
+ #
+ # This must be called in advance of reserve()
+ #
+ # Args:
+ # resources (set): Set of resources to reserve exclusively
+ # source (any): Source identifier, to be used again when unregistering
+ # the interest.
+ #
+ def register_exclusive_interest(self, resources, source):
+
+ # The very first thing we do is to register any exclusive
+ # resources this job may want. Even if the job is not yet
+ # allowed to run (because another job is holding the resource
+ # it wants), we can still set this - it just means that any
+ # job *currently* using these resources has to finish first,
+ # and no new jobs wanting these can be launched (except other
+ # exclusive-access jobs).
+ #
+ for resource in resources:
+ self._exclusive_resources[resource].add(source)
+
+ # unregister_exclusive_interest()
+ #
+ # Clear the exclusive interest in these resources.
+ #
+ # This should be called by the given source which registered
+ # an exclusive interest.
+ #
+ # Args:
+ # resources (set): Set of resources to reserve exclusively
+ # source (str): Source identifier, to be used again when unregistering
+ # the interest.
+ #
+ def unregister_exclusive_interest(self, resources, source):
+
+ for resource in resources:
+ self._exclusive_resources[resource].discard(source)