From 98900cd8c8765da904845aa59acdc265e4d6e926 Mon Sep 17 00:00:00 2001 From: Angelos Evripiotis Date: Thu, 6 Jun 2019 10:38:09 +0100 Subject: _scheduler/jobs/job: use enum for message types --- src/buildstream/_scheduler/jobs/job.py | 32 +++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 3c10eef62..0373d4df9 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -20,6 +20,7 @@ # Tristan Maat # System imports +import enum import os import sys import signal @@ -73,6 +74,15 @@ class Process(multiprocessing.Process): self._sentinel = self._popen.sentinel +@enum.unique +class _MessageType(enum.Enum): + LOG_MESSAGE = 1 + ERROR = 2 + RESULT = 3 + CHILD_DATA = 4 + SUBCLASS_CUSTOM_MESSAGE = 5 + + # Job() # # The Job object represents a task that will run in parallel to the main @@ -453,23 +463,23 @@ class Job(): if not self._listening: return - if envelope.message_type == 'message': + if envelope.message_type is _MessageType.LOG_MESSAGE: # Propagate received messages from children # back through the context. self._scheduler.context.message(envelope.message) - elif envelope.message_type == 'error': + elif envelope.message_type is _MessageType.ERROR: # For regression tests only, save the last error domain / reason # reported from a child task in the main process, this global state # is currently managed in _exceptions.py set_last_task_error(envelope.message['domain'], envelope.message['reason']) - elif envelope.message_type == 'result': + elif envelope.message_type is _MessageType.RESULT: assert self._result is None self._result = envelope.message - elif envelope.message_type == 'child_data': + elif envelope.message_type is _MessageType.CHILD_DATA: # If we retry a job, we assign a new value to this self.child_data = envelope.message - elif envelope.message_type == 'subclass_custom_message': + elif envelope.message_type is _MessageType.SUBCLASS_CUSTOM_MESSAGE: self.handle_message(envelope.message) else: assert False, "Unhandled message type '{}': {}".format( @@ -598,7 +608,7 @@ class ChildJob(): # instances). This is sent to the parent Job. # def send_message(self, message_data): - self._send_message('subclass_custom_message', message_data) + self._send_message(_MessageType.SUBCLASS_CUSTOM_MESSAGE, message_data) ####################################################### # Abstract Methods # @@ -703,7 +713,7 @@ class ChildJob(): elapsed=elapsed, detail=e.detail, logfile=filename, sandbox=e.sandbox) - self._send_message('child_data', self.child_process_data()) + self._send_message(_MessageType.CHILD_DATA, self.child_process_data()) # Report the exception to the parent (for internal testing purposes) self._child_send_error(e) @@ -729,7 +739,7 @@ class ChildJob(): else: # No exception occurred in the action - self._send_message('child_data', self.child_process_data()) + self._send_message(_MessageType.CHILD_DATA, self.child_process_data()) self._child_send_result(result) elapsed = datetime.datetime.now() - starttime @@ -773,7 +783,7 @@ class ChildJob(): domain = e.domain reason = e.reason - self._send_message('error', { + self._send_message(_MessageType.ERROR, { 'domain': domain, 'reason': reason }) @@ -792,7 +802,7 @@ class ChildJob(): # def _child_send_result(self, result): if result is not None: - self._send_message('result', result) + self._send_message(_MessageType.RESULT, result) # _child_shutdown() # @@ -828,4 +838,4 @@ class ChildJob(): if message.message_type == MessageType.LOG: return - self._send_message('message', message) + self._send_message(_MessageType.LOG_MESSAGE, message) -- cgit v1.2.1 From 63152f6c1dd4f7aea1e85cd5e334bd73952aa09d Mon Sep 17 00:00:00 2001 From: Angelos Evripiotis Date: Thu, 6 Jun 2019 10:59:20 +0100 Subject: _scheduler/jobs/job: use enum for return codes --- src/buildstream/_scheduler/jobs/job.py | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 0373d4df9..0ef316028 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -34,12 +34,15 @@ from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob from ..._message import Message, MessageType, unconditional_messages from ... import _signals, utils + # Return code values shutdown of job handling child processes # -RC_OK = 0 -RC_FAIL = 1 -RC_PERM_FAIL = 2 -RC_SKIPPED = 3 +@enum.unique +class _ReturnCode(enum.IntEnum): + OK = 0 + FAIL = 1 + PERM_FAIL = 2 + SKIPPED = 3 # JobStatus: @@ -425,7 +428,7 @@ class Job(): self._parent_shutdown() # We don't want to retry if we got OK or a permanent fail. - retry_flag = returncode == RC_FAIL + retry_flag = returncode == _ReturnCode.FAIL if retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated: self.start() @@ -433,11 +436,11 @@ class Job(): # Resolve the outward facing overall job completion status # - if returncode == RC_OK: + if returncode == _ReturnCode.OK: status = JobStatus.OK - elif returncode == RC_SKIPPED: + elif returncode == _ReturnCode.SKIPPED: status = JobStatus.SKIPPED - elif returncode in (RC_FAIL, RC_PERM_FAIL): + elif returncode in (_ReturnCode.FAIL, _ReturnCode.PERM_FAIL): status = JobStatus.FAIL else: status = JobStatus.FAIL @@ -699,7 +702,7 @@ class ChildJob(): elapsed=elapsed, logfile=filename) # Alert parent of skip by return code - self._child_shutdown(RC_SKIPPED) + self._child_shutdown(_ReturnCode.SKIPPED) except BstError as e: elapsed = datetime.datetime.now() - starttime retry_flag = e.temporary @@ -720,7 +723,7 @@ class ChildJob(): # Set return code based on whether or not the error was temporary. # - self._child_shutdown(RC_FAIL if retry_flag else RC_PERM_FAIL) + self._child_shutdown(_ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL) except Exception: # pylint: disable=broad-except @@ -735,7 +738,7 @@ class ChildJob(): elapsed=elapsed, detail=detail, logfile=filename) # Unhandled exceptions should permenantly fail - self._child_shutdown(RC_PERM_FAIL) + self._child_shutdown(_ReturnCode.PERM_FAIL) else: # No exception occurred in the action @@ -749,7 +752,7 @@ class ChildJob(): # Shutdown needs to stay outside of the above context manager, # make sure we dont try to handle SIGTERM while the process # is already busy in sys.exit() - self._child_shutdown(RC_OK) + self._child_shutdown(_ReturnCode.OK) ####################################################### # Local Private Methods # @@ -809,11 +812,12 @@ class ChildJob(): # Shuts down the child process by cleaning up and exiting the process # # Args: - # exit_code (int): The exit code to exit with + # exit_code (_ReturnCode): The exit code to exit with # def _child_shutdown(self, exit_code): self._queue.close() - sys.exit(exit_code) + assert isinstance(exit_code, _ReturnCode) + sys.exit(int(exit_code)) # _child_message_handler() # -- cgit v1.2.1 From b216febd888f2b7ab763db0045ac5736d0a9a16a Mon Sep 17 00:00:00 2001 From: Angelos Evripiotis Date: Thu, 6 Jun 2019 11:20:44 +0100 Subject: _scheduler/jobs/job: make JobStatus an enum This provides some minor guards against mistakes, and we'll be able to do type-checking later. This does open the possibility of problems if folks mistakenly try to pass off an integer as a JobStatus. --- src/buildstream/_scheduler/jobs/job.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 0ef316028..ed90bb3a4 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -50,7 +50,8 @@ class _ReturnCode(enum.IntEnum): # The job completion status, passed back through the # complete callbacks. # -class JobStatus(): +@enum.unique +class JobStatus(enum.Enum): # Job succeeded OK = 0 -- cgit v1.2.1 From ed51c482ff86478e0130086705b5618f70fab6da Mon Sep 17 00:00:00 2001 From: Angelos Evripiotis Date: Thu, 6 Jun 2019 11:23:16 +0100 Subject: Use 'is' when comparing against JobStatus Since JobStatus is an enum, it's clearer to compare using 'is' - equality comparison will fail in the same cases, but might lull folks into thinking that comparison with integer would also work. --- src/buildstream/_frontend/app.py | 2 +- src/buildstream/_scheduler/jobs/cachesizejob.py | 2 +- src/buildstream/_scheduler/jobs/cleanupjob.py | 2 +- src/buildstream/_scheduler/queues/buildqueue.py | 2 +- src/buildstream/_scheduler/queues/fetchqueue.py | 2 +- src/buildstream/_scheduler/queues/pullqueue.py | 4 ++-- src/buildstream/_scheduler/queues/queue.py | 4 ++-- src/buildstream/_scheduler/queues/trackqueue.py | 2 +- src/buildstream/_scheduler/scheduler.py | 2 +- 9 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py index d4ea83871..7aff52af6 100644 --- a/src/buildstream/_frontend/app.py +++ b/src/buildstream/_frontend/app.py @@ -526,7 +526,7 @@ class App(): # Dont attempt to handle a failure if the user has already opted to # terminate - if status == JobStatus.FAIL and not self.stream.terminated: + if status is JobStatus.FAIL and not self.stream.terminated: if isinstance(job, ElementJob): element = job.element diff --git a/src/buildstream/_scheduler/jobs/cachesizejob.py b/src/buildstream/_scheduler/jobs/cachesizejob.py index ed1cc4131..f36c30190 100644 --- a/src/buildstream/_scheduler/jobs/cachesizejob.py +++ b/src/buildstream/_scheduler/jobs/cachesizejob.py @@ -28,7 +28,7 @@ class CacheSizeJob(Job): self._casquota = context.get_casquota() def parent_complete(self, status, result): - if status == JobStatus.OK: + if status is JobStatus.OK: self._casquota.set_cache_size(result) if self._complete_cb: diff --git a/src/buildstream/_scheduler/jobs/cleanupjob.py b/src/buildstream/_scheduler/jobs/cleanupjob.py index 327d687d3..85722c83f 100644 --- a/src/buildstream/_scheduler/jobs/cleanupjob.py +++ b/src/buildstream/_scheduler/jobs/cleanupjob.py @@ -33,7 +33,7 @@ class CleanupJob(Job): self._casquota.set_cache_size(message, write_to_disk=False) def parent_complete(self, status, result): - if status == JobStatus.OK: + if status is JobStatus.OK: self._casquota.set_cache_size(result, write_to_disk=False) if self._complete_cb: diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py index aa489f381..dc82f54ec 100644 --- a/src/buildstream/_scheduler/queues/buildqueue.py +++ b/src/buildstream/_scheduler/queues/buildqueue.py @@ -113,5 +113,5 @@ class BuildQueue(Queue): # artifact cache size for a successful build even though we know a # failed build also grows the artifact cache size. # - if status == JobStatus.OK: + if status is JobStatus.OK: self._check_cache_size(job, element, result) diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py index 9edeebb1d..90db77f42 100644 --- a/src/buildstream/_scheduler/queues/fetchqueue.py +++ b/src/buildstream/_scheduler/queues/fetchqueue.py @@ -68,7 +68,7 @@ class FetchQueue(Queue): def done(self, _, element, result, status): - if status == JobStatus.FAIL: + if status is JobStatus.FAIL: return element._fetch_done() diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py index 013ee6489..374181cda 100644 --- a/src/buildstream/_scheduler/queues/pullqueue.py +++ b/src/buildstream/_scheduler/queues/pullqueue.py @@ -54,7 +54,7 @@ class PullQueue(Queue): def done(self, _, element, result, status): - if status == JobStatus.FAIL: + if status is JobStatus.FAIL: return element._pull_done() @@ -62,5 +62,5 @@ class PullQueue(Queue): # Build jobs will check the "approximate" size first. Since we # do not get an artifact size from pull jobs, we have to # actually check the cache size. - if status == JobStatus.OK: + if status is JobStatus.OK: self._scheduler.check_cache_size() diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index 1efcffc16..7740896b5 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -305,9 +305,9 @@ class Queue(): self._done_queue.append(element) # These lists are for bookkeeping purposes for the UI and logging. - if status == JobStatus.SKIPPED or job.get_terminated(): + if status is JobStatus.SKIPPED or job.get_terminated(): self.skipped_elements.append(element) - elif status == JobStatus.OK: + elif status is JobStatus.OK: self.processed_elements.append(element) else: self.failed_elements.append(element) diff --git a/src/buildstream/_scheduler/queues/trackqueue.py b/src/buildstream/_scheduler/queues/trackqueue.py index 72a79a532..194bb7e1d 100644 --- a/src/buildstream/_scheduler/queues/trackqueue.py +++ b/src/buildstream/_scheduler/queues/trackqueue.py @@ -50,7 +50,7 @@ class TrackQueue(Queue): def done(self, _, element, result, status): - if status == JobStatus.FAIL: + if status is JobStatus.FAIL: return # Set the new refs in the main process one by one as they complete, diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 80e14dcd0..da3016b59 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -329,7 +329,7 @@ class Scheduler(): ) # Schedule a cleanup job if we've hit the threshold - if status != JobStatus.OK: + if status is not JobStatus.OK: return context = self.context -- cgit v1.2.1