diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2019-06-06 17:17:52 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-06-06 17:17:52 +0000 |
commit | 640f0ca5a17a144448a48de2a80e1f7a655eb9b2 (patch) | |
tree | 6ec117b1f0492fb8df68f9058d11dddc5c615f02 | |
parent | 492b2ad16419c77a6ab33b7608897f5863ff1104 (diff) | |
parent | ed51c482ff86478e0130086705b5618f70fab6da (diff) | |
download | buildstream-640f0ca5a17a144448a48de2a80e1f7a655eb9b2.tar.gz |
Merge branch 'aevri/job_msg_enum' into 'master'
Use enums for Job-related constants
See merge request BuildStream/buildstream!1380
-rw-r--r-- | src/buildstream/_frontend/app.py | 2 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/cachesizejob.py | 2 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/cleanupjob.py | 2 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 67 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/buildqueue.py | 2 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/fetchqueue.py | 2 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/pullqueue.py | 4 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/queue.py | 4 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/trackqueue.py | 2 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 2 |
10 files changed, 52 insertions, 37 deletions
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py index d4ea83871..7aff52af6 100644 --- a/src/buildstream/_frontend/app.py +++ b/src/buildstream/_frontend/app.py @@ -526,7 +526,7 @@ class App(): # Dont attempt to handle a failure if the user has already opted to # terminate - if status == JobStatus.FAIL and not self.stream.terminated: + if status is JobStatus.FAIL and not self.stream.terminated: if isinstance(job, ElementJob): element = job.element diff --git a/src/buildstream/_scheduler/jobs/cachesizejob.py b/src/buildstream/_scheduler/jobs/cachesizejob.py index ed1cc4131..f36c30190 100644 --- a/src/buildstream/_scheduler/jobs/cachesizejob.py +++ b/src/buildstream/_scheduler/jobs/cachesizejob.py @@ -28,7 +28,7 @@ class CacheSizeJob(Job): self._casquota = context.get_casquota() def parent_complete(self, status, result): - if status == JobStatus.OK: + if status is JobStatus.OK: self._casquota.set_cache_size(result) if self._complete_cb: diff --git a/src/buildstream/_scheduler/jobs/cleanupjob.py b/src/buildstream/_scheduler/jobs/cleanupjob.py index 327d687d3..85722c83f 100644 --- a/src/buildstream/_scheduler/jobs/cleanupjob.py +++ b/src/buildstream/_scheduler/jobs/cleanupjob.py @@ -33,7 +33,7 @@ class CleanupJob(Job): self._casquota.set_cache_size(message, write_to_disk=False) def parent_complete(self, status, result): - if status == JobStatus.OK: + if status is JobStatus.OK: self._casquota.set_cache_size(result, write_to_disk=False) if self._complete_cb: diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 3c10eef62..ed90bb3a4 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -20,6 +20,7 @@ # Tristan Maat <tristan.maat@codethink.co.uk> # System imports +import enum import os import sys import signal @@ -33,12 +34,15 @@ from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob from ..._message import Message, MessageType, unconditional_messages from ... import _signals, utils + # Return code values shutdown of job handling child processes # -RC_OK = 0 -RC_FAIL = 1 -RC_PERM_FAIL = 2 -RC_SKIPPED = 3 +@enum.unique +class _ReturnCode(enum.IntEnum): + OK = 0 + FAIL = 1 + PERM_FAIL = 2 + SKIPPED = 3 # JobStatus: @@ -46,7 +50,8 @@ RC_SKIPPED = 3 # The job completion status, passed back through the # complete callbacks. # -class JobStatus(): +@enum.unique +class JobStatus(enum.Enum): # Job succeeded OK = 0 @@ -73,6 +78,15 @@ class Process(multiprocessing.Process): self._sentinel = self._popen.sentinel +@enum.unique +class _MessageType(enum.Enum): + LOG_MESSAGE = 1 + ERROR = 2 + RESULT = 3 + CHILD_DATA = 4 + SUBCLASS_CUSTOM_MESSAGE = 5 + + # Job() # # The Job object represents a task that will run in parallel to the main @@ -415,7 +429,7 @@ class Job(): self._parent_shutdown() # We don't want to retry if we got OK or a permanent fail. - retry_flag = returncode == RC_FAIL + retry_flag = returncode == _ReturnCode.FAIL if retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated: self.start() @@ -423,11 +437,11 @@ class Job(): # Resolve the outward facing overall job completion status # - if returncode == RC_OK: + if returncode == _ReturnCode.OK: status = JobStatus.OK - elif returncode == RC_SKIPPED: + elif returncode == _ReturnCode.SKIPPED: status = JobStatus.SKIPPED - elif returncode in (RC_FAIL, RC_PERM_FAIL): + elif returncode in (_ReturnCode.FAIL, _ReturnCode.PERM_FAIL): status = JobStatus.FAIL else: status = JobStatus.FAIL @@ -453,23 +467,23 @@ class Job(): if not self._listening: return - if envelope.message_type == 'message': + if envelope.message_type is _MessageType.LOG_MESSAGE: # Propagate received messages from children # back through the context. self._scheduler.context.message(envelope.message) - elif envelope.message_type == 'error': + elif envelope.message_type is _MessageType.ERROR: # For regression tests only, save the last error domain / reason # reported from a child task in the main process, this global state # is currently managed in _exceptions.py set_last_task_error(envelope.message['domain'], envelope.message['reason']) - elif envelope.message_type == 'result': + elif envelope.message_type is _MessageType.RESULT: assert self._result is None self._result = envelope.message - elif envelope.message_type == 'child_data': + elif envelope.message_type is _MessageType.CHILD_DATA: # If we retry a job, we assign a new value to this self.child_data = envelope.message - elif envelope.message_type == 'subclass_custom_message': + elif envelope.message_type is _MessageType.SUBCLASS_CUSTOM_MESSAGE: self.handle_message(envelope.message) else: assert False, "Unhandled message type '{}': {}".format( @@ -598,7 +612,7 @@ class ChildJob(): # instances). This is sent to the parent Job. # def send_message(self, message_data): - self._send_message('subclass_custom_message', message_data) + self._send_message(_MessageType.SUBCLASS_CUSTOM_MESSAGE, message_data) ####################################################### # Abstract Methods # @@ -689,7 +703,7 @@ class ChildJob(): elapsed=elapsed, logfile=filename) # Alert parent of skip by return code - self._child_shutdown(RC_SKIPPED) + self._child_shutdown(_ReturnCode.SKIPPED) except BstError as e: elapsed = datetime.datetime.now() - starttime retry_flag = e.temporary @@ -703,14 +717,14 @@ class ChildJob(): elapsed=elapsed, detail=e.detail, logfile=filename, sandbox=e.sandbox) - self._send_message('child_data', self.child_process_data()) + self._send_message(_MessageType.CHILD_DATA, self.child_process_data()) # Report the exception to the parent (for internal testing purposes) self._child_send_error(e) # Set return code based on whether or not the error was temporary. # - self._child_shutdown(RC_FAIL if retry_flag else RC_PERM_FAIL) + self._child_shutdown(_ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL) except Exception: # pylint: disable=broad-except @@ -725,11 +739,11 @@ class ChildJob(): elapsed=elapsed, detail=detail, logfile=filename) # Unhandled exceptions should permenantly fail - self._child_shutdown(RC_PERM_FAIL) + self._child_shutdown(_ReturnCode.PERM_FAIL) else: # No exception occurred in the action - self._send_message('child_data', self.child_process_data()) + self._send_message(_MessageType.CHILD_DATA, self.child_process_data()) self._child_send_result(result) elapsed = datetime.datetime.now() - starttime @@ -739,7 +753,7 @@ class ChildJob(): # Shutdown needs to stay outside of the above context manager, # make sure we dont try to handle SIGTERM while the process # is already busy in sys.exit() - self._child_shutdown(RC_OK) + self._child_shutdown(_ReturnCode.OK) ####################################################### # Local Private Methods # @@ -773,7 +787,7 @@ class ChildJob(): domain = e.domain reason = e.reason - self._send_message('error', { + self._send_message(_MessageType.ERROR, { 'domain': domain, 'reason': reason }) @@ -792,18 +806,19 @@ class ChildJob(): # def _child_send_result(self, result): if result is not None: - self._send_message('result', result) + self._send_message(_MessageType.RESULT, result) # _child_shutdown() # # Shuts down the child process by cleaning up and exiting the process # # Args: - # exit_code (int): The exit code to exit with + # exit_code (_ReturnCode): The exit code to exit with # def _child_shutdown(self, exit_code): self._queue.close() - sys.exit(exit_code) + assert isinstance(exit_code, _ReturnCode) + sys.exit(int(exit_code)) # _child_message_handler() # @@ -828,4 +843,4 @@ class ChildJob(): if message.message_type == MessageType.LOG: return - self._send_message('message', message) + self._send_message(_MessageType.LOG_MESSAGE, message) diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py index aa489f381..dc82f54ec 100644 --- a/src/buildstream/_scheduler/queues/buildqueue.py +++ b/src/buildstream/_scheduler/queues/buildqueue.py @@ -113,5 +113,5 @@ class BuildQueue(Queue): # artifact cache size for a successful build even though we know a # failed build also grows the artifact cache size. # - if status == JobStatus.OK: + if status is JobStatus.OK: self._check_cache_size(job, element, result) diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py index 9edeebb1d..90db77f42 100644 --- a/src/buildstream/_scheduler/queues/fetchqueue.py +++ b/src/buildstream/_scheduler/queues/fetchqueue.py @@ -68,7 +68,7 @@ class FetchQueue(Queue): def done(self, _, element, result, status): - if status == JobStatus.FAIL: + if status is JobStatus.FAIL: return element._fetch_done() diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py index 013ee6489..374181cda 100644 --- a/src/buildstream/_scheduler/queues/pullqueue.py +++ b/src/buildstream/_scheduler/queues/pullqueue.py @@ -54,7 +54,7 @@ class PullQueue(Queue): def done(self, _, element, result, status): - if status == JobStatus.FAIL: + if status is JobStatus.FAIL: return element._pull_done() @@ -62,5 +62,5 @@ class PullQueue(Queue): # Build jobs will check the "approximate" size first. Since we # do not get an artifact size from pull jobs, we have to # actually check the cache size. - if status == JobStatus.OK: + if status is JobStatus.OK: self._scheduler.check_cache_size() diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index 1efcffc16..7740896b5 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -305,9 +305,9 @@ class Queue(): self._done_queue.append(element) # These lists are for bookkeeping purposes for the UI and logging. - if status == JobStatus.SKIPPED or job.get_terminated(): + if status is JobStatus.SKIPPED or job.get_terminated(): self.skipped_elements.append(element) - elif status == JobStatus.OK: + elif status is JobStatus.OK: self.processed_elements.append(element) else: self.failed_elements.append(element) diff --git a/src/buildstream/_scheduler/queues/trackqueue.py b/src/buildstream/_scheduler/queues/trackqueue.py index 72a79a532..194bb7e1d 100644 --- a/src/buildstream/_scheduler/queues/trackqueue.py +++ b/src/buildstream/_scheduler/queues/trackqueue.py @@ -50,7 +50,7 @@ class TrackQueue(Queue): def done(self, _, element, result, status): - if status == JobStatus.FAIL: + if status is JobStatus.FAIL: return # Set the new refs in the main process one by one as they complete, diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 80e14dcd0..da3016b59 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -329,7 +329,7 @@ class Scheduler(): ) # Schedule a cleanup job if we've hit the threshold - if status != JobStatus.OK: + if status is not JobStatus.OK: return context = self.context |