summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAngelos Evripiotis <jevripiotis@bloomberg.net>2019-06-05 14:37:33 +0100
committerbst-marge-bot <marge-bot@buildstream.build>2019-06-06 09:53:44 +0000
commit0e6454ba2652eaaeea8b1e8a9f93aa2803247395 (patch)
treeb686a86391d720caf777aa876a196062e95d5e42
parent63054a03ac9a07a156c11b65d358240e1954cb1f (diff)
downloadbuildstream-aevri/defensive_send_message.tar.gz
_scheduler/jobs: refactor, defensive send_messageaevri/defensive_send_message
Simplify the custom 'handle_message' / 'send_message' protocol by not requiring a message_type. These message types share a namespace with the base Job implementation, which could cause trouble. Introduce a new private '_send_message' to implement the old functionality. Subclasses are free to pack a message type into their own messages, this isn't necessary at present and simplifies existing subclass code.
-rw-r--r--src/buildstream/_scheduler/jobs/cleanupjob.py11
-rw-r--r--src/buildstream/_scheduler/jobs/job.py55
2 files changed, 36 insertions, 30 deletions
diff --git a/src/buildstream/_scheduler/jobs/cleanupjob.py b/src/buildstream/_scheduler/jobs/cleanupjob.py
index 672e784bc..327d687d3 100644
--- a/src/buildstream/_scheduler/jobs/cleanupjob.py
+++ b/src/buildstream/_scheduler/jobs/cleanupjob.py
@@ -27,14 +27,10 @@ class CleanupJob(Job):
context = self._scheduler.context
self._casquota = context.get_casquota()
- def handle_message(self, message_type, message):
+ def handle_message(self, message):
# Update the cache size in the main process as we go,
# this provides better feedback in the UI.
- if message_type == 'update-cache-size':
- self._casquota.set_cache_size(message, write_to_disk=False)
- return True
-
- return False
+ self._casquota.set_cache_size(message, write_to_disk=False)
def parent_complete(self, status, result):
if status == JobStatus.OK:
@@ -54,6 +50,5 @@ class ChildCleanupJob(ChildJob):
def child_process(self):
def progress():
- self.send_message('update-cache-size',
- self._casquota.get_cache_size())
+ self.send_message(self._casquota.get_cache_size())
return self._casquota.clean(progress)
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 2fb0788dd..0dccadf54 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -344,19 +344,19 @@ class Job():
# handle_message()
#
# Handle a custom message. This will be called in the main process in
- # response to any messages sent to the main proces using the
- # Job.send_message() API from inside a Job.child_process() implementation
+ # response to any messages sent to the main process using the
+ # Job.send_message() API from inside a Job.child_process() implementation.
+ #
+ # There is no need to implement this function if no custom messages are
+ # expected.
#
# Args:
- # message_type (str): A string to identify the message type
# message (any): A simple object (must be pickle-able, i.e. strings,
# lists, dicts, numbers, but not Element instances).
#
- # Returns:
- # (bool): Should return a truthy value if message_type is handled.
- #
- def handle_message(self, message_type, message):
- return False
+ def handle_message(self, message):
+ raise ImplError("Job '{kind}' does not implement handle_message()"
+ .format(kind=type(self).__name__))
# parent_complete()
#
@@ -470,12 +470,11 @@ class Job():
elif envelope.message_type == 'child_data':
# If we retry a job, we assign a new value to this
self.child_data = envelope.message
-
- # Try Job subclass specific messages now
- elif not self.handle_message(envelope.message_type,
- envelope.message):
- assert 0, "Unhandled message type '{}': {}" \
- .format(envelope.message_type, envelope.message)
+ elif envelope.message_type == 'subclass_custom_message':
+ self.handle_message(envelope.message)
+ else:
+ assert False, "Unhandled message type '{}': {}".format(
+ envelope.message_type, envelope.message)
# _parent_process_queue()
#
@@ -595,13 +594,12 @@ class ChildJob():
# 'message_type's.
#
# Args:
- # message_type (str): The type of message to send.
# message_data (any): A simple object (must be pickle-able, i.e.
# strings, lists, dicts, numbers, but not Element
# instances). This is sent to the parent Job.
#
- def send_message(self, message_type, message_data):
- self._queue.put(_Envelope(message_type, message_data))
+ def send_message(self, message_data):
+ self._send_message('subclass_custom_message', message_data)
#######################################################
# Abstract Methods #
@@ -706,7 +704,7 @@ class ChildJob():
elapsed=elapsed, detail=e.detail,
logfile=filename, sandbox=e.sandbox)
- self.send_message('child_data', self.child_process_data())
+ self._send_message('child_data', self.child_process_data())
# Report the exception to the parent (for internal testing purposes)
self._child_send_error(e)
@@ -732,7 +730,7 @@ class ChildJob():
else:
# No exception occurred in the action
- self.send_message('child_data', self.child_process_data())
+ self._send_message('child_data', self.child_process_data())
self._child_send_result(result)
elapsed = datetime.datetime.now() - starttime
@@ -748,6 +746,19 @@ class ChildJob():
# Local Private Methods #
#######################################################
+ # _send_message()
+ #
+ # Send data in a message to the parent Job, running in the main process.
+ #
+ # Args:
+ # message_type (str): The type of message to send.
+ # message_data (any): A simple object (must be pickle-able, i.e.
+ # strings, lists, dicts, numbers, but not Element
+ # instances). This is sent to the parent Job.
+ #
+ def _send_message(self, message_type, message_data):
+ self._queue.put(_Envelope(message_type, message_data))
+
# _child_send_error()
#
# Sends an error to the main process through the message queue
@@ -763,7 +774,7 @@ class ChildJob():
domain = e.domain
reason = e.reason
- self.send_message('error', {
+ self._send_message('error', {
'domain': domain,
'reason': reason
})
@@ -782,7 +793,7 @@ class ChildJob():
#
def _child_send_result(self, result):
if result is not None:
- self.send_message('result', result)
+ self._send_message('result', result)
# _child_shutdown()
#
@@ -818,4 +829,4 @@ class ChildJob():
if message.message_type == MessageType.LOG:
return
- self.send_message('message', message)
+ self._send_message('message', message)