summaryrefslogtreecommitdiff
path: root/distbuild
diff options
context:
space:
mode:
authorRichard Ipsum <richard.ipsum@codethink.co.uk>2014-02-24 18:21:33 +0000
committerRichard Ipsum <richard.ipsum@codethink.co.uk>2014-03-21 16:47:28 +0000
commit1de342b8a4cf13b295805855bfaa341bcd86277e (patch)
tree2b550a0d60532446dad50ee3ecc703a90bb6d780 /distbuild
parentf4b503b036f76c23c4f2cb99ca6596823b323035 (diff)
downloadmorph-1de342b8a4cf13b295805855bfaa341bcd86277e.tar.gz
Add the distbuild libs
Diffstat (limited to 'distbuild')
-rw-r--r--distbuild/__init__.py61
-rw-r--r--distbuild/build_controller.py531
-rw-r--r--distbuild/connection_machine.py145
-rw-r--r--distbuild/crashpoint.py126
-rw-r--r--distbuild/crashpoint_tests.py109
-rw-r--r--distbuild/eventsrc.py48
-rw-r--r--distbuild/helper_router.py197
-rw-r--r--distbuild/idgen.py33
-rw-r--r--distbuild/initiator.py195
-rw-r--r--distbuild/initiator_connection.py216
-rw-r--r--distbuild/jm.py98
-rw-r--r--distbuild/json_router.py164
-rw-r--r--distbuild/mainloop.py117
-rw-r--r--distbuild/protocol.py93
-rw-r--r--distbuild/proxy_event_source.py47
-rw-r--r--distbuild/route_map.py60
-rw-r--r--distbuild/route_map_tests.py56
-rw-r--r--distbuild/serialise.py166
-rw-r--r--distbuild/serialise_tests.py148
-rw-r--r--distbuild/sm.py139
-rw-r--r--distbuild/sm_tests.py86
-rw-r--r--distbuild/sockbuf.py159
-rw-r--r--distbuild/socketsrc.py166
-rw-r--r--distbuild/sockserv.py45
-rw-r--r--distbuild/stringbuffer.py90
-rw-r--r--distbuild/stringbuffer_tests.py140
-rw-r--r--distbuild/timer_event_source.py59
-rw-r--r--distbuild/worker_build_scheduler.py392
28 files changed, 3886 insertions, 0 deletions
diff --git a/distbuild/__init__.py b/distbuild/__init__.py
new file mode 100644
index 00000000..3e60d5ee
--- /dev/null
+++ b/distbuild/__init__.py
@@ -0,0 +1,61 @@
+# distbuild/__init__.py -- library for Morph's distributed build plugin
+#
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA..
+
+
+from stringbuffer import StringBuffer
+from sm import StateMachine
+from eventsrc import EventSource
+from socketsrc import (SocketError, NewConnection, ListeningSocketEventSource,
+ SocketReadable, SocketWriteable, SocketEventSource,
+ set_nonblocking)
+from sockbuf import (SocketBufferNewData, SocketBufferEof,
+ SocketBufferClosed, SocketBuffer)
+from mainloop import MainLoop
+from sockserv import ListenServer
+from jm import JsonMachine, JsonNewMessage, JsonEof
+
+from serialise import serialise_artifact, deserialise_artifact
+from idgen import IdentifierGenerator
+from route_map import RouteMap
+from timer_event_source import TimerEventSource, Timer
+from proxy_event_source import ProxyEventSource
+from json_router import JsonRouter
+from helper_router import (HelperRouter, HelperRequest, HelperOutput,
+ HelperResult)
+from initiator_connection import (InitiatorConnection, InitiatorDisconnect)
+from connection_machine import ConnectionMachine, Reconnect, StopConnecting
+from worker_build_scheduler import (WorkerBuildQueuer,
+ WorkerConnection,
+ WorkerBuildRequest,
+ WorkerCancelPending,
+ WorkerBuildOutput,
+ WorkerBuildCaching,
+ WorkerBuildFinished,
+ WorkerBuildFailed,
+ WorkerBuildStepStarted)
+from build_controller import (BuildController, BuildFailed, BuildProgress,
+ BuildSteps, BuildStepStarted, BuildOutput,
+ BuildStepFinished, BuildStepFailed,
+ BuildFinished, BuildCancel,
+ build_step_name, map_build_graph)
+from initiator import Initiator
+from protocol import message
+
+from crashpoint import (crash_point, add_crash_condition, add_crash_conditions,
+ clear_crash_conditions)
+
+__all__ = locals()
diff --git a/distbuild/build_controller.py b/distbuild/build_controller.py
new file mode 100644
index 00000000..a300f0d4
--- /dev/null
+++ b/distbuild/build_controller.py
@@ -0,0 +1,531 @@
+# distbuild/build_controller.py -- control the steps for one build
+#
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA..
+
+
+import logging
+import httplib
+import traceback
+import urllib
+import urlparse
+
+import distbuild
+
+
+# Artifact build states
+UNKNOWN = 'unknown'
+UNBUILT = 'not-built'
+BUILDING = 'building'
+BUILT = 'built'
+
+
+class _Start(object): pass
+class _Annotated(object): pass
+class _Built(object): pass
+
+
+class _GotGraph(object):
+
+ def __init__(self, artifact):
+ self.artifact = artifact
+
+
+class _GraphFailed(object):
+
+ pass
+
+
+class BuildCancel(object):
+
+ def __init__(self, id):
+ self.id = id
+
+
+class BuildFinished(object):
+
+ def __init__(self, request_id, urls):
+ self.id = request_id
+ self.urls = urls
+
+
+class BuildFailed(object):
+
+ def __init__(self, request_id, reason):
+ self.id = request_id
+ self.reason = reason
+
+
+class BuildProgress(object):
+
+ def __init__(self, request_id, message_text):
+ self.id = request_id
+ self.message_text = message_text
+
+
+class BuildSteps(object):
+
+ def __init__(self, request_id, artifact):
+ self.id = request_id
+ self.artifact = artifact
+
+
+class BuildStepStarted(object):
+
+ def __init__(self, request_id, step_name, worker_name):
+ self.id = request_id
+ self.step_name = step_name
+ self.worker_name = worker_name
+
+
+class BuildOutput(object):
+
+ def __init__(self, request_id, step_name, stdout, stderr):
+ self.id = request_id
+ self.step_name = step_name
+ self.stdout = stdout
+ self.stderr = stderr
+
+
+class BuildStepFinished(object):
+
+ def __init__(self, request_id, step_name):
+ self.id = request_id
+ self.step_name = step_name
+
+
+class BuildStepFailed(object):
+
+ def __init__(self, request_id, step_name):
+ self.id = request_id
+ self.step_name = step_name
+
+
+class _Abort(object):
+
+ pass
+
+
+def build_step_name(artifact):
+ '''Return user-comprehensible name for a given artifact.'''
+ return artifact.name
+
+
+def map_build_graph(artifact, callback):
+ result = []
+ done = set()
+ queue = [artifact]
+ while queue:
+ a = queue.pop()
+ if a not in done:
+ result.append(callback(a))
+ queue.extend(a.dependencies)
+ done.add(a)
+ return result
+
+
+class BuildController(distbuild.StateMachine):
+
+ '''Control one build-request fulfillment.
+
+ The initiator sends a build-request message, which causes the
+ InitiatorConnection to instantiate this class to control the steps
+ needed to fulfill the request. This state machine builds the
+ build graph to determine all the artifacts that need building, then
+ builds anything that is not cached.
+
+ '''
+
+ _idgen = distbuild.IdentifierGenerator('BuildController')
+
+ def __init__(self, build_request_message, artifact_cache_server,
+ morph_instance):
+ distbuild.crash_point()
+ distbuild.StateMachine.__init__(self, 'init')
+ self._request = build_request_message
+ self._artifact_cache_server = artifact_cache_server
+ self._morph_instance = morph_instance
+ self._helper_id = None
+ self.debug_transitions = True
+
+ def setup(self):
+ distbuild.crash_point()
+
+ spec = [
+ ('init', self, _Start, 'graphing', self._start_graphing),
+ ('init', distbuild.InitiatorConnection,
+ distbuild.InitiatorDisconnect, 'init', self._maybe_abort),
+ ('init', self, _Abort, None, None),
+
+ ('graphing', distbuild.HelperRouter, distbuild.HelperOutput,
+ 'graphing', self._collect_graph),
+ ('graphing', distbuild.HelperRouter, distbuild.HelperResult,
+ 'graphing', self._finish_graph),
+ ('graphing', self, _GotGraph,
+ 'annotating', self._start_annotating),
+ ('graphing', self, _GraphFailed, None, None),
+ ('graphing', distbuild.InitiatorConnection,
+ distbuild.InitiatorDisconnect, None,
+ self._maybe_abort),
+
+ ('annotating', distbuild.HelperRouter, distbuild.HelperResult,
+ 'annotating', self._handle_cache_response),
+ ('annotating', self, _Annotated, 'building',
+ self._queue_worker_builds),
+ ('annotating', distbuild.InitiatorConnection,
+ distbuild.InitiatorDisconnect, None,
+ self._maybe_abort),
+
+ ('building', distbuild.WorkerConnection,
+ distbuild.WorkerBuildStepStarted, 'building',
+ self._relay_build_step_started),
+ ('building', distbuild.WorkerConnection,
+ distbuild.WorkerBuildOutput, 'building',
+ self._relay_build_output),
+ ('building', distbuild.WorkerConnection,
+ distbuild.WorkerBuildCaching, 'building',
+ self._relay_build_caching),
+ ('building', distbuild.WorkerConnection,
+ distbuild.WorkerBuildFinished, 'building',
+ self._check_result_and_queue_more_builds),
+ ('building', distbuild.WorkerConnection,
+ distbuild.WorkerBuildFailed, None,
+ self._notify_build_failed),
+ ('building', self, _Built, None, self._notify_build_done),
+ ('building', distbuild.InitiatorConnection,
+ distbuild.InitiatorDisconnect, 'building',
+ self._notify_initiator_disconnected),
+ ]
+ self.add_transitions(spec)
+
+ self.mainloop.queue_event(self, _Start())
+
+ def _maybe_abort(self, event_source, event):
+ if disconnect.id == self._request['id']:
+ self.mainloop.queue_event(self, _Abort())
+
+ def _start_graphing(self, event_source, event):
+ distbuild.crash_point()
+
+ logging.info('Start constructing build graph')
+ self._artifact_data = distbuild.StringBuffer()
+ self._artifact_error = distbuild.StringBuffer()
+ argv = [
+ self._morph_instance,
+ 'serialise-artifact',
+ '--quiet',
+ self._request['repo'],
+ self._request['ref'],
+ self._request['morphology'],
+ ]
+ msg = distbuild.message('exec-request',
+ id=self._idgen.next(),
+ argv=argv,
+ stdin_contents='')
+ self._helper_id = msg['id']
+ req = distbuild.HelperRequest(msg)
+ self.mainloop.queue_event(distbuild.HelperRouter, req)
+
+ progress = BuildProgress(self._request['id'], 'Computing build graph')
+ self.mainloop.queue_event(BuildController, progress)
+
+ def _collect_graph(self, event_source, event):
+ distbuild.crash_point()
+
+ if event.msg['id'] == self._helper_id:
+ self._artifact_data.add(event.msg['stdout'])
+ self._artifact_error.add(event.msg['stderr'])
+
+ def _finish_graph(self, event_source, event):
+ distbuild.crash_point()
+
+ def notify_failure(msg_text):
+ logging.error('Graph creation failed: %s' % msg_text)
+
+ failed = BuildFailed(
+ self._request['id'],
+ 'Failed go compute build graph: %s' % msg_text)
+ self.mainloop.queue_event(BuildController, failed)
+
+ self.mainloop.queue_event(self, _GraphFailed())
+
+ def notify_success(artifact):
+ logging.debug('Graph is finished')
+
+ progress = BuildProgress(
+ self._request['id'], 'Finished computing build graph')
+ self.mainloop.queue_event(BuildController, progress)
+
+ build_steps = BuildSteps(self._request['id'], artifact)
+ self.mainloop.queue_event(BuildController, build_steps)
+
+ self.mainloop.queue_event(self, _GotGraph(artifact))
+
+ if event.msg['id'] == self._helper_id:
+ self._helper_id = None
+
+ error_text = self._artifact_error.peek()
+ if event.msg['exit'] != 0 or error_text:
+ notify_failure(
+ 'Problem with serialise-artifact: %s' % error_text)
+ return
+
+ text = self._artifact_data.peek()
+ try:
+ artifact = distbuild.deserialise_artifact(text)
+ except ValueError, e:
+ logging.error(traceback.format_exc())
+ notify_failure(str(e))
+ return
+
+ notify_success(artifact)
+
+ def _start_annotating(self, event_source, event):
+ distbuild.crash_point()
+
+ self._artifact = event.artifact
+
+ # Queue http requests for checking from the shared artifact
+ # cache for the artifacts.
+ for artifact in map_build_graph(self._artifact, lambda a: a):
+ artifact.state = UNKNOWN
+ artifact.helper_id = self._idgen.next()
+ filename = ('%s.%s.%s' %
+ (artifact.cache_key,
+ artifact.source.morphology['kind'],
+ artifact.name))
+ url = urlparse.urljoin(
+ self._artifact_cache_server,
+ '/1.0/artifacts?filename=%s' % urllib.quote(filename))
+ msg = distbuild.message('http-request',
+ id=artifact.helper_id,
+ url=url,
+ method='HEAD')
+ request = distbuild.HelperRequest(msg)
+ self.mainloop.queue_event(distbuild.HelperRouter, request)
+ logging.debug(
+ 'Queued as %s query whether %s is in cache' %
+ (msg['id'], filename))
+
+ def _handle_cache_response(self, event_source, event):
+ distbuild.crash_point()
+
+ logging.debug('Got cache query response: %s' % repr(event.msg))
+
+ def set_status(artifact):
+ if artifact.helper_id == event.msg['id']:
+ old = artifact.state
+ if event.msg['status'] == httplib.OK:
+ artifact.state = BUILT
+ else:
+ artifact.state = UNBUILT
+ logging.debug(
+ 'Changed artifact %s state from %s to %s' %
+ (artifact.name, old, artifact.state))
+ artifact.helper_id = None
+
+ map_build_graph(self._artifact, set_status)
+
+ queued = map_build_graph(self._artifact, lambda a: a.state == UNKNOWN)
+ if any(queued):
+ logging.debug('Waiting for further responses')
+ else:
+ logging.debug('All cache query responses received')
+ self.mainloop.queue_event(self, _Annotated())
+
+ count = sum(1 if a.state == UNBUILT else 0
+ for a in map_build_graph(self._artifact, lambda b: b))
+ progress = BuildProgress(
+ self._request['id'],
+ 'Need to build %d artifacts' % count)
+ self.mainloop.queue_event(BuildController, progress)
+
+ if count == 0:
+ logging.info('There seems to be nothing to build')
+ self.mainloop.queue_event(self, _Built())
+
+ def _find_artifacts_that_are_ready_to_build(self):
+
+ def is_ready_to_build(artifact):
+ return (artifact.state == UNBUILT and
+ all(a.state == BUILT for a in artifact.dependencies))
+
+ return [a
+ for a in map_build_graph(self._artifact, lambda a: a)
+ if is_ready_to_build(a)]
+
+ def _queue_worker_builds(self, event_source, event):
+ distbuild.crash_point()
+
+ if self._artifact.state == BUILT:
+ logging.info('Requested artifact is built')
+ self.mainloop.queue_event(self, _Built())
+ return
+
+ logging.debug('Queuing more worker-builds to run')
+ logging.debug('Current state of build graph nodes:')
+ for a in map_build_graph(self._artifact, lambda a: a):
+ logging.debug(' %s state is %s' % (a.name, a.state))
+ if a.state != BUILT:
+ for dep in a.dependencies:
+ logging.debug(
+ ' depends on %s which is %s' %
+ (dep.name, dep.state))
+
+ ready = self._find_artifacts_that_are_ready_to_build()
+ if len(ready) == 0:
+ logging.debug('No new artifacts queued for building')
+
+ for artifact in ready:
+ logging.debug(
+ 'Requesting worker-build of %s (%s)' %
+ (artifact.name, artifact.cache_key))
+ request = distbuild.WorkerBuildRequest(artifact,
+ self._request['id'])
+ self.mainloop.queue_event(distbuild.WorkerBuildQueuer, request)
+ artifact.state = BUILDING
+
+ def _notify_initiator_disconnected(self, event_source, disconnect):
+ if disconnect.id == self._request['id']:
+ cancel = BuildCancel(disconnect.id)
+ self.mainloop.queue_event(BuildController, cancel)
+
+ def _relay_build_step_started(self, event_source, event):
+ distbuild.crash_point()
+ if event.initiator_id != self._request['id']:
+ return # not for us
+
+ logging.debug(
+ 'BC: _relay_build_step_started: %s' % event.artifact_cache_key)
+ artifact = self._find_artifact(event.artifact_cache_key)
+ if artifact is None:
+ # This is not the event you are looking for.
+ return
+
+ logging.debug('BC: got build step started: %s' % artifact.name)
+ started = BuildStepStarted(
+ self._request['id'], build_step_name(artifact), event.worker_name)
+ self.mainloop.queue_event(BuildController, started)
+ logging.debug('BC: emitted %s' % repr(started))
+
+ def _relay_build_output(self, event_source, event):
+ distbuild.crash_point()
+ if event.msg['id'] != self._request['id']:
+ return # not for us
+
+ logging.debug('BC: got output: %s' % repr(event.msg))
+ artifact = self._find_artifact(event.artifact_cache_key)
+ logging.debug('BC: got artifact: %s' % repr(artifact))
+ if artifact is None:
+ # This is not the event you are looking for.
+ return
+
+ output = BuildOutput(
+ self._request['id'], build_step_name(artifact),
+ event.msg['stdout'], event.msg['stderr'])
+ self.mainloop.queue_event(BuildController, output)
+ logging.debug('BC: queued %s' % repr(output))
+
+ def _relay_build_caching(self, event_source, event):
+ distbuild.crash_point()
+ if event.initiator_id != self._request['id']:
+ return # not for us
+
+ artifact = self._find_artifact(event.artifact_cache_key)
+ if artifact is None:
+ # This is not the event you are looking for.
+ return
+
+ progress = BuildProgress(
+ self._request['id'],
+ 'Transferring %s to shared artifact cache' % artifact.name)
+ self.mainloop.queue_event(BuildController, progress)
+
+ def _find_artifact(self, cache_key):
+ artifacts = map_build_graph(self._artifact, lambda a: a)
+ wanted = [a for a in artifacts if a.cache_key == cache_key]
+ if wanted:
+ return wanted[0]
+ else:
+ return None
+
+ def _check_result_and_queue_more_builds(self, event_source, event):
+ distbuild.crash_point()
+ if event.msg['id'] != self._request['id']:
+ return # not for us
+
+ artifact = self._find_artifact(event.artifact_cache_key)
+ if artifact is None:
+ # This is not the event you are looking for.
+ return
+
+ logging.debug(
+ 'Got build result for %s: %s', artifact.name, repr(event.msg))
+
+ finished = BuildStepFinished(
+ self._request['id'], build_step_name(artifact))
+ self.mainloop.queue_event(BuildController, finished)
+
+ artifact.state = BUILT
+ self._queue_worker_builds(None, event)
+
+ def _notify_build_failed(self, event_source, event):
+ distbuild.crash_point()
+ if event.msg['id'] != self._request['id']:
+ return # not for us
+
+ artifact = self._find_artifact(event.artifact_cache_key)
+ if artifact is None:
+ # This is not the event you are looking for.
+ return
+
+ logging.error(
+ 'Build step failed for %s: %s', artifact.name, repr(event.msg))
+
+ step_failed = BuildStepFailed(
+ self._request['id'], build_step_name(artifact))
+ self.mainloop.queue_event(BuildController, step_failed)
+
+ build_failed = BuildFailed(
+ self._request['id'],
+ 'Building failed for %s' % artifact.name)
+ self.mainloop.queue_event(BuildController, build_failed)
+
+ # Cancel any jobs waiting to be executed, since there is no point
+ # running them if this build has failed, it would just waste
+ # resources
+ cancel_pending = distbuild.WorkerCancelPending(
+ self._request['id'])
+ self.mainloop.queue_event(distbuild.WorkerBuildQueuer, cancel_pending)
+
+ # Cancel any currently executing jobs for the above reasons, since
+ # this build will fail and we can't decide whether these jobs will
+ # be of use to any other build
+ cancel = BuildCancel(self._request['id'])
+ self.mainloop.queue_event(BuildController, cancel)
+
+ def _notify_build_done(self, event_source, event):
+ distbuild.crash_point()
+
+ logging.debug('Notifying initiator of successful build')
+ baseurl = urlparse.urljoin(
+ self._artifact_cache_server, '/1.0/artifacts')
+ filename = ('%s.%s.%s' %
+ (self._artifact.cache_key,
+ self._artifact.source.morphology['kind'],
+ self._artifact.name))
+ url = '%s?filename=%s' % (baseurl, urllib.quote(filename))
+ finished = BuildFinished(self._request['id'], [url])
+ self.mainloop.queue_event(BuildController, finished)
diff --git a/distbuild/connection_machine.py b/distbuild/connection_machine.py
new file mode 100644
index 00000000..3d4e8d04
--- /dev/null
+++ b/distbuild/connection_machine.py
@@ -0,0 +1,145 @@
+# distbuild/connection_machine.py -- state machine for connecting to server
+#
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA..
+
+
+import errno
+import logging
+import socket
+
+import distbuild
+
+
+class Reconnect(object):
+
+ pass
+
+
+class StopConnecting(object):
+
+ pass
+
+
+class ConnectError(object):
+
+ pass
+
+
+class ProxyEventSource(object):
+
+ '''Proxy event sources that may come and go.'''
+
+ def __init__(self):
+ self.event_source = None
+
+ def get_select_params(self):
+ if self.event_source:
+ return self.event_source.get_select_params()
+ else:
+ return [], [], [], None
+
+ def get_events(self, r, w, x):
+ if self.event_source:
+ return self.event_source.get_events(r, w, x)
+ else:
+ return []
+
+ def is_finished(self):
+ return False
+
+
+class ConnectionMachine(distbuild.StateMachine):
+
+ def __init__(self, addr, port, machine, extra_args):
+ distbuild.StateMachine.__init__(self, 'connecting')
+ self._addr = addr
+ self._port = port
+ self._machine = machine
+ self._extra_args = extra_args
+ self._socket = None
+ self.reconnect_interval = 1
+
+ def setup(self):
+ self._sock_proxy = ProxyEventSource()
+ self.mainloop.add_event_source(self._sock_proxy)
+ self._start_connect()
+
+ self._timer = distbuild.TimerEventSource(self.reconnect_interval)
+ self.mainloop.add_event_source(self._timer)
+
+ spec = [
+ ('connecting', self._sock_proxy, distbuild.SocketWriteable,
+ 'connected', self._connect),
+ ('connecting', self, StopConnecting, None, self._stop),
+ ('connected', self, Reconnect, 'connecting', self._reconnect),
+ ('connected', self, ConnectError, 'timeout', self._start_timer),
+ ('connected', self, StopConnecting, None, self._stop),
+ ('timeout', self._timer, distbuild.Timer, 'connecting',
+ self._reconnect),
+ ('timeout', self, StopConnecting, None, self._stop),
+ ]
+ self.add_transitions(spec)
+
+ def _start_connect(self):
+ logging.debug(
+ 'ConnectionMachine: connecting to %s:%s' %
+ (self._addr, self._port))
+ self._socket = socket.socket()
+ distbuild.set_nonblocking(self._socket)
+ try:
+ self._socket.connect((self._addr, self._port))
+ except socket.error, e:
+ if e.errno != errno.EINPROGRESS:
+ raise socket.error(
+ "%s (attempting connection to distbuild controller "
+ "at %s:%s)" % (e, self._addr, self._port))
+
+ src = distbuild.SocketEventSource(self._socket)
+ self._sock_proxy.event_source = src
+
+ def _connect(self, event_source, event):
+ try:
+ self._socket.connect((self._addr, self._port))
+ except socket.error, e:
+ logging.error(
+ 'Failed to connect to %s:%s: %s' %
+ (self._addr, self._port, str(e)))
+ self.mainloop.queue_event(self, ConnectError())
+ return
+ self._sock_proxy.event_source = None
+ logging.info('Connected to %s:%s' % (self._addr, self._port))
+ m = self._machine(self, self._socket, *self._extra_args)
+ self.mainloop.add_state_machine(m)
+ self._socket = None
+
+ def _reconnect(self, event_source, event):
+ logging.info('Reconnecting to %s:%s' % (self._addr, self._port))
+ if self._socket is not None:
+ self._socket.close()
+ self._timer.stop()
+ self._start_connect()
+
+ def _stop(self, event_source, event):
+ logging.info(
+ 'Stopping connection attempts to %s:%s' % (self._addr, self._port))
+ self.mainloop.remove_event_source(self._timer)
+ if self._socket is not None:
+ self._socket.close()
+ self._socket = None
+
+ def _start_timer(self, event_source, event):
+ self._timer.start()
+
diff --git a/distbuild/crashpoint.py b/distbuild/crashpoint.py
new file mode 100644
index 00000000..01bfbd6b
--- /dev/null
+++ b/distbuild/crashpoint.py
@@ -0,0 +1,126 @@
+# distbuild/crashpoint.py -- user-controlled crashing
+#
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA..
+
+
+'''Crash the application.
+
+For crash testing, it's useful to easily induce crashes, to see how the
+rest of the system manages. This module implements user-controllable
+crashes. The code will be sprinkled with calls to the ``crash_point``
+function, which crashes the process if call matches a set of user-defined
+criteria.
+
+The criteria consist of:
+
+* a filename
+* a function name
+* a maximum call count
+
+The criterion is fullfilled if ``crash_point`` is called from the named
+function defined in the named file more than the given number of times.
+Filename matching is using substrings (a filename pattern ``foo.py``
+matches an actual source file path of
+``/usr/lib/python2.7/site-packages/distbuild/foo.py``), but function
+names must match exactly. It is not possible to match on class names
+(since that information is not available from a traceback).
+
+'''
+
+
+import logging
+import os
+import sys
+import traceback
+
+
+detailed_logging = False
+
+
+def debug(msg): # pragma: no cover
+ if detailed_logging:
+ logging.debug(msg)
+
+
+class CrashCondition(object):
+
+ def __init__(self, filename, funcname, max_calls):
+ self.filename = filename
+ self.funcname = funcname
+ self.max_calls = max_calls
+ self.called = 0
+
+ def matches(self, filename, funcname):
+ if self.filename not in filename:
+ debug(
+ 'crashpoint: filename mismatch: %s not in %s' %
+ (repr(self.filename), repr(filename)))
+ return False
+
+ if self.funcname != funcname:
+ debug(
+ 'crashpoint: funcname mismatch: %s != %s' %
+ (self.funcname, funcname))
+ return False
+
+ debug('crashpoint: matches: %s %s' % (filename, funcname))
+ return True
+
+ def triggered(self, filename, funcname):
+ if self.matches(filename, funcname):
+ self.called += 1
+ return self.called >= self.max_calls
+ else:
+ return False
+
+
+crash_conditions = []
+
+
+def add_crash_condition(filename, funcname, max_calls):
+ crash_conditions.append(CrashCondition(filename, funcname, max_calls))
+
+
+def add_crash_conditions(strings):
+ for s in strings:
+ words = s.split(':')
+ if len(words) != 3: # pragma: no cover
+ logging.error('Ignoring malformed crash condition: %s' % repr(s))
+ else:
+ add_crash_condition(words[0], words[1], int(words[2]))
+
+
+def clear_crash_conditions():
+ del crash_conditions[:]
+
+
+def crash_point(frame=None):
+ if frame is None:
+ frames = traceback.extract_stack(limit=2)
+ frame = frames[0]
+
+ filename, lineno, funcname, text = frame
+
+ for condition in crash_conditions:
+ if condition.triggered(filename, funcname):
+ logging.critical(
+ 'Crash triggered from %s:%s:%s' % (filename, lineno, funcname))
+ sys.exit(255)
+ else:
+ debug(
+ 'Crash not triggered by %s:%s:%s' %
+ (filename, lineno, funcname))
+
diff --git a/distbuild/crashpoint_tests.py b/distbuild/crashpoint_tests.py
new file mode 100644
index 00000000..2761ccac
--- /dev/null
+++ b/distbuild/crashpoint_tests.py
@@ -0,0 +1,109 @@
+# distbuild/crashpoint_tests.py -- unit tests for crashpoint.py
+#
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA..
+
+
+import unittest
+
+import crashpoint
+
+
+class CrashConditionTests(unittest.TestCase):
+
+ def setUp(self):
+ self.c = crashpoint.CrashCondition('bar', 'foofunc', 0)
+
+ def test_matches_exact_filename(self):
+ self.assertTrue(self.c.matches('bar', 'foofunc'))
+
+ def test_matches_basename(self):
+ self.assertTrue(self.c.matches('dir/bar', 'foofunc'))
+
+ def test_matches_partial_basename(self):
+ self.assertTrue(self.c.matches('dir/bar.py', 'foofunc'))
+
+ def test_matches_dirname(self):
+ self.assertTrue(self.c.matches('bar/something.py', 'foofunc'))
+
+ def test_doesnt_match_wrong_function_name(self):
+ self.assertFalse(self.c.matches('bar', 'foo'))
+
+ def test_triggered_first_time_with_zero_count(self):
+ c = crashpoint.CrashCondition('bar', 'foofunc', 0)
+ self.assertTrue(c.triggered('bar', 'foofunc'))
+
+ def test_triggered_first_time_with_zero_count(self):
+ c = crashpoint.CrashCondition('bar', 'foofunc', 0)
+ self.assertTrue(c.triggered('bar', 'foofunc'))
+
+ def test_triggered_second_time_with_zero_count(self):
+ c = crashpoint.CrashCondition('bar', 'foofunc', 0)
+ self.assertTrue(c.triggered('bar', 'foofunc'))
+ self.assertTrue(c.triggered('bar', 'foofunc'))
+
+ def test_triggered_first_time_with_count_of_one(self):
+ c = crashpoint.CrashCondition('bar', 'foofunc', 1)
+ self.assertTrue(c.triggered('bar', 'foofunc'))
+
+ def test_triggered_second_time_with_count_of_two(self):
+ c = crashpoint.CrashCondition('bar', 'foofunc', 2)
+ self.assertFalse(c.triggered('bar', 'foofunc'))
+ self.assertTrue(c.triggered('bar', 'foofunc'))
+
+ def test_not_triggered_if_not_matched(self):
+ c = crashpoint.CrashCondition('bar', 'foofunc', 0)
+ self.assertFalse(c.triggered('bar', 'otherfunc'))
+
+
+class CrashConditionsListTests(unittest.TestCase):
+
+ def setUp(self):
+ crashpoint.clear_crash_conditions()
+
+ def test_no_conditions_initially(self):
+ self.assertEqual(crashpoint.crash_conditions, [])
+
+ def test_adds_condition(self):
+ crashpoint.add_crash_condition('foo.py', 'bar', 0)
+ self.assertEqual(len(crashpoint.crash_conditions), 1)
+ c = crashpoint.crash_conditions[0]
+ self.assertEqual(c.filename, 'foo.py')
+ self.assertEqual(c.funcname, 'bar')
+ self.assertEqual(c.max_calls, 0)
+
+ def test_adds_conditions_from_list_of_strings(self):
+ crashpoint.add_crash_conditions(['foo.py:bar:0'])
+ self.assertEqual(len(crashpoint.crash_conditions), 1)
+ c = crashpoint.crash_conditions[0]
+ self.assertEqual(c.filename, 'foo.py')
+ self.assertEqual(c.funcname, 'bar')
+ self.assertEqual(c.max_calls, 0)
+
+
+class CrashPointTests(unittest.TestCase):
+
+ def setUp(self):
+ crashpoint.clear_crash_conditions()
+ crashpoint.add_crash_condition('foo.py', 'bar', 0)
+
+ def test_triggers_crash(self):
+ self.assertRaises(
+ SystemExit,
+ crashpoint.crash_point, frame=('foo.py', 123, 'bar', 'text'))
+
+ def test_does_not_trigger_crash(self):
+ self.assertEqual(crashpoint.crash_point(), None)
+
diff --git a/distbuild/eventsrc.py b/distbuild/eventsrc.py
new file mode 100644
index 00000000..11bb16e8
--- /dev/null
+++ b/distbuild/eventsrc.py
@@ -0,0 +1,48 @@
+# mainloop/eventsrc.py -- interface for event sources
+#
+# Copyright 2012 Codethink Limited.
+# All rights reserved.
+
+
+class EventSource(object):
+
+ '''A source of events for state machines.
+
+ This is a base class.
+
+ An event source watches one file descriptor, and returns events
+ related to it. The events may vary depending on the file descriptor.
+ The actual watching is done using select.select.
+
+ '''
+
+ def get_select_params(self):
+ '''Return parameters to use for select for this event source.
+
+ Three lists of file descriptors, and a timeout are returned.
+ The three lists and the timeout are used as arguments to the
+ select.select function, though they may be manipulated and
+ combined with return values from other event sources.
+
+ '''
+
+ return [], [], [], None
+
+ def get_events(self, r, w, x):
+ '''Return events related to this file descriptor.
+
+ The arguments are the return values of select.select.
+
+ '''
+
+ return []
+
+ def is_finished(self):
+ '''Is this event source finished?
+
+ It's finished if it won't ever return any new events.
+
+ '''
+
+ return False
+
diff --git a/distbuild/helper_router.py b/distbuild/helper_router.py
new file mode 100644
index 00000000..752a5fdb
--- /dev/null
+++ b/distbuild/helper_router.py
@@ -0,0 +1,197 @@
+# distbuild/helper_router.py -- state machine for controller's helper comms
+#
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA..
+
+
+import logging
+
+import distbuild
+
+
+class HelperRequest(object):
+
+ def __init__(self, msg):
+ self.msg = msg
+
+
+class HelperOutput(object):
+
+ def __init__(self, msg):
+ self.msg = msg
+
+
+class HelperResult(object):
+
+ def __init__(self, msg):
+ self.msg = msg
+
+
+class HelperRouter(distbuild.StateMachine):
+
+ '''Route JSON messages between helpers and other state machines.
+
+ This state machine relays and schedules access to one distbuild-helper
+ process. The helper process connects to a socket, which causes an
+ instance of HelperRouter to be created (one per connection). The
+ various instances co-ordinate requests automatically amongst
+ themselves.
+
+ Other state machines in the same mainloop as HelperRouter can
+ request work from the helper process by emitting an event:
+
+ * event source: the distbuild.HelperProcess class
+ * event: distbuild.HelperRequest instance
+
+ The HelperRequest event gets a message to be serialised as JSON.
+ The message must be a Pythondict that the distbuild-helper understands.
+
+ HelperRouter will send the msg to the next available helper process.
+ When the helper sends back the result, HelperRouter will emit a
+ HelperResult event, using the same ``request_id`` as the request had.
+
+ For its internal use, HelperRouter sets the ``id`` item in the
+ request object.
+
+ '''
+
+ _pending_requests = []
+ _running_requests = {}
+ _pending_helpers = []
+ _request_counter = distbuild.IdentifierGenerator('HelperRouter')
+ _route_map = distbuild.RouteMap()
+
+ def __init__(self, conn):
+ distbuild.StateMachine.__init__(self, 'idle')
+ self.conn = conn
+
+ def setup(self):
+ jm = distbuild.JsonMachine(self.conn)
+ self.mainloop.add_state_machine(jm)
+
+ spec = [
+ ('idle', HelperRouter, HelperRequest, 'idle',
+ self._handle_request),
+ ('idle', jm, distbuild.JsonNewMessage, 'idle', self._helper_msg),
+ ('idle', jm, distbuild.JsonEof, None, self._close),
+ ]
+ self.add_transitions(spec)
+
+ def _handle_request(self, event_source, event):
+ '''Send request received via mainloop, or put in queue.'''
+ logging.debug('HelperRouter: received request: %s', repr(event.msg))
+ self._enqueue_request(event.msg)
+ if self._pending_helpers:
+ self._send_request()
+
+ def _helper_msg(self, event_source, event):
+ '''Handle message from helper.'''
+
+# logging.debug('HelperRouter: from helper: %s', repr(event.msg))
+
+ handlers = {
+ 'helper-ready': self._handle_helper_ready,
+ 'exec-output': self._handle_exec_output,
+ 'exec-response': self._handle_exec_response,
+ 'http-response': self._handle_http_response,
+ }
+
+ handler = handlers[event.msg['type']]
+ handler(event_source, event.msg)
+
+ def _handle_helper_ready(self, event_source, msg):
+ self._pending_helpers.append(event_source)
+ if self._pending_requests:
+ self._send_request()
+
+ def _get_request(self, msg):
+ request_id = msg['id']
+ if request_id in self._running_requests:
+ request, helper = self._running_requests[request_id]
+ return request
+ elif request_id is None:
+ logging.error(
+ 'Helper process sent message without "id" field: %s',
+ repr(event.msg))
+ else:
+ logging.error(
+ 'Helper process sent message with unknown id: %s',
+ repr(event.msg))
+
+ def _new_message(self, msg):
+ old_id = msg['id']
+ new_msg = dict(msg)
+ new_msg['id'] = self._route_map.get_incoming_id(old_id)
+ return new_msg
+
+ def _handle_exec_output(self, event_source, msg):
+ request = self._get_request(msg)
+ if request is not None:
+ new_msg = self._new_message(msg)
+ self.mainloop.queue_event(HelperRouter, HelperOutput(new_msg))
+
+ def _handle_exec_response(self, event_source, msg):
+ request = self._get_request(msg)
+ if request is not None:
+ new_msg = self._new_message(msg)
+ self._route_map.remove(msg['id'])
+ del self._running_requests[msg['id']]
+ self.mainloop.queue_event(HelperRouter, HelperResult(new_msg))
+
+ def _handle_http_response(self, event_source, msg):
+ request = self._get_request(msg)
+ if request is not None:
+ new_msg = self._new_message(msg)
+ self._route_map.remove(msg['id'])
+ del self._running_requests[msg['id']]
+ self.mainloop.queue_event(HelperRouter, HelperResult(new_msg))
+
+ def _close(self, event_source, event):
+ logging.debug('HelperRouter: closing: %s', repr(event_source))
+ event_source.close()
+
+ # Remove from pending helpers.
+ if event_source in self._pending_helpers:
+ self._pending_helpers.remove(event_source)
+
+ # Re-queue any requests running on the hlper that just quit.
+ for request_id in self._running_requests.keys():
+ request, helper = self._running_requests[request_id]
+ if event_source == helper:
+ del self._running_requests[request_id]
+ self._enqueue_request(request)
+
+ # Finally, if there are any pending requests and helpers,
+ # send requests.
+ while self._pending_requests and self._pending_helpers:
+ self._send_request()
+
+ def _enqueue_request(self, request):
+ '''Put request into queue.'''
+# logging.debug('HelperRouter: enqueuing request: %s' % repr(request))
+ old_id = request['id']
+ new_id = self._request_counter.next()
+ request['id'] = new_id
+ self._route_map.add(old_id, new_id)
+ self._pending_requests.append(request)
+
+ def _send_request(self):
+ '''Pick the first queued request and send it to an available helper.'''
+ request = self._pending_requests.pop(0)
+ helper = self._pending_helpers.pop()
+ self._running_requests[request['id']] = (request, helper)
+ helper.send(request)
+# logging.debug('HelperRouter: sent to helper: %s', repr(request))
+
diff --git a/distbuild/idgen.py b/distbuild/idgen.py
new file mode 100644
index 00000000..b642bd98
--- /dev/null
+++ b/distbuild/idgen.py
@@ -0,0 +1,33 @@
+# distbuild/idgen.py -- generate unique identifiers
+#
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA..
+
+
+import logging
+
+
+class IdentifierGenerator(object):
+
+ '''Generate unique identifiers.'''
+
+ def __init__(self, series):
+ self._series = series
+ self._counter = 0
+
+ def next(self):
+ self._counter += 1
+ return '%s-%d' % (self._series, self._counter)
+
diff --git a/distbuild/initiator.py b/distbuild/initiator.py
new file mode 100644
index 00000000..9c7e2ddf
--- /dev/null
+++ b/distbuild/initiator.py
@@ -0,0 +1,195 @@
+# distbuild/initiator.py -- state machine for the initiator
+#
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA..
+
+
+import cliapp
+import logging
+import random
+import sys
+
+import distbuild
+
+
+class _Finished(object):
+
+ def __init__(self, msg):
+ self.msg = msg
+
+
+class _Failed(object):
+
+ def __init__(self, msg):
+ self.msg = msg
+
+
+class Initiator(distbuild.StateMachine):
+
+ def __init__(self, cm, conn, app, repo_name, ref, morphology):
+ distbuild.StateMachine.__init__(self, 'waiting')
+ self._cm = cm
+ self._conn = conn
+ self._app = app
+ self._repo_name = repo_name
+ self._ref = ref
+ self._morphology = morphology
+ self._steps = None
+ self._step_outputs = {}
+ self.debug_transitions = True
+
+ def setup(self):
+ distbuild.crash_point()
+
+ self._jm = distbuild.JsonMachine(self._conn)
+ self.mainloop.add_state_machine(self._jm)
+ logging.debug('initiator: _jm=%s' % repr(self._jm))
+
+ spec = [
+ ('waiting', self._jm, distbuild.JsonEof, None, self._terminate),
+ ('waiting', self._jm, distbuild.JsonNewMessage, 'waiting',
+ self._handle_json_message),
+ ('waiting', self, _Finished, None, self._succeed),
+ ('waiting', self, _Failed, None, self._fail),
+ ]
+ self.add_transitions(spec)
+
+ random_id = random.randint(0, 2**32-1)
+
+ self._app.status(
+ msg='Requesting build of %(repo)s %(ref)s %(morph)s',
+ repo=self._repo_name,
+ ref=self._ref,
+ morph=self._morphology)
+ msg = distbuild.message('build-request',
+ id=random_id,
+ repo=self._repo_name,
+ ref=self._ref,
+ morphology=self._morphology
+ )
+ self._jm.send(msg)
+ logging.debug('Initiator: sent to controller: %s', repr(msg))
+
+ def _handle_json_message(self, event_source, event):
+ distbuild.crash_point()
+
+ logging.debug('Initiator: from controller: %s' % repr(event.msg))
+
+ handlers = {
+ 'build-finished': self._handle_build_finished_message,
+ 'build-failed': self._handle_build_failed_message,
+ 'build-progress': self._handle_build_progress_message,
+ 'build-steps': self._handle_build_steps_message,
+ 'step-started': self._handle_step_started_message,
+ 'step-output': self._handle_step_output_message,
+ 'step-finished': self._handle_step_finished_message,
+ 'step-failed': self._handle_step_failed_message,
+ }
+
+ handler = handlers[event.msg['type']]
+ handler(event.msg)
+
+ def _handle_build_finished_message(self, msg):
+ self.mainloop.queue_event(self, _Finished(msg))
+
+ def _handle_build_failed_message(self, msg):
+ self.mainloop.queue_event(self, _Failed(msg))
+
+ def _handle_build_progress_message(self, msg):
+ self._app.status(msg='Progress: %(msgtext)s', msgtext=msg['message'])
+
+ def _handle_build_steps_message(self, msg):
+ self._steps = msg['steps']
+ self._app.status(
+ msg='Build steps in total: %(steps)d',
+ steps=len(self._steps))
+
+ def _open_output(self, msg):
+ assert msg['step_name'] not in self._step_outputs
+ filename = 'build-step-%s.log' % msg['step_name']
+ f = open(filename, 'a')
+ self._step_outputs[msg['step_name']] = f
+ worker = msg['worker']
+ f.write('Worker: %s\n%s\n\n' % (worker, '-' * len(worker)))
+
+ def _close_output(self, msg):
+ self._step_outputs[msg['step_name']].close()
+ del self._step_outputs[msg['step_name']]
+
+ def _handle_step_started_message(self, msg):
+ self._app.status(
+ msg='Started building %(step_name)s on %(worker_name)s',
+ step_name=msg['step_name'],
+ worker_name=msg['worker_name'])
+ self._open_output(msg)
+
+ def _handle_step_output_message(self, msg):
+ step_name = msg['step_name']
+ if step_name in self._step_outputs:
+ f = self._step_outputs[step_name]
+ f.write(msg['stdout'])
+ f.write(msg['stderr'])
+ f.flush()
+ else:
+ logging.warning(
+ 'Got step-output message for unknown step: %s' % repr(msg))
+
+ def _handle_step_finished_message(self, msg):
+ step_name = msg['step_name']
+ if step_name in self._step_outputs:
+ self._app.status(
+ msg='Finished building %(step_name)s',
+ step_name=step_name)
+ self._close_output(msg)
+ else:
+ logging.warning(
+ 'Got step-finished message for unknown step: %s' % repr(msg))
+
+ def _handle_step_failed_message(self, msg):
+ step_name = msg['step_name']
+ if step_name in self._step_outputs:
+ self._app.status(
+ msg='Build failed: %(step_name)s',
+ step_name=step_name)
+ self._close_output(msg)
+ else:
+ logging.warning(
+ 'Got step-failed message for unknown step: %s' % repr(msg))
+
+ def _succeed(self, event_source, event):
+ self.mainloop.queue_event(self._cm, distbuild.StopConnecting())
+ self._jm.close()
+ logging.info('Build finished OK')
+
+ urls = event.msg['urls']
+ if urls:
+ for url in urls:
+ self._app.status(msg='Artifact: %(url)s', url=url)
+ else:
+ self._app.status(
+ msg='Controller did not give us any artifact URLs.')
+
+ def _fail(self, event_source, event):
+ self.mainloop.queue_event(self._cm, distbuild.StopConnecting())
+ self._jm.close()
+ raise cliapp.AppException(
+ 'Failed to build %s %s %s: %s' %
+ (self._repo_name, self._ref, self._morphology,
+ event.msg['reason']))
+
+ def _terminate(self, event_source, event):
+ self.mainloop.queue_event(self._cm, distbuild.StopConnecting())
+ self._jm.close()
+
diff --git a/distbuild/initiator_connection.py b/distbuild/initiator_connection.py
new file mode 100644
index 00000000..48d083e4
--- /dev/null
+++ b/distbuild/initiator_connection.py
@@ -0,0 +1,216 @@
+# distbuild/initiator_connection.py -- communicate with initiator
+#
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA..
+
+
+import logging
+
+import distbuild
+
+
+class InitiatorDisconnect(object):
+
+ def __init__(self, id):
+ self.id = id
+
+
+class _Close(object):
+
+ def __init__(self, event_source):
+ self.event_source = event_source
+
+
+class InitiatorConnection(distbuild.StateMachine):
+
+ '''Communicate with the initiator.
+
+ This state machine communicates with the initiator, relaying and
+ translating messages from the initiator to the rest of the controller's
+ state machines, and vice versa.
+
+ '''
+
+ _idgen = distbuild.IdentifierGenerator('InitiatorConnection')
+ _route_map = distbuild.RouteMap()
+
+ def __init__(self, conn, artifact_cache_server, morph_instance):
+ distbuild.StateMachine.__init__(self, 'idle')
+ self.conn = conn
+ self.artifact_cache_server = artifact_cache_server
+ self.morph_instance = morph_instance
+
+ def setup(self):
+ self.jm = distbuild.JsonMachine(self.conn)
+ self.mainloop.add_state_machine(self.jm)
+
+ self.our_ids = set()
+
+ spec = [
+ ('idle', self.jm, distbuild.JsonNewMessage, 'idle',
+ self._handle_msg),
+ ('idle', self.jm, distbuild.JsonEof, 'closing', self._disconnect),
+ ('idle', distbuild.BuildController, distbuild.BuildFinished,
+ 'idle', self._send_build_finished_message),
+ ('idle', distbuild.BuildController, distbuild.BuildFailed,
+ 'idle', self._send_build_failed_message),
+ ('idle', distbuild.BuildController, distbuild.BuildProgress,
+ 'idle', self._send_build_progress_message),
+ ('idle', distbuild.BuildController, distbuild.BuildSteps,
+ 'idle', self._send_build_steps_message),
+ ('idle', distbuild.BuildController, distbuild.BuildStepStarted,
+ 'idle', self._send_build_step_started_message),
+ ('idle', distbuild.BuildController, distbuild.BuildOutput,
+ 'idle', self._send_build_output_message),
+ ('idle', distbuild.BuildController, distbuild.BuildStepFinished,
+ 'idle', self._send_build_step_finished_message),
+ ('idle', distbuild.BuildController, distbuild.BuildStepFailed,
+ 'idle', self._send_build_step_failed_message),
+ ('closing', self, _Close, None, self._close),
+ ]
+ self.add_transitions(spec)
+
+ def _handle_msg(self, event_source, event):
+ '''Handle message from initiator.'''
+
+ logging.debug(
+ 'InitiatorConnection: from initiator: %s', repr(event.msg))
+
+ if event.msg['type'] == 'build-request':
+ new_id = self._idgen.next()
+ self.our_ids.add(new_id)
+ self._route_map.add(event.msg['id'], new_id)
+ event.msg['id'] = new_id
+ build_controller = distbuild.BuildController(
+ event.msg, self.artifact_cache_server, self.morph_instance)
+ self.mainloop.add_state_machine(build_controller)
+
+ def _disconnect(self, event_source, event):
+ for id in self.our_ids:
+ logging.debug('InitiatorConnection: InitiatorDisconnect(%s)'
+ % str(id))
+ self.mainloop.queue_event(InitiatorConnection,
+ InitiatorDisconnect(id))
+ # TODO should this clear our_ids?
+ self.mainloop.queue_event(self, _Close(event_source))
+
+ def _close(self, event_source, event):
+ logging.debug('InitiatorConnection: closing: %s',
+ repr(event.event_source))
+
+ event.event_source.close()
+
+ def _handle_result(self, event_source, event):
+ '''Handle result from helper.'''
+
+ if event.msg['id'] in self.our_ids:
+ logging.debug(
+ 'InitiatorConnection: received result: %s', repr(event.msg))
+ self.jm.send(event.msg)
+
+ def _send_build_finished_message(self, event_source, event):
+ if event.id in self.our_ids:
+ msg = distbuild.message('build-finished',
+ id=self._route_map.get_incoming_id(event.id),
+ urls=event.urls)
+ self._route_map.remove(event.id)
+ self.our_ids.remove(event.id)
+ self.jm.send(msg)
+ logging.debug(
+ 'InitiatorConnection: sent to initiator: %s', repr(msg))
+
+ def _send_build_failed_message(self, event_source, event):
+ if event.id in self.our_ids:
+ msg = distbuild.message('build-failed',
+ id=self._route_map.get_incoming_id(event.id),
+ reason=event.reason)
+ self._route_map.remove(event.id)
+ self.our_ids.remove(event.id)
+ self.jm.send(msg)
+ logging.debug(
+ 'InitiatorConnection: sent to initiator: %s', repr(msg))
+
+ def _send_build_progress_message(self, event_source, event):
+ if event.id in self.our_ids:
+ msg = distbuild.message('build-progress',
+ id=self._route_map.get_incoming_id(event.id),
+ message=event.message_text)
+ self.jm.send(msg)
+ logging.debug(
+ 'InitiatorConnection: sent to initiator: %s', repr(msg))
+
+ def _send_build_steps_message(self, event_source, event):
+
+ def make_step_dict(artifact):
+ return {
+ 'name': distbuild.build_step_name(artifact),
+ 'build-depends': [
+ distbuild.build_step_name(x)
+ for x in artifact.dependencies
+ ]
+ }
+
+ if event.id in self.our_ids:
+ step_names = distbuild.map_build_graph(
+ event.artifact, make_step_dict)
+ msg = distbuild.message('build-steps',
+ id=self._route_map.get_incoming_id(event.id),
+ steps=step_names)
+ self.jm.send(msg)
+ logging.debug(
+ 'InitiatorConnection: sent to initiator: %s', repr(msg))
+
+ def _send_build_step_started_message(self, event_source, event):
+ if event.id in self.our_ids:
+ msg = distbuild.message('step-started',
+ id=self._route_map.get_incoming_id(event.id),
+ step_name=event.step_name,
+ worker_name=event.worker_name)
+ self.jm.send(msg)
+ logging.debug(
+ 'InitiatorConnection: sent to initiator: %s', repr(msg))
+
+ def _send_build_output_message(self, event_source, event):
+ logging.debug('InitiatorConnection: build_output: '
+ 'id=%s stdout=%s stderr=%s' %
+ (repr(event.id), repr(event.stdout), repr(event.stderr)))
+ if event.id in self.our_ids:
+ msg = distbuild.message('step-output',
+ id=self._route_map.get_incoming_id(event.id),
+ step_name=event.step_name,
+ stdout=event.stdout,
+ stderr=event.stderr)
+ self.jm.send(msg)
+ logging.debug(
+ 'InitiatorConnection: sent to initiator: %s', repr(msg))
+
+ def _send_build_step_finished_message(self, event_source, event):
+ if event.id in self.our_ids:
+ msg = distbuild.message('step-finished',
+ id=self._route_map.get_incoming_id(event.id),
+ step_name=event.step_name)
+ self.jm.send(msg)
+ logging.debug(
+ 'InitiatorConnection: sent to initiator: %s', repr(msg))
+
+ def _send_build_step_failed_message(self, event_source, event):
+ if event.id in self.our_ids:
+ msg = distbuild.message('step-failed',
+ id=self._route_map.get_incoming_id(event.id),
+ step_name=event.step_name)
+ self.jm.send(msg)
+ logging.debug(
+ 'InitiatorConnection: sent to initiator: %s', repr(msg))
+
diff --git a/distbuild/jm.py b/distbuild/jm.py
new file mode 100644
index 00000000..ae222c00
--- /dev/null
+++ b/distbuild/jm.py
@@ -0,0 +1,98 @@
+# mainloop/jm.py -- state machine for JSON communication between nodes
+#
+# Copyright 2012 Codethink Limited.
+# All rights reserved.
+
+
+import fcntl
+import json
+import logging
+import os
+import socket
+import sys
+
+from sm import StateMachine
+from stringbuffer import StringBuffer
+from sockbuf import (SocketBuffer, SocketBufferNewData,
+ SocketBufferEof, SocketError)
+
+
+class JsonNewMessage(object):
+
+ def __init__(self, msg):
+ self.msg = msg
+
+
+class JsonEof(object):
+
+ pass
+
+
+class _Close2(object):
+
+ pass
+
+
+class JsonMachine(StateMachine):
+
+ '''A state machine for sending/receiving JSON messages across TCP.'''
+
+ max_buffer = 16 * 1024
+
+ def __init__(self, conn):
+ StateMachine.__init__(self, 'rw')
+ self.conn = conn
+ self.debug_json = False
+
+ def setup(self):
+ sockbuf = self.sockbuf = SocketBuffer(self.conn, self.max_buffer)
+ self.mainloop.add_state_machine(sockbuf)
+
+ self._eof = False
+ self.receive_buf = StringBuffer()
+
+ spec = [
+ ('rw', sockbuf, SocketBufferNewData, 'rw', self._parse),
+ ('rw', sockbuf, SocketBufferEof, 'w', self._send_eof),
+ ('rw', self, _Close2, None, self._really_close),
+
+ ('w', self, _Close2, None, self._really_close),
+ ]
+ self.add_transitions(spec)
+
+ def send(self, msg):
+ '''Send a message to the other side.'''
+ self.sockbuf.write('%s\n' % json.dumps(msg))
+
+ def close(self):
+ '''Tell state machine it should shut down.
+
+ The state machine will vanish once it has flushed any pending
+ writes.
+
+ '''
+
+ self.mainloop.queue_event(self, _Close2())
+
+ def _parse(self, event_source, event):
+ data = event.data
+ self.receive_buf.add(data)
+ if self.debug_json:
+ logging.debug('JsonMachine: Received: %s' % repr(data))
+ while True:
+ line = self.receive_buf.readline()
+ if line is None:
+ break
+ line = line.rstrip()
+ if self.debug_json:
+ logging.debug('JsonMachine: line: %s' % repr(line))
+ msg = json.loads(line)
+ self.mainloop.queue_event(self, JsonNewMessage(msg))
+
+ def _send_eof(self, event_source, event):
+ self.mainloop.queue_event(self, JsonEof())
+
+ def _really_close(self, event_source, event):
+ self.sockbuf.close()
+ self._send_eof(event_source, event)
+
diff --git a/distbuild/json_router.py b/distbuild/json_router.py
new file mode 100644
index 00000000..bf272174
--- /dev/null
+++ b/distbuild/json_router.py
@@ -0,0 +1,164 @@
+# distbuild/json_router.py -- state machine to route JSON messages
+#
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA..
+
+
+import logging
+
+import distbuild
+
+
+class JsonRouter(distbuild.StateMachine):
+
+ '''Route JSON messages between clients and helpers.
+
+ This state machine receives JSON messages from clients and helpers,
+ and routes messages between them.
+
+ Each incoming request is labeled with a unique identifier, then
+ sent to the next free helper. The helper's response will retain
+ the unique id, so that the response can be routed to the right
+ client.
+
+ '''
+
+ pending_requests = []
+ running_requests = {}
+ pending_helpers = []
+ request_counter = distbuild.IdentifierGenerator('JsonRouter')
+ route_map = distbuild.RouteMap()
+
+ def __init__(self, conn):
+ distbuild.StateMachine.__init__(self, 'idle')
+ self.conn = conn
+ logging.debug('JsonMachine: connection from %s', conn.getpeername())
+
+ def setup(self):
+ jm = distbuild.JsonMachine(self.conn)
+ jm.debug_json = True
+ self.mainloop.add_state_machine(jm)
+
+ spec = [
+ ('idle', jm, distbuild.JsonNewMessage, 'idle', self.bloop),
+ ('idle', jm, distbuild.JsonEof, None, self.close),
+ ]
+ self.add_transitions(spec)
+
+ def _lookup_request(self, request_id):
+ if request_id in self.running_requests:
+ return self.running_requests[request_id]
+ else:
+ return None
+
+ def bloop(self, event_source, event):
+ logging.debug('JsonRouter: got msg: %s', repr(event.msg))
+ handlers = {
+ 'http-request': self.do_request,
+ 'http-response': self.do_response,
+ 'exec-request': self.do_request,
+ 'exec-cancel': self.do_cancel,
+ 'exec-output': self.do_exec_output,
+ 'exec-response': self.do_response,
+ 'helper-ready': self.do_helper_ready,
+ }
+ handler = handlers.get(event.msg['type'])
+ handler(event_source, event)
+
+ def do_request(self, client, event):
+ self._enqueue_request(client, event.msg)
+ if self.pending_helpers:
+ self._send_request()
+
+ def do_cancel(self, client, event):
+ for id in self.route_map.get_outgoing_ids(event.msg['id']):
+ logging.debug('JsonRouter: looking up request for id %s', id)
+ t = self._lookup_request(id)
+ if t:
+ helper = t[2]
+ new = dict(event.msg)
+ new['id'] = id
+ helper.send(new)
+ logging.debug('JsonRouter: sent to helper: %s', repr(new))
+
+ def do_response(self, helper, event):
+ t = self._lookup_request(event.msg['id'])
+ if t:
+ client, msg, helper = t
+ new = dict(event.msg)
+ new['id'] = self.route_map.get_incoming_id(msg['id'])
+ client.send(new)
+ logging.debug('JsonRouter: sent to client: %s', repr(new))
+
+ def do_helper_ready(self, helper, event):
+ self.pending_helpers.append(helper)
+ if self.pending_requests:
+ self._send_request()
+
+ def do_exec_output(self, helper, event):
+ t = self._lookup_request(event.msg['id'])
+ if t:
+ client, msg, helper = t
+ new = dict(event.msg)
+ new['id'] = self.route_map.get_incoming_id(msg['id'])
+ client.send(new)
+ logging.debug('JsonRouter: sent to client: %s', repr(new))
+
+ def close(self, event_source, event):
+ logging.debug('closing: %s', repr(event_source))
+ event_source.close()
+
+ # Remove from pending helpers.
+ if event_source in self.pending_helpers:
+ self.pending_helpers.remove(event_source)
+
+ # Remove from running requests, and put the request back in the
+ # pending requests queue if the helper quit (but not if the
+ # client quit).
+ for request_id in self.running_requests.keys():
+ client, msg, helper = self.running_requests[request_id]
+ if event_source == client:
+ del self.running_requests[request_id]
+ elif event_source == helper:
+ del self.running_requests[request_id]
+ self._enqueue_request(client, msg)
+
+ # Remove from pending requests, if the client quit.
+ i = 0
+ while i < len(self.pending_requests):
+ client, msg = self.pending_requests[i]
+ if event_source == client:
+ del self.pending_requests[i]
+ else:
+ i += 1
+
+ # Finally, if there are any pending requests and helpers,
+ # send requests.
+ while self.pending_requests and self.pending_helpers:
+ self._send_request()
+
+ def _enqueue_request(self, client, msg):
+ new = dict(msg)
+ new['id'] = self.request_counter.next()
+ self.route_map.add(msg['id'], new['id'])
+ self.pending_requests.append((client, new))
+
+ def _send_request(self):
+ client, msg = self.pending_requests.pop(0)
+ helper = self.pending_helpers.pop()
+ self.running_requests[msg['id']] = (client, msg, helper)
+ helper.send(msg)
+ logging.debug('JsonRouter: sent to helper: %s', repr(msg))
+
diff --git a/distbuild/mainloop.py b/distbuild/mainloop.py
new file mode 100644
index 00000000..e3f9ae2d
--- /dev/null
+++ b/distbuild/mainloop.py
@@ -0,0 +1,117 @@
+# mainloop/mainloop.py -- select-based main loop
+#
+# Copyright 2012 Codethink Limited.
+# All rights reserved.
+
+
+import fcntl
+import logging
+import os
+import select
+
+
+class MainLoop(object):
+
+ '''A select-based main loop.
+
+ The main loop watches a set of file descriptors wrapped in
+ EventSource objects, and when something happens with them,
+ asks the EventSource objects to create events, which it then
+ feeds into user-supplied state machines. The state machines
+ can create further events, which are processed further.
+
+ When nothing is happening, the main loop sleeps in the
+ select.select call.
+
+ '''
+
+ def __init__(self):
+ self._machines = []
+ self._sources = []
+ self._events = []
+ self.dump_filename = None
+
+ def add_state_machine(self, machine):
+ logging.debug('MainLoop.add_state_machine: %s' % machine)
+ machine.mainloop = self
+ machine.setup()
+ self._machines.append(machine)
+ if self.dump_filename:
+ filename = '%s%s.dot' % (self.dump_filename,
+ machine.__class__.__name__)
+ machine.dump_dot(filename)
+
+ def remove_state_machine(self, machine):
+ logging.debug('MainLoop.remove_state_machine: %s' % machine)
+ self._machines.remove(machine)
+
+ def add_event_source(self, event_source):
+ logging.debug('MainLoop.add_event_source: %s' % event_source)
+ self._sources.append(event_source)
+
+ def remove_event_source(self, event_source):
+ logging.debug('MainLoop.remove_event_source: %s' % event_source)
+ self._sources.remove(event_source)
+
+ def _setup_select(self):
+ r = []
+ w = []
+ x = []
+ timeout = None
+
+ self._sources = [s for s in self._sources if not s.is_finished()]
+
+ for event_source in self._sources:
+ sr, sw, sx, st = event_source.get_select_params()
+ r.extend(sr)
+ w.extend(sw)
+ x.extend(sx)
+ if timeout is None:
+ timeout = st
+ elif st is not None:
+ timeout = min(timeout, st)
+
+ return r, w, x, timeout
+
+ def _run_once(self):
+ r, w, x, timeout = self._setup_select()
+ assert r or w or x or timeout is not None
+ r, w, x = select.select(r, w, x, timeout)
+
+ for event_source in self._sources:
+ if event_source.is_finished():
+ self.remove_event_source(event_source)
+ else:
+ for event in event_source.get_events(r, w, x):
+ self.queue_event(event_source, event)
+
+ for event_source, event in self._dequeue_events():
+ for machine in self._machines[:]:
+ for new_event in machine.handle_event(event_source, event):
+ self.queue_event(event_source, new_event)
+ if machine.state is None:
+ self.remove_state_machine(machine)
+
+ def run(self):
+ '''Run the main loop.
+
+ The main loop terminates when there are no state machines to
+ run anymore.
+
+ '''
+
+ logging.debug('MainLoop starts')
+ while self._machines:
+ self._run_once()
+ logging.debug('MainLoop ends')
+
+ def queue_event(self, event_source, event):
+ '''Add an event to queue of events to be processed.'''
+
+ self._events.append((event_source, event))
+
+ def _dequeue_events(self):
+ while self._events:
+ event_queue, event = self._events.pop(0)
+ yield event_queue, event
+
diff --git a/distbuild/protocol.py b/distbuild/protocol.py
new file mode 100644
index 00000000..5d693761
--- /dev/null
+++ b/distbuild/protocol.py
@@ -0,0 +1,93 @@
+# distbuild/protocol.py -- abstractions for the JSON messages
+#
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA..
+
+
+'''Construct protocol message objects (dicts).'''
+
+
+_types = {
+ 'build-request': [
+ 'id',
+ 'repo',
+ 'ref',
+ 'morphology',
+ ],
+ 'build-progress': [
+ 'id',
+ 'message',
+ ],
+ 'build-steps': [
+ 'id',
+ 'steps',
+ ],
+ 'step-started': [
+ 'id',
+ 'step_name',
+ 'worker_name',
+ ],
+ 'step-output': [
+ 'id',
+ 'step_name',
+ 'stdout',
+ 'stderr',
+ ],
+ 'step-finished': [
+ 'id',
+ 'step_name',
+ ],
+ 'step-failed': [
+ 'id',
+ 'step_name',
+ ],
+ 'build-finished': [
+ 'id',
+ 'urls',
+ ],
+ 'build-failed': [
+ 'id',
+ 'reason',
+ ],
+ 'exec-request': [
+ 'id',
+ 'argv',
+ 'stdin_contents',
+ ],
+ 'exec-cancel': [
+ 'id',
+ ],
+ 'http-request': [
+ 'id',
+ 'url',
+ 'method',
+ ],
+}
+
+
+def message(message_type, **kwargs):
+ assert message_type in _types
+ required_fields = _types[message_type]
+
+ for name in required_fields:
+ assert name in kwargs, 'field %s is required' % name
+
+ for name in kwargs:
+ assert name in required_fields, 'field %s is not allowed' % name
+
+ msg = dict(kwargs)
+ msg['type'] = message_type
+ return msg
+
diff --git a/distbuild/proxy_event_source.py b/distbuild/proxy_event_source.py
new file mode 100644
index 00000000..5b35b741
--- /dev/null
+++ b/distbuild/proxy_event_source.py
@@ -0,0 +1,47 @@
+# distbuild/proxy_event_source.py -- proxy for temporary event sources
+#
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA..
+
+
+import errno
+import logging
+import socket
+
+import distbuild
+
+
+class ProxyEventSource(object):
+
+ '''Proxy event sources that may come and go.'''
+
+ def __init__(self):
+ self.event_source = None
+
+ def get_select_params(self):
+ if self.event_source:
+ return self.event_source.get_select_params()
+ else:
+ return [], [], [], None
+
+ def get_events(self, r, w, x):
+ if self.event_source:
+ return self.event_source.get_events(r, w, x)
+ else:
+ return []
+
+ def is_finished(self):
+ return False
+
diff --git a/distbuild/route_map.py b/distbuild/route_map.py
new file mode 100644
index 00000000..f67bcb54
--- /dev/null
+++ b/distbuild/route_map.py
@@ -0,0 +1,60 @@
+# distbuild/route_map.py -- map message ids for routing purposes
+#
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA..
+
+
+class RouteMap(object):
+
+ '''Map message identifiers for routing purposes.
+
+ Various state machines need to handle requests coming from multiple
+ sources, and they need to keep track of which responses should be
+ sent to which requestors. This class provides tools for keeping
+ track of that.
+
+ Each message is expected to have a unique identifier of some sort.
+ The incoming request message has one, and all responses to it need
+ to keep that. An incoming request might be converted into one or more
+ outgoing requests, each with its own unique id. The responses to all
+ of those need to be mapped back to the original incoming request.
+
+ For this class, we care about "incoming id" and "outgoing id".
+ There can be multiple outgoing identifiers for one incoming one.
+
+ '''
+
+ def __init__(self):
+ self._routes = {}
+
+ def add(self, incoming_id, outgoing_id):
+ assert (outgoing_id not in self._routes or
+ self._routes[outgoing_id] == incoming_id)
+ self._routes[outgoing_id] = incoming_id
+
+ def get_incoming_id(self, outgoing_id):
+ '''Get the incoming id corresponding to an outgoing one.
+
+ Raise KeyError if not found.
+
+ '''
+
+ return self._routes[outgoing_id]
+
+ def get_outgoing_ids(self, incoming_id):
+ return [o for (o, i) in self._routes.iteritems() if i == incoming_id]
+
+ def remove(self, outgoing_id):
+ del self._routes[outgoing_id]
diff --git a/distbuild/route_map_tests.py b/distbuild/route_map_tests.py
new file mode 100644
index 00000000..c8ed69b3
--- /dev/null
+++ b/distbuild/route_map_tests.py
@@ -0,0 +1,56 @@
+# distbuild/route_map_tests.py -- unit tests for message routing
+#
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA..
+
+
+import unittest
+
+import distbuild
+
+
+class RouteMapTests(unittest.TestCase):
+
+ def setUp(self):
+ self.rm = distbuild.RouteMap()
+
+ def test_raises_error_for_unknown_route(self):
+ self.assertRaises(KeyError, self.rm.get_incoming_id, 'outgoing')
+
+ def test_finds_added_route(self):
+ self.rm.add('incoming', 'outgoing')
+ self.assertEqual(self.rm.get_incoming_id('outgoing'), 'incoming')
+
+ def test_finds_outgoing_ids(self):
+ self.rm.add('incoming', 'outgoing')
+ self.assertEqual(self.rm.get_outgoing_ids('incoming'), ['outgoing'])
+
+ def test_removes_added_route(self):
+ self.rm.add('incoming', 'outgoing')
+ self.rm.remove('outgoing')
+ self.assertRaises(KeyError, self.rm.get_incoming_id, 'outgoing')
+
+ def test_raises_error_if_forgetting_unknown_route(self):
+ self.assertRaises(KeyError, self.rm.remove, 'outgoing')
+
+ def test_silently_ignores_adding_existing_route(self):
+ self.rm.add('incoming', 'outgoing')
+ self.rm.add('incoming', 'outgoing')
+ self.assertEqual(self.rm.get_incoming_id('outgoing'), 'incoming')
+
+ def test_raises_assert_if_adding_conflicting_route(self):
+ self.rm.add('incoming', 'outgoing')
+ self.assertRaises(AssertionError, self.rm.add, 'different', 'outgoing')
+
diff --git a/distbuild/serialise.py b/distbuild/serialise.py
new file mode 100644
index 00000000..060833b1
--- /dev/null
+++ b/distbuild/serialise.py
@@ -0,0 +1,166 @@
+# distbuild/serialise.py -- (de)serialise Artifact object graphs
+#
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA..
+
+
+import json
+
+import morphlib
+
+
+morphology_attributes = [
+ 'needs_artifact_metadata_cached',
+]
+
+
+def serialise_artifact(artifact):
+ '''Serialise an Artifact object and its dependencies into string form.'''
+
+ def encode_morphology(morphology):
+ result = {}
+ for key in morphology.keys():
+ result[key] = morphology[key]
+ for x in morphology_attributes:
+ result['__%s' % x] = getattr(morphology, x)
+ return result
+
+ def encode_source(source):
+ source_dic = {
+ 'repo': None,
+ 'repo_name': source.repo_name,
+ 'original_ref': source.original_ref,
+ 'sha1': source.sha1,
+ 'tree': source.tree,
+ 'morphology': encode_morphology(source.morphology),
+ 'filename': source.filename,
+ }
+ if source.morphology['kind'] == 'chunk':
+ source_dic['build_mode'] = source.build_mode
+ source_dic['prefix'] = source.prefix
+ return source_dic
+
+ def encode_single_artifact(a, encoded):
+ if artifact.source.morphology['kind'] == 'system':
+ arch = artifact.source.morphology['arch']
+ else:
+ arch = artifact.arch
+ return {
+ 'source': encode_source(a.source),
+ 'name': a.name,
+ 'cache_id': a.cache_id,
+ 'cache_key': a.cache_key,
+ 'dependencies': [encoded[d.cache_key]['cache_key']
+ for d in a.dependencies],
+ 'arch': arch,
+ }
+
+ visited = set()
+ def traverse(a):
+ visited.add(a)
+ for dep in a.dependencies:
+ if dep in visited:
+ continue
+ for ret in traverse(dep):
+ yield ret
+ yield a
+
+ encoded = {}
+ for a in traverse(artifact):
+ if a.cache_key not in encoded:
+ encoded[a.cache_key] = encode_single_artifact(a, encoded)
+
+ encoded['_root'] = artifact.cache_key
+ return json.dumps(encoded)
+
+
+def deserialise_artifact(encoded):
+ '''Re-construct the Artifact object (and dependencies).
+
+ The argument should be a string returned by ``serialise_artifact``.
+ The reconstructed Artifact objects will be sufficiently like the
+ originals that they can be used as a build graph, and other such
+ purposes, by Morph.
+
+ '''
+
+ def unserialise_morphology(le_dict):
+ '''Convert a dict into something that kinda acts like a Morphology.
+
+ As it happens, we don't need the full Morphology so we cheat.
+ Cheating is good.
+
+ '''
+
+ class FakeMorphology(dict):
+
+ def get_commands(self, which):
+ '''Get commands to run from a morphology or build system'''
+ if self[which] is None:
+ attr = '_'.join(which.split('-'))
+ bs = morphlib.buildsystem.lookup_build_system(
+ self['build-system'])
+ return getattr(bs, attr)
+ else:
+ return self[which]
+
+ morphology = FakeMorphology(le_dict)
+ for x in morphology_attributes:
+ setattr(morphology, x, le_dict['__%s' % x])
+ del morphology['__%s' % x]
+ return morphology
+
+ def unserialise_source(le_dict):
+ '''Convert a dict into a Source object.'''
+
+ morphology = unserialise_morphology(le_dict['morphology'])
+ source = morphlib.source.Source(le_dict['repo_name'],
+ le_dict['original_ref'],
+ le_dict['sha1'],
+ le_dict['tree'],
+ morphology,
+ le_dict['filename'])
+ if morphology['kind'] == 'chunk':
+ source.build_mode = le_dict['build_mode']
+ source.prefix = le_dict['prefix']
+ return source
+
+ def unserialise_single_artifact(le_dict):
+ '''Convert dict into an Artifact object.
+
+ Do not set dependencies, that will be dealt with later.
+
+ '''
+
+ source = unserialise_source(le_dict['source'])
+ artifact = morphlib.artifact.Artifact(source, le_dict['name'])
+ artifact.cache_id = le_dict['cache_id']
+ artifact.cache_key = le_dict['cache_key']
+ artifact.arch = le_dict['arch']
+ return artifact
+
+ le_dicts = json.loads(encoded)
+ cache_keys = [k for k in le_dicts.keys() if k != '_root']
+ artifacts = {}
+ for cache_key in cache_keys:
+ le_dict = le_dicts[cache_key]
+ artifacts[cache_key] = unserialise_single_artifact(le_dict)
+ for cache_key in cache_keys:
+ le_dict = le_dicts[cache_key]
+ artifact = artifacts[cache_key]
+ artifact.dependencies = [artifacts[k] for k in le_dict['dependencies']]
+
+ return artifacts[le_dicts['_root']]
+
diff --git a/distbuild/serialise_tests.py b/distbuild/serialise_tests.py
new file mode 100644
index 00000000..2b4b3af7
--- /dev/null
+++ b/distbuild/serialise_tests.py
@@ -0,0 +1,148 @@
+# distbuild/serialise_tests.py -- unit tests for Artifact serialisation
+#
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA..
+
+
+import unittest
+
+import distbuild
+
+
+class MockMorphology(object):
+
+ def __init__(self, name):
+ self.dict = {
+ 'name': '%s.morphology.name' % name,
+ 'kind': '%s.morphology.kind' % name,
+ }
+ self.needs_staging_area = None
+ self.needs_artifact_metadata_cached = None
+
+ def keys(self):
+ return self.dict.keys()
+
+ def __getitem__(self, key):
+ return self.dict[key]
+
+
+class MockSource(object):
+
+ def __init__(self, name):
+ self.repo = None
+ self.repo_name = '%s.source.repo_name' % name
+ self.original_ref = '%s.source.original_ref' % name
+ self.sha1 = '%s.source.sha1' % name
+ self.tree = '%s.source.tree' % name
+ self.morphology = MockMorphology(name)
+ self.filename = '%s.source.filename' % name
+
+
+class MockArtifact(object):
+
+ def __init__(self, name):
+ self.source = MockSource(name)
+ self.name = name
+ self.cache_id = {
+ 'blip': '%s.blip' % name,
+ 'integer': 42,
+ }
+ self.cache_key = '%s.cache_key' % name
+ self.dependencies = []
+
+
+class SerialisationTests(unittest.TestCase):
+
+ def setUp(self):
+ self.art1 = MockArtifact('name1')
+ self.art2 = MockArtifact('name2')
+ self.art3 = MockArtifact('name3')
+ self.art4 = MockArtifact('name4')
+
+ def assertEqualMorphologies(self, a, b):
+ self.assertEqual(sorted(a.keys()), sorted(b.keys()))
+ keys = sorted(a.keys())
+ a_values = [a[k] for k in keys]
+ b_values = [b[k] for k in keys]
+ self.assertEqual(a_values, b_values)
+ self.assertEqual(a.needs_staging_area, b.needs_staging_area)
+ self.assertEqual(a.needs_artifact_metadata_cached,
+ b.needs_artifact_metadata_cached)
+ self.assertEqual(a.needs_staging_area,
+ b.needs_staging_area)
+
+ def assertEqualSources(self, a, b):
+ self.assertEqual(a.repo, b.repo)
+ self.assertEqual(a.repo_name, b.repo_name)
+ self.assertEqual(a.original_ref, b.original_ref)
+ self.assertEqual(a.sha1, b.sha1)
+ self.assertEqual(a.tree, b.tree)
+ self.assertEqualMorphologies(a.morphology, b.morphology)
+ self.assertEqual(a.filename, b.filename)
+
+ def assertEqualArtifacts(self, a, b):
+ self.assertEqualSources(a.source, b.source)
+ self.assertEqual(a.name, b.name)
+ self.assertEqual(a.cache_id, b.cache_id)
+ self.assertEqual(a.cache_key, b.cache_key)
+ self.assertEqual(len(a.dependencies), len(b.dependencies))
+ for i in range(len(a.dependencies)):
+ self.assertEqualArtifacts(a.dependencies[i], b.dependencies[i])
+
+ def verify_round_trip(self, artifact):
+ encoded = distbuild.serialise_artifact(artifact)
+ decoded = distbuild.deserialise_artifact(encoded)
+ self.assertEqualArtifacts(artifact, decoded)
+
+ def key(a):
+ return a.cache_key
+
+ objs = {}
+ queue = [decoded]
+ while queue:
+ obj = queue.pop()
+ k = key(obj)
+ if k in objs:
+ self.assertTrue(obj is objs[k])
+ else:
+ objs[k] = obj
+ queue.extend(obj.dependencies)
+
+ def test_returns_string(self):
+ encoded = distbuild.serialise_artifact(self.art1)
+ self.assertEqual(type(encoded), str)
+
+ def test_works_without_dependencies(self):
+ self.verify_round_trip(self.art1)
+
+ def test_works_with_single_dependency(self):
+ self.art1.dependencies = [self.art2]
+ self.verify_round_trip(self.art1)
+
+ def test_works_with_two_dependencies(self):
+ self.art1.dependencies = [self.art2, self.art3]
+ self.verify_round_trip(self.art1)
+
+ def test_works_with_two_levels_of_dependencies(self):
+ self.art2.dependencies = [self.art4]
+ self.art1.dependencies = [self.art2, self.art3]
+ self.verify_round_trip(self.art1)
+
+ def test_works_with_dag(self):
+ self.art2.dependencies = [self.art4]
+ self.art3.dependencies = [self.art4]
+ self.art1.dependencies = [self.art2, self.art3]
+ self.verify_round_trip(self.art1)
+
diff --git a/distbuild/sm.py b/distbuild/sm.py
new file mode 100644
index 00000000..f50515c3
--- /dev/null
+++ b/distbuild/sm.py
@@ -0,0 +1,139 @@
+# mainloop/sm.py -- state machine abstraction
+#
+# Copyright 2012 Codethink Limited.
+# All rights reserved.
+
+
+import logging
+import re
+
+
+classnamepat = re.compile(r"<class '(?P<name>.*)'>")
+
+
+class StateMachine(object):
+
+ '''A state machine abstraction.
+
+ The caller may specify call backs for events coming from specific
+ event sources. An event source might, for example, be a socket
+ file descriptor, and the event might be incoming data from the
+ socket. The callback would then process the data, perhaps by
+ collecting it into a buffer and parsing out messages from it.
+
+ A callback gets the event source and event as arguments. It returns
+ the new state, and a list of new events to
+
+ A callback may return or yield new events, which will be handled
+ eventually. They may or may not be handled in order.
+
+ There can only be one callback for one state, source, and event
+ class combination.
+
+ States are represented by unique objects, e.g., strings containing
+ the names of the states. When a machine wants to stop, it sets its
+ state to None.
+
+ '''
+
+ def __init__(self, initial_state):
+ self._transitions = {}
+ self.state = self._initial_state = initial_state
+ self.debug_transitions = False
+
+ def setup(self):
+ '''Set up machine for execution.
+
+ This is called when the machine is added to the main loop.
+
+ '''
+
+ def _key(self, state, event_source, event_class):
+ return (state, event_source, event_class)
+
+ def add_transition(self, state, source, event_class, new_state, callback):
+ '''Add a transition to the state machine.
+
+ When the state machine is in the given state, and an event of
+ a given type comes from a given source, move the state machine
+ to the new state and call the callback function.
+
+ '''
+
+ key = self._key(state, source, event_class)
+ assert key not in self._transitions, \
+ 'Transition %s already registered' % str(key)
+ self._transitions[key] = (new_state, callback)
+
+ def add_transitions(self, specification):
+ '''Add many transitions.
+
+ The specification is a list of transitions.
+ Each transition is a tuple of the arguments given to
+ ``add_transition``.
+
+ '''
+
+ for t in specification:
+ self.add_transition(*t)
+
+ def handle_event(self, event_source, event):
+ '''Handle a given event.
+
+ Return list of new events to handle.
+
+ '''
+
+ key = self._key(self.state, event_source, event.__class__)
+ if key not in self._transitions:
+ if self.debug_transitions: # pragma: no cover
+ prefix = '%s: handle_event: ' % self.__class__.__name__
+ logging.debug(prefix + 'not relevant for us: %s' % repr(event))
+ logging.debug(prefix + 'key: %s', repr(key))
+ logging.debug(prefix + 'state: %s', repr(self.state))
+ return []
+
+ new_state, callback = self._transitions[key]
+ if self.debug_transitions: # pragma: no cover
+ logging.debug(
+ '%s: state change %s -> %s callback=%s' %
+ (self.__class__.__name__, self.state, new_state,
+ str(callback)))
+ self.state = new_state
+ if callback is not None:
+ ret = callback(event_source, event)
+ if ret is None:
+ return []
+ else:
+ return list(ret)
+ else:
+ return []
+
+ def dump_dot(self, filename): # pragma: no cover
+ '''Write a Graphviz DOT file for the state machine.'''
+
+ with open(filename, 'w') as f:
+ f.write('digraph %s {\n' % self._classname(self.__class__))
+ first = True
+ for key in self._transitions:
+ state, src, event_class = key
+ if first:
+ f.write('"START" -> "%s" [label=""];\n' %
+ self._initial_state)
+ first = False
+
+ new_state, callback = self._transitions[key]
+ if new_state is None:
+ new_state = 'END'
+ f.write('"%s" -> "%s" [label="%s"];\n' %
+ (state, new_state, self._classname(event_class)))
+ f.write('}\n')
+
+ def _classname(self, klass): # pragma: no cover
+ s = str(klass)
+ m = classnamepat.match(s)
+ if m:
+ return m.group('name').split('.')[-1]
+ else:
+ return s
+
diff --git a/distbuild/sm_tests.py b/distbuild/sm_tests.py
new file mode 100644
index 00000000..3c33e494
--- /dev/null
+++ b/distbuild/sm_tests.py
@@ -0,0 +1,86 @@
+# distbuild/sm_tests.py -- unit tests for state machine abstraction
+#
+# Copyright 2012 Codethink Limited.
+# All rights reserved.
+
+
+import unittest
+
+import distbuild
+
+
+class DummyEventSource(object):
+
+ pass
+
+
+class DummyEvent(object):
+
+ pass
+
+
+class StateMachineTests(unittest.TestCase):
+
+ def setUp(self):
+ self.sm = distbuild.StateMachine('init')
+ self.sm.distbuild = None
+ self.sm.setup()
+ self.event_source = DummyEventSource()
+ self.event = DummyEvent()
+ self.event_sources = []
+ self.events = []
+ self.callback_result = None
+
+ def callback(self, event_source, event):
+ self.event_sources.append(event_source)
+ self.events.append(event)
+ return self.callback_result
+
+ def test_ignores_event_when_there_are_no_transitions(self):
+ new_events = self.sm.handle_event(self.event_source, self.event)
+ self.assertEqual(new_events, [])
+ self.assertEqual(self.event_sources, [])
+ self.assertEqual(self.events, [])
+
+ def test_ignores_event_when_no_transition_matches(self):
+ spec = [
+ ('init', self.event_source, str, 'init', self.callback),
+ ]
+ self.sm.add_transitions(spec)
+ new_events = self.sm.handle_event(self.event_source, self.event)
+ self.assertEqual(new_events, [])
+ self.assertEqual(self.event_sources, [])
+ self.assertEqual(self.events, [])
+
+ def test_handles_lack_of_callback_ok(self):
+ spec = [
+ ('init', self.event_source, DummyEvent, 'init', None),
+ ]
+ self.sm.add_transitions(spec)
+ new_events = self.sm.handle_event(self.event_source, self.event)
+ self.assertEqual(new_events, [])
+ self.assertEqual(self.event_sources, [])
+ self.assertEqual(self.events, [])
+
+ def test_calls_registered_callback_for_right_event(self):
+ spec = [
+ ('init', self.event_source, DummyEvent, 'init', self.callback),
+ ]
+ self.sm.add_transitions(spec)
+ new_events = self.sm.handle_event(self.event_source, self.event)
+ self.assertEqual(new_events, [])
+ self.assertEqual(self.event_sources, [self.event_source])
+ self.assertEqual(self.events, [self.event])
+
+ def test_handle_converts_nonlist_to_list(self):
+ self.callback_result = ('foo', 'bar')
+
+ spec = [
+ ('init', self.event_source, DummyEvent, 'init', self.callback),
+ ]
+ self.sm.add_transitions(spec)
+ new_events = self.sm.handle_event(self.event_source, self.event)
+ self.assertEqual(new_events, ['foo', 'bar'])
+ self.assertEqual(self.event_sources, [self.event_source])
+ self.assertEqual(self.events, [self.event])
+
diff --git a/distbuild/sockbuf.py b/distbuild/sockbuf.py
new file mode 100644
index 00000000..a7fe339a
--- /dev/null
+++ b/distbuild/sockbuf.py
@@ -0,0 +1,159 @@
+# mainloop/sockbuf.py -- a buffering, non-blocking socket I/O state machine
+#
+# Copyright 2012 Codethink Limited
+# All rights reserved.
+
+
+import logging
+
+
+'''A buffering, non-blocking I/O state machine for sockets.
+
+The state machine is given an open socket. It reads from the socket,
+and writes to it, when it can do so without blocking. A maximum size
+for the read buffer can be set: the state machine will stop reading
+if the buffer becomes full. This avoids the problem of an excessively
+large buffer.
+
+The state machine generates events to indicate that the buffer contains
+data or that the end of the file for reading has been reached. An event
+is also generated if there is an error while doing I/O with the socket.
+
+* SocketError: an error has occurred
+* SocketBufferNewData: socket buffer has received new data; the data
+ is available as the ``data`` attribute
+* SocketBufferEof: socket buffer has reached EOF for reading, but
+ still writes anything in the write buffer (or anything that gets added
+ to the write buffer)
+* SocketBufferClosed: socket is now closed
+
+The state machine starts shutting down when ``close`` method is called,
+but continues to operate in write-only mode until the write buffer has
+been emptied.
+
+'''
+
+
+from socketsrc import (SocketError, SocketReadable, SocketWriteable,
+ SocketEventSource)
+from sm import StateMachine
+from stringbuffer import StringBuffer
+
+
+class SocketBufferNewData(object):
+
+ '''Socket buffer has received new data.'''
+
+ def __init__(self, data):
+ self.data = data
+
+
+class SocketBufferEof(object):
+
+ '''Socket buffer has reached end of file when reading.
+
+ Note that the socket buffer may still be available for writing.
+ However, no more new data will be read.
+
+ '''
+
+
+class SocketBufferClosed(object):
+
+ '''Socket buffer has closed its socket.'''
+
+
+class _Close(object): pass
+class _WriteBufferIsEmpty(object): pass
+class _WriteBufferNotEmpty(object): pass
+
+
+
+class SocketBuffer(StateMachine):
+
+ def __init__(self, sock, max_buffer):
+ StateMachine.__init__(self, 'reading')
+
+ self._sock = sock
+ self._max_buffer = max_buffer
+
+ def setup(self):
+ src = self._src = SocketEventSource(self._sock)
+ src.stop_writing() # We'll start writing when we need to.
+ self.mainloop.add_event_source(src)
+
+ self._wbuf = StringBuffer()
+
+ spec = [
+ ('reading', src, SocketReadable, 'reading', self._fill),
+ ('reading', self, _WriteBufferNotEmpty, 'rw',
+ self._start_writing),
+ ('reading', self, SocketBufferEof, 'idle', None),
+ ('reading', self, _Close, None, self._really_close),
+
+ ('rw', src, SocketReadable, 'rw', self._fill),
+ ('rw', src, SocketWriteable, 'rw', self._flush),
+ ('rw', self, _WriteBufferIsEmpty, 'reading', self._stop_writing),
+ ('rw', self, SocketBufferEof, 'w', None),
+ ('rw', self, _Close, 'wc', None),
+
+ ('idle', self, _WriteBufferNotEmpty, 'w', self._start_writing),
+ ('idle', self, _Close, None, self._really_close),
+
+ ('w', src, SocketWriteable, 'w', self._flush),
+ ('w', self, _WriteBufferIsEmpty, 'idle', self._stop_writing),
+
+ ('wc', src, SocketWriteable, 'wc', self._flush),
+ ('wc', self, _WriteBufferIsEmpty, None, self._really_close),
+ ]
+ self.add_transitions(spec)
+
+ def write(self, data):
+ '''Put data into write queue.'''
+
+ was_empty = len(self._wbuf) == 0
+ self._wbuf.add(data)
+ if was_empty and len(self._wbuf) > 0:
+ self._start_writing(None, None)
+ self.mainloop.queue_event(self, _WriteBufferNotEmpty())
+
+ def close(self):
+ '''Tell state machine to terminate.'''
+ self.mainloop.queue_event(self, _Close())
+
+ def _report_error(self, event_source, event):
+ logging.error(str(event))
+
+ def _fill(self, event_source, event):
+ try:
+ data = event.sock.read(self._max_buffer)
+ except (IOError, OSError), e:
+ return [SocketError(event.sock, e)]
+
+ if data:
+ self.mainloop.queue_event(self, SocketBufferNewData(data))
+ else:
+ event_source.stop_reading()
+ self.mainloop.queue_event(self, SocketBufferEof())
+
+ def _really_close(self, event_source, event):
+ self._src.close()
+ self.mainloop.queue_event(self, SocketBufferClosed())
+
+ def _flush(self, event_source, event):
+ max_write = 1024**2
+ data = self._wbuf.read(max_write)
+ try:
+ n = event.sock.write(data)
+ except (IOError, OSError), e:
+ return [SocketError(event.sock, e)]
+ self._wbuf.remove(n)
+ if len(self._wbuf) == 0:
+ self.mainloop.queue_event(self, _WriteBufferIsEmpty())
+
+ def _start_writing(self, event_source, event):
+ self._src.start_writing()
+
+ def _stop_writing(self, event_source, event):
+ self._src.stop_writing()
+
diff --git a/distbuild/socketsrc.py b/distbuild/socketsrc.py
new file mode 100644
index 00000000..78486f0e
--- /dev/null
+++ b/distbuild/socketsrc.py
@@ -0,0 +1,166 @@
+# mainloop/socketsrc.py -- events and event sources for sockets
+#
+# Copyright 2012 Codethink Limited.
+# All rights reserved.
+
+
+import fcntl
+import logging
+import os
+import socket
+
+from eventsrc import EventSource
+
+
+def set_nonblocking(handle):
+ '''Make a socket, file descriptor, or other such thing be non-blocking.'''
+
+ if type(handle) is int:
+ fd = handle
+ else:
+ fd = handle.fileno()
+
+ flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
+ flags = flags | os.O_NONBLOCK
+ fcntl.fcntl(fd, fcntl.F_SETFL, flags)
+
+
+class SocketError(object):
+
+ '''An error has occured with a socket.'''
+
+ def __init__(self, sock, exception):
+ self.sock = sock
+ self.exception = exception
+
+
+class NewConnection(object):
+
+ '''A new client connection.'''
+
+ def __init__(self, connection, addr):
+ self.connection = connection
+ self.addr = addr
+
+
+class ListeningSocketEventSource(EventSource):
+
+ '''An event source for a socket that listens for connections.'''
+
+ def __init__(self, addr, port):
+ self.sock = socket.socket()
+ self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.sock.bind((addr, port))
+ self.sock.listen(5)
+ self._accepting = True
+ logging.info('Listening at %s' % repr(self.sock.getsockname()))
+
+ def get_select_params(self):
+ r = [self.sock.fileno()] if self._accepting else []
+ return r, [], [], None
+
+ def get_events(self, r, w, x):
+ if self._accepting and self.sock.fileno() in r:
+ try:
+ conn, addr = self.sock.accept()
+ except socket.error, e:
+ return [SocketError(self.sock, e)]
+ else:
+ logging.info(
+ 'New connection to %s from %s' %
+ (conn.getsockname(), addr))
+ return [NewConnection(conn, addr)]
+
+ return []
+
+ def start_accepting(self):
+ self._accepting = True
+
+ def stop_accepting(self):
+ self._accepting = False
+
+
+class SocketReadable(object):
+
+ '''A socket is readable.'''
+
+ def __init__(self, sock):
+ self.sock = sock
+
+
+class SocketWriteable(object):
+
+ '''A socket is writeable.'''
+
+ def __init__(self, sock):
+ self.sock = sock
+
+
+class SocketEventSource(EventSource):
+
+ '''Event source for normal sockets (for I/O).
+
+ This generates events for indicating the socket is readable or
+ writeable. It does not actually do any I/O itself, that's for the
+ handler of the events. There are, however, methods for doing the
+ reading/writing, and for closing the socket.
+
+ The event source can be told to stop checking for readability
+ or writeability, so that the user may, for example, stop those
+ events from being triggered while a buffer is full.
+
+ '''
+
+ def __init__(self, sock):
+ self.sock = sock
+ self._reading = True
+ self._writing = True
+
+ set_nonblocking(sock)
+
+ def get_select_params(self):
+ r = [self.sock.fileno()] if self._reading else []
+ w = [self.sock.fileno()] if self._writing else []
+ return r, w, [], None
+
+ def get_events(self, r, w, x):
+ events = []
+ fd = self.sock.fileno()
+
+ if self._reading and fd in r:
+ events.append(SocketReadable(self))
+
+ if self._writing and fd in w:
+ events.append(SocketWriteable(self))
+
+ return events
+
+ def start_reading(self):
+ self._reading = True
+
+ def stop_reading(self):
+ self._reading = False
+
+ def start_writing(self):
+ self._writing = True
+
+ def stop_writing(self):
+ self._writing = False
+
+ def read(self, max_bytes):
+ fd = self.sock.fileno()
+ return os.read(fd, max_bytes)
+
+ def write(self, data):
+ fd = self.sock.fileno()
+ return os.write(fd, data)
+
+ def close(self):
+ self.stop_reading()
+ self.stop_writing()
+ self.sock.close()
+ self.sock = None
+
+ def is_finished(self):
+ return self.sock is None
+
diff --git a/distbuild/sockserv.py b/distbuild/sockserv.py
new file mode 100644
index 00000000..6d8f216e
--- /dev/null
+++ b/distbuild/sockserv.py
@@ -0,0 +1,45 @@
+# mainloop/sockserv.py -- socket server state machines
+#
+# Copyright 2012 Codethink Limited
+# All rights reserved.
+
+
+import logging
+
+from sm import StateMachine
+from socketsrc import NewConnection, SocketError, ListeningSocketEventSource
+
+
+class ListenServer(StateMachine):
+
+ '''Listen for new connections on a port, send events for them.'''
+
+ def __init__(self, addr, port, machine, extra_args=None):
+ StateMachine.__init__(self, 'listening')
+ self._addr = addr
+ self._port = port
+ self._machine = machine
+ self._extra_args = extra_args or []
+
+ def setup(self):
+ src = ListeningSocketEventSource(self._addr, self._port)
+ self.mainloop.add_event_source(src)
+
+ spec = [
+ ('listening', src, NewConnection, 'listening', self.new_conn),
+ ('listening', src, SocketError, None, self.report_error),
+ ]
+ self.add_transitions(spec)
+
+ def new_conn(self, event_source, event):
+ logging.debug(
+ 'ListenServer: Creating new %s using %s and %s' %
+ (repr(self._machine),
+ repr(event.connection),
+ repr(self._extra_args)))
+ m = self._machine(event.connection, *self._extra_args)
+ self.mainloop.add_state_machine(m)
+
+ def report_error(self, event_source, event):
+ logging.error(str(event))
+
diff --git a/distbuild/stringbuffer.py b/distbuild/stringbuffer.py
new file mode 100644
index 00000000..3349bb87
--- /dev/null
+++ b/distbuild/stringbuffer.py
@@ -0,0 +1,90 @@
+# mainloop/stringbuffer.py -- efficient buffering of strings as a queue
+#
+# Copyright 2012 Codethink Limited
+# All rights reserved.
+
+
+class StringBuffer(object):
+
+ '''Buffer data for a file descriptor.
+
+ The data may arrive in small pieces, and it is buffered in a way that
+ avoids excessive string catenation or splitting.
+
+ '''
+
+ def __init__(self):
+ self.strings = []
+ self.len = 0
+
+ def add(self, data):
+ '''Add data to buffer.'''
+ self.strings.append(data)
+ self.len += len(data)
+
+ def remove(self, num_bytes):
+ '''Remove specified number of bytes from buffer.'''
+ while num_bytes > 0 and self.strings:
+ first = self.strings[0]
+ if len(first) <= num_bytes:
+ num_bytes -= len(first)
+ del self.strings[0]
+ self.len -= len(first)
+ else:
+ self.strings[0] = first[num_bytes:]
+ self.len -= num_bytes
+ num_bytes = 0
+
+ def peek(self):
+ '''Return contents of buffer as one string.'''
+
+ if len(self.strings) == 0:
+ return ''
+ elif len(self.strings) == 1:
+ return self.strings[0]
+ else:
+ self.strings = [''.join(self.strings)]
+ return self.strings[0]
+
+ def read(self, max_bytes):
+ '''Return up to max_bytes from the buffer.
+
+ Less is returned if the buffer does not contain at least max_bytes.
+ The returned data will remain in the buffer; use remove to remove
+ it.
+
+ '''
+
+ use = []
+ size = 0
+ for s in self.strings:
+ n = max_bytes - size
+ if len(s) <= n:
+ use.append(s)
+ size += len(s)
+ else:
+ use.append(s[:n])
+ size += n
+ break
+ return ''.join(use)
+
+ def readline(self):
+ '''Return a complete line (ends with '\n') or None.'''
+
+ for i, s in enumerate(self.strings):
+ newline = s.find('\n')
+ if newline != -1:
+ if newline+1 == len(s):
+ use = self.strings[:i+1]
+ del self.strings[:i+1]
+ else:
+ pre = s[:newline+1]
+ use = self.strings[:i] + [pre]
+ del self.strings[:i]
+ self.strings[0] = s[newline+1:]
+ return ''.join(use)
+ return None
+
+ def __len__(self):
+ return self.len
+
diff --git a/distbuild/stringbuffer_tests.py b/distbuild/stringbuffer_tests.py
new file mode 100644
index 00000000..29530560
--- /dev/null
+++ b/distbuild/stringbuffer_tests.py
@@ -0,0 +1,140 @@
+# distbuild/stringbuffer_tests.py -- unit tests
+#
+# Copyright 2012 Codethink Limited
+# All rights reserved.
+
+
+import unittest
+
+import distbuild
+
+
+class StringBufferTests(unittest.TestCase):
+
+ def setUp(self):
+ self.buf = distbuild.StringBuffer()
+
+ def test_is_empty_initially(self):
+ self.assertEqual(self.buf.peek(), '')
+ self.assertEqual(len(self.buf), 0)
+
+ def test_adds_a_string(self):
+ s = 'foo'
+ self.buf.add(s)
+ self.assertEqual(self.buf.peek(), s)
+ self.assertEqual(len(self.buf), len(s))
+
+ def test_adds_a_second_string(self):
+ s = 'foo'
+ t = 'bar'
+ self.buf.add(s)
+ self.buf.add(t)
+ self.assertEqual(self.buf.peek(), s + t)
+ self.assertEqual(len(self.buf), len(s + t))
+
+
+class StringBufferRemoveTests(unittest.TestCase):
+
+ def setUp(self):
+ self.buf = distbuild.StringBuffer()
+ self.first = 'foo'
+ self.second = 'bar'
+ self.all = self.first + self.second
+ self.buf.add(self.first)
+ self.buf.add(self.second)
+
+ def test_removes_part_of_first_string(self):
+ self.assertTrue(len(self.first) > 1)
+ self.buf.remove(1)
+ self.assertEqual(self.buf.peek(), self.all[1:])
+ self.assertEqual(len(self.buf), len(self.all) - 1)
+
+ def test_removes_all_of_first_string(self):
+ self.buf.remove(len(self.first))
+ self.assertEqual(self.buf.peek(), self.second)
+ self.assertEqual(len(self.buf), len(self.second))
+
+ def test_removes_more_than_first_string(self):
+ self.assertTrue(len(self.first) > 1)
+ self.assertTrue(len(self.second) > 1)
+ self.buf.remove(len(self.first) + 1)
+ self.assertEqual(self.buf.peek(), self.second[1:])
+ self.assertEqual(len(self.buf), len(self.second) - 1)
+
+ def test_removes_all_strings(self):
+ self.buf.remove(len(self.all))
+ self.assertEqual(self.buf.peek(), '')
+ self.assertEqual(len(self.buf), 0)
+
+ def test_removes_more_than_all_strings(self):
+ self.buf.remove(len(self.all) + 1)
+ self.assertEqual(self.buf.peek(), '')
+ self.assertEqual(len(self.buf), 0)
+
+
+class StringBufferReadTests(unittest.TestCase):
+
+ def setUp(self):
+ self.buf = distbuild.StringBuffer()
+
+ def test_returns_empty_string_for_empty_buffer(self):
+ self.assertEqual(self.buf.read(100), '')
+ self.assertEqual(self.buf.peek(), '')
+
+ def test_returns_partial_string_for_short_buffer(self):
+ self.buf.add('foo')
+ self.assertEqual(self.buf.read(100), 'foo')
+ self.assertEqual(self.buf.peek(), 'foo')
+
+ def test_returns_catenated_strings(self):
+ self.buf.add('foo')
+ self.buf.add('bar')
+ self.assertEqual(self.buf.read(100), 'foobar')
+ self.assertEqual(self.buf.peek(), 'foobar')
+
+ def test_returns_requested_amount_when_available(self):
+ self.buf.add('foo')
+ self.buf.add('bar')
+ self.assertEqual(self.buf.read(4), 'foob')
+ self.assertEqual(self.buf.peek(), 'foobar')
+
+
+class StringBufferReadlineTests(unittest.TestCase):
+
+ def setUp(self):
+ self.buf = distbuild.StringBuffer()
+
+ def test_returns_None_on_empty_buffer(self):
+ self.assertEqual(self.buf.readline(), None)
+
+ def test_returns_None_on_incomplete_line_in_buffer(self):
+ self.buf.add('foo')
+ self.assertEqual(self.buf.readline(), None)
+
+ def test_extracts_complete_line(self):
+ self.buf.add('foo\n')
+ self.assertEqual(self.buf.readline(), 'foo\n')
+ self.assertEqual(self.buf.peek(), '')
+
+ def test_extracts_only_the_initial_line_and_leaves_rest_of_buffer(self):
+ self.buf.add('foo\nbar\n')
+ self.assertEqual(self.buf.readline(), 'foo\n')
+ self.assertEqual(self.buf.peek(), 'bar\n')
+
+ def test_extracts_only_the_initial_line_and_leaves_partial_line(self):
+ self.buf.add('foo\nbar')
+ self.assertEqual(self.buf.readline(), 'foo\n')
+ self.assertEqual(self.buf.peek(), 'bar')
+
+ def test_extracts_only_the_initial_line_from_multiple_pieces(self):
+ self.buf.add('foo\n')
+ self.buf.add('bar\n')
+ self.assertEqual(self.buf.readline(), 'foo\n')
+ self.assertEqual(self.buf.peek(), 'bar\n')
+
+ def test_extracts_only_the_initial_line_from_multiple_pieces_incomp(self):
+ self.buf.add('foo\n')
+ self.buf.add('bar')
+ self.assertEqual(self.buf.readline(), 'foo\n')
+ self.assertEqual(self.buf.peek(), 'bar')
+
diff --git a/distbuild/timer_event_source.py b/distbuild/timer_event_source.py
new file mode 100644
index 00000000..f6141a73
--- /dev/null
+++ b/distbuild/timer_event_source.py
@@ -0,0 +1,59 @@
+# distbuild/timer_event_source.py -- event source for timer events
+#
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA..
+
+
+import time
+
+
+class Timer(object):
+
+ pass
+
+
+class TimerEventSource(object):
+
+ def __init__(self, interval):
+ self.interval = interval
+ self.last_event = time.time()
+ self.enabled = False
+
+ def start(self):
+ self.enabled = True
+ self.last_event = time.time()
+
+ def stop(self):
+ self.enabled = False
+
+ def get_select_params(self):
+ if self.enabled:
+ next_event = self.last_event + self.interval
+ timeout = next_event - time.time()
+ return [], [], [], max(0, timeout)
+ else:
+ return [], [], [], None
+
+ def get_events(self, r, w, x):
+ if self.enabled:
+ now = time.time()
+ if now >= self.last_event + self.interval:
+ self.last_event = now
+ return [Timer()]
+ return []
+
+ def is_finished(self):
+ return False
+
diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py
new file mode 100644
index 00000000..d0f158b6
--- /dev/null
+++ b/distbuild/worker_build_scheduler.py
@@ -0,0 +1,392 @@
+# distbuild/worker_build_scheduler.py -- schedule worker-builds on workers
+#
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA..
+
+
+import collections
+import errno
+import httplib
+import logging
+import socket
+import urllib
+import urlparse
+
+import distbuild
+
+
+class WorkerBuildRequest(object):
+
+ def __init__(self, artifact, initiator_id):
+ self.artifact = artifact
+ self.initiator_id = initiator_id
+
+
+class WorkerCancelPending(object):
+
+ def __init__(self, initiator_id):
+ self.initiator_id = initiator_id
+
+
+class WorkerBuildStepStarted(object):
+
+ def __init__(self, initiator_id, cache_key, worker_name):
+ self.initiator_id = initiator_id
+ self.artifact_cache_key = cache_key
+ self.worker_name = worker_name
+
+
+class WorkerBuildOutput(object):
+
+ def __init__(self, msg, cache_key):
+ self.msg = msg
+ self.artifact_cache_key = cache_key
+
+
+class WorkerBuildCaching(object):
+
+ def __init__(self, initiator_id, cache_key):
+ self.initiator_id = initiator_id
+ self.artifact_cache_key = cache_key
+
+
+class WorkerBuildFinished(object):
+
+ def __init__(self, msg, cache_key):
+ self.msg = msg
+ self.artifact_cache_key = cache_key
+
+
+class WorkerBuildFailed(object):
+
+ def __init__(self, msg, cache_key):
+ self.msg = msg
+ self.artifact_cache_key = cache_key
+
+
+class _NeedJob(object):
+
+ def __init__(self, who):
+ self.who = who
+
+
+class _HaveAJob(object):
+
+ def __init__(self, artifact, initiator_id):
+ self.artifact = artifact
+ self.initiator_id = initiator_id
+
+
+class _JobIsFinished(object):
+
+ def __init__(self, msg):
+ self.msg = msg
+
+
+class _JobFailed(object):
+
+ pass
+
+
+class _Cached(object):
+
+ pass
+
+
+class WorkerBuildQueuer(distbuild.StateMachine):
+
+ '''Maintain queue of outstanding worker-build requests.
+
+ This state machine captures WorkerBuildRequest events, and puts them
+ into a queue. It also catches _NeedJob events, from a
+ WorkerConnection, and responds to them with _HaveAJob events,
+ when it has an outstanding request.
+
+ '''
+
+ def __init__(self):
+ distbuild.StateMachine.__init__(self, 'idle')
+
+ def setup(self):
+ distbuild.crash_point()
+
+ logging.debug('WBQ: Setting up %s' % self)
+ self._request_queue = []
+ self._available_workers = []
+
+ spec = [
+ ('idle', WorkerBuildQueuer, WorkerBuildRequest, 'idle',
+ self._handle_request),
+ ('idle', WorkerBuildQueuer, WorkerCancelPending, 'idle',
+ self._handle_cancel),
+ ('idle', WorkerConnection, _NeedJob, 'idle', self._handle_worker),
+ ]
+ self.add_transitions(spec)
+
+ def _handle_request(self, event_source, event):
+ distbuild.crash_point()
+
+ logging.debug('WBQ: Adding request to queue: %s' % event.artifact.name)
+ self._request_queue.append(event)
+ logging.debug(
+ 'WBQ: %d available workers and %d requests queued' %
+ (len(self._available_workers),
+ len(self._request_queue)))
+ if self._available_workers:
+ self._give_job()
+
+ def _handle_cancel(self, event_source, worker_cancel_pending):
+ for request in [r for r in self._request_queue if
+ r.initiator_id == worker_cancel_pending.initiator_id]:
+ logging.debug('WBQ: Removing request from queue: %s',
+ request.artifact.name)
+ self._request_queue.remove(request)
+
+ def _handle_worker(self, event_source, event):
+ distbuild.crash_point()
+
+ logging.debug('WBQ: Adding worker to queue: %s' % event.who)
+ self._available_workers.append(event)
+ logging.debug(
+ 'WBQ: %d available workers and %d requests queued' %
+ (len(self._available_workers),
+ len(self._request_queue)))
+ if self._request_queue:
+ self._give_job()
+
+ def _give_job(self):
+ request = self._request_queue.pop(0)
+ worker = self._available_workers.pop(0)
+ logging.debug(
+ 'WBQ: Giving %s to %s' %
+ (request.artifact.name, worker.who.name()))
+ self.mainloop.queue_event(worker.who, _HaveAJob(request.artifact,
+ request.initiator_id))
+
+
+class WorkerConnection(distbuild.StateMachine):
+
+ '''Communicate with a single worker.'''
+
+ _request_ids = distbuild.IdentifierGenerator('WorkerConnection')
+ _route_map = distbuild.RouteMap()
+ _initiator_request_map = collections.defaultdict(set)
+
+ def __init__(self, cm, conn, writeable_cache_server,
+ worker_cache_server_port, morph_instance):
+ distbuild.StateMachine.__init__(self, 'idle')
+ self._cm = cm
+ self._conn = conn
+ self._writeable_cache_server = writeable_cache_server
+ self._worker_cache_server_port = worker_cache_server_port
+ self._morph_instance = morph_instance
+ self._helper_id = None
+
+ def name(self):
+ addr, port = self._conn.getpeername()
+ name = socket.getfqdn(addr)
+ return '%s:%s' % (name, port)
+
+ def setup(self):
+ distbuild.crash_point()
+
+ logging.debug('WC: Setting up instance %s' % repr(self))
+
+ self._jm = distbuild.JsonMachine(self._conn)
+ self.mainloop.add_state_machine(self._jm)
+
+ spec = [
+ ('idle', self._jm, distbuild.JsonEof, None, self._reconnect),
+ ('idle', self, _HaveAJob, 'building', self._start_build),
+
+ ('building', distbuild.BuildController,
+ distbuild.BuildCancel, 'building',
+ self._maybe_cancel),
+ ('building', self._jm, distbuild.JsonEof, None, self._reconnect),
+ ('building', self._jm, distbuild.JsonNewMessage, 'building',
+ self._handle_json_message),
+ ('building', self, _JobFailed, 'idle', self._request_job),
+ ('building', self, _JobIsFinished, 'caching',
+ self._request_caching),
+
+ ('caching', distbuild.HelperRouter, distbuild.HelperResult,
+ 'caching', self._handle_helper_result),
+ ('caching', self, _Cached, 'idle', self._request_job),
+ ('caching', self, _JobFailed, 'idle', self._request_job),
+ ]
+ self.add_transitions(spec)
+
+ self._request_job(None, None)
+
+ def _maybe_cancel(self, event_source, build_cancel):
+ logging.debug('WC: BuildController requested a cancel')
+ if build_cancel.id == self._initiator_id:
+ distbuild.crash_point()
+
+ for id in self._initiator_request_map[self._initiator_id]:
+ logging.debug('WC: Cancelling exec %s' % id)
+ msg = distbuild.message('exec-cancel', id=id)
+ self._jm.send(msg)
+
+ def _reconnect(self, event_source, event):
+ distbuild.crash_point()
+
+ logging.debug('WC: Triggering reconnect')
+ self.mainloop.queue_event(self._cm, distbuild.Reconnect())
+
+ def _start_build(self, event_source, event):
+ distbuild.crash_point()
+
+ self._artifact = event.artifact
+ self._initiator_id = event.initiator_id
+ logging.debug('WC: starting build: %s for %s' %
+ (self._artifact.name, self._initiator_id))
+
+ argv = [
+ self._morph_instance,
+ 'worker-build',
+ self._artifact.name,
+ ]
+ msg = distbuild.message('exec-request',
+ id=self._request_ids.next(),
+ argv=argv,
+ stdin_contents=distbuild.serialise_artifact(self._artifact),
+ )
+ self._jm.send(msg)
+ logging.debug('WC: sent to worker: %s' % repr(msg))
+ self._route_map.add(self._initiator_id, msg['id'])
+ self._initiator_request_map[self._initiator_id].add(msg['id'])
+ logging.debug(
+ 'WC: route map from %s to %s',
+ self._artifact.cache_key, msg['id'])
+
+ started = WorkerBuildStepStarted(
+ self._initiator_id, self._artifact.cache_key, self.name())
+ self.mainloop.queue_event(WorkerConnection, started)
+
+ def _handle_json_message(self, event_source, event):
+ '''Handle JSON messages from the worker.'''
+
+ distbuild.crash_point()
+
+ logging.debug('WC: from worker: %s' % repr(event.msg))
+
+ handlers = {
+ 'exec-output': self._handle_exec_output,
+ 'exec-response': self._handle_exec_response,
+ }
+
+ handler = handlers[event.msg['type']]
+ handler(event.msg)
+
+ def _handle_exec_output(self, msg):
+ new = dict(msg)
+ new['id'] = self._route_map.get_incoming_id(msg['id'])
+ logging.debug('WC: emitting: %s', repr(new))
+ self.mainloop.queue_event(
+ WorkerConnection,
+ WorkerBuildOutput(new, self._artifact.cache_key))
+
+ def _handle_exec_response(self, msg):
+ logging.debug('WC: finished building: %s' % self._artifact.name)
+
+ new = dict(msg)
+ new['id'] = self._route_map.get_incoming_id(msg['id'])
+ self._route_map.remove(msg['id'])
+ self._initiator_request_map[self._initiator_id].remove(msg['id'])
+
+ if new['exit'] != 0:
+ # Build failed.
+ new_event = WorkerBuildFailed(new, self._artifact.cache_key)
+ self.mainloop.queue_event(WorkerConnection, new_event)
+ self.mainloop.queue_event(self, _JobFailed())
+ self._artifact = None
+ self._initiator_id = None
+ else:
+ # Build succeeded. We have more work to do: caching the result.
+ self.mainloop.queue_event(self, _JobIsFinished(new))
+
+ def _request_job(self, event_source, event):
+ distbuild.crash_point()
+ self.mainloop.queue_event(WorkerConnection, _NeedJob(self))
+
+ def _request_caching(self, event_source, event):
+ distbuild.crash_point()
+
+ logging.debug('Requesting shared artifact cache to get artifacts')
+
+ filename = ('%s.%s' %
+ (self._artifact.source.morphology['kind'],
+ self._artifact.name))
+ suffixes = [filename]
+ kind = self._artifact.source.morphology['kind']
+ if kind == 'stratum':
+ suffixes.append(filename + '.meta')
+ elif kind == 'system':
+ # FIXME: This is a really ugly hack.
+ if filename.endswith('-rootfs'):
+ suffixes.append(filename[:-len('-rootfs')] + '-kernel')
+
+ suffixes = [urllib.quote(x) for x in suffixes]
+ suffixes = ','.join(suffixes)
+
+ worker_host = self._conn.getpeername()[0]
+
+ url = urlparse.urljoin(
+ self._writeable_cache_server,
+ '/1.0/fetch?host=%s:%d&cacheid=%s&artifacts=%s' %
+ (urllib.quote(worker_host),
+ self._worker_cache_server_port,
+ urllib.quote(self._artifact.cache_key),
+ suffixes))
+
+ msg = distbuild.message(
+ 'http-request', id=self._request_ids.next(), url=url, method='GET')
+ self._helper_id = msg['id']
+ req = distbuild.HelperRequest(msg)
+ self.mainloop.queue_event(distbuild.HelperRouter, req)
+
+ progress = WorkerBuildCaching(
+ self._initiator_id, self._artifact.cache_key)
+ self.mainloop.queue_event(WorkerConnection, progress)
+
+ self._initiator_id = None
+ self._finished_msg = event.msg
+
+ def _handle_helper_result(self, event_source, event):
+ if event.msg['id'] == self._helper_id:
+ distbuild.crash_point()
+
+ logging.debug('caching: event.msg: %s' % repr(event.msg))
+ if event.msg['status'] == httplib.OK:
+ logging.debug('Shared artifact cache population done')
+ new_event = WorkerBuildFinished(
+ self._finished_msg, self._artifact.cache_key)
+ self.mainloop.queue_event(WorkerConnection, new_event)
+ self._finished_msg = None
+ self._helper_id = None
+ self.mainloop.queue_event(self, _Cached())
+ else:
+ logging.error(
+ 'Failed to populate artifact cache: %s %s' %
+ (event.msg['status'], event.msg['body']))
+ new_event = WorkerBuildFailed(
+ self._finished_msg, self._artifact.cache_key)
+ self.mainloop.queue_event(WorkerConnection, new_event)
+ self._finished_msg = None
+ self._helper_id = None
+ self.mainloop.queue_event(self, _JobFailed())
+
+ self._artifact = None