From 1de342b8a4cf13b295805855bfaa341bcd86277e Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Mon, 24 Feb 2014 18:21:33 +0000 Subject: Add the distbuild libs --- distbuild/__init__.py | 61 +++++ distbuild/build_controller.py | 531 ++++++++++++++++++++++++++++++++++++ distbuild/connection_machine.py | 145 ++++++++++ distbuild/crashpoint.py | 126 +++++++++ distbuild/crashpoint_tests.py | 109 ++++++++ distbuild/eventsrc.py | 48 ++++ distbuild/helper_router.py | 197 +++++++++++++ distbuild/idgen.py | 33 +++ distbuild/initiator.py | 195 +++++++++++++ distbuild/initiator_connection.py | 216 +++++++++++++++ distbuild/jm.py | 98 +++++++ distbuild/json_router.py | 164 +++++++++++ distbuild/mainloop.py | 117 ++++++++ distbuild/protocol.py | 93 +++++++ distbuild/proxy_event_source.py | 47 ++++ distbuild/route_map.py | 60 ++++ distbuild/route_map_tests.py | 56 ++++ distbuild/serialise.py | 166 +++++++++++ distbuild/serialise_tests.py | 148 ++++++++++ distbuild/sm.py | 139 ++++++++++ distbuild/sm_tests.py | 86 ++++++ distbuild/sockbuf.py | 159 +++++++++++ distbuild/socketsrc.py | 166 +++++++++++ distbuild/sockserv.py | 45 +++ distbuild/stringbuffer.py | 90 ++++++ distbuild/stringbuffer_tests.py | 140 ++++++++++ distbuild/timer_event_source.py | 59 ++++ distbuild/worker_build_scheduler.py | 392 ++++++++++++++++++++++++++ 28 files changed, 3886 insertions(+) create mode 100644 distbuild/__init__.py create mode 100644 distbuild/build_controller.py create mode 100644 distbuild/connection_machine.py create mode 100644 distbuild/crashpoint.py create mode 100644 distbuild/crashpoint_tests.py create mode 100644 distbuild/eventsrc.py create mode 100644 distbuild/helper_router.py create mode 100644 distbuild/idgen.py create mode 100644 distbuild/initiator.py create mode 100644 distbuild/initiator_connection.py create mode 100644 distbuild/jm.py create mode 100644 distbuild/json_router.py create mode 100644 distbuild/mainloop.py create mode 100644 distbuild/protocol.py create mode 100644 distbuild/proxy_event_source.py create mode 100644 distbuild/route_map.py create mode 100644 distbuild/route_map_tests.py create mode 100644 distbuild/serialise.py create mode 100644 distbuild/serialise_tests.py create mode 100644 distbuild/sm.py create mode 100644 distbuild/sm_tests.py create mode 100644 distbuild/sockbuf.py create mode 100644 distbuild/socketsrc.py create mode 100644 distbuild/sockserv.py create mode 100644 distbuild/stringbuffer.py create mode 100644 distbuild/stringbuffer_tests.py create mode 100644 distbuild/timer_event_source.py create mode 100644 distbuild/worker_build_scheduler.py diff --git a/distbuild/__init__.py b/distbuild/__init__.py new file mode 100644 index 00000000..3e60d5ee --- /dev/null +++ b/distbuild/__init__.py @@ -0,0 +1,61 @@ +# distbuild/__init__.py -- library for Morph's distributed build plugin +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +from stringbuffer import StringBuffer +from sm import StateMachine +from eventsrc import EventSource +from socketsrc import (SocketError, NewConnection, ListeningSocketEventSource, + SocketReadable, SocketWriteable, SocketEventSource, + set_nonblocking) +from sockbuf import (SocketBufferNewData, SocketBufferEof, + SocketBufferClosed, SocketBuffer) +from mainloop import MainLoop +from sockserv import ListenServer +from jm import JsonMachine, JsonNewMessage, JsonEof + +from serialise import serialise_artifact, deserialise_artifact +from idgen import IdentifierGenerator +from route_map import RouteMap +from timer_event_source import TimerEventSource, Timer +from proxy_event_source import ProxyEventSource +from json_router import JsonRouter +from helper_router import (HelperRouter, HelperRequest, HelperOutput, + HelperResult) +from initiator_connection import (InitiatorConnection, InitiatorDisconnect) +from connection_machine import ConnectionMachine, Reconnect, StopConnecting +from worker_build_scheduler import (WorkerBuildQueuer, + WorkerConnection, + WorkerBuildRequest, + WorkerCancelPending, + WorkerBuildOutput, + WorkerBuildCaching, + WorkerBuildFinished, + WorkerBuildFailed, + WorkerBuildStepStarted) +from build_controller import (BuildController, BuildFailed, BuildProgress, + BuildSteps, BuildStepStarted, BuildOutput, + BuildStepFinished, BuildStepFailed, + BuildFinished, BuildCancel, + build_step_name, map_build_graph) +from initiator import Initiator +from protocol import message + +from crashpoint import (crash_point, add_crash_condition, add_crash_conditions, + clear_crash_conditions) + +__all__ = locals() diff --git a/distbuild/build_controller.py b/distbuild/build_controller.py new file mode 100644 index 00000000..a300f0d4 --- /dev/null +++ b/distbuild/build_controller.py @@ -0,0 +1,531 @@ +# distbuild/build_controller.py -- control the steps for one build +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import logging +import httplib +import traceback +import urllib +import urlparse + +import distbuild + + +# Artifact build states +UNKNOWN = 'unknown' +UNBUILT = 'not-built' +BUILDING = 'building' +BUILT = 'built' + + +class _Start(object): pass +class _Annotated(object): pass +class _Built(object): pass + + +class _GotGraph(object): + + def __init__(self, artifact): + self.artifact = artifact + + +class _GraphFailed(object): + + pass + + +class BuildCancel(object): + + def __init__(self, id): + self.id = id + + +class BuildFinished(object): + + def __init__(self, request_id, urls): + self.id = request_id + self.urls = urls + + +class BuildFailed(object): + + def __init__(self, request_id, reason): + self.id = request_id + self.reason = reason + + +class BuildProgress(object): + + def __init__(self, request_id, message_text): + self.id = request_id + self.message_text = message_text + + +class BuildSteps(object): + + def __init__(self, request_id, artifact): + self.id = request_id + self.artifact = artifact + + +class BuildStepStarted(object): + + def __init__(self, request_id, step_name, worker_name): + self.id = request_id + self.step_name = step_name + self.worker_name = worker_name + + +class BuildOutput(object): + + def __init__(self, request_id, step_name, stdout, stderr): + self.id = request_id + self.step_name = step_name + self.stdout = stdout + self.stderr = stderr + + +class BuildStepFinished(object): + + def __init__(self, request_id, step_name): + self.id = request_id + self.step_name = step_name + + +class BuildStepFailed(object): + + def __init__(self, request_id, step_name): + self.id = request_id + self.step_name = step_name + + +class _Abort(object): + + pass + + +def build_step_name(artifact): + '''Return user-comprehensible name for a given artifact.''' + return artifact.name + + +def map_build_graph(artifact, callback): + result = [] + done = set() + queue = [artifact] + while queue: + a = queue.pop() + if a not in done: + result.append(callback(a)) + queue.extend(a.dependencies) + done.add(a) + return result + + +class BuildController(distbuild.StateMachine): + + '''Control one build-request fulfillment. + + The initiator sends a build-request message, which causes the + InitiatorConnection to instantiate this class to control the steps + needed to fulfill the request. This state machine builds the + build graph to determine all the artifacts that need building, then + builds anything that is not cached. + + ''' + + _idgen = distbuild.IdentifierGenerator('BuildController') + + def __init__(self, build_request_message, artifact_cache_server, + morph_instance): + distbuild.crash_point() + distbuild.StateMachine.__init__(self, 'init') + self._request = build_request_message + self._artifact_cache_server = artifact_cache_server + self._morph_instance = morph_instance + self._helper_id = None + self.debug_transitions = True + + def setup(self): + distbuild.crash_point() + + spec = [ + ('init', self, _Start, 'graphing', self._start_graphing), + ('init', distbuild.InitiatorConnection, + distbuild.InitiatorDisconnect, 'init', self._maybe_abort), + ('init', self, _Abort, None, None), + + ('graphing', distbuild.HelperRouter, distbuild.HelperOutput, + 'graphing', self._collect_graph), + ('graphing', distbuild.HelperRouter, distbuild.HelperResult, + 'graphing', self._finish_graph), + ('graphing', self, _GotGraph, + 'annotating', self._start_annotating), + ('graphing', self, _GraphFailed, None, None), + ('graphing', distbuild.InitiatorConnection, + distbuild.InitiatorDisconnect, None, + self._maybe_abort), + + ('annotating', distbuild.HelperRouter, distbuild.HelperResult, + 'annotating', self._handle_cache_response), + ('annotating', self, _Annotated, 'building', + self._queue_worker_builds), + ('annotating', distbuild.InitiatorConnection, + distbuild.InitiatorDisconnect, None, + self._maybe_abort), + + ('building', distbuild.WorkerConnection, + distbuild.WorkerBuildStepStarted, 'building', + self._relay_build_step_started), + ('building', distbuild.WorkerConnection, + distbuild.WorkerBuildOutput, 'building', + self._relay_build_output), + ('building', distbuild.WorkerConnection, + distbuild.WorkerBuildCaching, 'building', + self._relay_build_caching), + ('building', distbuild.WorkerConnection, + distbuild.WorkerBuildFinished, 'building', + self._check_result_and_queue_more_builds), + ('building', distbuild.WorkerConnection, + distbuild.WorkerBuildFailed, None, + self._notify_build_failed), + ('building', self, _Built, None, self._notify_build_done), + ('building', distbuild.InitiatorConnection, + distbuild.InitiatorDisconnect, 'building', + self._notify_initiator_disconnected), + ] + self.add_transitions(spec) + + self.mainloop.queue_event(self, _Start()) + + def _maybe_abort(self, event_source, event): + if disconnect.id == self._request['id']: + self.mainloop.queue_event(self, _Abort()) + + def _start_graphing(self, event_source, event): + distbuild.crash_point() + + logging.info('Start constructing build graph') + self._artifact_data = distbuild.StringBuffer() + self._artifact_error = distbuild.StringBuffer() + argv = [ + self._morph_instance, + 'serialise-artifact', + '--quiet', + self._request['repo'], + self._request['ref'], + self._request['morphology'], + ] + msg = distbuild.message('exec-request', + id=self._idgen.next(), + argv=argv, + stdin_contents='') + self._helper_id = msg['id'] + req = distbuild.HelperRequest(msg) + self.mainloop.queue_event(distbuild.HelperRouter, req) + + progress = BuildProgress(self._request['id'], 'Computing build graph') + self.mainloop.queue_event(BuildController, progress) + + def _collect_graph(self, event_source, event): + distbuild.crash_point() + + if event.msg['id'] == self._helper_id: + self._artifact_data.add(event.msg['stdout']) + self._artifact_error.add(event.msg['stderr']) + + def _finish_graph(self, event_source, event): + distbuild.crash_point() + + def notify_failure(msg_text): + logging.error('Graph creation failed: %s' % msg_text) + + failed = BuildFailed( + self._request['id'], + 'Failed go compute build graph: %s' % msg_text) + self.mainloop.queue_event(BuildController, failed) + + self.mainloop.queue_event(self, _GraphFailed()) + + def notify_success(artifact): + logging.debug('Graph is finished') + + progress = BuildProgress( + self._request['id'], 'Finished computing build graph') + self.mainloop.queue_event(BuildController, progress) + + build_steps = BuildSteps(self._request['id'], artifact) + self.mainloop.queue_event(BuildController, build_steps) + + self.mainloop.queue_event(self, _GotGraph(artifact)) + + if event.msg['id'] == self._helper_id: + self._helper_id = None + + error_text = self._artifact_error.peek() + if event.msg['exit'] != 0 or error_text: + notify_failure( + 'Problem with serialise-artifact: %s' % error_text) + return + + text = self._artifact_data.peek() + try: + artifact = distbuild.deserialise_artifact(text) + except ValueError, e: + logging.error(traceback.format_exc()) + notify_failure(str(e)) + return + + notify_success(artifact) + + def _start_annotating(self, event_source, event): + distbuild.crash_point() + + self._artifact = event.artifact + + # Queue http requests for checking from the shared artifact + # cache for the artifacts. + for artifact in map_build_graph(self._artifact, lambda a: a): + artifact.state = UNKNOWN + artifact.helper_id = self._idgen.next() + filename = ('%s.%s.%s' % + (artifact.cache_key, + artifact.source.morphology['kind'], + artifact.name)) + url = urlparse.urljoin( + self._artifact_cache_server, + '/1.0/artifacts?filename=%s' % urllib.quote(filename)) + msg = distbuild.message('http-request', + id=artifact.helper_id, + url=url, + method='HEAD') + request = distbuild.HelperRequest(msg) + self.mainloop.queue_event(distbuild.HelperRouter, request) + logging.debug( + 'Queued as %s query whether %s is in cache' % + (msg['id'], filename)) + + def _handle_cache_response(self, event_source, event): + distbuild.crash_point() + + logging.debug('Got cache query response: %s' % repr(event.msg)) + + def set_status(artifact): + if artifact.helper_id == event.msg['id']: + old = artifact.state + if event.msg['status'] == httplib.OK: + artifact.state = BUILT + else: + artifact.state = UNBUILT + logging.debug( + 'Changed artifact %s state from %s to %s' % + (artifact.name, old, artifact.state)) + artifact.helper_id = None + + map_build_graph(self._artifact, set_status) + + queued = map_build_graph(self._artifact, lambda a: a.state == UNKNOWN) + if any(queued): + logging.debug('Waiting for further responses') + else: + logging.debug('All cache query responses received') + self.mainloop.queue_event(self, _Annotated()) + + count = sum(1 if a.state == UNBUILT else 0 + for a in map_build_graph(self._artifact, lambda b: b)) + progress = BuildProgress( + self._request['id'], + 'Need to build %d artifacts' % count) + self.mainloop.queue_event(BuildController, progress) + + if count == 0: + logging.info('There seems to be nothing to build') + self.mainloop.queue_event(self, _Built()) + + def _find_artifacts_that_are_ready_to_build(self): + + def is_ready_to_build(artifact): + return (artifact.state == UNBUILT and + all(a.state == BUILT for a in artifact.dependencies)) + + return [a + for a in map_build_graph(self._artifact, lambda a: a) + if is_ready_to_build(a)] + + def _queue_worker_builds(self, event_source, event): + distbuild.crash_point() + + if self._artifact.state == BUILT: + logging.info('Requested artifact is built') + self.mainloop.queue_event(self, _Built()) + return + + logging.debug('Queuing more worker-builds to run') + logging.debug('Current state of build graph nodes:') + for a in map_build_graph(self._artifact, lambda a: a): + logging.debug(' %s state is %s' % (a.name, a.state)) + if a.state != BUILT: + for dep in a.dependencies: + logging.debug( + ' depends on %s which is %s' % + (dep.name, dep.state)) + + ready = self._find_artifacts_that_are_ready_to_build() + if len(ready) == 0: + logging.debug('No new artifacts queued for building') + + for artifact in ready: + logging.debug( + 'Requesting worker-build of %s (%s)' % + (artifact.name, artifact.cache_key)) + request = distbuild.WorkerBuildRequest(artifact, + self._request['id']) + self.mainloop.queue_event(distbuild.WorkerBuildQueuer, request) + artifact.state = BUILDING + + def _notify_initiator_disconnected(self, event_source, disconnect): + if disconnect.id == self._request['id']: + cancel = BuildCancel(disconnect.id) + self.mainloop.queue_event(BuildController, cancel) + + def _relay_build_step_started(self, event_source, event): + distbuild.crash_point() + if event.initiator_id != self._request['id']: + return # not for us + + logging.debug( + 'BC: _relay_build_step_started: %s' % event.artifact_cache_key) + artifact = self._find_artifact(event.artifact_cache_key) + if artifact is None: + # This is not the event you are looking for. + return + + logging.debug('BC: got build step started: %s' % artifact.name) + started = BuildStepStarted( + self._request['id'], build_step_name(artifact), event.worker_name) + self.mainloop.queue_event(BuildController, started) + logging.debug('BC: emitted %s' % repr(started)) + + def _relay_build_output(self, event_source, event): + distbuild.crash_point() + if event.msg['id'] != self._request['id']: + return # not for us + + logging.debug('BC: got output: %s' % repr(event.msg)) + artifact = self._find_artifact(event.artifact_cache_key) + logging.debug('BC: got artifact: %s' % repr(artifact)) + if artifact is None: + # This is not the event you are looking for. + return + + output = BuildOutput( + self._request['id'], build_step_name(artifact), + event.msg['stdout'], event.msg['stderr']) + self.mainloop.queue_event(BuildController, output) + logging.debug('BC: queued %s' % repr(output)) + + def _relay_build_caching(self, event_source, event): + distbuild.crash_point() + if event.initiator_id != self._request['id']: + return # not for us + + artifact = self._find_artifact(event.artifact_cache_key) + if artifact is None: + # This is not the event you are looking for. + return + + progress = BuildProgress( + self._request['id'], + 'Transferring %s to shared artifact cache' % artifact.name) + self.mainloop.queue_event(BuildController, progress) + + def _find_artifact(self, cache_key): + artifacts = map_build_graph(self._artifact, lambda a: a) + wanted = [a for a in artifacts if a.cache_key == cache_key] + if wanted: + return wanted[0] + else: + return None + + def _check_result_and_queue_more_builds(self, event_source, event): + distbuild.crash_point() + if event.msg['id'] != self._request['id']: + return # not for us + + artifact = self._find_artifact(event.artifact_cache_key) + if artifact is None: + # This is not the event you are looking for. + return + + logging.debug( + 'Got build result for %s: %s', artifact.name, repr(event.msg)) + + finished = BuildStepFinished( + self._request['id'], build_step_name(artifact)) + self.mainloop.queue_event(BuildController, finished) + + artifact.state = BUILT + self._queue_worker_builds(None, event) + + def _notify_build_failed(self, event_source, event): + distbuild.crash_point() + if event.msg['id'] != self._request['id']: + return # not for us + + artifact = self._find_artifact(event.artifact_cache_key) + if artifact is None: + # This is not the event you are looking for. + return + + logging.error( + 'Build step failed for %s: %s', artifact.name, repr(event.msg)) + + step_failed = BuildStepFailed( + self._request['id'], build_step_name(artifact)) + self.mainloop.queue_event(BuildController, step_failed) + + build_failed = BuildFailed( + self._request['id'], + 'Building failed for %s' % artifact.name) + self.mainloop.queue_event(BuildController, build_failed) + + # Cancel any jobs waiting to be executed, since there is no point + # running them if this build has failed, it would just waste + # resources + cancel_pending = distbuild.WorkerCancelPending( + self._request['id']) + self.mainloop.queue_event(distbuild.WorkerBuildQueuer, cancel_pending) + + # Cancel any currently executing jobs for the above reasons, since + # this build will fail and we can't decide whether these jobs will + # be of use to any other build + cancel = BuildCancel(self._request['id']) + self.mainloop.queue_event(BuildController, cancel) + + def _notify_build_done(self, event_source, event): + distbuild.crash_point() + + logging.debug('Notifying initiator of successful build') + baseurl = urlparse.urljoin( + self._artifact_cache_server, '/1.0/artifacts') + filename = ('%s.%s.%s' % + (self._artifact.cache_key, + self._artifact.source.morphology['kind'], + self._artifact.name)) + url = '%s?filename=%s' % (baseurl, urllib.quote(filename)) + finished = BuildFinished(self._request['id'], [url]) + self.mainloop.queue_event(BuildController, finished) diff --git a/distbuild/connection_machine.py b/distbuild/connection_machine.py new file mode 100644 index 00000000..3d4e8d04 --- /dev/null +++ b/distbuild/connection_machine.py @@ -0,0 +1,145 @@ +# distbuild/connection_machine.py -- state machine for connecting to server +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import errno +import logging +import socket + +import distbuild + + +class Reconnect(object): + + pass + + +class StopConnecting(object): + + pass + + +class ConnectError(object): + + pass + + +class ProxyEventSource(object): + + '''Proxy event sources that may come and go.''' + + def __init__(self): + self.event_source = None + + def get_select_params(self): + if self.event_source: + return self.event_source.get_select_params() + else: + return [], [], [], None + + def get_events(self, r, w, x): + if self.event_source: + return self.event_source.get_events(r, w, x) + else: + return [] + + def is_finished(self): + return False + + +class ConnectionMachine(distbuild.StateMachine): + + def __init__(self, addr, port, machine, extra_args): + distbuild.StateMachine.__init__(self, 'connecting') + self._addr = addr + self._port = port + self._machine = machine + self._extra_args = extra_args + self._socket = None + self.reconnect_interval = 1 + + def setup(self): + self._sock_proxy = ProxyEventSource() + self.mainloop.add_event_source(self._sock_proxy) + self._start_connect() + + self._timer = distbuild.TimerEventSource(self.reconnect_interval) + self.mainloop.add_event_source(self._timer) + + spec = [ + ('connecting', self._sock_proxy, distbuild.SocketWriteable, + 'connected', self._connect), + ('connecting', self, StopConnecting, None, self._stop), + ('connected', self, Reconnect, 'connecting', self._reconnect), + ('connected', self, ConnectError, 'timeout', self._start_timer), + ('connected', self, StopConnecting, None, self._stop), + ('timeout', self._timer, distbuild.Timer, 'connecting', + self._reconnect), + ('timeout', self, StopConnecting, None, self._stop), + ] + self.add_transitions(spec) + + def _start_connect(self): + logging.debug( + 'ConnectionMachine: connecting to %s:%s' % + (self._addr, self._port)) + self._socket = socket.socket() + distbuild.set_nonblocking(self._socket) + try: + self._socket.connect((self._addr, self._port)) + except socket.error, e: + if e.errno != errno.EINPROGRESS: + raise socket.error( + "%s (attempting connection to distbuild controller " + "at %s:%s)" % (e, self._addr, self._port)) + + src = distbuild.SocketEventSource(self._socket) + self._sock_proxy.event_source = src + + def _connect(self, event_source, event): + try: + self._socket.connect((self._addr, self._port)) + except socket.error, e: + logging.error( + 'Failed to connect to %s:%s: %s' % + (self._addr, self._port, str(e))) + self.mainloop.queue_event(self, ConnectError()) + return + self._sock_proxy.event_source = None + logging.info('Connected to %s:%s' % (self._addr, self._port)) + m = self._machine(self, self._socket, *self._extra_args) + self.mainloop.add_state_machine(m) + self._socket = None + + def _reconnect(self, event_source, event): + logging.info('Reconnecting to %s:%s' % (self._addr, self._port)) + if self._socket is not None: + self._socket.close() + self._timer.stop() + self._start_connect() + + def _stop(self, event_source, event): + logging.info( + 'Stopping connection attempts to %s:%s' % (self._addr, self._port)) + self.mainloop.remove_event_source(self._timer) + if self._socket is not None: + self._socket.close() + self._socket = None + + def _start_timer(self, event_source, event): + self._timer.start() + diff --git a/distbuild/crashpoint.py b/distbuild/crashpoint.py new file mode 100644 index 00000000..01bfbd6b --- /dev/null +++ b/distbuild/crashpoint.py @@ -0,0 +1,126 @@ +# distbuild/crashpoint.py -- user-controlled crashing +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +'''Crash the application. + +For crash testing, it's useful to easily induce crashes, to see how the +rest of the system manages. This module implements user-controllable +crashes. The code will be sprinkled with calls to the ``crash_point`` +function, which crashes the process if call matches a set of user-defined +criteria. + +The criteria consist of: + +* a filename +* a function name +* a maximum call count + +The criterion is fullfilled if ``crash_point`` is called from the named +function defined in the named file more than the given number of times. +Filename matching is using substrings (a filename pattern ``foo.py`` +matches an actual source file path of +``/usr/lib/python2.7/site-packages/distbuild/foo.py``), but function +names must match exactly. It is not possible to match on class names +(since that information is not available from a traceback). + +''' + + +import logging +import os +import sys +import traceback + + +detailed_logging = False + + +def debug(msg): # pragma: no cover + if detailed_logging: + logging.debug(msg) + + +class CrashCondition(object): + + def __init__(self, filename, funcname, max_calls): + self.filename = filename + self.funcname = funcname + self.max_calls = max_calls + self.called = 0 + + def matches(self, filename, funcname): + if self.filename not in filename: + debug( + 'crashpoint: filename mismatch: %s not in %s' % + (repr(self.filename), repr(filename))) + return False + + if self.funcname != funcname: + debug( + 'crashpoint: funcname mismatch: %s != %s' % + (self.funcname, funcname)) + return False + + debug('crashpoint: matches: %s %s' % (filename, funcname)) + return True + + def triggered(self, filename, funcname): + if self.matches(filename, funcname): + self.called += 1 + return self.called >= self.max_calls + else: + return False + + +crash_conditions = [] + + +def add_crash_condition(filename, funcname, max_calls): + crash_conditions.append(CrashCondition(filename, funcname, max_calls)) + + +def add_crash_conditions(strings): + for s in strings: + words = s.split(':') + if len(words) != 3: # pragma: no cover + logging.error('Ignoring malformed crash condition: %s' % repr(s)) + else: + add_crash_condition(words[0], words[1], int(words[2])) + + +def clear_crash_conditions(): + del crash_conditions[:] + + +def crash_point(frame=None): + if frame is None: + frames = traceback.extract_stack(limit=2) + frame = frames[0] + + filename, lineno, funcname, text = frame + + for condition in crash_conditions: + if condition.triggered(filename, funcname): + logging.critical( + 'Crash triggered from %s:%s:%s' % (filename, lineno, funcname)) + sys.exit(255) + else: + debug( + 'Crash not triggered by %s:%s:%s' % + (filename, lineno, funcname)) + diff --git a/distbuild/crashpoint_tests.py b/distbuild/crashpoint_tests.py new file mode 100644 index 00000000..2761ccac --- /dev/null +++ b/distbuild/crashpoint_tests.py @@ -0,0 +1,109 @@ +# distbuild/crashpoint_tests.py -- unit tests for crashpoint.py +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import unittest + +import crashpoint + + +class CrashConditionTests(unittest.TestCase): + + def setUp(self): + self.c = crashpoint.CrashCondition('bar', 'foofunc', 0) + + def test_matches_exact_filename(self): + self.assertTrue(self.c.matches('bar', 'foofunc')) + + def test_matches_basename(self): + self.assertTrue(self.c.matches('dir/bar', 'foofunc')) + + def test_matches_partial_basename(self): + self.assertTrue(self.c.matches('dir/bar.py', 'foofunc')) + + def test_matches_dirname(self): + self.assertTrue(self.c.matches('bar/something.py', 'foofunc')) + + def test_doesnt_match_wrong_function_name(self): + self.assertFalse(self.c.matches('bar', 'foo')) + + def test_triggered_first_time_with_zero_count(self): + c = crashpoint.CrashCondition('bar', 'foofunc', 0) + self.assertTrue(c.triggered('bar', 'foofunc')) + + def test_triggered_first_time_with_zero_count(self): + c = crashpoint.CrashCondition('bar', 'foofunc', 0) + self.assertTrue(c.triggered('bar', 'foofunc')) + + def test_triggered_second_time_with_zero_count(self): + c = crashpoint.CrashCondition('bar', 'foofunc', 0) + self.assertTrue(c.triggered('bar', 'foofunc')) + self.assertTrue(c.triggered('bar', 'foofunc')) + + def test_triggered_first_time_with_count_of_one(self): + c = crashpoint.CrashCondition('bar', 'foofunc', 1) + self.assertTrue(c.triggered('bar', 'foofunc')) + + def test_triggered_second_time_with_count_of_two(self): + c = crashpoint.CrashCondition('bar', 'foofunc', 2) + self.assertFalse(c.triggered('bar', 'foofunc')) + self.assertTrue(c.triggered('bar', 'foofunc')) + + def test_not_triggered_if_not_matched(self): + c = crashpoint.CrashCondition('bar', 'foofunc', 0) + self.assertFalse(c.triggered('bar', 'otherfunc')) + + +class CrashConditionsListTests(unittest.TestCase): + + def setUp(self): + crashpoint.clear_crash_conditions() + + def test_no_conditions_initially(self): + self.assertEqual(crashpoint.crash_conditions, []) + + def test_adds_condition(self): + crashpoint.add_crash_condition('foo.py', 'bar', 0) + self.assertEqual(len(crashpoint.crash_conditions), 1) + c = crashpoint.crash_conditions[0] + self.assertEqual(c.filename, 'foo.py') + self.assertEqual(c.funcname, 'bar') + self.assertEqual(c.max_calls, 0) + + def test_adds_conditions_from_list_of_strings(self): + crashpoint.add_crash_conditions(['foo.py:bar:0']) + self.assertEqual(len(crashpoint.crash_conditions), 1) + c = crashpoint.crash_conditions[0] + self.assertEqual(c.filename, 'foo.py') + self.assertEqual(c.funcname, 'bar') + self.assertEqual(c.max_calls, 0) + + +class CrashPointTests(unittest.TestCase): + + def setUp(self): + crashpoint.clear_crash_conditions() + crashpoint.add_crash_condition('foo.py', 'bar', 0) + + def test_triggers_crash(self): + self.assertRaises( + SystemExit, + crashpoint.crash_point, frame=('foo.py', 123, 'bar', 'text')) + + def test_does_not_trigger_crash(self): + self.assertEqual(crashpoint.crash_point(), None) + diff --git a/distbuild/eventsrc.py b/distbuild/eventsrc.py new file mode 100644 index 00000000..11bb16e8 --- /dev/null +++ b/distbuild/eventsrc.py @@ -0,0 +1,48 @@ +# mainloop/eventsrc.py -- interface for event sources +# +# Copyright 2012 Codethink Limited. +# All rights reserved. + + +class EventSource(object): + + '''A source of events for state machines. + + This is a base class. + + An event source watches one file descriptor, and returns events + related to it. The events may vary depending on the file descriptor. + The actual watching is done using select.select. + + ''' + + def get_select_params(self): + '''Return parameters to use for select for this event source. + + Three lists of file descriptors, and a timeout are returned. + The three lists and the timeout are used as arguments to the + select.select function, though they may be manipulated and + combined with return values from other event sources. + + ''' + + return [], [], [], None + + def get_events(self, r, w, x): + '''Return events related to this file descriptor. + + The arguments are the return values of select.select. + + ''' + + return [] + + def is_finished(self): + '''Is this event source finished? + + It's finished if it won't ever return any new events. + + ''' + + return False + diff --git a/distbuild/helper_router.py b/distbuild/helper_router.py new file mode 100644 index 00000000..752a5fdb --- /dev/null +++ b/distbuild/helper_router.py @@ -0,0 +1,197 @@ +# distbuild/helper_router.py -- state machine for controller's helper comms +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import logging + +import distbuild + + +class HelperRequest(object): + + def __init__(self, msg): + self.msg = msg + + +class HelperOutput(object): + + def __init__(self, msg): + self.msg = msg + + +class HelperResult(object): + + def __init__(self, msg): + self.msg = msg + + +class HelperRouter(distbuild.StateMachine): + + '''Route JSON messages between helpers and other state machines. + + This state machine relays and schedules access to one distbuild-helper + process. The helper process connects to a socket, which causes an + instance of HelperRouter to be created (one per connection). The + various instances co-ordinate requests automatically amongst + themselves. + + Other state machines in the same mainloop as HelperRouter can + request work from the helper process by emitting an event: + + * event source: the distbuild.HelperProcess class + * event: distbuild.HelperRequest instance + + The HelperRequest event gets a message to be serialised as JSON. + The message must be a Pythondict that the distbuild-helper understands. + + HelperRouter will send the msg to the next available helper process. + When the helper sends back the result, HelperRouter will emit a + HelperResult event, using the same ``request_id`` as the request had. + + For its internal use, HelperRouter sets the ``id`` item in the + request object. + + ''' + + _pending_requests = [] + _running_requests = {} + _pending_helpers = [] + _request_counter = distbuild.IdentifierGenerator('HelperRouter') + _route_map = distbuild.RouteMap() + + def __init__(self, conn): + distbuild.StateMachine.__init__(self, 'idle') + self.conn = conn + + def setup(self): + jm = distbuild.JsonMachine(self.conn) + self.mainloop.add_state_machine(jm) + + spec = [ + ('idle', HelperRouter, HelperRequest, 'idle', + self._handle_request), + ('idle', jm, distbuild.JsonNewMessage, 'idle', self._helper_msg), + ('idle', jm, distbuild.JsonEof, None, self._close), + ] + self.add_transitions(spec) + + def _handle_request(self, event_source, event): + '''Send request received via mainloop, or put in queue.''' + logging.debug('HelperRouter: received request: %s', repr(event.msg)) + self._enqueue_request(event.msg) + if self._pending_helpers: + self._send_request() + + def _helper_msg(self, event_source, event): + '''Handle message from helper.''' + +# logging.debug('HelperRouter: from helper: %s', repr(event.msg)) + + handlers = { + 'helper-ready': self._handle_helper_ready, + 'exec-output': self._handle_exec_output, + 'exec-response': self._handle_exec_response, + 'http-response': self._handle_http_response, + } + + handler = handlers[event.msg['type']] + handler(event_source, event.msg) + + def _handle_helper_ready(self, event_source, msg): + self._pending_helpers.append(event_source) + if self._pending_requests: + self._send_request() + + def _get_request(self, msg): + request_id = msg['id'] + if request_id in self._running_requests: + request, helper = self._running_requests[request_id] + return request + elif request_id is None: + logging.error( + 'Helper process sent message without "id" field: %s', + repr(event.msg)) + else: + logging.error( + 'Helper process sent message with unknown id: %s', + repr(event.msg)) + + def _new_message(self, msg): + old_id = msg['id'] + new_msg = dict(msg) + new_msg['id'] = self._route_map.get_incoming_id(old_id) + return new_msg + + def _handle_exec_output(self, event_source, msg): + request = self._get_request(msg) + if request is not None: + new_msg = self._new_message(msg) + self.mainloop.queue_event(HelperRouter, HelperOutput(new_msg)) + + def _handle_exec_response(self, event_source, msg): + request = self._get_request(msg) + if request is not None: + new_msg = self._new_message(msg) + self._route_map.remove(msg['id']) + del self._running_requests[msg['id']] + self.mainloop.queue_event(HelperRouter, HelperResult(new_msg)) + + def _handle_http_response(self, event_source, msg): + request = self._get_request(msg) + if request is not None: + new_msg = self._new_message(msg) + self._route_map.remove(msg['id']) + del self._running_requests[msg['id']] + self.mainloop.queue_event(HelperRouter, HelperResult(new_msg)) + + def _close(self, event_source, event): + logging.debug('HelperRouter: closing: %s', repr(event_source)) + event_source.close() + + # Remove from pending helpers. + if event_source in self._pending_helpers: + self._pending_helpers.remove(event_source) + + # Re-queue any requests running on the hlper that just quit. + for request_id in self._running_requests.keys(): + request, helper = self._running_requests[request_id] + if event_source == helper: + del self._running_requests[request_id] + self._enqueue_request(request) + + # Finally, if there are any pending requests and helpers, + # send requests. + while self._pending_requests and self._pending_helpers: + self._send_request() + + def _enqueue_request(self, request): + '''Put request into queue.''' +# logging.debug('HelperRouter: enqueuing request: %s' % repr(request)) + old_id = request['id'] + new_id = self._request_counter.next() + request['id'] = new_id + self._route_map.add(old_id, new_id) + self._pending_requests.append(request) + + def _send_request(self): + '''Pick the first queued request and send it to an available helper.''' + request = self._pending_requests.pop(0) + helper = self._pending_helpers.pop() + self._running_requests[request['id']] = (request, helper) + helper.send(request) +# logging.debug('HelperRouter: sent to helper: %s', repr(request)) + diff --git a/distbuild/idgen.py b/distbuild/idgen.py new file mode 100644 index 00000000..b642bd98 --- /dev/null +++ b/distbuild/idgen.py @@ -0,0 +1,33 @@ +# distbuild/idgen.py -- generate unique identifiers +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import logging + + +class IdentifierGenerator(object): + + '''Generate unique identifiers.''' + + def __init__(self, series): + self._series = series + self._counter = 0 + + def next(self): + self._counter += 1 + return '%s-%d' % (self._series, self._counter) + diff --git a/distbuild/initiator.py b/distbuild/initiator.py new file mode 100644 index 00000000..9c7e2ddf --- /dev/null +++ b/distbuild/initiator.py @@ -0,0 +1,195 @@ +# distbuild/initiator.py -- state machine for the initiator +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import cliapp +import logging +import random +import sys + +import distbuild + + +class _Finished(object): + + def __init__(self, msg): + self.msg = msg + + +class _Failed(object): + + def __init__(self, msg): + self.msg = msg + + +class Initiator(distbuild.StateMachine): + + def __init__(self, cm, conn, app, repo_name, ref, morphology): + distbuild.StateMachine.__init__(self, 'waiting') + self._cm = cm + self._conn = conn + self._app = app + self._repo_name = repo_name + self._ref = ref + self._morphology = morphology + self._steps = None + self._step_outputs = {} + self.debug_transitions = True + + def setup(self): + distbuild.crash_point() + + self._jm = distbuild.JsonMachine(self._conn) + self.mainloop.add_state_machine(self._jm) + logging.debug('initiator: _jm=%s' % repr(self._jm)) + + spec = [ + ('waiting', self._jm, distbuild.JsonEof, None, self._terminate), + ('waiting', self._jm, distbuild.JsonNewMessage, 'waiting', + self._handle_json_message), + ('waiting', self, _Finished, None, self._succeed), + ('waiting', self, _Failed, None, self._fail), + ] + self.add_transitions(spec) + + random_id = random.randint(0, 2**32-1) + + self._app.status( + msg='Requesting build of %(repo)s %(ref)s %(morph)s', + repo=self._repo_name, + ref=self._ref, + morph=self._morphology) + msg = distbuild.message('build-request', + id=random_id, + repo=self._repo_name, + ref=self._ref, + morphology=self._morphology + ) + self._jm.send(msg) + logging.debug('Initiator: sent to controller: %s', repr(msg)) + + def _handle_json_message(self, event_source, event): + distbuild.crash_point() + + logging.debug('Initiator: from controller: %s' % repr(event.msg)) + + handlers = { + 'build-finished': self._handle_build_finished_message, + 'build-failed': self._handle_build_failed_message, + 'build-progress': self._handle_build_progress_message, + 'build-steps': self._handle_build_steps_message, + 'step-started': self._handle_step_started_message, + 'step-output': self._handle_step_output_message, + 'step-finished': self._handle_step_finished_message, + 'step-failed': self._handle_step_failed_message, + } + + handler = handlers[event.msg['type']] + handler(event.msg) + + def _handle_build_finished_message(self, msg): + self.mainloop.queue_event(self, _Finished(msg)) + + def _handle_build_failed_message(self, msg): + self.mainloop.queue_event(self, _Failed(msg)) + + def _handle_build_progress_message(self, msg): + self._app.status(msg='Progress: %(msgtext)s', msgtext=msg['message']) + + def _handle_build_steps_message(self, msg): + self._steps = msg['steps'] + self._app.status( + msg='Build steps in total: %(steps)d', + steps=len(self._steps)) + + def _open_output(self, msg): + assert msg['step_name'] not in self._step_outputs + filename = 'build-step-%s.log' % msg['step_name'] + f = open(filename, 'a') + self._step_outputs[msg['step_name']] = f + worker = msg['worker'] + f.write('Worker: %s\n%s\n\n' % (worker, '-' * len(worker))) + + def _close_output(self, msg): + self._step_outputs[msg['step_name']].close() + del self._step_outputs[msg['step_name']] + + def _handle_step_started_message(self, msg): + self._app.status( + msg='Started building %(step_name)s on %(worker_name)s', + step_name=msg['step_name'], + worker_name=msg['worker_name']) + self._open_output(msg) + + def _handle_step_output_message(self, msg): + step_name = msg['step_name'] + if step_name in self._step_outputs: + f = self._step_outputs[step_name] + f.write(msg['stdout']) + f.write(msg['stderr']) + f.flush() + else: + logging.warning( + 'Got step-output message for unknown step: %s' % repr(msg)) + + def _handle_step_finished_message(self, msg): + step_name = msg['step_name'] + if step_name in self._step_outputs: + self._app.status( + msg='Finished building %(step_name)s', + step_name=step_name) + self._close_output(msg) + else: + logging.warning( + 'Got step-finished message for unknown step: %s' % repr(msg)) + + def _handle_step_failed_message(self, msg): + step_name = msg['step_name'] + if step_name in self._step_outputs: + self._app.status( + msg='Build failed: %(step_name)s', + step_name=step_name) + self._close_output(msg) + else: + logging.warning( + 'Got step-failed message for unknown step: %s' % repr(msg)) + + def _succeed(self, event_source, event): + self.mainloop.queue_event(self._cm, distbuild.StopConnecting()) + self._jm.close() + logging.info('Build finished OK') + + urls = event.msg['urls'] + if urls: + for url in urls: + self._app.status(msg='Artifact: %(url)s', url=url) + else: + self._app.status( + msg='Controller did not give us any artifact URLs.') + + def _fail(self, event_source, event): + self.mainloop.queue_event(self._cm, distbuild.StopConnecting()) + self._jm.close() + raise cliapp.AppException( + 'Failed to build %s %s %s: %s' % + (self._repo_name, self._ref, self._morphology, + event.msg['reason'])) + + def _terminate(self, event_source, event): + self.mainloop.queue_event(self._cm, distbuild.StopConnecting()) + self._jm.close() + diff --git a/distbuild/initiator_connection.py b/distbuild/initiator_connection.py new file mode 100644 index 00000000..48d083e4 --- /dev/null +++ b/distbuild/initiator_connection.py @@ -0,0 +1,216 @@ +# distbuild/initiator_connection.py -- communicate with initiator +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import logging + +import distbuild + + +class InitiatorDisconnect(object): + + def __init__(self, id): + self.id = id + + +class _Close(object): + + def __init__(self, event_source): + self.event_source = event_source + + +class InitiatorConnection(distbuild.StateMachine): + + '''Communicate with the initiator. + + This state machine communicates with the initiator, relaying and + translating messages from the initiator to the rest of the controller's + state machines, and vice versa. + + ''' + + _idgen = distbuild.IdentifierGenerator('InitiatorConnection') + _route_map = distbuild.RouteMap() + + def __init__(self, conn, artifact_cache_server, morph_instance): + distbuild.StateMachine.__init__(self, 'idle') + self.conn = conn + self.artifact_cache_server = artifact_cache_server + self.morph_instance = morph_instance + + def setup(self): + self.jm = distbuild.JsonMachine(self.conn) + self.mainloop.add_state_machine(self.jm) + + self.our_ids = set() + + spec = [ + ('idle', self.jm, distbuild.JsonNewMessage, 'idle', + self._handle_msg), + ('idle', self.jm, distbuild.JsonEof, 'closing', self._disconnect), + ('idle', distbuild.BuildController, distbuild.BuildFinished, + 'idle', self._send_build_finished_message), + ('idle', distbuild.BuildController, distbuild.BuildFailed, + 'idle', self._send_build_failed_message), + ('idle', distbuild.BuildController, distbuild.BuildProgress, + 'idle', self._send_build_progress_message), + ('idle', distbuild.BuildController, distbuild.BuildSteps, + 'idle', self._send_build_steps_message), + ('idle', distbuild.BuildController, distbuild.BuildStepStarted, + 'idle', self._send_build_step_started_message), + ('idle', distbuild.BuildController, distbuild.BuildOutput, + 'idle', self._send_build_output_message), + ('idle', distbuild.BuildController, distbuild.BuildStepFinished, + 'idle', self._send_build_step_finished_message), + ('idle', distbuild.BuildController, distbuild.BuildStepFailed, + 'idle', self._send_build_step_failed_message), + ('closing', self, _Close, None, self._close), + ] + self.add_transitions(spec) + + def _handle_msg(self, event_source, event): + '''Handle message from initiator.''' + + logging.debug( + 'InitiatorConnection: from initiator: %s', repr(event.msg)) + + if event.msg['type'] == 'build-request': + new_id = self._idgen.next() + self.our_ids.add(new_id) + self._route_map.add(event.msg['id'], new_id) + event.msg['id'] = new_id + build_controller = distbuild.BuildController( + event.msg, self.artifact_cache_server, self.morph_instance) + self.mainloop.add_state_machine(build_controller) + + def _disconnect(self, event_source, event): + for id in self.our_ids: + logging.debug('InitiatorConnection: InitiatorDisconnect(%s)' + % str(id)) + self.mainloop.queue_event(InitiatorConnection, + InitiatorDisconnect(id)) + # TODO should this clear our_ids? + self.mainloop.queue_event(self, _Close(event_source)) + + def _close(self, event_source, event): + logging.debug('InitiatorConnection: closing: %s', + repr(event.event_source)) + + event.event_source.close() + + def _handle_result(self, event_source, event): + '''Handle result from helper.''' + + if event.msg['id'] in self.our_ids: + logging.debug( + 'InitiatorConnection: received result: %s', repr(event.msg)) + self.jm.send(event.msg) + + def _send_build_finished_message(self, event_source, event): + if event.id in self.our_ids: + msg = distbuild.message('build-finished', + id=self._route_map.get_incoming_id(event.id), + urls=event.urls) + self._route_map.remove(event.id) + self.our_ids.remove(event.id) + self.jm.send(msg) + logging.debug( + 'InitiatorConnection: sent to initiator: %s', repr(msg)) + + def _send_build_failed_message(self, event_source, event): + if event.id in self.our_ids: + msg = distbuild.message('build-failed', + id=self._route_map.get_incoming_id(event.id), + reason=event.reason) + self._route_map.remove(event.id) + self.our_ids.remove(event.id) + self.jm.send(msg) + logging.debug( + 'InitiatorConnection: sent to initiator: %s', repr(msg)) + + def _send_build_progress_message(self, event_source, event): + if event.id in self.our_ids: + msg = distbuild.message('build-progress', + id=self._route_map.get_incoming_id(event.id), + message=event.message_text) + self.jm.send(msg) + logging.debug( + 'InitiatorConnection: sent to initiator: %s', repr(msg)) + + def _send_build_steps_message(self, event_source, event): + + def make_step_dict(artifact): + return { + 'name': distbuild.build_step_name(artifact), + 'build-depends': [ + distbuild.build_step_name(x) + for x in artifact.dependencies + ] + } + + if event.id in self.our_ids: + step_names = distbuild.map_build_graph( + event.artifact, make_step_dict) + msg = distbuild.message('build-steps', + id=self._route_map.get_incoming_id(event.id), + steps=step_names) + self.jm.send(msg) + logging.debug( + 'InitiatorConnection: sent to initiator: %s', repr(msg)) + + def _send_build_step_started_message(self, event_source, event): + if event.id in self.our_ids: + msg = distbuild.message('step-started', + id=self._route_map.get_incoming_id(event.id), + step_name=event.step_name, + worker_name=event.worker_name) + self.jm.send(msg) + logging.debug( + 'InitiatorConnection: sent to initiator: %s', repr(msg)) + + def _send_build_output_message(self, event_source, event): + logging.debug('InitiatorConnection: build_output: ' + 'id=%s stdout=%s stderr=%s' % + (repr(event.id), repr(event.stdout), repr(event.stderr))) + if event.id in self.our_ids: + msg = distbuild.message('step-output', + id=self._route_map.get_incoming_id(event.id), + step_name=event.step_name, + stdout=event.stdout, + stderr=event.stderr) + self.jm.send(msg) + logging.debug( + 'InitiatorConnection: sent to initiator: %s', repr(msg)) + + def _send_build_step_finished_message(self, event_source, event): + if event.id in self.our_ids: + msg = distbuild.message('step-finished', + id=self._route_map.get_incoming_id(event.id), + step_name=event.step_name) + self.jm.send(msg) + logging.debug( + 'InitiatorConnection: sent to initiator: %s', repr(msg)) + + def _send_build_step_failed_message(self, event_source, event): + if event.id in self.our_ids: + msg = distbuild.message('step-failed', + id=self._route_map.get_incoming_id(event.id), + step_name=event.step_name) + self.jm.send(msg) + logging.debug( + 'InitiatorConnection: sent to initiator: %s', repr(msg)) + diff --git a/distbuild/jm.py b/distbuild/jm.py new file mode 100644 index 00000000..ae222c00 --- /dev/null +++ b/distbuild/jm.py @@ -0,0 +1,98 @@ +# mainloop/jm.py -- state machine for JSON communication between nodes +# +# Copyright 2012 Codethink Limited. +# All rights reserved. + + +import fcntl +import json +import logging +import os +import socket +import sys + +from sm import StateMachine +from stringbuffer import StringBuffer +from sockbuf import (SocketBuffer, SocketBufferNewData, + SocketBufferEof, SocketError) + + +class JsonNewMessage(object): + + def __init__(self, msg): + self.msg = msg + + +class JsonEof(object): + + pass + + +class _Close2(object): + + pass + + +class JsonMachine(StateMachine): + + '''A state machine for sending/receiving JSON messages across TCP.''' + + max_buffer = 16 * 1024 + + def __init__(self, conn): + StateMachine.__init__(self, 'rw') + self.conn = conn + self.debug_json = False + + def setup(self): + sockbuf = self.sockbuf = SocketBuffer(self.conn, self.max_buffer) + self.mainloop.add_state_machine(sockbuf) + + self._eof = False + self.receive_buf = StringBuffer() + + spec = [ + ('rw', sockbuf, SocketBufferNewData, 'rw', self._parse), + ('rw', sockbuf, SocketBufferEof, 'w', self._send_eof), + ('rw', self, _Close2, None, self._really_close), + + ('w', self, _Close2, None, self._really_close), + ] + self.add_transitions(spec) + + def send(self, msg): + '''Send a message to the other side.''' + self.sockbuf.write('%s\n' % json.dumps(msg)) + + def close(self): + '''Tell state machine it should shut down. + + The state machine will vanish once it has flushed any pending + writes. + + ''' + + self.mainloop.queue_event(self, _Close2()) + + def _parse(self, event_source, event): + data = event.data + self.receive_buf.add(data) + if self.debug_json: + logging.debug('JsonMachine: Received: %s' % repr(data)) + while True: + line = self.receive_buf.readline() + if line is None: + break + line = line.rstrip() + if self.debug_json: + logging.debug('JsonMachine: line: %s' % repr(line)) + msg = json.loads(line) + self.mainloop.queue_event(self, JsonNewMessage(msg)) + + def _send_eof(self, event_source, event): + self.mainloop.queue_event(self, JsonEof()) + + def _really_close(self, event_source, event): + self.sockbuf.close() + self._send_eof(event_source, event) + diff --git a/distbuild/json_router.py b/distbuild/json_router.py new file mode 100644 index 00000000..bf272174 --- /dev/null +++ b/distbuild/json_router.py @@ -0,0 +1,164 @@ +# distbuild/json_router.py -- state machine to route JSON messages +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import logging + +import distbuild + + +class JsonRouter(distbuild.StateMachine): + + '''Route JSON messages between clients and helpers. + + This state machine receives JSON messages from clients and helpers, + and routes messages between them. + + Each incoming request is labeled with a unique identifier, then + sent to the next free helper. The helper's response will retain + the unique id, so that the response can be routed to the right + client. + + ''' + + pending_requests = [] + running_requests = {} + pending_helpers = [] + request_counter = distbuild.IdentifierGenerator('JsonRouter') + route_map = distbuild.RouteMap() + + def __init__(self, conn): + distbuild.StateMachine.__init__(self, 'idle') + self.conn = conn + logging.debug('JsonMachine: connection from %s', conn.getpeername()) + + def setup(self): + jm = distbuild.JsonMachine(self.conn) + jm.debug_json = True + self.mainloop.add_state_machine(jm) + + spec = [ + ('idle', jm, distbuild.JsonNewMessage, 'idle', self.bloop), + ('idle', jm, distbuild.JsonEof, None, self.close), + ] + self.add_transitions(spec) + + def _lookup_request(self, request_id): + if request_id in self.running_requests: + return self.running_requests[request_id] + else: + return None + + def bloop(self, event_source, event): + logging.debug('JsonRouter: got msg: %s', repr(event.msg)) + handlers = { + 'http-request': self.do_request, + 'http-response': self.do_response, + 'exec-request': self.do_request, + 'exec-cancel': self.do_cancel, + 'exec-output': self.do_exec_output, + 'exec-response': self.do_response, + 'helper-ready': self.do_helper_ready, + } + handler = handlers.get(event.msg['type']) + handler(event_source, event) + + def do_request(self, client, event): + self._enqueue_request(client, event.msg) + if self.pending_helpers: + self._send_request() + + def do_cancel(self, client, event): + for id in self.route_map.get_outgoing_ids(event.msg['id']): + logging.debug('JsonRouter: looking up request for id %s', id) + t = self._lookup_request(id) + if t: + helper = t[2] + new = dict(event.msg) + new['id'] = id + helper.send(new) + logging.debug('JsonRouter: sent to helper: %s', repr(new)) + + def do_response(self, helper, event): + t = self._lookup_request(event.msg['id']) + if t: + client, msg, helper = t + new = dict(event.msg) + new['id'] = self.route_map.get_incoming_id(msg['id']) + client.send(new) + logging.debug('JsonRouter: sent to client: %s', repr(new)) + + def do_helper_ready(self, helper, event): + self.pending_helpers.append(helper) + if self.pending_requests: + self._send_request() + + def do_exec_output(self, helper, event): + t = self._lookup_request(event.msg['id']) + if t: + client, msg, helper = t + new = dict(event.msg) + new['id'] = self.route_map.get_incoming_id(msg['id']) + client.send(new) + logging.debug('JsonRouter: sent to client: %s', repr(new)) + + def close(self, event_source, event): + logging.debug('closing: %s', repr(event_source)) + event_source.close() + + # Remove from pending helpers. + if event_source in self.pending_helpers: + self.pending_helpers.remove(event_source) + + # Remove from running requests, and put the request back in the + # pending requests queue if the helper quit (but not if the + # client quit). + for request_id in self.running_requests.keys(): + client, msg, helper = self.running_requests[request_id] + if event_source == client: + del self.running_requests[request_id] + elif event_source == helper: + del self.running_requests[request_id] + self._enqueue_request(client, msg) + + # Remove from pending requests, if the client quit. + i = 0 + while i < len(self.pending_requests): + client, msg = self.pending_requests[i] + if event_source == client: + del self.pending_requests[i] + else: + i += 1 + + # Finally, if there are any pending requests and helpers, + # send requests. + while self.pending_requests and self.pending_helpers: + self._send_request() + + def _enqueue_request(self, client, msg): + new = dict(msg) + new['id'] = self.request_counter.next() + self.route_map.add(msg['id'], new['id']) + self.pending_requests.append((client, new)) + + def _send_request(self): + client, msg = self.pending_requests.pop(0) + helper = self.pending_helpers.pop() + self.running_requests[msg['id']] = (client, msg, helper) + helper.send(msg) + logging.debug('JsonRouter: sent to helper: %s', repr(msg)) + diff --git a/distbuild/mainloop.py b/distbuild/mainloop.py new file mode 100644 index 00000000..e3f9ae2d --- /dev/null +++ b/distbuild/mainloop.py @@ -0,0 +1,117 @@ +# mainloop/mainloop.py -- select-based main loop +# +# Copyright 2012 Codethink Limited. +# All rights reserved. + + +import fcntl +import logging +import os +import select + + +class MainLoop(object): + + '''A select-based main loop. + + The main loop watches a set of file descriptors wrapped in + EventSource objects, and when something happens with them, + asks the EventSource objects to create events, which it then + feeds into user-supplied state machines. The state machines + can create further events, which are processed further. + + When nothing is happening, the main loop sleeps in the + select.select call. + + ''' + + def __init__(self): + self._machines = [] + self._sources = [] + self._events = [] + self.dump_filename = None + + def add_state_machine(self, machine): + logging.debug('MainLoop.add_state_machine: %s' % machine) + machine.mainloop = self + machine.setup() + self._machines.append(machine) + if self.dump_filename: + filename = '%s%s.dot' % (self.dump_filename, + machine.__class__.__name__) + machine.dump_dot(filename) + + def remove_state_machine(self, machine): + logging.debug('MainLoop.remove_state_machine: %s' % machine) + self._machines.remove(machine) + + def add_event_source(self, event_source): + logging.debug('MainLoop.add_event_source: %s' % event_source) + self._sources.append(event_source) + + def remove_event_source(self, event_source): + logging.debug('MainLoop.remove_event_source: %s' % event_source) + self._sources.remove(event_source) + + def _setup_select(self): + r = [] + w = [] + x = [] + timeout = None + + self._sources = [s for s in self._sources if not s.is_finished()] + + for event_source in self._sources: + sr, sw, sx, st = event_source.get_select_params() + r.extend(sr) + w.extend(sw) + x.extend(sx) + if timeout is None: + timeout = st + elif st is not None: + timeout = min(timeout, st) + + return r, w, x, timeout + + def _run_once(self): + r, w, x, timeout = self._setup_select() + assert r or w or x or timeout is not None + r, w, x = select.select(r, w, x, timeout) + + for event_source in self._sources: + if event_source.is_finished(): + self.remove_event_source(event_source) + else: + for event in event_source.get_events(r, w, x): + self.queue_event(event_source, event) + + for event_source, event in self._dequeue_events(): + for machine in self._machines[:]: + for new_event in machine.handle_event(event_source, event): + self.queue_event(event_source, new_event) + if machine.state is None: + self.remove_state_machine(machine) + + def run(self): + '''Run the main loop. + + The main loop terminates when there are no state machines to + run anymore. + + ''' + + logging.debug('MainLoop starts') + while self._machines: + self._run_once() + logging.debug('MainLoop ends') + + def queue_event(self, event_source, event): + '''Add an event to queue of events to be processed.''' + + self._events.append((event_source, event)) + + def _dequeue_events(self): + while self._events: + event_queue, event = self._events.pop(0) + yield event_queue, event + diff --git a/distbuild/protocol.py b/distbuild/protocol.py new file mode 100644 index 00000000..5d693761 --- /dev/null +++ b/distbuild/protocol.py @@ -0,0 +1,93 @@ +# distbuild/protocol.py -- abstractions for the JSON messages +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +'''Construct protocol message objects (dicts).''' + + +_types = { + 'build-request': [ + 'id', + 'repo', + 'ref', + 'morphology', + ], + 'build-progress': [ + 'id', + 'message', + ], + 'build-steps': [ + 'id', + 'steps', + ], + 'step-started': [ + 'id', + 'step_name', + 'worker_name', + ], + 'step-output': [ + 'id', + 'step_name', + 'stdout', + 'stderr', + ], + 'step-finished': [ + 'id', + 'step_name', + ], + 'step-failed': [ + 'id', + 'step_name', + ], + 'build-finished': [ + 'id', + 'urls', + ], + 'build-failed': [ + 'id', + 'reason', + ], + 'exec-request': [ + 'id', + 'argv', + 'stdin_contents', + ], + 'exec-cancel': [ + 'id', + ], + 'http-request': [ + 'id', + 'url', + 'method', + ], +} + + +def message(message_type, **kwargs): + assert message_type in _types + required_fields = _types[message_type] + + for name in required_fields: + assert name in kwargs, 'field %s is required' % name + + for name in kwargs: + assert name in required_fields, 'field %s is not allowed' % name + + msg = dict(kwargs) + msg['type'] = message_type + return msg + diff --git a/distbuild/proxy_event_source.py b/distbuild/proxy_event_source.py new file mode 100644 index 00000000..5b35b741 --- /dev/null +++ b/distbuild/proxy_event_source.py @@ -0,0 +1,47 @@ +# distbuild/proxy_event_source.py -- proxy for temporary event sources +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import errno +import logging +import socket + +import distbuild + + +class ProxyEventSource(object): + + '''Proxy event sources that may come and go.''' + + def __init__(self): + self.event_source = None + + def get_select_params(self): + if self.event_source: + return self.event_source.get_select_params() + else: + return [], [], [], None + + def get_events(self, r, w, x): + if self.event_source: + return self.event_source.get_events(r, w, x) + else: + return [] + + def is_finished(self): + return False + diff --git a/distbuild/route_map.py b/distbuild/route_map.py new file mode 100644 index 00000000..f67bcb54 --- /dev/null +++ b/distbuild/route_map.py @@ -0,0 +1,60 @@ +# distbuild/route_map.py -- map message ids for routing purposes +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +class RouteMap(object): + + '''Map message identifiers for routing purposes. + + Various state machines need to handle requests coming from multiple + sources, and they need to keep track of which responses should be + sent to which requestors. This class provides tools for keeping + track of that. + + Each message is expected to have a unique identifier of some sort. + The incoming request message has one, and all responses to it need + to keep that. An incoming request might be converted into one or more + outgoing requests, each with its own unique id. The responses to all + of those need to be mapped back to the original incoming request. + + For this class, we care about "incoming id" and "outgoing id". + There can be multiple outgoing identifiers for one incoming one. + + ''' + + def __init__(self): + self._routes = {} + + def add(self, incoming_id, outgoing_id): + assert (outgoing_id not in self._routes or + self._routes[outgoing_id] == incoming_id) + self._routes[outgoing_id] = incoming_id + + def get_incoming_id(self, outgoing_id): + '''Get the incoming id corresponding to an outgoing one. + + Raise KeyError if not found. + + ''' + + return self._routes[outgoing_id] + + def get_outgoing_ids(self, incoming_id): + return [o for (o, i) in self._routes.iteritems() if i == incoming_id] + + def remove(self, outgoing_id): + del self._routes[outgoing_id] diff --git a/distbuild/route_map_tests.py b/distbuild/route_map_tests.py new file mode 100644 index 00000000..c8ed69b3 --- /dev/null +++ b/distbuild/route_map_tests.py @@ -0,0 +1,56 @@ +# distbuild/route_map_tests.py -- unit tests for message routing +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import unittest + +import distbuild + + +class RouteMapTests(unittest.TestCase): + + def setUp(self): + self.rm = distbuild.RouteMap() + + def test_raises_error_for_unknown_route(self): + self.assertRaises(KeyError, self.rm.get_incoming_id, 'outgoing') + + def test_finds_added_route(self): + self.rm.add('incoming', 'outgoing') + self.assertEqual(self.rm.get_incoming_id('outgoing'), 'incoming') + + def test_finds_outgoing_ids(self): + self.rm.add('incoming', 'outgoing') + self.assertEqual(self.rm.get_outgoing_ids('incoming'), ['outgoing']) + + def test_removes_added_route(self): + self.rm.add('incoming', 'outgoing') + self.rm.remove('outgoing') + self.assertRaises(KeyError, self.rm.get_incoming_id, 'outgoing') + + def test_raises_error_if_forgetting_unknown_route(self): + self.assertRaises(KeyError, self.rm.remove, 'outgoing') + + def test_silently_ignores_adding_existing_route(self): + self.rm.add('incoming', 'outgoing') + self.rm.add('incoming', 'outgoing') + self.assertEqual(self.rm.get_incoming_id('outgoing'), 'incoming') + + def test_raises_assert_if_adding_conflicting_route(self): + self.rm.add('incoming', 'outgoing') + self.assertRaises(AssertionError, self.rm.add, 'different', 'outgoing') + diff --git a/distbuild/serialise.py b/distbuild/serialise.py new file mode 100644 index 00000000..060833b1 --- /dev/null +++ b/distbuild/serialise.py @@ -0,0 +1,166 @@ +# distbuild/serialise.py -- (de)serialise Artifact object graphs +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import json + +import morphlib + + +morphology_attributes = [ + 'needs_artifact_metadata_cached', +] + + +def serialise_artifact(artifact): + '''Serialise an Artifact object and its dependencies into string form.''' + + def encode_morphology(morphology): + result = {} + for key in morphology.keys(): + result[key] = morphology[key] + for x in morphology_attributes: + result['__%s' % x] = getattr(morphology, x) + return result + + def encode_source(source): + source_dic = { + 'repo': None, + 'repo_name': source.repo_name, + 'original_ref': source.original_ref, + 'sha1': source.sha1, + 'tree': source.tree, + 'morphology': encode_morphology(source.morphology), + 'filename': source.filename, + } + if source.morphology['kind'] == 'chunk': + source_dic['build_mode'] = source.build_mode + source_dic['prefix'] = source.prefix + return source_dic + + def encode_single_artifact(a, encoded): + if artifact.source.morphology['kind'] == 'system': + arch = artifact.source.morphology['arch'] + else: + arch = artifact.arch + return { + 'source': encode_source(a.source), + 'name': a.name, + 'cache_id': a.cache_id, + 'cache_key': a.cache_key, + 'dependencies': [encoded[d.cache_key]['cache_key'] + for d in a.dependencies], + 'arch': arch, + } + + visited = set() + def traverse(a): + visited.add(a) + for dep in a.dependencies: + if dep in visited: + continue + for ret in traverse(dep): + yield ret + yield a + + encoded = {} + for a in traverse(artifact): + if a.cache_key not in encoded: + encoded[a.cache_key] = encode_single_artifact(a, encoded) + + encoded['_root'] = artifact.cache_key + return json.dumps(encoded) + + +def deserialise_artifact(encoded): + '''Re-construct the Artifact object (and dependencies). + + The argument should be a string returned by ``serialise_artifact``. + The reconstructed Artifact objects will be sufficiently like the + originals that they can be used as a build graph, and other such + purposes, by Morph. + + ''' + + def unserialise_morphology(le_dict): + '''Convert a dict into something that kinda acts like a Morphology. + + As it happens, we don't need the full Morphology so we cheat. + Cheating is good. + + ''' + + class FakeMorphology(dict): + + def get_commands(self, which): + '''Get commands to run from a morphology or build system''' + if self[which] is None: + attr = '_'.join(which.split('-')) + bs = morphlib.buildsystem.lookup_build_system( + self['build-system']) + return getattr(bs, attr) + else: + return self[which] + + morphology = FakeMorphology(le_dict) + for x in morphology_attributes: + setattr(morphology, x, le_dict['__%s' % x]) + del morphology['__%s' % x] + return morphology + + def unserialise_source(le_dict): + '''Convert a dict into a Source object.''' + + morphology = unserialise_morphology(le_dict['morphology']) + source = morphlib.source.Source(le_dict['repo_name'], + le_dict['original_ref'], + le_dict['sha1'], + le_dict['tree'], + morphology, + le_dict['filename']) + if morphology['kind'] == 'chunk': + source.build_mode = le_dict['build_mode'] + source.prefix = le_dict['prefix'] + return source + + def unserialise_single_artifact(le_dict): + '''Convert dict into an Artifact object. + + Do not set dependencies, that will be dealt with later. + + ''' + + source = unserialise_source(le_dict['source']) + artifact = morphlib.artifact.Artifact(source, le_dict['name']) + artifact.cache_id = le_dict['cache_id'] + artifact.cache_key = le_dict['cache_key'] + artifact.arch = le_dict['arch'] + return artifact + + le_dicts = json.loads(encoded) + cache_keys = [k for k in le_dicts.keys() if k != '_root'] + artifacts = {} + for cache_key in cache_keys: + le_dict = le_dicts[cache_key] + artifacts[cache_key] = unserialise_single_artifact(le_dict) + for cache_key in cache_keys: + le_dict = le_dicts[cache_key] + artifact = artifacts[cache_key] + artifact.dependencies = [artifacts[k] for k in le_dict['dependencies']] + + return artifacts[le_dicts['_root']] + diff --git a/distbuild/serialise_tests.py b/distbuild/serialise_tests.py new file mode 100644 index 00000000..2b4b3af7 --- /dev/null +++ b/distbuild/serialise_tests.py @@ -0,0 +1,148 @@ +# distbuild/serialise_tests.py -- unit tests for Artifact serialisation +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import unittest + +import distbuild + + +class MockMorphology(object): + + def __init__(self, name): + self.dict = { + 'name': '%s.morphology.name' % name, + 'kind': '%s.morphology.kind' % name, + } + self.needs_staging_area = None + self.needs_artifact_metadata_cached = None + + def keys(self): + return self.dict.keys() + + def __getitem__(self, key): + return self.dict[key] + + +class MockSource(object): + + def __init__(self, name): + self.repo = None + self.repo_name = '%s.source.repo_name' % name + self.original_ref = '%s.source.original_ref' % name + self.sha1 = '%s.source.sha1' % name + self.tree = '%s.source.tree' % name + self.morphology = MockMorphology(name) + self.filename = '%s.source.filename' % name + + +class MockArtifact(object): + + def __init__(self, name): + self.source = MockSource(name) + self.name = name + self.cache_id = { + 'blip': '%s.blip' % name, + 'integer': 42, + } + self.cache_key = '%s.cache_key' % name + self.dependencies = [] + + +class SerialisationTests(unittest.TestCase): + + def setUp(self): + self.art1 = MockArtifact('name1') + self.art2 = MockArtifact('name2') + self.art3 = MockArtifact('name3') + self.art4 = MockArtifact('name4') + + def assertEqualMorphologies(self, a, b): + self.assertEqual(sorted(a.keys()), sorted(b.keys())) + keys = sorted(a.keys()) + a_values = [a[k] for k in keys] + b_values = [b[k] for k in keys] + self.assertEqual(a_values, b_values) + self.assertEqual(a.needs_staging_area, b.needs_staging_area) + self.assertEqual(a.needs_artifact_metadata_cached, + b.needs_artifact_metadata_cached) + self.assertEqual(a.needs_staging_area, + b.needs_staging_area) + + def assertEqualSources(self, a, b): + self.assertEqual(a.repo, b.repo) + self.assertEqual(a.repo_name, b.repo_name) + self.assertEqual(a.original_ref, b.original_ref) + self.assertEqual(a.sha1, b.sha1) + self.assertEqual(a.tree, b.tree) + self.assertEqualMorphologies(a.morphology, b.morphology) + self.assertEqual(a.filename, b.filename) + + def assertEqualArtifacts(self, a, b): + self.assertEqualSources(a.source, b.source) + self.assertEqual(a.name, b.name) + self.assertEqual(a.cache_id, b.cache_id) + self.assertEqual(a.cache_key, b.cache_key) + self.assertEqual(len(a.dependencies), len(b.dependencies)) + for i in range(len(a.dependencies)): + self.assertEqualArtifacts(a.dependencies[i], b.dependencies[i]) + + def verify_round_trip(self, artifact): + encoded = distbuild.serialise_artifact(artifact) + decoded = distbuild.deserialise_artifact(encoded) + self.assertEqualArtifacts(artifact, decoded) + + def key(a): + return a.cache_key + + objs = {} + queue = [decoded] + while queue: + obj = queue.pop() + k = key(obj) + if k in objs: + self.assertTrue(obj is objs[k]) + else: + objs[k] = obj + queue.extend(obj.dependencies) + + def test_returns_string(self): + encoded = distbuild.serialise_artifact(self.art1) + self.assertEqual(type(encoded), str) + + def test_works_without_dependencies(self): + self.verify_round_trip(self.art1) + + def test_works_with_single_dependency(self): + self.art1.dependencies = [self.art2] + self.verify_round_trip(self.art1) + + def test_works_with_two_dependencies(self): + self.art1.dependencies = [self.art2, self.art3] + self.verify_round_trip(self.art1) + + def test_works_with_two_levels_of_dependencies(self): + self.art2.dependencies = [self.art4] + self.art1.dependencies = [self.art2, self.art3] + self.verify_round_trip(self.art1) + + def test_works_with_dag(self): + self.art2.dependencies = [self.art4] + self.art3.dependencies = [self.art4] + self.art1.dependencies = [self.art2, self.art3] + self.verify_round_trip(self.art1) + diff --git a/distbuild/sm.py b/distbuild/sm.py new file mode 100644 index 00000000..f50515c3 --- /dev/null +++ b/distbuild/sm.py @@ -0,0 +1,139 @@ +# mainloop/sm.py -- state machine abstraction +# +# Copyright 2012 Codethink Limited. +# All rights reserved. + + +import logging +import re + + +classnamepat = re.compile(r".*)'>") + + +class StateMachine(object): + + '''A state machine abstraction. + + The caller may specify call backs for events coming from specific + event sources. An event source might, for example, be a socket + file descriptor, and the event might be incoming data from the + socket. The callback would then process the data, perhaps by + collecting it into a buffer and parsing out messages from it. + + A callback gets the event source and event as arguments. It returns + the new state, and a list of new events to + + A callback may return or yield new events, which will be handled + eventually. They may or may not be handled in order. + + There can only be one callback for one state, source, and event + class combination. + + States are represented by unique objects, e.g., strings containing + the names of the states. When a machine wants to stop, it sets its + state to None. + + ''' + + def __init__(self, initial_state): + self._transitions = {} + self.state = self._initial_state = initial_state + self.debug_transitions = False + + def setup(self): + '''Set up machine for execution. + + This is called when the machine is added to the main loop. + + ''' + + def _key(self, state, event_source, event_class): + return (state, event_source, event_class) + + def add_transition(self, state, source, event_class, new_state, callback): + '''Add a transition to the state machine. + + When the state machine is in the given state, and an event of + a given type comes from a given source, move the state machine + to the new state and call the callback function. + + ''' + + key = self._key(state, source, event_class) + assert key not in self._transitions, \ + 'Transition %s already registered' % str(key) + self._transitions[key] = (new_state, callback) + + def add_transitions(self, specification): + '''Add many transitions. + + The specification is a list of transitions. + Each transition is a tuple of the arguments given to + ``add_transition``. + + ''' + + for t in specification: + self.add_transition(*t) + + def handle_event(self, event_source, event): + '''Handle a given event. + + Return list of new events to handle. + + ''' + + key = self._key(self.state, event_source, event.__class__) + if key not in self._transitions: + if self.debug_transitions: # pragma: no cover + prefix = '%s: handle_event: ' % self.__class__.__name__ + logging.debug(prefix + 'not relevant for us: %s' % repr(event)) + logging.debug(prefix + 'key: %s', repr(key)) + logging.debug(prefix + 'state: %s', repr(self.state)) + return [] + + new_state, callback = self._transitions[key] + if self.debug_transitions: # pragma: no cover + logging.debug( + '%s: state change %s -> %s callback=%s' % + (self.__class__.__name__, self.state, new_state, + str(callback))) + self.state = new_state + if callback is not None: + ret = callback(event_source, event) + if ret is None: + return [] + else: + return list(ret) + else: + return [] + + def dump_dot(self, filename): # pragma: no cover + '''Write a Graphviz DOT file for the state machine.''' + + with open(filename, 'w') as f: + f.write('digraph %s {\n' % self._classname(self.__class__)) + first = True + for key in self._transitions: + state, src, event_class = key + if first: + f.write('"START" -> "%s" [label=""];\n' % + self._initial_state) + first = False + + new_state, callback = self._transitions[key] + if new_state is None: + new_state = 'END' + f.write('"%s" -> "%s" [label="%s"];\n' % + (state, new_state, self._classname(event_class))) + f.write('}\n') + + def _classname(self, klass): # pragma: no cover + s = str(klass) + m = classnamepat.match(s) + if m: + return m.group('name').split('.')[-1] + else: + return s + diff --git a/distbuild/sm_tests.py b/distbuild/sm_tests.py new file mode 100644 index 00000000..3c33e494 --- /dev/null +++ b/distbuild/sm_tests.py @@ -0,0 +1,86 @@ +# distbuild/sm_tests.py -- unit tests for state machine abstraction +# +# Copyright 2012 Codethink Limited. +# All rights reserved. + + +import unittest + +import distbuild + + +class DummyEventSource(object): + + pass + + +class DummyEvent(object): + + pass + + +class StateMachineTests(unittest.TestCase): + + def setUp(self): + self.sm = distbuild.StateMachine('init') + self.sm.distbuild = None + self.sm.setup() + self.event_source = DummyEventSource() + self.event = DummyEvent() + self.event_sources = [] + self.events = [] + self.callback_result = None + + def callback(self, event_source, event): + self.event_sources.append(event_source) + self.events.append(event) + return self.callback_result + + def test_ignores_event_when_there_are_no_transitions(self): + new_events = self.sm.handle_event(self.event_source, self.event) + self.assertEqual(new_events, []) + self.assertEqual(self.event_sources, []) + self.assertEqual(self.events, []) + + def test_ignores_event_when_no_transition_matches(self): + spec = [ + ('init', self.event_source, str, 'init', self.callback), + ] + self.sm.add_transitions(spec) + new_events = self.sm.handle_event(self.event_source, self.event) + self.assertEqual(new_events, []) + self.assertEqual(self.event_sources, []) + self.assertEqual(self.events, []) + + def test_handles_lack_of_callback_ok(self): + spec = [ + ('init', self.event_source, DummyEvent, 'init', None), + ] + self.sm.add_transitions(spec) + new_events = self.sm.handle_event(self.event_source, self.event) + self.assertEqual(new_events, []) + self.assertEqual(self.event_sources, []) + self.assertEqual(self.events, []) + + def test_calls_registered_callback_for_right_event(self): + spec = [ + ('init', self.event_source, DummyEvent, 'init', self.callback), + ] + self.sm.add_transitions(spec) + new_events = self.sm.handle_event(self.event_source, self.event) + self.assertEqual(new_events, []) + self.assertEqual(self.event_sources, [self.event_source]) + self.assertEqual(self.events, [self.event]) + + def test_handle_converts_nonlist_to_list(self): + self.callback_result = ('foo', 'bar') + + spec = [ + ('init', self.event_source, DummyEvent, 'init', self.callback), + ] + self.sm.add_transitions(spec) + new_events = self.sm.handle_event(self.event_source, self.event) + self.assertEqual(new_events, ['foo', 'bar']) + self.assertEqual(self.event_sources, [self.event_source]) + self.assertEqual(self.events, [self.event]) + diff --git a/distbuild/sockbuf.py b/distbuild/sockbuf.py new file mode 100644 index 00000000..a7fe339a --- /dev/null +++ b/distbuild/sockbuf.py @@ -0,0 +1,159 @@ +# mainloop/sockbuf.py -- a buffering, non-blocking socket I/O state machine +# +# Copyright 2012 Codethink Limited +# All rights reserved. + + +import logging + + +'''A buffering, non-blocking I/O state machine for sockets. + +The state machine is given an open socket. It reads from the socket, +and writes to it, when it can do so without blocking. A maximum size +for the read buffer can be set: the state machine will stop reading +if the buffer becomes full. This avoids the problem of an excessively +large buffer. + +The state machine generates events to indicate that the buffer contains +data or that the end of the file for reading has been reached. An event +is also generated if there is an error while doing I/O with the socket. + +* SocketError: an error has occurred +* SocketBufferNewData: socket buffer has received new data; the data + is available as the ``data`` attribute +* SocketBufferEof: socket buffer has reached EOF for reading, but + still writes anything in the write buffer (or anything that gets added + to the write buffer) +* SocketBufferClosed: socket is now closed + +The state machine starts shutting down when ``close`` method is called, +but continues to operate in write-only mode until the write buffer has +been emptied. + +''' + + +from socketsrc import (SocketError, SocketReadable, SocketWriteable, + SocketEventSource) +from sm import StateMachine +from stringbuffer import StringBuffer + + +class SocketBufferNewData(object): + + '''Socket buffer has received new data.''' + + def __init__(self, data): + self.data = data + + +class SocketBufferEof(object): + + '''Socket buffer has reached end of file when reading. + + Note that the socket buffer may still be available for writing. + However, no more new data will be read. + + ''' + + +class SocketBufferClosed(object): + + '''Socket buffer has closed its socket.''' + + +class _Close(object): pass +class _WriteBufferIsEmpty(object): pass +class _WriteBufferNotEmpty(object): pass + + + +class SocketBuffer(StateMachine): + + def __init__(self, sock, max_buffer): + StateMachine.__init__(self, 'reading') + + self._sock = sock + self._max_buffer = max_buffer + + def setup(self): + src = self._src = SocketEventSource(self._sock) + src.stop_writing() # We'll start writing when we need to. + self.mainloop.add_event_source(src) + + self._wbuf = StringBuffer() + + spec = [ + ('reading', src, SocketReadable, 'reading', self._fill), + ('reading', self, _WriteBufferNotEmpty, 'rw', + self._start_writing), + ('reading', self, SocketBufferEof, 'idle', None), + ('reading', self, _Close, None, self._really_close), + + ('rw', src, SocketReadable, 'rw', self._fill), + ('rw', src, SocketWriteable, 'rw', self._flush), + ('rw', self, _WriteBufferIsEmpty, 'reading', self._stop_writing), + ('rw', self, SocketBufferEof, 'w', None), + ('rw', self, _Close, 'wc', None), + + ('idle', self, _WriteBufferNotEmpty, 'w', self._start_writing), + ('idle', self, _Close, None, self._really_close), + + ('w', src, SocketWriteable, 'w', self._flush), + ('w', self, _WriteBufferIsEmpty, 'idle', self._stop_writing), + + ('wc', src, SocketWriteable, 'wc', self._flush), + ('wc', self, _WriteBufferIsEmpty, None, self._really_close), + ] + self.add_transitions(spec) + + def write(self, data): + '''Put data into write queue.''' + + was_empty = len(self._wbuf) == 0 + self._wbuf.add(data) + if was_empty and len(self._wbuf) > 0: + self._start_writing(None, None) + self.mainloop.queue_event(self, _WriteBufferNotEmpty()) + + def close(self): + '''Tell state machine to terminate.''' + self.mainloop.queue_event(self, _Close()) + + def _report_error(self, event_source, event): + logging.error(str(event)) + + def _fill(self, event_source, event): + try: + data = event.sock.read(self._max_buffer) + except (IOError, OSError), e: + return [SocketError(event.sock, e)] + + if data: + self.mainloop.queue_event(self, SocketBufferNewData(data)) + else: + event_source.stop_reading() + self.mainloop.queue_event(self, SocketBufferEof()) + + def _really_close(self, event_source, event): + self._src.close() + self.mainloop.queue_event(self, SocketBufferClosed()) + + def _flush(self, event_source, event): + max_write = 1024**2 + data = self._wbuf.read(max_write) + try: + n = event.sock.write(data) + except (IOError, OSError), e: + return [SocketError(event.sock, e)] + self._wbuf.remove(n) + if len(self._wbuf) == 0: + self.mainloop.queue_event(self, _WriteBufferIsEmpty()) + + def _start_writing(self, event_source, event): + self._src.start_writing() + + def _stop_writing(self, event_source, event): + self._src.stop_writing() + diff --git a/distbuild/socketsrc.py b/distbuild/socketsrc.py new file mode 100644 index 00000000..78486f0e --- /dev/null +++ b/distbuild/socketsrc.py @@ -0,0 +1,166 @@ +# mainloop/socketsrc.py -- events and event sources for sockets +# +# Copyright 2012 Codethink Limited. +# All rights reserved. + + +import fcntl +import logging +import os +import socket + +from eventsrc import EventSource + + +def set_nonblocking(handle): + '''Make a socket, file descriptor, or other such thing be non-blocking.''' + + if type(handle) is int: + fd = handle + else: + fd = handle.fileno() + + flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0) + flags = flags | os.O_NONBLOCK + fcntl.fcntl(fd, fcntl.F_SETFL, flags) + + +class SocketError(object): + + '''An error has occured with a socket.''' + + def __init__(self, sock, exception): + self.sock = sock + self.exception = exception + + +class NewConnection(object): + + '''A new client connection.''' + + def __init__(self, connection, addr): + self.connection = connection + self.addr = addr + + +class ListeningSocketEventSource(EventSource): + + '''An event source for a socket that listens for connections.''' + + def __init__(self, addr, port): + self.sock = socket.socket() + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.sock.bind((addr, port)) + self.sock.listen(5) + self._accepting = True + logging.info('Listening at %s' % repr(self.sock.getsockname())) + + def get_select_params(self): + r = [self.sock.fileno()] if self._accepting else [] + return r, [], [], None + + def get_events(self, r, w, x): + if self._accepting and self.sock.fileno() in r: + try: + conn, addr = self.sock.accept() + except socket.error, e: + return [SocketError(self.sock, e)] + else: + logging.info( + 'New connection to %s from %s' % + (conn.getsockname(), addr)) + return [NewConnection(conn, addr)] + + return [] + + def start_accepting(self): + self._accepting = True + + def stop_accepting(self): + self._accepting = False + + +class SocketReadable(object): + + '''A socket is readable.''' + + def __init__(self, sock): + self.sock = sock + + +class SocketWriteable(object): + + '''A socket is writeable.''' + + def __init__(self, sock): + self.sock = sock + + +class SocketEventSource(EventSource): + + '''Event source for normal sockets (for I/O). + + This generates events for indicating the socket is readable or + writeable. It does not actually do any I/O itself, that's for the + handler of the events. There are, however, methods for doing the + reading/writing, and for closing the socket. + + The event source can be told to stop checking for readability + or writeability, so that the user may, for example, stop those + events from being triggered while a buffer is full. + + ''' + + def __init__(self, sock): + self.sock = sock + self._reading = True + self._writing = True + + set_nonblocking(sock) + + def get_select_params(self): + r = [self.sock.fileno()] if self._reading else [] + w = [self.sock.fileno()] if self._writing else [] + return r, w, [], None + + def get_events(self, r, w, x): + events = [] + fd = self.sock.fileno() + + if self._reading and fd in r: + events.append(SocketReadable(self)) + + if self._writing and fd in w: + events.append(SocketWriteable(self)) + + return events + + def start_reading(self): + self._reading = True + + def stop_reading(self): + self._reading = False + + def start_writing(self): + self._writing = True + + def stop_writing(self): + self._writing = False + + def read(self, max_bytes): + fd = self.sock.fileno() + return os.read(fd, max_bytes) + + def write(self, data): + fd = self.sock.fileno() + return os.write(fd, data) + + def close(self): + self.stop_reading() + self.stop_writing() + self.sock.close() + self.sock = None + + def is_finished(self): + return self.sock is None + diff --git a/distbuild/sockserv.py b/distbuild/sockserv.py new file mode 100644 index 00000000..6d8f216e --- /dev/null +++ b/distbuild/sockserv.py @@ -0,0 +1,45 @@ +# mainloop/sockserv.py -- socket server state machines +# +# Copyright 2012 Codethink Limited +# All rights reserved. + + +import logging + +from sm import StateMachine +from socketsrc import NewConnection, SocketError, ListeningSocketEventSource + + +class ListenServer(StateMachine): + + '''Listen for new connections on a port, send events for them.''' + + def __init__(self, addr, port, machine, extra_args=None): + StateMachine.__init__(self, 'listening') + self._addr = addr + self._port = port + self._machine = machine + self._extra_args = extra_args or [] + + def setup(self): + src = ListeningSocketEventSource(self._addr, self._port) + self.mainloop.add_event_source(src) + + spec = [ + ('listening', src, NewConnection, 'listening', self.new_conn), + ('listening', src, SocketError, None, self.report_error), + ] + self.add_transitions(spec) + + def new_conn(self, event_source, event): + logging.debug( + 'ListenServer: Creating new %s using %s and %s' % + (repr(self._machine), + repr(event.connection), + repr(self._extra_args))) + m = self._machine(event.connection, *self._extra_args) + self.mainloop.add_state_machine(m) + + def report_error(self, event_source, event): + logging.error(str(event)) + diff --git a/distbuild/stringbuffer.py b/distbuild/stringbuffer.py new file mode 100644 index 00000000..3349bb87 --- /dev/null +++ b/distbuild/stringbuffer.py @@ -0,0 +1,90 @@ +# mainloop/stringbuffer.py -- efficient buffering of strings as a queue +# +# Copyright 2012 Codethink Limited +# All rights reserved. + + +class StringBuffer(object): + + '''Buffer data for a file descriptor. + + The data may arrive in small pieces, and it is buffered in a way that + avoids excessive string catenation or splitting. + + ''' + + def __init__(self): + self.strings = [] + self.len = 0 + + def add(self, data): + '''Add data to buffer.''' + self.strings.append(data) + self.len += len(data) + + def remove(self, num_bytes): + '''Remove specified number of bytes from buffer.''' + while num_bytes > 0 and self.strings: + first = self.strings[0] + if len(first) <= num_bytes: + num_bytes -= len(first) + del self.strings[0] + self.len -= len(first) + else: + self.strings[0] = first[num_bytes:] + self.len -= num_bytes + num_bytes = 0 + + def peek(self): + '''Return contents of buffer as one string.''' + + if len(self.strings) == 0: + return '' + elif len(self.strings) == 1: + return self.strings[0] + else: + self.strings = [''.join(self.strings)] + return self.strings[0] + + def read(self, max_bytes): + '''Return up to max_bytes from the buffer. + + Less is returned if the buffer does not contain at least max_bytes. + The returned data will remain in the buffer; use remove to remove + it. + + ''' + + use = [] + size = 0 + for s in self.strings: + n = max_bytes - size + if len(s) <= n: + use.append(s) + size += len(s) + else: + use.append(s[:n]) + size += n + break + return ''.join(use) + + def readline(self): + '''Return a complete line (ends with '\n') or None.''' + + for i, s in enumerate(self.strings): + newline = s.find('\n') + if newline != -1: + if newline+1 == len(s): + use = self.strings[:i+1] + del self.strings[:i+1] + else: + pre = s[:newline+1] + use = self.strings[:i] + [pre] + del self.strings[:i] + self.strings[0] = s[newline+1:] + return ''.join(use) + return None + + def __len__(self): + return self.len + diff --git a/distbuild/stringbuffer_tests.py b/distbuild/stringbuffer_tests.py new file mode 100644 index 00000000..29530560 --- /dev/null +++ b/distbuild/stringbuffer_tests.py @@ -0,0 +1,140 @@ +# distbuild/stringbuffer_tests.py -- unit tests +# +# Copyright 2012 Codethink Limited +# All rights reserved. + + +import unittest + +import distbuild + + +class StringBufferTests(unittest.TestCase): + + def setUp(self): + self.buf = distbuild.StringBuffer() + + def test_is_empty_initially(self): + self.assertEqual(self.buf.peek(), '') + self.assertEqual(len(self.buf), 0) + + def test_adds_a_string(self): + s = 'foo' + self.buf.add(s) + self.assertEqual(self.buf.peek(), s) + self.assertEqual(len(self.buf), len(s)) + + def test_adds_a_second_string(self): + s = 'foo' + t = 'bar' + self.buf.add(s) + self.buf.add(t) + self.assertEqual(self.buf.peek(), s + t) + self.assertEqual(len(self.buf), len(s + t)) + + +class StringBufferRemoveTests(unittest.TestCase): + + def setUp(self): + self.buf = distbuild.StringBuffer() + self.first = 'foo' + self.second = 'bar' + self.all = self.first + self.second + self.buf.add(self.first) + self.buf.add(self.second) + + def test_removes_part_of_first_string(self): + self.assertTrue(len(self.first) > 1) + self.buf.remove(1) + self.assertEqual(self.buf.peek(), self.all[1:]) + self.assertEqual(len(self.buf), len(self.all) - 1) + + def test_removes_all_of_first_string(self): + self.buf.remove(len(self.first)) + self.assertEqual(self.buf.peek(), self.second) + self.assertEqual(len(self.buf), len(self.second)) + + def test_removes_more_than_first_string(self): + self.assertTrue(len(self.first) > 1) + self.assertTrue(len(self.second) > 1) + self.buf.remove(len(self.first) + 1) + self.assertEqual(self.buf.peek(), self.second[1:]) + self.assertEqual(len(self.buf), len(self.second) - 1) + + def test_removes_all_strings(self): + self.buf.remove(len(self.all)) + self.assertEqual(self.buf.peek(), '') + self.assertEqual(len(self.buf), 0) + + def test_removes_more_than_all_strings(self): + self.buf.remove(len(self.all) + 1) + self.assertEqual(self.buf.peek(), '') + self.assertEqual(len(self.buf), 0) + + +class StringBufferReadTests(unittest.TestCase): + + def setUp(self): + self.buf = distbuild.StringBuffer() + + def test_returns_empty_string_for_empty_buffer(self): + self.assertEqual(self.buf.read(100), '') + self.assertEqual(self.buf.peek(), '') + + def test_returns_partial_string_for_short_buffer(self): + self.buf.add('foo') + self.assertEqual(self.buf.read(100), 'foo') + self.assertEqual(self.buf.peek(), 'foo') + + def test_returns_catenated_strings(self): + self.buf.add('foo') + self.buf.add('bar') + self.assertEqual(self.buf.read(100), 'foobar') + self.assertEqual(self.buf.peek(), 'foobar') + + def test_returns_requested_amount_when_available(self): + self.buf.add('foo') + self.buf.add('bar') + self.assertEqual(self.buf.read(4), 'foob') + self.assertEqual(self.buf.peek(), 'foobar') + + +class StringBufferReadlineTests(unittest.TestCase): + + def setUp(self): + self.buf = distbuild.StringBuffer() + + def test_returns_None_on_empty_buffer(self): + self.assertEqual(self.buf.readline(), None) + + def test_returns_None_on_incomplete_line_in_buffer(self): + self.buf.add('foo') + self.assertEqual(self.buf.readline(), None) + + def test_extracts_complete_line(self): + self.buf.add('foo\n') + self.assertEqual(self.buf.readline(), 'foo\n') + self.assertEqual(self.buf.peek(), '') + + def test_extracts_only_the_initial_line_and_leaves_rest_of_buffer(self): + self.buf.add('foo\nbar\n') + self.assertEqual(self.buf.readline(), 'foo\n') + self.assertEqual(self.buf.peek(), 'bar\n') + + def test_extracts_only_the_initial_line_and_leaves_partial_line(self): + self.buf.add('foo\nbar') + self.assertEqual(self.buf.readline(), 'foo\n') + self.assertEqual(self.buf.peek(), 'bar') + + def test_extracts_only_the_initial_line_from_multiple_pieces(self): + self.buf.add('foo\n') + self.buf.add('bar\n') + self.assertEqual(self.buf.readline(), 'foo\n') + self.assertEqual(self.buf.peek(), 'bar\n') + + def test_extracts_only_the_initial_line_from_multiple_pieces_incomp(self): + self.buf.add('foo\n') + self.buf.add('bar') + self.assertEqual(self.buf.readline(), 'foo\n') + self.assertEqual(self.buf.peek(), 'bar') + diff --git a/distbuild/timer_event_source.py b/distbuild/timer_event_source.py new file mode 100644 index 00000000..f6141a73 --- /dev/null +++ b/distbuild/timer_event_source.py @@ -0,0 +1,59 @@ +# distbuild/timer_event_source.py -- event source for timer events +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import time + + +class Timer(object): + + pass + + +class TimerEventSource(object): + + def __init__(self, interval): + self.interval = interval + self.last_event = time.time() + self.enabled = False + + def start(self): + self.enabled = True + self.last_event = time.time() + + def stop(self): + self.enabled = False + + def get_select_params(self): + if self.enabled: + next_event = self.last_event + self.interval + timeout = next_event - time.time() + return [], [], [], max(0, timeout) + else: + return [], [], [], None + + def get_events(self, r, w, x): + if self.enabled: + now = time.time() + if now >= self.last_event + self.interval: + self.last_event = now + return [Timer()] + return [] + + def is_finished(self): + return False + diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py new file mode 100644 index 00000000..d0f158b6 --- /dev/null +++ b/distbuild/worker_build_scheduler.py @@ -0,0 +1,392 @@ +# distbuild/worker_build_scheduler.py -- schedule worker-builds on workers +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import collections +import errno +import httplib +import logging +import socket +import urllib +import urlparse + +import distbuild + + +class WorkerBuildRequest(object): + + def __init__(self, artifact, initiator_id): + self.artifact = artifact + self.initiator_id = initiator_id + + +class WorkerCancelPending(object): + + def __init__(self, initiator_id): + self.initiator_id = initiator_id + + +class WorkerBuildStepStarted(object): + + def __init__(self, initiator_id, cache_key, worker_name): + self.initiator_id = initiator_id + self.artifact_cache_key = cache_key + self.worker_name = worker_name + + +class WorkerBuildOutput(object): + + def __init__(self, msg, cache_key): + self.msg = msg + self.artifact_cache_key = cache_key + + +class WorkerBuildCaching(object): + + def __init__(self, initiator_id, cache_key): + self.initiator_id = initiator_id + self.artifact_cache_key = cache_key + + +class WorkerBuildFinished(object): + + def __init__(self, msg, cache_key): + self.msg = msg + self.artifact_cache_key = cache_key + + +class WorkerBuildFailed(object): + + def __init__(self, msg, cache_key): + self.msg = msg + self.artifact_cache_key = cache_key + + +class _NeedJob(object): + + def __init__(self, who): + self.who = who + + +class _HaveAJob(object): + + def __init__(self, artifact, initiator_id): + self.artifact = artifact + self.initiator_id = initiator_id + + +class _JobIsFinished(object): + + def __init__(self, msg): + self.msg = msg + + +class _JobFailed(object): + + pass + + +class _Cached(object): + + pass + + +class WorkerBuildQueuer(distbuild.StateMachine): + + '''Maintain queue of outstanding worker-build requests. + + This state machine captures WorkerBuildRequest events, and puts them + into a queue. It also catches _NeedJob events, from a + WorkerConnection, and responds to them with _HaveAJob events, + when it has an outstanding request. + + ''' + + def __init__(self): + distbuild.StateMachine.__init__(self, 'idle') + + def setup(self): + distbuild.crash_point() + + logging.debug('WBQ: Setting up %s' % self) + self._request_queue = [] + self._available_workers = [] + + spec = [ + ('idle', WorkerBuildQueuer, WorkerBuildRequest, 'idle', + self._handle_request), + ('idle', WorkerBuildQueuer, WorkerCancelPending, 'idle', + self._handle_cancel), + ('idle', WorkerConnection, _NeedJob, 'idle', self._handle_worker), + ] + self.add_transitions(spec) + + def _handle_request(self, event_source, event): + distbuild.crash_point() + + logging.debug('WBQ: Adding request to queue: %s' % event.artifact.name) + self._request_queue.append(event) + logging.debug( + 'WBQ: %d available workers and %d requests queued' % + (len(self._available_workers), + len(self._request_queue))) + if self._available_workers: + self._give_job() + + def _handle_cancel(self, event_source, worker_cancel_pending): + for request in [r for r in self._request_queue if + r.initiator_id == worker_cancel_pending.initiator_id]: + logging.debug('WBQ: Removing request from queue: %s', + request.artifact.name) + self._request_queue.remove(request) + + def _handle_worker(self, event_source, event): + distbuild.crash_point() + + logging.debug('WBQ: Adding worker to queue: %s' % event.who) + self._available_workers.append(event) + logging.debug( + 'WBQ: %d available workers and %d requests queued' % + (len(self._available_workers), + len(self._request_queue))) + if self._request_queue: + self._give_job() + + def _give_job(self): + request = self._request_queue.pop(0) + worker = self._available_workers.pop(0) + logging.debug( + 'WBQ: Giving %s to %s' % + (request.artifact.name, worker.who.name())) + self.mainloop.queue_event(worker.who, _HaveAJob(request.artifact, + request.initiator_id)) + + +class WorkerConnection(distbuild.StateMachine): + + '''Communicate with a single worker.''' + + _request_ids = distbuild.IdentifierGenerator('WorkerConnection') + _route_map = distbuild.RouteMap() + _initiator_request_map = collections.defaultdict(set) + + def __init__(self, cm, conn, writeable_cache_server, + worker_cache_server_port, morph_instance): + distbuild.StateMachine.__init__(self, 'idle') + self._cm = cm + self._conn = conn + self._writeable_cache_server = writeable_cache_server + self._worker_cache_server_port = worker_cache_server_port + self._morph_instance = morph_instance + self._helper_id = None + + def name(self): + addr, port = self._conn.getpeername() + name = socket.getfqdn(addr) + return '%s:%s' % (name, port) + + def setup(self): + distbuild.crash_point() + + logging.debug('WC: Setting up instance %s' % repr(self)) + + self._jm = distbuild.JsonMachine(self._conn) + self.mainloop.add_state_machine(self._jm) + + spec = [ + ('idle', self._jm, distbuild.JsonEof, None, self._reconnect), + ('idle', self, _HaveAJob, 'building', self._start_build), + + ('building', distbuild.BuildController, + distbuild.BuildCancel, 'building', + self._maybe_cancel), + ('building', self._jm, distbuild.JsonEof, None, self._reconnect), + ('building', self._jm, distbuild.JsonNewMessage, 'building', + self._handle_json_message), + ('building', self, _JobFailed, 'idle', self._request_job), + ('building', self, _JobIsFinished, 'caching', + self._request_caching), + + ('caching', distbuild.HelperRouter, distbuild.HelperResult, + 'caching', self._handle_helper_result), + ('caching', self, _Cached, 'idle', self._request_job), + ('caching', self, _JobFailed, 'idle', self._request_job), + ] + self.add_transitions(spec) + + self._request_job(None, None) + + def _maybe_cancel(self, event_source, build_cancel): + logging.debug('WC: BuildController requested a cancel') + if build_cancel.id == self._initiator_id: + distbuild.crash_point() + + for id in self._initiator_request_map[self._initiator_id]: + logging.debug('WC: Cancelling exec %s' % id) + msg = distbuild.message('exec-cancel', id=id) + self._jm.send(msg) + + def _reconnect(self, event_source, event): + distbuild.crash_point() + + logging.debug('WC: Triggering reconnect') + self.mainloop.queue_event(self._cm, distbuild.Reconnect()) + + def _start_build(self, event_source, event): + distbuild.crash_point() + + self._artifact = event.artifact + self._initiator_id = event.initiator_id + logging.debug('WC: starting build: %s for %s' % + (self._artifact.name, self._initiator_id)) + + argv = [ + self._morph_instance, + 'worker-build', + self._artifact.name, + ] + msg = distbuild.message('exec-request', + id=self._request_ids.next(), + argv=argv, + stdin_contents=distbuild.serialise_artifact(self._artifact), + ) + self._jm.send(msg) + logging.debug('WC: sent to worker: %s' % repr(msg)) + self._route_map.add(self._initiator_id, msg['id']) + self._initiator_request_map[self._initiator_id].add(msg['id']) + logging.debug( + 'WC: route map from %s to %s', + self._artifact.cache_key, msg['id']) + + started = WorkerBuildStepStarted( + self._initiator_id, self._artifact.cache_key, self.name()) + self.mainloop.queue_event(WorkerConnection, started) + + def _handle_json_message(self, event_source, event): + '''Handle JSON messages from the worker.''' + + distbuild.crash_point() + + logging.debug('WC: from worker: %s' % repr(event.msg)) + + handlers = { + 'exec-output': self._handle_exec_output, + 'exec-response': self._handle_exec_response, + } + + handler = handlers[event.msg['type']] + handler(event.msg) + + def _handle_exec_output(self, msg): + new = dict(msg) + new['id'] = self._route_map.get_incoming_id(msg['id']) + logging.debug('WC: emitting: %s', repr(new)) + self.mainloop.queue_event( + WorkerConnection, + WorkerBuildOutput(new, self._artifact.cache_key)) + + def _handle_exec_response(self, msg): + logging.debug('WC: finished building: %s' % self._artifact.name) + + new = dict(msg) + new['id'] = self._route_map.get_incoming_id(msg['id']) + self._route_map.remove(msg['id']) + self._initiator_request_map[self._initiator_id].remove(msg['id']) + + if new['exit'] != 0: + # Build failed. + new_event = WorkerBuildFailed(new, self._artifact.cache_key) + self.mainloop.queue_event(WorkerConnection, new_event) + self.mainloop.queue_event(self, _JobFailed()) + self._artifact = None + self._initiator_id = None + else: + # Build succeeded. We have more work to do: caching the result. + self.mainloop.queue_event(self, _JobIsFinished(new)) + + def _request_job(self, event_source, event): + distbuild.crash_point() + self.mainloop.queue_event(WorkerConnection, _NeedJob(self)) + + def _request_caching(self, event_source, event): + distbuild.crash_point() + + logging.debug('Requesting shared artifact cache to get artifacts') + + filename = ('%s.%s' % + (self._artifact.source.morphology['kind'], + self._artifact.name)) + suffixes = [filename] + kind = self._artifact.source.morphology['kind'] + if kind == 'stratum': + suffixes.append(filename + '.meta') + elif kind == 'system': + # FIXME: This is a really ugly hack. + if filename.endswith('-rootfs'): + suffixes.append(filename[:-len('-rootfs')] + '-kernel') + + suffixes = [urllib.quote(x) for x in suffixes] + suffixes = ','.join(suffixes) + + worker_host = self._conn.getpeername()[0] + + url = urlparse.urljoin( + self._writeable_cache_server, + '/1.0/fetch?host=%s:%d&cacheid=%s&artifacts=%s' % + (urllib.quote(worker_host), + self._worker_cache_server_port, + urllib.quote(self._artifact.cache_key), + suffixes)) + + msg = distbuild.message( + 'http-request', id=self._request_ids.next(), url=url, method='GET') + self._helper_id = msg['id'] + req = distbuild.HelperRequest(msg) + self.mainloop.queue_event(distbuild.HelperRouter, req) + + progress = WorkerBuildCaching( + self._initiator_id, self._artifact.cache_key) + self.mainloop.queue_event(WorkerConnection, progress) + + self._initiator_id = None + self._finished_msg = event.msg + + def _handle_helper_result(self, event_source, event): + if event.msg['id'] == self._helper_id: + distbuild.crash_point() + + logging.debug('caching: event.msg: %s' % repr(event.msg)) + if event.msg['status'] == httplib.OK: + logging.debug('Shared artifact cache population done') + new_event = WorkerBuildFinished( + self._finished_msg, self._artifact.cache_key) + self.mainloop.queue_event(WorkerConnection, new_event) + self._finished_msg = None + self._helper_id = None + self.mainloop.queue_event(self, _Cached()) + else: + logging.error( + 'Failed to populate artifact cache: %s %s' % + (event.msg['status'], event.msg['body'])) + new_event = WorkerBuildFailed( + self._finished_msg, self._artifact.cache_key) + self.mainloop.queue_event(WorkerConnection, new_event) + self._finished_msg = None + self._helper_id = None + self.mainloop.queue_event(self, _JobFailed()) + + self._artifact = None -- cgit v1.2.1 From 494fae7812ec6fb9c99c4ff8f5f03bce00d29036 Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Mon, 24 Feb 2014 18:21:57 +0000 Subject: Add the distbuild plugin --- morphlib/plugins/distbuild_plugin.py | 343 +++++++++++++++++++++++++++++++++++ 1 file changed, 343 insertions(+) create mode 100644 morphlib/plugins/distbuild_plugin.py diff --git a/morphlib/plugins/distbuild_plugin.py b/morphlib/plugins/distbuild_plugin.py new file mode 100644 index 00000000..364e3eb5 --- /dev/null +++ b/morphlib/plugins/distbuild_plugin.py @@ -0,0 +1,343 @@ +# distbuild_plugin.py -- Morph distributed build plugin +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import cliapp +import logging +import sys + +import morphlib +import distbuild + + +group_distbuild = 'Distributed Build Options' + +class DistbuildOptionsPlugin(cliapp.Plugin): + + def enable(self): + self.app.settings.string_list( + ['crash-condition'], + 'add FILENAME:FUNCNAME:MAXCALLS to list of crash conditions ' + '(this is for testing only)', + metavar='FILENAME:FUNCNAME:MAXCALLS', + group=group_distbuild) + + def disable(self): + pass + + +class SerialiseArtifactPlugin(cliapp.Plugin): + + def enable(self): + self.app.add_subcommand('serialise-artifact', self.serialise_artifact, + arg_synopsis='REPO REF MORPHOLOGY') + + def disable(self): + pass + + def serialise_artifact(self, args): + '''Internal use only: Serialise Artifact build graph as JSON.''' + + distbuild.add_crash_conditions(self.app.settings['crash-condition']) + + if len(args) != 3: + raise cliapp.AppException('Must get triplet') + + repo_name, ref, morph_name = args + filename = '%s.morph' % morph_name + build_command = morphlib.buildcommand.BuildCommand(self.app) + srcpool = build_command.create_source_pool(repo_name, ref, filename) + artifact = build_command.resolve_artifacts(srcpool) + self.app.output.write(distbuild.serialise_artifact(artifact)) + self.app.output.write('\n') + + +class WorkerBuild(cliapp.Plugin): + + def enable(self): + self.app.add_subcommand( + 'worker-build', self.worker_build, arg_synopsis='') + + def disable(self): + pass + + def worker_build(self, args): + '''Internal use only: Build an artifact in a worker. + + All build dependencies are assumed to have been built already + and available in the local or remote artifact cache. + + ''' + + distbuild.add_crash_conditions(self.app.settings['crash-condition']) + + serialized = sys.stdin.readline() + artifact = distbuild.deserialise_artifact(serialized) + + bc = morphlib.buildcommand.BuildCommand(self.app) + + # We always, unconditionally clear the local artifact cache + # to avoid it growing boundlessly on a worker. Especially system + # artifacts are big (up to gigabytes), and having a new one for + # every build eats up a lot of disk space. + bc.lac.clear() + + arch = artifact.arch + bc.build_artifact(artifact, bc.new_build_env(arch)) + + +class WorkerDaemon(cliapp.Plugin): + + def enable(self): + self.app.settings.string( + ['worker-daemon-address'], + 'listen for connections on ADDRESS (domain / IP address)', + default='', + group=group_distbuild) + self.app.settings.integer( + ['worker-daemon-port'], + 'listen for connections on PORT', + default=3434, + group=group_distbuild) + self.app.add_subcommand( + 'worker-daemon', + self.worker_daemon, + arg_synopsis='') + + def disable(self): + pass + + def worker_daemon(self, args): + '''Daemon that controls builds on a single worker node.''' + + distbuild.add_crash_conditions(self.app.settings['crash-condition']) + + address = self.app.settings['worker-daemon-address'] + port = self.app.settings['worker-daemon-port'] + router = distbuild.ListenServer(address, port, distbuild.JsonRouter) + loop = distbuild.MainLoop() + loop.add_state_machine(router) + loop.run() + + +class ControllerDaemon(cliapp.Plugin): + + def enable(self): + self.app.settings.string( + ['controller-initiator-address'], + 'listen for initiator connections on ADDRESS ' + '(domain / IP address)', + default='', + group=group_distbuild) + self.app.settings.integer( + ['controller-initiator-port'], + 'listen for initiator connections on PORT', + default=7878, + group=group_distbuild) + + self.app.settings.string( + ['controller-helper-address'], + 'listen for helper connections on ADDRESS (domain / IP address)', + default='localhost', + group=group_distbuild) + self.app.settings.integer( + ['controller-helper-port'], + 'listen for helper connections on PORT', + default=5656, + group=group_distbuild) + + self.app.settings.string_list( + ['worker'], + 'specify a build worker (WORKER is ADDRESS or ADDRESS:PORT, ' + 'with PORT defaulting to 3434)', + metavar='WORKER', + default=[], + group=group_distbuild) + self.app.settings.integer( + ['worker-cache-server-port'], + 'port number for the artifact cache server on each worker', + metavar='PORT', + default=8080, + group=group_distbuild) + self.app.settings.string( + ['writeable-cache-server'], + 'specify the shared cache server writeable instance ' + '(SERVER is ADDRESS or ADDRESS:PORT, with PORT defaulting ' + 'to 80', + metavar='SERVER', + group=group_distbuild) + + self.app.settings.string( + ['morph-instance'], + 'use FILENAME to invoke morph (default: %default)', + metavar='FILENAME', + default='morph', + group=group_distbuild) + + self.app.add_subcommand( + 'controller-daemon', self.controller_daemon, arg_synopsis='') + + def disable(self): + pass + + def controller_daemon(self, args): + '''Daemon that gives jobs to worker daemons.''' + + distbuild.add_crash_conditions(self.app.settings['crash-condition']) + + artifact_cache_server = ( + self.app.settings['artifact-cache-server'] or + self.app.settings['cache-server']) + writeable_cache_server = self.app.settings['writeable-cache-server'] + worker_cache_server_port = \ + self.app.settings['worker-cache-server-port'] + morph_instance = self.app.settings['morph-instance'] + + listener_specs = [ + ('controller-helper-address', 'controller-helper-port', + distbuild.HelperRouter, []), + ('controller-initiator-address', 'controller-initiator-port', + distbuild.InitiatorConnection, + [artifact_cache_server, morph_instance]), + ] + + loop = distbuild.MainLoop() + + queuer = distbuild.WorkerBuildQueuer() + loop.add_state_machine(queuer) + + for addr, port, sm, extra_args in listener_specs: + addr = self.app.settings[addr] + port = self.app.settings[port] + listener = distbuild.ListenServer( + addr, port, sm, extra_args=extra_args) + loop.add_state_machine(listener) + + for worker in self.app.settings['worker']: + if ':' in worker: + addr, port = worker.split(':', 1) + port = int(port) + else: + addr = worker + port = 3434 + cm = distbuild.ConnectionMachine( + addr, port, distbuild.WorkerConnection, + [writeable_cache_server, worker_cache_server_port, + morph_instance]) + loop.add_state_machine(cm) + + loop.run() + + +class InitiatorBuildCommand(morphlib.buildcommand.BuildCommand): + + def __init__(self, app, addr, port): + self.app = app + self.addr = addr + self.port = port + self.app.settings['push-build-branches'] = True + + def build(self, args): + '''Initiate a distributed build on a controller''' + + distbuild.add_crash_conditions(self.app.settings['crash-condition']) + + if len(args) != 3: + raise cliapp.AppException( + 'Need repo, ref, morphology triplet to build') + + self.app.status(msg='Starting distributed build') + loop = distbuild.MainLoop() + cm = distbuild.ConnectionMachine( + self.addr, self.port, distbuild.Initiator, [self.app] + args) + loop.add_state_machine(cm) + loop.run() + + +class Initiator(cliapp.Plugin): + + def enable(self): + self.app.settings.boolean( + ['disable-distbuild'], 'disable distributed building', + group=group_distbuild) + self.app.hookmgr.add_callback( + 'new-build-command', self.create_build_command) + + def disable(self): + pass + + def create_build_command(self, old_build_command): + addr = self.app.settings['controller-initiator-address'] + port = self.app.settings['controller-initiator-port'] + + if addr != '' and not self.app.settings['disable-distbuild']: + return InitiatorBuildCommand(self.app, addr, port) + else: + return old_build_command + + +class GraphStateMachines(cliapp.Plugin): + + def enable(self): + self.app.add_subcommand( + 'graph-state-machines', + self.graph_state_machines, + arg_synopsis='') + + def disable(self): + pass + + def graph_state_machines(self, args): + cm = distbuild.ConnectionMachine(None, None, None, None) + cm._start_connect = lambda *args: None + self.graph_one(cm) + + self.graph_one(distbuild.BuildController(None, None, None)) + self.graph_one(distbuild.HelperRouter(None)) + self.graph_one(distbuild.InitiatorConnection(None, None, None)) + self.graph_one(distbuild.JsonMachine(None)) + self.graph_one(distbuild.WorkerBuildQueuer()) + + # FIXME: These need more mocking to work. + # self.graph_one(distbuild.Initiator(None, None, + # self, None, None, None)) + # self.graph_one(distbuild.JsonRouter(None)) + # self.graph_one(distbuild.SocketBuffer(None, None)) + # self.graph_one(distbuild.ListenServer(None, None, None)) + + def graph_one(self, sm): + class_name = sm.__class__.__name__.split('.')[-1] + filename = '%s.gv' % class_name + sm.mainloop = self + sm.setup() + sm.dump_dot(filename) + + # Some methods to mock this class as other classes, which the + # state machine class need to access, just enough to allow the + # transitions to be set up for graphing. + + def queue_event(self, *args, **kwargs): + pass + + def add_event_source(self, *args, **kwargs): + pass + + def add_state_machine(self, sm): + pass + + def status(self, *args, **kwargs): + pass -- cgit v1.2.1 From 799646770f936d6a1f0e11cea4997f6d39c48c2f Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Fri, 21 Mar 2014 17:42:51 +0000 Subject: Don't test distbuild plugin --- without-test-modules | 1 + 1 file changed, 1 insertion(+) diff --git a/without-test-modules b/without-test-modules index 1f5bc872..774e8ac1 100644 --- a/without-test-modules +++ b/without-test-modules @@ -29,5 +29,6 @@ morphlib/plugins/trovectl_plugin.py morphlib/plugins/gc_plugin.py morphlib/plugins/branch_and_merge_new_plugin.py morphlib/plugins/print_architecture_plugin.py +morphlib/plugins/distbuild_plugin.py # Not unit tested, since it needs a full system branch morphlib/buildbranch.py -- cgit v1.2.1 From ea5a6365888118075216b164a1498b3c4ebe88f5 Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Mon, 17 Mar 2014 13:43:20 +0000 Subject: Add distbuild-helper --- distbuild-helper | 322 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 322 insertions(+) create mode 100755 distbuild-helper diff --git a/distbuild-helper b/distbuild-helper new file mode 100755 index 00000000..9d70642e --- /dev/null +++ b/distbuild-helper @@ -0,0 +1,322 @@ +#!/usr/bin/python +# +# distbuild-helper -- helper process for Morph distributed building +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + +import cliapp +import errno +import fcntl +import httplib +import logging +import os +import signal +import socket +import subprocess +import sys +import time +import urlparse + +import distbuild + + +class FileReadable(object): + + def __init__(self, request_id, p, f): + self.request_id = request_id + self.process = p + self.file = f + + +class FileWriteable(object): + + def __init__(self, request_id, p, f): + self.request_id = request_id + self.process = p + self.file = f + + +class SubprocessEventSource(distbuild.EventSource): + + def __init__(self): + self.procs = [] + self.closed = False + + def get_select_params(self): + r = [] + w = [] + for requst_id, p in self.procs: + if p.stdin_contents is not None: + w.append(p.stdin) + if p.stdout is not None: + r.append(p.stdout) + if p.stderr is not None: + r.append(p.stderr) + return r, w, [], None + + def get_events(self, r, w, x): + events = [] + + for request_id, p in self.procs: + if p.stdin in w: + events.append(FileWriteable(request_id, p, p.stdin)) + if p.stdout in r: + events.append(FileReadable(request_id, p, p.stdout)) + if p.stderr in r: + events.append(FileReadable(request_id, p, p.stderr)) + + return events + + def add(self, request_id, process): + + self.procs.append((request_id, process)) + distbuild.set_nonblocking(process.stdin) + distbuild.set_nonblocking(process.stdout) + distbuild.set_nonblocking(process.stderr) + + def remove(self, process): + self.procs = [t for t in self.procs if t[1] != process] + + def kill_by_id(self, request_id): + logging.debug('SES: Killing all processes for %s', request_id) + for id, process in self.procs: + if id == request_id: + logging.debug('SES: killing %s', repr(process)) + process.kill() + + def close(self): + self.procs = [] + self.closed = True + + def is_finished(self): + return self.closed + + +class HelperMachine(distbuild.StateMachine): + + def __init__(self, conn): + distbuild.StateMachine.__init__(self, 'waiting') + self.conn = conn + self.debug_messages = False + + def setup(self): + distbuild.crash_point() + + jm = self.jm = distbuild.JsonMachine(self.conn) + self.mainloop.add_state_machine(jm) + + p = self.procsrc = SubprocessEventSource() + self.mainloop.add_event_source(p) + + self.send_helper_ready(jm) + + spec = [ + ('waiting', jm, distbuild.JsonNewMessage, 'waiting', self.do), + ('waiting', jm, distbuild.JsonEof, None, self._eofed), + ('waiting', p, FileReadable, 'waiting', self._relay_exec_output), + ('waiting', p, FileWriteable, 'waiting', self._feed_stdin), + ] + self.add_transitions(spec) + + def send_helper_ready(self, jm): + msg = { + 'type': 'helper-ready', + } + jm.send(msg) + logging.debug('HelperMachine: sent: %s', repr(msg)) + + def do(self, parent, event): + distbuild.crash_point() + + logging.debug('JsonMachine: got: %s', repr(event.msg)) + handlers = { + 'http-request': self.do_http_request, + 'exec-request': self.do_exec_request, + 'exec-cancel': self.do_exec_cancel, + } + handler = handlers.get(event.msg['type']) + handler(parent, event.msg) + + def do_http_request(self, parent, msg): + distbuild.crash_point() + + url = msg['url'] + method = msg['method'] + assert method in ('HEAD', 'GET') + + logging.debug('JsonMachine: http request: %s %s' % (method, url)) + + schema, netloc, path, query, fragment = urlparse.urlsplit(url) + assert schema == 'http' + if query: + path += '?' + query + + try: + conn = httplib.HTTPConnection(netloc) + conn.request(method, path) + except (socket.error, httplib.HTTPException), e: + status = 418 # teapot + data = str(e) + else: + res = conn.getresponse() + status = res.status + data = res.read() + conn.close() + + response = { + 'type': 'http-response', + 'id': msg['id'], + 'status': status, + 'body': data, + } + parent.send(response) + logging.debug('JsonMachine: sent to parent: %s', repr(response)) + self.send_helper_ready(parent) + + def do_exec_request(self, parent, msg): + distbuild.crash_point() + + argv = msg['argv'] + stdin_contents = msg.get('stdin_contents', '') + logging.debug('JsonMachine: exec request: argv=%s', repr(argv)) + logging.debug( + 'JsonMachine: exec request: stdin=%s', repr(stdin_contents)) + + p = subprocess.Popen(argv, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + p.stdin_contents = stdin_contents + + self.procsrc.add(msg['id'], p) + + def do_exec_cancel(self, parent, msg): + distbuild.crash_point() + + self.procsrc.kill_by_id(msg['id']) + + def _relay_exec_output(self, event_source, event): + distbuild.crash_point() + + buf_size = 64 + fd = event.file.fileno() + data = os.read(fd, buf_size) + if data: + if event.file == event.process.stdout: + stream = 'stdout' + other = 'stderr' + else: + stream = 'stderr' + other = 'stdout' + msg = { + 'type': 'exec-output', + 'id': event.request_id, + stream: data, + other: '', + } + logging.debug('JsonMachine: sent to parent: %s', repr(msg)) + self.jm.send(msg) + else: + if event.file == event.process.stdout: + event.process.stdout.close() + event.process.stdout = None + else: + event.process.stderr.close() + event.process.stderr = None + + if event.process.stdout == event.process.stderr == None: + event.process.wait() + self.procsrc.remove(event.process) + msg = { + 'type': 'exec-response', + 'id': event.request_id, + 'exit': event.process.returncode, + } + logging.debug('JsonMachine: sent to parent: %s', repr(msg)) + self.jm.send(msg) + self.send_helper_ready(self.jm) + + def _feed_stdin(self, event_source, event): + distbuild.crash_point() + + fd = event.file.fileno() + try: + n = os.write(fd, event.process.stdin_contents) + except os.error, e: + # If other end closed the read end, stop writing. + if e.errno == errno.EPIPE: + logging.debug('JsonMachine: reader closed pipe') + event.process.stdin_contents = '' + else: + raise + else: + logging.debug('JsonMachine: fed %d bytes to stdin', n) + event.process.stdin_contents = event.process.stdin_contents[n:] + if event.process.stdin_contents == '': + logging.debug('JsonMachine: stdin contents finished, closing') + event.file.close() + event.process.stdin_contents = None + + def _eofed(self, event_source, event): + distbuild.crash_point() + logging.info('eof from parent, closing') + event_source.close() + self.procsrc.close() + + +class DistributedBuildHelper(cliapp.Application): + + def add_settings(self): + self.settings.string( + ['parent-address'], + 'address (hostname/ip address) for parent', + metavar='HOSTNAME', + default='localhost') + self.settings.integer( + ['parent-port'], + 'port number for parent', + metavar='PORT', + default=3434) + self.settings.boolean( + ['debug-messages'], + 'log messages that are received?') + self.settings.string_list( + ['crash-condition'], + 'add FILENAME:FUNCNAME:MAXCALLS to list of crash conditions ' + '(this is for testing only)', + metavar='FILENAME:FUNCNAME:MAXCALLS') + + def process_args(self, args): + distbuild.add_crash_conditions(self.settings['crash-condition']) + + # We don't want SIGPIPE, ever. It just kills us. We handle EPIPE + # instead. + signal.signal(signal.SIGPIPE, signal.SIG_IGN) + + addr = self.settings['parent-address'] + port = self.settings['parent-port'] + conn = socket.socket() + conn.connect((addr, port)) + helper = HelperMachine(conn) + helper.debug_messages = self.settings['debug-messages'] + loop = distbuild.MainLoop() + loop.add_state_machine(helper) + loop.run() + + +DistributedBuildHelper().run() + -- cgit v1.2.1 From 8f72c7397f9b62046291d1d699f04ae0233e1afe Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Tue, 25 Feb 2014 16:53:35 +0000 Subject: Fix push_build_branches A semantic error in the BuildBranch class meant that it was not possible to push temporary branches. This escaped testing since BuildBranch interacts too tightly with other components to be easily unit-tested, so testing was deferred to a yarn test. However, coverage isn't measured in yarn tests, so this code path was forgotten. --- morphlib/buildbranch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/morphlib/buildbranch.py b/morphlib/buildbranch.py index 546a29d5..d415e7e1 100644 --- a/morphlib/buildbranch.py +++ b/morphlib/buildbranch.py @@ -217,7 +217,7 @@ class BuildBranch(object): # current HEAD if self._push_temporary: with morphlib.branchmanager.RemoteRefManager(False) as rrm: - for gd, build_ref in self._to_push.iterkeys(): + for gd, (build_ref, index) in self._to_push.iteritems(): remote = gd.get_remote('origin') yield gd, build_ref, remote refspec = morphlib.gitdir.RefSpec(build_ref) -- cgit v1.2.1 From 4e1153649e5d531b7017ac2a1b7791f9ad3c774b Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Tue, 25 Feb 2014 18:29:59 +0000 Subject: Don't print worker in initiator Worker name is not sent in message --- distbuild/initiator.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/distbuild/initiator.py b/distbuild/initiator.py index 9c7e2ddf..e4d4975f 100644 --- a/distbuild/initiator.py +++ b/distbuild/initiator.py @@ -121,8 +121,6 @@ class Initiator(distbuild.StateMachine): filename = 'build-step-%s.log' % msg['step_name'] f = open(filename, 'a') self._step_outputs[msg['step_name']] = f - worker = msg['worker'] - f.write('Worker: %s\n%s\n\n' % (worker, '-' * len(worker))) def _close_output(self, msg): self._step_outputs[msg['step_name']].close() -- cgit v1.2.1 From 2632a9ac870177f6ec743fdd96e40dc1d71314a8 Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Fri, 14 Mar 2014 16:33:21 +0000 Subject: Make serialise work with artifact splitting Serialisation was simple when we only had 1 artifact per source. However, to allow smaller systems, we need artifact splitting to produce multiple artifacts per chunk source. So now the new serialisation format has a separate list of artifacts and sources, rather than the Source being generated from the artifact's serialisation. Python's id() function is used to encode the references between the various Sources and Artifacts, these are replaced with a reference to the new object after deserialisation. Previously the cache-key was used, but this is no longer sufficient to uniquely identify an Artifact. The resultant build graph after deserialisation is a little different to what went in: Strata end up with a different Source per Artifact, so it _is_ a 1 to 1 mapping, as opposed to Chunks, where it's many to 1. We serialise strata and chunks differently because stratum artifacts from the same source can have different dependencies, for example core-devel can have different dependencies to core-runtime. Without intervention we would serialise core-devel and core-devel's dependencies without including core-runtime's dependencies. To solve this we've decided to encode stratum artifacts completely indepedently: each stratum artifact has its own source. This is safe because stratum artifacts can be constructed independently, as opposed to Chunks where all the Artifacts for a Source are produced together. This is a little hacky in its current form, but it simplifies matters later in distbuild with regards to how it handles expressing that every Artifact that shares a Source is built together. Arguably, this should be the output of producing the build graph anyway, since it more helpfully represents which Artifacts are built together than checking the morphology kind all the time, but more assumptions need checking in morph before it's safe to make this change across the whole of the morph codebase. --- distbuild/serialise.py | 144 ++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 118 insertions(+), 26 deletions(-) diff --git a/distbuild/serialise.py b/distbuild/serialise.py index 060833b1..cd871042 100644 --- a/distbuild/serialise.py +++ b/distbuild/serialise.py @@ -19,6 +19,7 @@ import json import morphlib +import logging morphology_attributes = [ @@ -36,6 +37,16 @@ def serialise_artifact(artifact): for x in morphology_attributes: result['__%s' % x] = getattr(morphology, x) return result + + def encode_artifact(artifact): + return { + 'name': artifact.name, + 'cache_id': artifact.cache_id, + 'cache_key': artifact.cache_key, + 'dependencies': artifact.dependencies, + 'dependents': artifact.dependents, + 'metadata_version': artifact.metadata_version, + } def encode_source(source): source_dic = { @@ -46,25 +57,35 @@ def serialise_artifact(artifact): 'tree': source.tree, 'morphology': encode_morphology(source.morphology), 'filename': source.filename, + + # dict keys are converted to strings by json + # so we encode the artifact ids as strings + 'artifact_ids': [str(id(artifact)) for (_, artifact) + in source.artifacts.iteritems()], } + if source.morphology['kind'] == 'chunk': source_dic['build_mode'] = source.build_mode source_dic['prefix'] = source.prefix return source_dic - def encode_single_artifact(a, encoded): + def encode_single_artifact(a, artifacts, source_id): if artifact.source.morphology['kind'] == 'system': arch = artifact.source.morphology['arch'] else: arch = artifact.arch + + logging.debug('encode_single_artifact dependencies: %s' + % str([('id: %s' % str(id(d)), d.name) for d in a.dependencies])) + return { - 'source': encode_source(a.source), + 'source_id': source_id, 'name': a.name, 'cache_id': a.cache_id, 'cache_key': a.cache_key, - 'dependencies': [encoded[d.cache_key]['cache_key'] - for d in a.dependencies], - 'arch': arch, + 'dependencies': [str(id(artifacts[id(d)])) + for d in a.dependencies], + 'arch': arch } visited = set() @@ -77,13 +98,46 @@ def serialise_artifact(artifact): yield ret yield a - encoded = {} + + artifacts = {} + encoded_artifacts = {} + encoded_sources = {} + for a in traverse(artifact): - if a.cache_key not in encoded: - encoded[a.cache_key] = encode_single_artifact(a, encoded) + logging.debug('traversing artifacts at %s' % a.name) + + if id(a.source) not in encoded_sources: + if a.source.morphology['kind'] == 'chunk': + for (_, sa) in a.source.artifacts.iteritems(): + if id(sa) not in artifacts: + logging.debug('encoding source artifact %s' % sa.name) + artifacts[id(sa)] = sa + encoded_artifacts[id(sa)] = encode_single_artifact(sa, + artifacts, id(a.source)) + else: + # We create separate sources for strata and systems, + # this is a bit of a hack, but needed to allow + # us to build strata and systems independently + + s = a.source + t = morphlib.source.Source(s.repo_name, s.original_ref, + s.sha1, s.tree, s.morphology, s.filename) + + t.artifacts = {a.name: a} + a.source = t + + encoded_sources[id(a.source)] = encode_source(a.source) + + if id(a) not in artifacts: + artifacts[id(a)] = a + logging.debug('encoding artifact %s' % a.name) + encoded_artifacts[id(a)] = encode_single_artifact(a, artifacts, + id(a.source)) - encoded['_root'] = artifact.cache_key - return json.dumps(encoded) + encoded_artifacts['_root'] = str(id(artifact)) + + return json.dumps({'sources': encoded_sources, + 'artifacts': encoded_artifacts}) def deserialise_artifact(encoded): @@ -121,7 +175,17 @@ def deserialise_artifact(encoded): setattr(morphology, x, le_dict['__%s' % x]) del morphology['__%s' % x] return morphology - + + def unserialise_source_artifacts(source, artifacts_dict): + '''Convert this dict into a list of artifacts''' + return {a['name']: Artifact(source, + a['name'], + a['cache_id'], + a['cache_key'], + a['dependencies'], + a['dependents'], + a['metadata_version']) for a in artifacts_dict} + def unserialise_source(le_dict): '''Convert a dict into a Source object.''' @@ -132,35 +196,63 @@ def deserialise_artifact(encoded): le_dict['tree'], morphology, le_dict['filename']) + if morphology['kind'] == 'chunk': source.build_mode = le_dict['build_mode'] source.prefix = le_dict['prefix'] return source - def unserialise_single_artifact(le_dict): + def unserialise_single_artifact(artifact_dict, source): '''Convert dict into an Artifact object. Do not set dependencies, that will be dealt with later. ''' - source = unserialise_source(le_dict['source']) - artifact = morphlib.artifact.Artifact(source, le_dict['name']) - artifact.cache_id = le_dict['cache_id'] - artifact.cache_key = le_dict['cache_key'] - artifact.arch = le_dict['arch'] + artifact = morphlib.artifact.Artifact(source, artifact_dict['name']) + artifact.cache_id = artifact_dict['cache_id'] + artifact.cache_key = artifact_dict['cache_key'] + artifact.arch = artifact_dict['arch'] + artifact.source = source + return artifact le_dicts = json.loads(encoded) - cache_keys = [k for k in le_dicts.keys() if k != '_root'] + artifacts_dict = le_dicts['artifacts'] + sources_dict = le_dicts['sources'] + + artifact_ids = ([artifacts_dict['_root']] + + filter(lambda k: k != '_root', artifacts_dict.keys())) + + source_ids = [sid for sid in sources_dict.keys()] + artifacts = {} - for cache_key in cache_keys: - le_dict = le_dicts[cache_key] - artifacts[cache_key] = unserialise_single_artifact(le_dict) - for cache_key in cache_keys: - le_dict = le_dicts[cache_key] - artifact = artifacts[cache_key] - artifact.dependencies = [artifacts[k] for k in le_dict['dependencies']] + sources = {} + + for source_id in source_ids: + source_dict = sources_dict[source_id] + sources[source_id] = unserialise_source(source_dict) + + # clear the source artifacts that get automatically generated + # we want to add the ones that were sent to us + sources[source_id].artifacts = {} + source_artifacts = source_dict['artifact_ids'] + + for artifact_id in source_artifacts: + if artifact_id not in artifacts: + artifact_dict = artifacts_dict[artifact_id] + artifact = unserialise_single_artifact(artifact_dict, + sources[source_id]) + + artifacts[artifact_id] = artifact + + key = artifacts[artifact_id].name + sources[source_id].artifacts[key] = artifacts[artifact_id] - return artifacts[le_dicts['_root']] + # now add the dependencies + for artifact_id in artifact_ids: + artifact = artifacts[artifact_id] + artifact.dependencies = [artifacts[aid] for aid in + artifacts_dict[artifact_id]['dependencies']] + return artifacts[artifacts_dict['_root']] -- cgit v1.2.1 From 66d2165dc7a5e0183f83128170625ee19cf829e3 Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Fri, 14 Mar 2014 16:34:49 +0000 Subject: Fix build controller Whenever the controller finds a source artifact it wants to build, it changes its state to BUILDING. We build all a chunk's source artifacts in one go. So for any chunk artifact, we change the state of all chunk artifacts that are built from the same source to BUILDING --- distbuild/build_controller.py | 40 ++++++++++++++++++++++++++++++++++------ 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/distbuild/build_controller.py b/distbuild/build_controller.py index a300f0d4..7849db17 100644 --- a/distbuild/build_controller.py +++ b/distbuild/build_controller.py @@ -278,8 +278,10 @@ class BuildController(distbuild.StateMachine): error_text = self._artifact_error.peek() if event.msg['exit'] != 0 or error_text: - notify_failure( - 'Problem with serialise-artifact: %s' % error_text) + notify_failure('Problem with serialise-artifact: %s' + % error_text) + + if event.msg['exit'] != 0: return text = self._artifact_data.peek() @@ -384,18 +386,32 @@ class BuildController(distbuild.StateMachine): ' depends on %s which is %s' % (dep.name, dep.state)) - ready = self._find_artifacts_that_are_ready_to_build() - if len(ready) == 0: - logging.debug('No new artifacts queued for building') + while True: + ready = self._find_artifacts_that_are_ready_to_build() + + if len(ready) == 0: + logging.debug('No new artifacts queued for building') + break + + artifact = ready[0] - for artifact in ready: logging.debug( 'Requesting worker-build of %s (%s)' % (artifact.name, artifact.cache_key)) request = distbuild.WorkerBuildRequest(artifact, self._request['id']) self.mainloop.queue_event(distbuild.WorkerBuildQueuer, request) + artifact.state = BUILDING + if artifact.source.morphology['kind'] == 'chunk': + # Chunk artifacts are not built independently + # so when we're building any chunk artifact + # we're also building all the chunk artifacts + # in this source + for a in ready: + if a.source == artifact.source: + a.state = BUILDING + def _notify_initiator_disconnected(self, event_source, disconnect): if disconnect.id == self._request['id']: @@ -479,6 +495,18 @@ class BuildController(distbuild.StateMachine): self.mainloop.queue_event(BuildController, finished) artifact.state = BUILT + + def set_state(a): + if a.source == artifact.source: + a.state = BUILT + + if artifact.source.morphology['kind'] == 'chunk': + # Building a single chunk artifact + # yields all chunk artifacts for the given source + # so we set the state of this source's artifacts + # to BUILT + map_build_graph(self._artifact, set_state) + self._queue_worker_builds(None, event) def _notify_build_failed(self, event_source, event): -- cgit v1.2.1 From a4d2de0411030eab8832a625734e7d65d5efbec6 Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Fri, 14 Mar 2014 16:36:54 +0000 Subject: Transfer all chunk artifacts after they are built We want to be able to transfer all source artifacts in a single transaction --- distbuild/worker_build_scheduler.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py index d0f158b6..315d3094 100644 --- a/distbuild/worker_build_scheduler.py +++ b/distbuild/worker_build_scheduler.py @@ -327,17 +327,22 @@ class WorkerConnection(distbuild.StateMachine): logging.debug('Requesting shared artifact cache to get artifacts') - filename = ('%s.%s' % - (self._artifact.source.morphology['kind'], - self._artifact.name)) - suffixes = [filename] kind = self._artifact.source.morphology['kind'] - if kind == 'stratum': - suffixes.append(filename + '.meta') - elif kind == 'system': - # FIXME: This is a really ugly hack. - if filename.endswith('-rootfs'): - suffixes.append(filename[:-len('-rootfs')] + '-kernel') + + if kind == 'chunk': + source_artifacts = self._artifact.source.artifacts + + suffixes = ['%s.%s' % (kind, name) for name in source_artifacts] + else: + filename = '%s.%s' % (kind, self._artifact.name) + suffixes = [filename] + + if kind == 'stratum': + suffixes.append(filename + '.meta') + elif kind == 'system': + # FIXME: This is a really ugly hack. + if filename.endswith('-rootfs'): + suffixes.append(filename[:-len('-rootfs')] + '-kernel') suffixes = [urllib.quote(x) for x in suffixes] suffixes = ','.join(suffixes) -- cgit v1.2.1 From f921ad47648e60d3f2bc468169fa717688ecfa55 Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Fri, 14 Mar 2014 12:43:04 +0000 Subject: Make InitiatorBuildCommand construct BuildCommand The constructor for BuildCommand sets up the caches, we want the caches to be set up for distbuild too so that we can deploy a system (system artifact will be fetched from the artifact cache) --- morphlib/plugins/distbuild_plugin.py | 1 + 1 file changed, 1 insertion(+) diff --git a/morphlib/plugins/distbuild_plugin.py b/morphlib/plugins/distbuild_plugin.py index 364e3eb5..7f6b3435 100644 --- a/morphlib/plugins/distbuild_plugin.py +++ b/morphlib/plugins/distbuild_plugin.py @@ -250,6 +250,7 @@ class InitiatorBuildCommand(morphlib.buildcommand.BuildCommand): self.addr = addr self.port = port self.app.settings['push-build-branches'] = True + super(InitiatorBuildCommand, self).__init__(app) def build(self, args): '''Initiate a distributed build on a controller''' -- cgit v1.2.1 From 7a9f0d114c01a8c18d1ea74a7b2db85c967d8c41 Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Mon, 17 Mar 2014 13:49:40 +0000 Subject: Make morph install distbuild --- setup.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index b996c1ae..59b95b64 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,4 @@ -# Copyright (C) 2011, 2012, 2013 Codethink Limited +# Copyright (C) 2011 - 2014 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -151,8 +151,8 @@ FIXME author='Codethink Limited', author_email='baserock-dev@baserock.org', url='http://www.baserock.org/', - scripts=['morph'], - packages=['morphlib', 'morphlib.plugins'], + scripts=['morph', 'distbuild-helper'], + packages=['morphlib', 'morphlib.plugins', 'distbuild'], package_data={ 'morphlib': [ 'exts/*', -- cgit v1.2.1 From 19004f89ec34503d937897dac44e84be9ea6c47b Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Fri, 14 Mar 2014 15:28:13 +0000 Subject: Fix deploy tarfile's open needs the file-like object to have a tell() method, objects returned from sockets don't have this method. So instead we fetch the artifact from the remote and cache it locally --- morphlib/plugins/deploy_plugin.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/morphlib/plugins/deploy_plugin.py b/morphlib/plugins/deploy_plugin.py index 47cd00c1..90c658a0 100644 --- a/morphlib/plugins/deploy_plugin.py +++ b/morphlib/plugins/deploy_plugin.py @@ -1,4 +1,4 @@ -# Copyright (C) 2013 Codethink Limited +# Copyright (C) 2013, 2014 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -383,7 +383,8 @@ class DeployPlugin(cliapp.Plugin): if build_command.lac.has(artifact): f = build_command.lac.get(artifact) elif build_command.rac.has(artifact): - f = build_command.rac.get(artifact) + build_command.cache_artifacts_locally([artifact]) + f = build_command.lac.get(artifact) else: raise cliapp.AppException('Deployment failed as system is' ' not yet built.\nPlease ensure' -- cgit v1.2.1 From a589866b356453474bb845bd78e8b38c77ee1059 Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Wed, 26 Mar 2014 16:40:31 +0000 Subject: Bring InitiatorBuildCommand into buildcommand --- morphlib/buildcommand.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/morphlib/buildcommand.py b/morphlib/buildcommand.py index 137c63b4..3c190275 100644 --- a/morphlib/buildcommand.py +++ b/morphlib/buildcommand.py @@ -20,6 +20,7 @@ import logging import tempfile import morphlib +import distbuild class MultipleRootArtifactsError(morphlib.Error): @@ -474,3 +475,32 @@ class BuildCommand(object): self.app, staging_area, self.lac, self.rac, self.lrc, self.app.settings['max-jobs'], setup_mounts) return builder.build_and_cache(artifact) + +class InitiatorBuildCommand(BuildCommand): + + def __init__(self, app, addr, port): + self.app = app + self.addr = addr + self.port = port + self.app.settings['push-build-branches'] = True + super(InitiatorBuildCommand, self).__init__(app) + + def build(self, args): + '''Initiate a distributed build on a controller''' + + distbuild.add_crash_conditions(self.app.settings['crash-condition']) + + if len(args) != 3: + raise morphlib.Error( + 'Need repo, ref, morphology triplet to build') + + if self.addr == '': + raise morphlib.Error( + 'Need address of controller to run a distbuild') + + self.app.status(msg='Starting distributed build') + loop = distbuild.MainLoop() + cm = distbuild.ConnectionMachine( + self.addr, self.port, distbuild.Initiator, [self.app] + args) + loop.add_state_machine(cm) + loop.run() -- cgit v1.2.1 From 5933335b7a20626fc1d1a36e98fad2e1ade60ad2 Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Wed, 26 Mar 2014 12:24:28 +0000 Subject: Remove Initiator and InitiatorBuildCommand --- morphlib/plugins/distbuild_plugin.py | 49 ------------------------------------ 1 file changed, 49 deletions(-) diff --git a/morphlib/plugins/distbuild_plugin.py b/morphlib/plugins/distbuild_plugin.py index 7f6b3435..27d87e35 100644 --- a/morphlib/plugins/distbuild_plugin.py +++ b/morphlib/plugins/distbuild_plugin.py @@ -242,55 +242,6 @@ class ControllerDaemon(cliapp.Plugin): loop.run() - -class InitiatorBuildCommand(morphlib.buildcommand.BuildCommand): - - def __init__(self, app, addr, port): - self.app = app - self.addr = addr - self.port = port - self.app.settings['push-build-branches'] = True - super(InitiatorBuildCommand, self).__init__(app) - - def build(self, args): - '''Initiate a distributed build on a controller''' - - distbuild.add_crash_conditions(self.app.settings['crash-condition']) - - if len(args) != 3: - raise cliapp.AppException( - 'Need repo, ref, morphology triplet to build') - - self.app.status(msg='Starting distributed build') - loop = distbuild.MainLoop() - cm = distbuild.ConnectionMachine( - self.addr, self.port, distbuild.Initiator, [self.app] + args) - loop.add_state_machine(cm) - loop.run() - - -class Initiator(cliapp.Plugin): - - def enable(self): - self.app.settings.boolean( - ['disable-distbuild'], 'disable distributed building', - group=group_distbuild) - self.app.hookmgr.add_callback( - 'new-build-command', self.create_build_command) - - def disable(self): - pass - - def create_build_command(self, old_build_command): - addr = self.app.settings['controller-initiator-address'] - port = self.app.settings['controller-initiator-port'] - - if addr != '' and not self.app.settings['disable-distbuild']: - return InitiatorBuildCommand(self.app, addr, port) - else: - return old_build_command - - class GraphStateMachines(cliapp.Plugin): def enable(self): -- cgit v1.2.1 From 9cbf777d614b8f240ccaa0203e7e3ba40b487f11 Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Wed, 26 Mar 2014 11:30:05 +0000 Subject: UI change: distbuild has its own subcommand --- morphlib/plugins/build_plugin.py | 46 ++++++++++++++++++++++++++++++++++------ 1 file changed, 39 insertions(+), 7 deletions(-) diff --git a/morphlib/plugins/build_plugin.py b/morphlib/plugins/build_plugin.py index a58bce20..62009bcd 100644 --- a/morphlib/plugins/build_plugin.py +++ b/morphlib/plugins/build_plugin.py @@ -1,4 +1,4 @@ -# Copyright (C) 2012,2013 Codethink Limited +# Copyright (C) 2012,2013,2014 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -28,9 +28,37 @@ class BuildPlugin(cliapp.Plugin): arg_synopsis='(REPO REF FILENAME)...') self.app.add_subcommand('build', self.build, arg_synopsis='SYSTEM') + self.app.add_subcommand('distbuild', self.distbuild, + arg_synopsis='SYSTEM') + self.use_distbuild = False def disable(self): - pass + self.use_distbuild = False + + def distbuild(self, args): + '''Distbuild a system image in the current system branch + + Command line arguments: + + * `SYSTEM` is the name of the system to build. + + This command launches a distributed build, to use this command + you must first set up a distbuild cluster. + + Artifacts produced during the build will be stored on your trove. + + Once the build completes you can use morph deploy to the deploy + your system, the system artifact will be copied from your trove + and cached locally. + + Example: + + morph distbuild devel-system-x86_64-generic + + ''' + + self.use_distbuild = True + self.build(args) def build_morphology(self, args): '''Build a system, outside of a system branch. @@ -65,8 +93,6 @@ class BuildPlugin(cliapp.Plugin): self.app.settings['cachedir-min-space']) build_command = morphlib.buildcommand.BuildCommand(self.app) - build_command = self.app.hookmgr.call('new-build-command', - build_command) build_command.build(args) def build(self, args): @@ -114,9 +140,15 @@ class BuildPlugin(cliapp.Plugin): build_uuid = uuid.uuid4().hex - build_command = morphlib.buildcommand.BuildCommand(self.app) - build_command = self.app.hookmgr.call('new-build-command', - build_command) + if self.use_distbuild: + addr = self.app.settings['controller-initiator-address'] + port = self.app.settings['controller-initiator-port'] + + build_command = morphlib.buildcommand.InitiatorBuildCommand( + self.app, addr, port) + else: + build_command = morphlib.buildcommand.BuildCommand(self.app) + loader = morphlib.morphloader.MorphologyLoader() push = self.app.settings['push-build-branches'] name = morphlib.git.get_user_name(self.app.runcmd) -- cgit v1.2.1