summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2019-06-06 17:17:52 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2019-06-06 17:17:52 +0000
commit640f0ca5a17a144448a48de2a80e1f7a655eb9b2 (patch)
tree6ec117b1f0492fb8df68f9058d11dddc5c615f02
parent492b2ad16419c77a6ab33b7608897f5863ff1104 (diff)
parented51c482ff86478e0130086705b5618f70fab6da (diff)
downloadbuildstream-640f0ca5a17a144448a48de2a80e1f7a655eb9b2.tar.gz
Merge branch 'aevri/job_msg_enum' into 'master'
Use enums for Job-related constants See merge request BuildStream/buildstream!1380
-rw-r--r--src/buildstream/_frontend/app.py2
-rw-r--r--src/buildstream/_scheduler/jobs/cachesizejob.py2
-rw-r--r--src/buildstream/_scheduler/jobs/cleanupjob.py2
-rw-r--r--src/buildstream/_scheduler/jobs/job.py67
-rw-r--r--src/buildstream/_scheduler/queues/buildqueue.py2
-rw-r--r--src/buildstream/_scheduler/queues/fetchqueue.py2
-rw-r--r--src/buildstream/_scheduler/queues/pullqueue.py4
-rw-r--r--src/buildstream/_scheduler/queues/queue.py4
-rw-r--r--src/buildstream/_scheduler/queues/trackqueue.py2
-rw-r--r--src/buildstream/_scheduler/scheduler.py2
10 files changed, 52 insertions, 37 deletions
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index d4ea83871..7aff52af6 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -526,7 +526,7 @@ class App():
# Dont attempt to handle a failure if the user has already opted to
# terminate
- if status == JobStatus.FAIL and not self.stream.terminated:
+ if status is JobStatus.FAIL and not self.stream.terminated:
if isinstance(job, ElementJob):
element = job.element
diff --git a/src/buildstream/_scheduler/jobs/cachesizejob.py b/src/buildstream/_scheduler/jobs/cachesizejob.py
index ed1cc4131..f36c30190 100644
--- a/src/buildstream/_scheduler/jobs/cachesizejob.py
+++ b/src/buildstream/_scheduler/jobs/cachesizejob.py
@@ -28,7 +28,7 @@ class CacheSizeJob(Job):
self._casquota = context.get_casquota()
def parent_complete(self, status, result):
- if status == JobStatus.OK:
+ if status is JobStatus.OK:
self._casquota.set_cache_size(result)
if self._complete_cb:
diff --git a/src/buildstream/_scheduler/jobs/cleanupjob.py b/src/buildstream/_scheduler/jobs/cleanupjob.py
index 327d687d3..85722c83f 100644
--- a/src/buildstream/_scheduler/jobs/cleanupjob.py
+++ b/src/buildstream/_scheduler/jobs/cleanupjob.py
@@ -33,7 +33,7 @@ class CleanupJob(Job):
self._casquota.set_cache_size(message, write_to_disk=False)
def parent_complete(self, status, result):
- if status == JobStatus.OK:
+ if status is JobStatus.OK:
self._casquota.set_cache_size(result, write_to_disk=False)
if self._complete_cb:
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 3c10eef62..ed90bb3a4 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -20,6 +20,7 @@
# Tristan Maat <tristan.maat@codethink.co.uk>
# System imports
+import enum
import os
import sys
import signal
@@ -33,12 +34,15 @@ from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
from ..._message import Message, MessageType, unconditional_messages
from ... import _signals, utils
+
# Return code values shutdown of job handling child processes
#
-RC_OK = 0
-RC_FAIL = 1
-RC_PERM_FAIL = 2
-RC_SKIPPED = 3
+@enum.unique
+class _ReturnCode(enum.IntEnum):
+ OK = 0
+ FAIL = 1
+ PERM_FAIL = 2
+ SKIPPED = 3
# JobStatus:
@@ -46,7 +50,8 @@ RC_SKIPPED = 3
# The job completion status, passed back through the
# complete callbacks.
#
-class JobStatus():
+@enum.unique
+class JobStatus(enum.Enum):
# Job succeeded
OK = 0
@@ -73,6 +78,15 @@ class Process(multiprocessing.Process):
self._sentinel = self._popen.sentinel
+@enum.unique
+class _MessageType(enum.Enum):
+ LOG_MESSAGE = 1
+ ERROR = 2
+ RESULT = 3
+ CHILD_DATA = 4
+ SUBCLASS_CUSTOM_MESSAGE = 5
+
+
# Job()
#
# The Job object represents a task that will run in parallel to the main
@@ -415,7 +429,7 @@ class Job():
self._parent_shutdown()
# We don't want to retry if we got OK or a permanent fail.
- retry_flag = returncode == RC_FAIL
+ retry_flag = returncode == _ReturnCode.FAIL
if retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
self.start()
@@ -423,11 +437,11 @@ class Job():
# Resolve the outward facing overall job completion status
#
- if returncode == RC_OK:
+ if returncode == _ReturnCode.OK:
status = JobStatus.OK
- elif returncode == RC_SKIPPED:
+ elif returncode == _ReturnCode.SKIPPED:
status = JobStatus.SKIPPED
- elif returncode in (RC_FAIL, RC_PERM_FAIL):
+ elif returncode in (_ReturnCode.FAIL, _ReturnCode.PERM_FAIL):
status = JobStatus.FAIL
else:
status = JobStatus.FAIL
@@ -453,23 +467,23 @@ class Job():
if not self._listening:
return
- if envelope.message_type == 'message':
+ if envelope.message_type is _MessageType.LOG_MESSAGE:
# Propagate received messages from children
# back through the context.
self._scheduler.context.message(envelope.message)
- elif envelope.message_type == 'error':
+ elif envelope.message_type is _MessageType.ERROR:
# For regression tests only, save the last error domain / reason
# reported from a child task in the main process, this global state
# is currently managed in _exceptions.py
set_last_task_error(envelope.message['domain'],
envelope.message['reason'])
- elif envelope.message_type == 'result':
+ elif envelope.message_type is _MessageType.RESULT:
assert self._result is None
self._result = envelope.message
- elif envelope.message_type == 'child_data':
+ elif envelope.message_type is _MessageType.CHILD_DATA:
# If we retry a job, we assign a new value to this
self.child_data = envelope.message
- elif envelope.message_type == 'subclass_custom_message':
+ elif envelope.message_type is _MessageType.SUBCLASS_CUSTOM_MESSAGE:
self.handle_message(envelope.message)
else:
assert False, "Unhandled message type '{}': {}".format(
@@ -598,7 +612,7 @@ class ChildJob():
# instances). This is sent to the parent Job.
#
def send_message(self, message_data):
- self._send_message('subclass_custom_message', message_data)
+ self._send_message(_MessageType.SUBCLASS_CUSTOM_MESSAGE, message_data)
#######################################################
# Abstract Methods #
@@ -689,7 +703,7 @@ class ChildJob():
elapsed=elapsed, logfile=filename)
# Alert parent of skip by return code
- self._child_shutdown(RC_SKIPPED)
+ self._child_shutdown(_ReturnCode.SKIPPED)
except BstError as e:
elapsed = datetime.datetime.now() - starttime
retry_flag = e.temporary
@@ -703,14 +717,14 @@ class ChildJob():
elapsed=elapsed, detail=e.detail,
logfile=filename, sandbox=e.sandbox)
- self._send_message('child_data', self.child_process_data())
+ self._send_message(_MessageType.CHILD_DATA, self.child_process_data())
# Report the exception to the parent (for internal testing purposes)
self._child_send_error(e)
# Set return code based on whether or not the error was temporary.
#
- self._child_shutdown(RC_FAIL if retry_flag else RC_PERM_FAIL)
+ self._child_shutdown(_ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL)
except Exception: # pylint: disable=broad-except
@@ -725,11 +739,11 @@ class ChildJob():
elapsed=elapsed, detail=detail,
logfile=filename)
# Unhandled exceptions should permenantly fail
- self._child_shutdown(RC_PERM_FAIL)
+ self._child_shutdown(_ReturnCode.PERM_FAIL)
else:
# No exception occurred in the action
- self._send_message('child_data', self.child_process_data())
+ self._send_message(_MessageType.CHILD_DATA, self.child_process_data())
self._child_send_result(result)
elapsed = datetime.datetime.now() - starttime
@@ -739,7 +753,7 @@ class ChildJob():
# Shutdown needs to stay outside of the above context manager,
# make sure we dont try to handle SIGTERM while the process
# is already busy in sys.exit()
- self._child_shutdown(RC_OK)
+ self._child_shutdown(_ReturnCode.OK)
#######################################################
# Local Private Methods #
@@ -773,7 +787,7 @@ class ChildJob():
domain = e.domain
reason = e.reason
- self._send_message('error', {
+ self._send_message(_MessageType.ERROR, {
'domain': domain,
'reason': reason
})
@@ -792,18 +806,19 @@ class ChildJob():
#
def _child_send_result(self, result):
if result is not None:
- self._send_message('result', result)
+ self._send_message(_MessageType.RESULT, result)
# _child_shutdown()
#
# Shuts down the child process by cleaning up and exiting the process
#
# Args:
- # exit_code (int): The exit code to exit with
+ # exit_code (_ReturnCode): The exit code to exit with
#
def _child_shutdown(self, exit_code):
self._queue.close()
- sys.exit(exit_code)
+ assert isinstance(exit_code, _ReturnCode)
+ sys.exit(int(exit_code))
# _child_message_handler()
#
@@ -828,4 +843,4 @@ class ChildJob():
if message.message_type == MessageType.LOG:
return
- self._send_message('message', message)
+ self._send_message(_MessageType.LOG_MESSAGE, message)
diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py
index aa489f381..dc82f54ec 100644
--- a/src/buildstream/_scheduler/queues/buildqueue.py
+++ b/src/buildstream/_scheduler/queues/buildqueue.py
@@ -113,5 +113,5 @@ class BuildQueue(Queue):
# artifact cache size for a successful build even though we know a
# failed build also grows the artifact cache size.
#
- if status == JobStatus.OK:
+ if status is JobStatus.OK:
self._check_cache_size(job, element, result)
diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py
index 9edeebb1d..90db77f42 100644
--- a/src/buildstream/_scheduler/queues/fetchqueue.py
+++ b/src/buildstream/_scheduler/queues/fetchqueue.py
@@ -68,7 +68,7 @@ class FetchQueue(Queue):
def done(self, _, element, result, status):
- if status == JobStatus.FAIL:
+ if status is JobStatus.FAIL:
return
element._fetch_done()
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index 013ee6489..374181cda 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -54,7 +54,7 @@ class PullQueue(Queue):
def done(self, _, element, result, status):
- if status == JobStatus.FAIL:
+ if status is JobStatus.FAIL:
return
element._pull_done()
@@ -62,5 +62,5 @@ class PullQueue(Queue):
# Build jobs will check the "approximate" size first. Since we
# do not get an artifact size from pull jobs, we have to
# actually check the cache size.
- if status == JobStatus.OK:
+ if status is JobStatus.OK:
self._scheduler.check_cache_size()
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 1efcffc16..7740896b5 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -305,9 +305,9 @@ class Queue():
self._done_queue.append(element)
# These lists are for bookkeeping purposes for the UI and logging.
- if status == JobStatus.SKIPPED or job.get_terminated():
+ if status is JobStatus.SKIPPED or job.get_terminated():
self.skipped_elements.append(element)
- elif status == JobStatus.OK:
+ elif status is JobStatus.OK:
self.processed_elements.append(element)
else:
self.failed_elements.append(element)
diff --git a/src/buildstream/_scheduler/queues/trackqueue.py b/src/buildstream/_scheduler/queues/trackqueue.py
index 72a79a532..194bb7e1d 100644
--- a/src/buildstream/_scheduler/queues/trackqueue.py
+++ b/src/buildstream/_scheduler/queues/trackqueue.py
@@ -50,7 +50,7 @@ class TrackQueue(Queue):
def done(self, _, element, result, status):
- if status == JobStatus.FAIL:
+ if status is JobStatus.FAIL:
return
# Set the new refs in the main process one by one as they complete,
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 80e14dcd0..da3016b59 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -329,7 +329,7 @@ class Scheduler():
)
# Schedule a cleanup job if we've hit the threshold
- if status != JobStatus.OK:
+ if status is not JobStatus.OK:
return
context = self.context