From 1de342b8a4cf13b295805855bfaa341bcd86277e Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Mon, 24 Feb 2014 18:21:33 +0000 Subject: Add the distbuild libs --- distbuild/__init__.py | 61 +++++ distbuild/build_controller.py | 531 ++++++++++++++++++++++++++++++++++++ distbuild/connection_machine.py | 145 ++++++++++ distbuild/crashpoint.py | 126 +++++++++ distbuild/crashpoint_tests.py | 109 ++++++++ distbuild/eventsrc.py | 48 ++++ distbuild/helper_router.py | 197 +++++++++++++ distbuild/idgen.py | 33 +++ distbuild/initiator.py | 195 +++++++++++++ distbuild/initiator_connection.py | 216 +++++++++++++++ distbuild/jm.py | 98 +++++++ distbuild/json_router.py | 164 +++++++++++ distbuild/mainloop.py | 117 ++++++++ distbuild/protocol.py | 93 +++++++ distbuild/proxy_event_source.py | 47 ++++ distbuild/route_map.py | 60 ++++ distbuild/route_map_tests.py | 56 ++++ distbuild/serialise.py | 166 +++++++++++ distbuild/serialise_tests.py | 148 ++++++++++ distbuild/sm.py | 139 ++++++++++ distbuild/sm_tests.py | 86 ++++++ distbuild/sockbuf.py | 159 +++++++++++ distbuild/socketsrc.py | 166 +++++++++++ distbuild/sockserv.py | 45 +++ distbuild/stringbuffer.py | 90 ++++++ distbuild/stringbuffer_tests.py | 140 ++++++++++ distbuild/timer_event_source.py | 59 ++++ distbuild/worker_build_scheduler.py | 392 ++++++++++++++++++++++++++ 28 files changed, 3886 insertions(+) create mode 100644 distbuild/__init__.py create mode 100644 distbuild/build_controller.py create mode 100644 distbuild/connection_machine.py create mode 100644 distbuild/crashpoint.py create mode 100644 distbuild/crashpoint_tests.py create mode 100644 distbuild/eventsrc.py create mode 100644 distbuild/helper_router.py create mode 100644 distbuild/idgen.py create mode 100644 distbuild/initiator.py create mode 100644 distbuild/initiator_connection.py create mode 100644 distbuild/jm.py create mode 100644 distbuild/json_router.py create mode 100644 distbuild/mainloop.py create mode 100644 distbuild/protocol.py create mode 100644 distbuild/proxy_event_source.py create mode 100644 distbuild/route_map.py create mode 100644 distbuild/route_map_tests.py create mode 100644 distbuild/serialise.py create mode 100644 distbuild/serialise_tests.py create mode 100644 distbuild/sm.py create mode 100644 distbuild/sm_tests.py create mode 100644 distbuild/sockbuf.py create mode 100644 distbuild/socketsrc.py create mode 100644 distbuild/sockserv.py create mode 100644 distbuild/stringbuffer.py create mode 100644 distbuild/stringbuffer_tests.py create mode 100644 distbuild/timer_event_source.py create mode 100644 distbuild/worker_build_scheduler.py (limited to 'distbuild') diff --git a/distbuild/__init__.py b/distbuild/__init__.py new file mode 100644 index 00000000..3e60d5ee --- /dev/null +++ b/distbuild/__init__.py @@ -0,0 +1,61 @@ +# distbuild/__init__.py -- library for Morph's distributed build plugin +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +from stringbuffer import StringBuffer +from sm import StateMachine +from eventsrc import EventSource +from socketsrc import (SocketError, NewConnection, ListeningSocketEventSource, + SocketReadable, SocketWriteable, SocketEventSource, + set_nonblocking) +from sockbuf import (SocketBufferNewData, SocketBufferEof, + SocketBufferClosed, SocketBuffer) +from mainloop import MainLoop +from sockserv import ListenServer +from jm import JsonMachine, JsonNewMessage, JsonEof + +from serialise import serialise_artifact, deserialise_artifact +from idgen import IdentifierGenerator +from route_map import RouteMap +from timer_event_source import TimerEventSource, Timer +from proxy_event_source import ProxyEventSource +from json_router import JsonRouter +from helper_router import (HelperRouter, HelperRequest, HelperOutput, + HelperResult) +from initiator_connection import (InitiatorConnection, InitiatorDisconnect) +from connection_machine import ConnectionMachine, Reconnect, StopConnecting +from worker_build_scheduler import (WorkerBuildQueuer, + WorkerConnection, + WorkerBuildRequest, + WorkerCancelPending, + WorkerBuildOutput, + WorkerBuildCaching, + WorkerBuildFinished, + WorkerBuildFailed, + WorkerBuildStepStarted) +from build_controller import (BuildController, BuildFailed, BuildProgress, + BuildSteps, BuildStepStarted, BuildOutput, + BuildStepFinished, BuildStepFailed, + BuildFinished, BuildCancel, + build_step_name, map_build_graph) +from initiator import Initiator +from protocol import message + +from crashpoint import (crash_point, add_crash_condition, add_crash_conditions, + clear_crash_conditions) + +__all__ = locals() diff --git a/distbuild/build_controller.py b/distbuild/build_controller.py new file mode 100644 index 00000000..a300f0d4 --- /dev/null +++ b/distbuild/build_controller.py @@ -0,0 +1,531 @@ +# distbuild/build_controller.py -- control the steps for one build +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import logging +import httplib +import traceback +import urllib +import urlparse + +import distbuild + + +# Artifact build states +UNKNOWN = 'unknown' +UNBUILT = 'not-built' +BUILDING = 'building' +BUILT = 'built' + + +class _Start(object): pass +class _Annotated(object): pass +class _Built(object): pass + + +class _GotGraph(object): + + def __init__(self, artifact): + self.artifact = artifact + + +class _GraphFailed(object): + + pass + + +class BuildCancel(object): + + def __init__(self, id): + self.id = id + + +class BuildFinished(object): + + def __init__(self, request_id, urls): + self.id = request_id + self.urls = urls + + +class BuildFailed(object): + + def __init__(self, request_id, reason): + self.id = request_id + self.reason = reason + + +class BuildProgress(object): + + def __init__(self, request_id, message_text): + self.id = request_id + self.message_text = message_text + + +class BuildSteps(object): + + def __init__(self, request_id, artifact): + self.id = request_id + self.artifact = artifact + + +class BuildStepStarted(object): + + def __init__(self, request_id, step_name, worker_name): + self.id = request_id + self.step_name = step_name + self.worker_name = worker_name + + +class BuildOutput(object): + + def __init__(self, request_id, step_name, stdout, stderr): + self.id = request_id + self.step_name = step_name + self.stdout = stdout + self.stderr = stderr + + +class BuildStepFinished(object): + + def __init__(self, request_id, step_name): + self.id = request_id + self.step_name = step_name + + +class BuildStepFailed(object): + + def __init__(self, request_id, step_name): + self.id = request_id + self.step_name = step_name + + +class _Abort(object): + + pass + + +def build_step_name(artifact): + '''Return user-comprehensible name for a given artifact.''' + return artifact.name + + +def map_build_graph(artifact, callback): + result = [] + done = set() + queue = [artifact] + while queue: + a = queue.pop() + if a not in done: + result.append(callback(a)) + queue.extend(a.dependencies) + done.add(a) + return result + + +class BuildController(distbuild.StateMachine): + + '''Control one build-request fulfillment. + + The initiator sends a build-request message, which causes the + InitiatorConnection to instantiate this class to control the steps + needed to fulfill the request. This state machine builds the + build graph to determine all the artifacts that need building, then + builds anything that is not cached. + + ''' + + _idgen = distbuild.IdentifierGenerator('BuildController') + + def __init__(self, build_request_message, artifact_cache_server, + morph_instance): + distbuild.crash_point() + distbuild.StateMachine.__init__(self, 'init') + self._request = build_request_message + self._artifact_cache_server = artifact_cache_server + self._morph_instance = morph_instance + self._helper_id = None + self.debug_transitions = True + + def setup(self): + distbuild.crash_point() + + spec = [ + ('init', self, _Start, 'graphing', self._start_graphing), + ('init', distbuild.InitiatorConnection, + distbuild.InitiatorDisconnect, 'init', self._maybe_abort), + ('init', self, _Abort, None, None), + + ('graphing', distbuild.HelperRouter, distbuild.HelperOutput, + 'graphing', self._collect_graph), + ('graphing', distbuild.HelperRouter, distbuild.HelperResult, + 'graphing', self._finish_graph), + ('graphing', self, _GotGraph, + 'annotating', self._start_annotating), + ('graphing', self, _GraphFailed, None, None), + ('graphing', distbuild.InitiatorConnection, + distbuild.InitiatorDisconnect, None, + self._maybe_abort), + + ('annotating', distbuild.HelperRouter, distbuild.HelperResult, + 'annotating', self._handle_cache_response), + ('annotating', self, _Annotated, 'building', + self._queue_worker_builds), + ('annotating', distbuild.InitiatorConnection, + distbuild.InitiatorDisconnect, None, + self._maybe_abort), + + ('building', distbuild.WorkerConnection, + distbuild.WorkerBuildStepStarted, 'building', + self._relay_build_step_started), + ('building', distbuild.WorkerConnection, + distbuild.WorkerBuildOutput, 'building', + self._relay_build_output), + ('building', distbuild.WorkerConnection, + distbuild.WorkerBuildCaching, 'building', + self._relay_build_caching), + ('building', distbuild.WorkerConnection, + distbuild.WorkerBuildFinished, 'building', + self._check_result_and_queue_more_builds), + ('building', distbuild.WorkerConnection, + distbuild.WorkerBuildFailed, None, + self._notify_build_failed), + ('building', self, _Built, None, self._notify_build_done), + ('building', distbuild.InitiatorConnection, + distbuild.InitiatorDisconnect, 'building', + self._notify_initiator_disconnected), + ] + self.add_transitions(spec) + + self.mainloop.queue_event(self, _Start()) + + def _maybe_abort(self, event_source, event): + if disconnect.id == self._request['id']: + self.mainloop.queue_event(self, _Abort()) + + def _start_graphing(self, event_source, event): + distbuild.crash_point() + + logging.info('Start constructing build graph') + self._artifact_data = distbuild.StringBuffer() + self._artifact_error = distbuild.StringBuffer() + argv = [ + self._morph_instance, + 'serialise-artifact', + '--quiet', + self._request['repo'], + self._request['ref'], + self._request['morphology'], + ] + msg = distbuild.message('exec-request', + id=self._idgen.next(), + argv=argv, + stdin_contents='') + self._helper_id = msg['id'] + req = distbuild.HelperRequest(msg) + self.mainloop.queue_event(distbuild.HelperRouter, req) + + progress = BuildProgress(self._request['id'], 'Computing build graph') + self.mainloop.queue_event(BuildController, progress) + + def _collect_graph(self, event_source, event): + distbuild.crash_point() + + if event.msg['id'] == self._helper_id: + self._artifact_data.add(event.msg['stdout']) + self._artifact_error.add(event.msg['stderr']) + + def _finish_graph(self, event_source, event): + distbuild.crash_point() + + def notify_failure(msg_text): + logging.error('Graph creation failed: %s' % msg_text) + + failed = BuildFailed( + self._request['id'], + 'Failed go compute build graph: %s' % msg_text) + self.mainloop.queue_event(BuildController, failed) + + self.mainloop.queue_event(self, _GraphFailed()) + + def notify_success(artifact): + logging.debug('Graph is finished') + + progress = BuildProgress( + self._request['id'], 'Finished computing build graph') + self.mainloop.queue_event(BuildController, progress) + + build_steps = BuildSteps(self._request['id'], artifact) + self.mainloop.queue_event(BuildController, build_steps) + + self.mainloop.queue_event(self, _GotGraph(artifact)) + + if event.msg['id'] == self._helper_id: + self._helper_id = None + + error_text = self._artifact_error.peek() + if event.msg['exit'] != 0 or error_text: + notify_failure( + 'Problem with serialise-artifact: %s' % error_text) + return + + text = self._artifact_data.peek() + try: + artifact = distbuild.deserialise_artifact(text) + except ValueError, e: + logging.error(traceback.format_exc()) + notify_failure(str(e)) + return + + notify_success(artifact) + + def _start_annotating(self, event_source, event): + distbuild.crash_point() + + self._artifact = event.artifact + + # Queue http requests for checking from the shared artifact + # cache for the artifacts. + for artifact in map_build_graph(self._artifact, lambda a: a): + artifact.state = UNKNOWN + artifact.helper_id = self._idgen.next() + filename = ('%s.%s.%s' % + (artifact.cache_key, + artifact.source.morphology['kind'], + artifact.name)) + url = urlparse.urljoin( + self._artifact_cache_server, + '/1.0/artifacts?filename=%s' % urllib.quote(filename)) + msg = distbuild.message('http-request', + id=artifact.helper_id, + url=url, + method='HEAD') + request = distbuild.HelperRequest(msg) + self.mainloop.queue_event(distbuild.HelperRouter, request) + logging.debug( + 'Queued as %s query whether %s is in cache' % + (msg['id'], filename)) + + def _handle_cache_response(self, event_source, event): + distbuild.crash_point() + + logging.debug('Got cache query response: %s' % repr(event.msg)) + + def set_status(artifact): + if artifact.helper_id == event.msg['id']: + old = artifact.state + if event.msg['status'] == httplib.OK: + artifact.state = BUILT + else: + artifact.state = UNBUILT + logging.debug( + 'Changed artifact %s state from %s to %s' % + (artifact.name, old, artifact.state)) + artifact.helper_id = None + + map_build_graph(self._artifact, set_status) + + queued = map_build_graph(self._artifact, lambda a: a.state == UNKNOWN) + if any(queued): + logging.debug('Waiting for further responses') + else: + logging.debug('All cache query responses received') + self.mainloop.queue_event(self, _Annotated()) + + count = sum(1 if a.state == UNBUILT else 0 + for a in map_build_graph(self._artifact, lambda b: b)) + progress = BuildProgress( + self._request['id'], + 'Need to build %d artifacts' % count) + self.mainloop.queue_event(BuildController, progress) + + if count == 0: + logging.info('There seems to be nothing to build') + self.mainloop.queue_event(self, _Built()) + + def _find_artifacts_that_are_ready_to_build(self): + + def is_ready_to_build(artifact): + return (artifact.state == UNBUILT and + all(a.state == BUILT for a in artifact.dependencies)) + + return [a + for a in map_build_graph(self._artifact, lambda a: a) + if is_ready_to_build(a)] + + def _queue_worker_builds(self, event_source, event): + distbuild.crash_point() + + if self._artifact.state == BUILT: + logging.info('Requested artifact is built') + self.mainloop.queue_event(self, _Built()) + return + + logging.debug('Queuing more worker-builds to run') + logging.debug('Current state of build graph nodes:') + for a in map_build_graph(self._artifact, lambda a: a): + logging.debug(' %s state is %s' % (a.name, a.state)) + if a.state != BUILT: + for dep in a.dependencies: + logging.debug( + ' depends on %s which is %s' % + (dep.name, dep.state)) + + ready = self._find_artifacts_that_are_ready_to_build() + if len(ready) == 0: + logging.debug('No new artifacts queued for building') + + for artifact in ready: + logging.debug( + 'Requesting worker-build of %s (%s)' % + (artifact.name, artifact.cache_key)) + request = distbuild.WorkerBuildRequest(artifact, + self._request['id']) + self.mainloop.queue_event(distbuild.WorkerBuildQueuer, request) + artifact.state = BUILDING + + def _notify_initiator_disconnected(self, event_source, disconnect): + if disconnect.id == self._request['id']: + cancel = BuildCancel(disconnect.id) + self.mainloop.queue_event(BuildController, cancel) + + def _relay_build_step_started(self, event_source, event): + distbuild.crash_point() + if event.initiator_id != self._request['id']: + return # not for us + + logging.debug( + 'BC: _relay_build_step_started: %s' % event.artifact_cache_key) + artifact = self._find_artifact(event.artifact_cache_key) + if artifact is None: + # This is not the event you are looking for. + return + + logging.debug('BC: got build step started: %s' % artifact.name) + started = BuildStepStarted( + self._request['id'], build_step_name(artifact), event.worker_name) + self.mainloop.queue_event(BuildController, started) + logging.debug('BC: emitted %s' % repr(started)) + + def _relay_build_output(self, event_source, event): + distbuild.crash_point() + if event.msg['id'] != self._request['id']: + return # not for us + + logging.debug('BC: got output: %s' % repr(event.msg)) + artifact = self._find_artifact(event.artifact_cache_key) + logging.debug('BC: got artifact: %s' % repr(artifact)) + if artifact is None: + # This is not the event you are looking for. + return + + output = BuildOutput( + self._request['id'], build_step_name(artifact), + event.msg['stdout'], event.msg['stderr']) + self.mainloop.queue_event(BuildController, output) + logging.debug('BC: queued %s' % repr(output)) + + def _relay_build_caching(self, event_source, event): + distbuild.crash_point() + if event.initiator_id != self._request['id']: + return # not for us + + artifact = self._find_artifact(event.artifact_cache_key) + if artifact is None: + # This is not the event you are looking for. + return + + progress = BuildProgress( + self._request['id'], + 'Transferring %s to shared artifact cache' % artifact.name) + self.mainloop.queue_event(BuildController, progress) + + def _find_artifact(self, cache_key): + artifacts = map_build_graph(self._artifact, lambda a: a) + wanted = [a for a in artifacts if a.cache_key == cache_key] + if wanted: + return wanted[0] + else: + return None + + def _check_result_and_queue_more_builds(self, event_source, event): + distbuild.crash_point() + if event.msg['id'] != self._request['id']: + return # not for us + + artifact = self._find_artifact(event.artifact_cache_key) + if artifact is None: + # This is not the event you are looking for. + return + + logging.debug( + 'Got build result for %s: %s', artifact.name, repr(event.msg)) + + finished = BuildStepFinished( + self._request['id'], build_step_name(artifact)) + self.mainloop.queue_event(BuildController, finished) + + artifact.state = BUILT + self._queue_worker_builds(None, event) + + def _notify_build_failed(self, event_source, event): + distbuild.crash_point() + if event.msg['id'] != self._request['id']: + return # not for us + + artifact = self._find_artifact(event.artifact_cache_key) + if artifact is None: + # This is not the event you are looking for. + return + + logging.error( + 'Build step failed for %s: %s', artifact.name, repr(event.msg)) + + step_failed = BuildStepFailed( + self._request['id'], build_step_name(artifact)) + self.mainloop.queue_event(BuildController, step_failed) + + build_failed = BuildFailed( + self._request['id'], + 'Building failed for %s' % artifact.name) + self.mainloop.queue_event(BuildController, build_failed) + + # Cancel any jobs waiting to be executed, since there is no point + # running them if this build has failed, it would just waste + # resources + cancel_pending = distbuild.WorkerCancelPending( + self._request['id']) + self.mainloop.queue_event(distbuild.WorkerBuildQueuer, cancel_pending) + + # Cancel any currently executing jobs for the above reasons, since + # this build will fail and we can't decide whether these jobs will + # be of use to any other build + cancel = BuildCancel(self._request['id']) + self.mainloop.queue_event(BuildController, cancel) + + def _notify_build_done(self, event_source, event): + distbuild.crash_point() + + logging.debug('Notifying initiator of successful build') + baseurl = urlparse.urljoin( + self._artifact_cache_server, '/1.0/artifacts') + filename = ('%s.%s.%s' % + (self._artifact.cache_key, + self._artifact.source.morphology['kind'], + self._artifact.name)) + url = '%s?filename=%s' % (baseurl, urllib.quote(filename)) + finished = BuildFinished(self._request['id'], [url]) + self.mainloop.queue_event(BuildController, finished) diff --git a/distbuild/connection_machine.py b/distbuild/connection_machine.py new file mode 100644 index 00000000..3d4e8d04 --- /dev/null +++ b/distbuild/connection_machine.py @@ -0,0 +1,145 @@ +# distbuild/connection_machine.py -- state machine for connecting to server +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import errno +import logging +import socket + +import distbuild + + +class Reconnect(object): + + pass + + +class StopConnecting(object): + + pass + + +class ConnectError(object): + + pass + + +class ProxyEventSource(object): + + '''Proxy event sources that may come and go.''' + + def __init__(self): + self.event_source = None + + def get_select_params(self): + if self.event_source: + return self.event_source.get_select_params() + else: + return [], [], [], None + + def get_events(self, r, w, x): + if self.event_source: + return self.event_source.get_events(r, w, x) + else: + return [] + + def is_finished(self): + return False + + +class ConnectionMachine(distbuild.StateMachine): + + def __init__(self, addr, port, machine, extra_args): + distbuild.StateMachine.__init__(self, 'connecting') + self._addr = addr + self._port = port + self._machine = machine + self._extra_args = extra_args + self._socket = None + self.reconnect_interval = 1 + + def setup(self): + self._sock_proxy = ProxyEventSource() + self.mainloop.add_event_source(self._sock_proxy) + self._start_connect() + + self._timer = distbuild.TimerEventSource(self.reconnect_interval) + self.mainloop.add_event_source(self._timer) + + spec = [ + ('connecting', self._sock_proxy, distbuild.SocketWriteable, + 'connected', self._connect), + ('connecting', self, StopConnecting, None, self._stop), + ('connected', self, Reconnect, 'connecting', self._reconnect), + ('connected', self, ConnectError, 'timeout', self._start_timer), + ('connected', self, StopConnecting, None, self._stop), + ('timeout', self._timer, distbuild.Timer, 'connecting', + self._reconnect), + ('timeout', self, StopConnecting, None, self._stop), + ] + self.add_transitions(spec) + + def _start_connect(self): + logging.debug( + 'ConnectionMachine: connecting to %s:%s' % + (self._addr, self._port)) + self._socket = socket.socket() + distbuild.set_nonblocking(self._socket) + try: + self._socket.connect((self._addr, self._port)) + except socket.error, e: + if e.errno != errno.EINPROGRESS: + raise socket.error( + "%s (attempting connection to distbuild controller " + "at %s:%s)" % (e, self._addr, self._port)) + + src = distbuild.SocketEventSource(self._socket) + self._sock_proxy.event_source = src + + def _connect(self, event_source, event): + try: + self._socket.connect((self._addr, self._port)) + except socket.error, e: + logging.error( + 'Failed to connect to %s:%s: %s' % + (self._addr, self._port, str(e))) + self.mainloop.queue_event(self, ConnectError()) + return + self._sock_proxy.event_source = None + logging.info('Connected to %s:%s' % (self._addr, self._port)) + m = self._machine(self, self._socket, *self._extra_args) + self.mainloop.add_state_machine(m) + self._socket = None + + def _reconnect(self, event_source, event): + logging.info('Reconnecting to %s:%s' % (self._addr, self._port)) + if self._socket is not None: + self._socket.close() + self._timer.stop() + self._start_connect() + + def _stop(self, event_source, event): + logging.info( + 'Stopping connection attempts to %s:%s' % (self._addr, self._port)) + self.mainloop.remove_event_source(self._timer) + if self._socket is not None: + self._socket.close() + self._socket = None + + def _start_timer(self, event_source, event): + self._timer.start() + diff --git a/distbuild/crashpoint.py b/distbuild/crashpoint.py new file mode 100644 index 00000000..01bfbd6b --- /dev/null +++ b/distbuild/crashpoint.py @@ -0,0 +1,126 @@ +# distbuild/crashpoint.py -- user-controlled crashing +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +'''Crash the application. + +For crash testing, it's useful to easily induce crashes, to see how the +rest of the system manages. This module implements user-controllable +crashes. The code will be sprinkled with calls to the ``crash_point`` +function, which crashes the process if call matches a set of user-defined +criteria. + +The criteria consist of: + +* a filename +* a function name +* a maximum call count + +The criterion is fullfilled if ``crash_point`` is called from the named +function defined in the named file more than the given number of times. +Filename matching is using substrings (a filename pattern ``foo.py`` +matches an actual source file path of +``/usr/lib/python2.7/site-packages/distbuild/foo.py``), but function +names must match exactly. It is not possible to match on class names +(since that information is not available from a traceback). + +''' + + +import logging +import os +import sys +import traceback + + +detailed_logging = False + + +def debug(msg): # pragma: no cover + if detailed_logging: + logging.debug(msg) + + +class CrashCondition(object): + + def __init__(self, filename, funcname, max_calls): + self.filename = filename + self.funcname = funcname + self.max_calls = max_calls + self.called = 0 + + def matches(self, filename, funcname): + if self.filename not in filename: + debug( + 'crashpoint: filename mismatch: %s not in %s' % + (repr(self.filename), repr(filename))) + return False + + if self.funcname != funcname: + debug( + 'crashpoint: funcname mismatch: %s != %s' % + (self.funcname, funcname)) + return False + + debug('crashpoint: matches: %s %s' % (filename, funcname)) + return True + + def triggered(self, filename, funcname): + if self.matches(filename, funcname): + self.called += 1 + return self.called >= self.max_calls + else: + return False + + +crash_conditions = [] + + +def add_crash_condition(filename, funcname, max_calls): + crash_conditions.append(CrashCondition(filename, funcname, max_calls)) + + +def add_crash_conditions(strings): + for s in strings: + words = s.split(':') + if len(words) != 3: # pragma: no cover + logging.error('Ignoring malformed crash condition: %s' % repr(s)) + else: + add_crash_condition(words[0], words[1], int(words[2])) + + +def clear_crash_conditions(): + del crash_conditions[:] + + +def crash_point(frame=None): + if frame is None: + frames = traceback.extract_stack(limit=2) + frame = frames[0] + + filename, lineno, funcname, text = frame + + for condition in crash_conditions: + if condition.triggered(filename, funcname): + logging.critical( + 'Crash triggered from %s:%s:%s' % (filename, lineno, funcname)) + sys.exit(255) + else: + debug( + 'Crash not triggered by %s:%s:%s' % + (filename, lineno, funcname)) + diff --git a/distbuild/crashpoint_tests.py b/distbuild/crashpoint_tests.py new file mode 100644 index 00000000..2761ccac --- /dev/null +++ b/distbuild/crashpoint_tests.py @@ -0,0 +1,109 @@ +# distbuild/crashpoint_tests.py -- unit tests for crashpoint.py +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import unittest + +import crashpoint + + +class CrashConditionTests(unittest.TestCase): + + def setUp(self): + self.c = crashpoint.CrashCondition('bar', 'foofunc', 0) + + def test_matches_exact_filename(self): + self.assertTrue(self.c.matches('bar', 'foofunc')) + + def test_matches_basename(self): + self.assertTrue(self.c.matches('dir/bar', 'foofunc')) + + def test_matches_partial_basename(self): + self.assertTrue(self.c.matches('dir/bar.py', 'foofunc')) + + def test_matches_dirname(self): + self.assertTrue(self.c.matches('bar/something.py', 'foofunc')) + + def test_doesnt_match_wrong_function_name(self): + self.assertFalse(self.c.matches('bar', 'foo')) + + def test_triggered_first_time_with_zero_count(self): + c = crashpoint.CrashCondition('bar', 'foofunc', 0) + self.assertTrue(c.triggered('bar', 'foofunc')) + + def test_triggered_first_time_with_zero_count(self): + c = crashpoint.CrashCondition('bar', 'foofunc', 0) + self.assertTrue(c.triggered('bar', 'foofunc')) + + def test_triggered_second_time_with_zero_count(self): + c = crashpoint.CrashCondition('bar', 'foofunc', 0) + self.assertTrue(c.triggered('bar', 'foofunc')) + self.assertTrue(c.triggered('bar', 'foofunc')) + + def test_triggered_first_time_with_count_of_one(self): + c = crashpoint.CrashCondition('bar', 'foofunc', 1) + self.assertTrue(c.triggered('bar', 'foofunc')) + + def test_triggered_second_time_with_count_of_two(self): + c = crashpoint.CrashCondition('bar', 'foofunc', 2) + self.assertFalse(c.triggered('bar', 'foofunc')) + self.assertTrue(c.triggered('bar', 'foofunc')) + + def test_not_triggered_if_not_matched(self): + c = crashpoint.CrashCondition('bar', 'foofunc', 0) + self.assertFalse(c.triggered('bar', 'otherfunc')) + + +class CrashConditionsListTests(unittest.TestCase): + + def setUp(self): + crashpoint.clear_crash_conditions() + + def test_no_conditions_initially(self): + self.assertEqual(crashpoint.crash_conditions, []) + + def test_adds_condition(self): + crashpoint.add_crash_condition('foo.py', 'bar', 0) + self.assertEqual(len(crashpoint.crash_conditions), 1) + c = crashpoint.crash_conditions[0] + self.assertEqual(c.filename, 'foo.py') + self.assertEqual(c.funcname, 'bar') + self.assertEqual(c.max_calls, 0) + + def test_adds_conditions_from_list_of_strings(self): + crashpoint.add_crash_conditions(['foo.py:bar:0']) + self.assertEqual(len(crashpoint.crash_conditions), 1) + c = crashpoint.crash_conditions[0] + self.assertEqual(c.filename, 'foo.py') + self.assertEqual(c.funcname, 'bar') + self.assertEqual(c.max_calls, 0) + + +class CrashPointTests(unittest.TestCase): + + def setUp(self): + crashpoint.clear_crash_conditions() + crashpoint.add_crash_condition('foo.py', 'bar', 0) + + def test_triggers_crash(self): + self.assertRaises( + SystemExit, + crashpoint.crash_point, frame=('foo.py', 123, 'bar', 'text')) + + def test_does_not_trigger_crash(self): + self.assertEqual(crashpoint.crash_point(), None) + diff --git a/distbuild/eventsrc.py b/distbuild/eventsrc.py new file mode 100644 index 00000000..11bb16e8 --- /dev/null +++ b/distbuild/eventsrc.py @@ -0,0 +1,48 @@ +# mainloop/eventsrc.py -- interface for event sources +# +# Copyright 2012 Codethink Limited. +# All rights reserved. + + +class EventSource(object): + + '''A source of events for state machines. + + This is a base class. + + An event source watches one file descriptor, and returns events + related to it. The events may vary depending on the file descriptor. + The actual watching is done using select.select. + + ''' + + def get_select_params(self): + '''Return parameters to use for select for this event source. + + Three lists of file descriptors, and a timeout are returned. + The three lists and the timeout are used as arguments to the + select.select function, though they may be manipulated and + combined with return values from other event sources. + + ''' + + return [], [], [], None + + def get_events(self, r, w, x): + '''Return events related to this file descriptor. + + The arguments are the return values of select.select. + + ''' + + return [] + + def is_finished(self): + '''Is this event source finished? + + It's finished if it won't ever return any new events. + + ''' + + return False + diff --git a/distbuild/helper_router.py b/distbuild/helper_router.py new file mode 100644 index 00000000..752a5fdb --- /dev/null +++ b/distbuild/helper_router.py @@ -0,0 +1,197 @@ +# distbuild/helper_router.py -- state machine for controller's helper comms +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import logging + +import distbuild + + +class HelperRequest(object): + + def __init__(self, msg): + self.msg = msg + + +class HelperOutput(object): + + def __init__(self, msg): + self.msg = msg + + +class HelperResult(object): + + def __init__(self, msg): + self.msg = msg + + +class HelperRouter(distbuild.StateMachine): + + '''Route JSON messages between helpers and other state machines. + + This state machine relays and schedules access to one distbuild-helper + process. The helper process connects to a socket, which causes an + instance of HelperRouter to be created (one per connection). The + various instances co-ordinate requests automatically amongst + themselves. + + Other state machines in the same mainloop as HelperRouter can + request work from the helper process by emitting an event: + + * event source: the distbuild.HelperProcess class + * event: distbuild.HelperRequest instance + + The HelperRequest event gets a message to be serialised as JSON. + The message must be a Pythondict that the distbuild-helper understands. + + HelperRouter will send the msg to the next available helper process. + When the helper sends back the result, HelperRouter will emit a + HelperResult event, using the same ``request_id`` as the request had. + + For its internal use, HelperRouter sets the ``id`` item in the + request object. + + ''' + + _pending_requests = [] + _running_requests = {} + _pending_helpers = [] + _request_counter = distbuild.IdentifierGenerator('HelperRouter') + _route_map = distbuild.RouteMap() + + def __init__(self, conn): + distbuild.StateMachine.__init__(self, 'idle') + self.conn = conn + + def setup(self): + jm = distbuild.JsonMachine(self.conn) + self.mainloop.add_state_machine(jm) + + spec = [ + ('idle', HelperRouter, HelperRequest, 'idle', + self._handle_request), + ('idle', jm, distbuild.JsonNewMessage, 'idle', self._helper_msg), + ('idle', jm, distbuild.JsonEof, None, self._close), + ] + self.add_transitions(spec) + + def _handle_request(self, event_source, event): + '''Send request received via mainloop, or put in queue.''' + logging.debug('HelperRouter: received request: %s', repr(event.msg)) + self._enqueue_request(event.msg) + if self._pending_helpers: + self._send_request() + + def _helper_msg(self, event_source, event): + '''Handle message from helper.''' + +# logging.debug('HelperRouter: from helper: %s', repr(event.msg)) + + handlers = { + 'helper-ready': self._handle_helper_ready, + 'exec-output': self._handle_exec_output, + 'exec-response': self._handle_exec_response, + 'http-response': self._handle_http_response, + } + + handler = handlers[event.msg['type']] + handler(event_source, event.msg) + + def _handle_helper_ready(self, event_source, msg): + self._pending_helpers.append(event_source) + if self._pending_requests: + self._send_request() + + def _get_request(self, msg): + request_id = msg['id'] + if request_id in self._running_requests: + request, helper = self._running_requests[request_id] + return request + elif request_id is None: + logging.error( + 'Helper process sent message without "id" field: %s', + repr(event.msg)) + else: + logging.error( + 'Helper process sent message with unknown id: %s', + repr(event.msg)) + + def _new_message(self, msg): + old_id = msg['id'] + new_msg = dict(msg) + new_msg['id'] = self._route_map.get_incoming_id(old_id) + return new_msg + + def _handle_exec_output(self, event_source, msg): + request = self._get_request(msg) + if request is not None: + new_msg = self._new_message(msg) + self.mainloop.queue_event(HelperRouter, HelperOutput(new_msg)) + + def _handle_exec_response(self, event_source, msg): + request = self._get_request(msg) + if request is not None: + new_msg = self._new_message(msg) + self._route_map.remove(msg['id']) + del self._running_requests[msg['id']] + self.mainloop.queue_event(HelperRouter, HelperResult(new_msg)) + + def _handle_http_response(self, event_source, msg): + request = self._get_request(msg) + if request is not None: + new_msg = self._new_message(msg) + self._route_map.remove(msg['id']) + del self._running_requests[msg['id']] + self.mainloop.queue_event(HelperRouter, HelperResult(new_msg)) + + def _close(self, event_source, event): + logging.debug('HelperRouter: closing: %s', repr(event_source)) + event_source.close() + + # Remove from pending helpers. + if event_source in self._pending_helpers: + self._pending_helpers.remove(event_source) + + # Re-queue any requests running on the hlper that just quit. + for request_id in self._running_requests.keys(): + request, helper = self._running_requests[request_id] + if event_source == helper: + del self._running_requests[request_id] + self._enqueue_request(request) + + # Finally, if there are any pending requests and helpers, + # send requests. + while self._pending_requests and self._pending_helpers: + self._send_request() + + def _enqueue_request(self, request): + '''Put request into queue.''' +# logging.debug('HelperRouter: enqueuing request: %s' % repr(request)) + old_id = request['id'] + new_id = self._request_counter.next() + request['id'] = new_id + self._route_map.add(old_id, new_id) + self._pending_requests.append(request) + + def _send_request(self): + '''Pick the first queued request and send it to an available helper.''' + request = self._pending_requests.pop(0) + helper = self._pending_helpers.pop() + self._running_requests[request['id']] = (request, helper) + helper.send(request) +# logging.debug('HelperRouter: sent to helper: %s', repr(request)) + diff --git a/distbuild/idgen.py b/distbuild/idgen.py new file mode 100644 index 00000000..b642bd98 --- /dev/null +++ b/distbuild/idgen.py @@ -0,0 +1,33 @@ +# distbuild/idgen.py -- generate unique identifiers +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import logging + + +class IdentifierGenerator(object): + + '''Generate unique identifiers.''' + + def __init__(self, series): + self._series = series + self._counter = 0 + + def next(self): + self._counter += 1 + return '%s-%d' % (self._series, self._counter) + diff --git a/distbuild/initiator.py b/distbuild/initiator.py new file mode 100644 index 00000000..9c7e2ddf --- /dev/null +++ b/distbuild/initiator.py @@ -0,0 +1,195 @@ +# distbuild/initiator.py -- state machine for the initiator +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import cliapp +import logging +import random +import sys + +import distbuild + + +class _Finished(object): + + def __init__(self, msg): + self.msg = msg + + +class _Failed(object): + + def __init__(self, msg): + self.msg = msg + + +class Initiator(distbuild.StateMachine): + + def __init__(self, cm, conn, app, repo_name, ref, morphology): + distbuild.StateMachine.__init__(self, 'waiting') + self._cm = cm + self._conn = conn + self._app = app + self._repo_name = repo_name + self._ref = ref + self._morphology = morphology + self._steps = None + self._step_outputs = {} + self.debug_transitions = True + + def setup(self): + distbuild.crash_point() + + self._jm = distbuild.JsonMachine(self._conn) + self.mainloop.add_state_machine(self._jm) + logging.debug('initiator: _jm=%s' % repr(self._jm)) + + spec = [ + ('waiting', self._jm, distbuild.JsonEof, None, self._terminate), + ('waiting', self._jm, distbuild.JsonNewMessage, 'waiting', + self._handle_json_message), + ('waiting', self, _Finished, None, self._succeed), + ('waiting', self, _Failed, None, self._fail), + ] + self.add_transitions(spec) + + random_id = random.randint(0, 2**32-1) + + self._app.status( + msg='Requesting build of %(repo)s %(ref)s %(morph)s', + repo=self._repo_name, + ref=self._ref, + morph=self._morphology) + msg = distbuild.message('build-request', + id=random_id, + repo=self._repo_name, + ref=self._ref, + morphology=self._morphology + ) + self._jm.send(msg) + logging.debug('Initiator: sent to controller: %s', repr(msg)) + + def _handle_json_message(self, event_source, event): + distbuild.crash_point() + + logging.debug('Initiator: from controller: %s' % repr(event.msg)) + + handlers = { + 'build-finished': self._handle_build_finished_message, + 'build-failed': self._handle_build_failed_message, + 'build-progress': self._handle_build_progress_message, + 'build-steps': self._handle_build_steps_message, + 'step-started': self._handle_step_started_message, + 'step-output': self._handle_step_output_message, + 'step-finished': self._handle_step_finished_message, + 'step-failed': self._handle_step_failed_message, + } + + handler = handlers[event.msg['type']] + handler(event.msg) + + def _handle_build_finished_message(self, msg): + self.mainloop.queue_event(self, _Finished(msg)) + + def _handle_build_failed_message(self, msg): + self.mainloop.queue_event(self, _Failed(msg)) + + def _handle_build_progress_message(self, msg): + self._app.status(msg='Progress: %(msgtext)s', msgtext=msg['message']) + + def _handle_build_steps_message(self, msg): + self._steps = msg['steps'] + self._app.status( + msg='Build steps in total: %(steps)d', + steps=len(self._steps)) + + def _open_output(self, msg): + assert msg['step_name'] not in self._step_outputs + filename = 'build-step-%s.log' % msg['step_name'] + f = open(filename, 'a') + self._step_outputs[msg['step_name']] = f + worker = msg['worker'] + f.write('Worker: %s\n%s\n\n' % (worker, '-' * len(worker))) + + def _close_output(self, msg): + self._step_outputs[msg['step_name']].close() + del self._step_outputs[msg['step_name']] + + def _handle_step_started_message(self, msg): + self._app.status( + msg='Started building %(step_name)s on %(worker_name)s', + step_name=msg['step_name'], + worker_name=msg['worker_name']) + self._open_output(msg) + + def _handle_step_output_message(self, msg): + step_name = msg['step_name'] + if step_name in self._step_outputs: + f = self._step_outputs[step_name] + f.write(msg['stdout']) + f.write(msg['stderr']) + f.flush() + else: + logging.warning( + 'Got step-output message for unknown step: %s' % repr(msg)) + + def _handle_step_finished_message(self, msg): + step_name = msg['step_name'] + if step_name in self._step_outputs: + self._app.status( + msg='Finished building %(step_name)s', + step_name=step_name) + self._close_output(msg) + else: + logging.warning( + 'Got step-finished message for unknown step: %s' % repr(msg)) + + def _handle_step_failed_message(self, msg): + step_name = msg['step_name'] + if step_name in self._step_outputs: + self._app.status( + msg='Build failed: %(step_name)s', + step_name=step_name) + self._close_output(msg) + else: + logging.warning( + 'Got step-failed message for unknown step: %s' % repr(msg)) + + def _succeed(self, event_source, event): + self.mainloop.queue_event(self._cm, distbuild.StopConnecting()) + self._jm.close() + logging.info('Build finished OK') + + urls = event.msg['urls'] + if urls: + for url in urls: + self._app.status(msg='Artifact: %(url)s', url=url) + else: + self._app.status( + msg='Controller did not give us any artifact URLs.') + + def _fail(self, event_source, event): + self.mainloop.queue_event(self._cm, distbuild.StopConnecting()) + self._jm.close() + raise cliapp.AppException( + 'Failed to build %s %s %s: %s' % + (self._repo_name, self._ref, self._morphology, + event.msg['reason'])) + + def _terminate(self, event_source, event): + self.mainloop.queue_event(self._cm, distbuild.StopConnecting()) + self._jm.close() + diff --git a/distbuild/initiator_connection.py b/distbuild/initiator_connection.py new file mode 100644 index 00000000..48d083e4 --- /dev/null +++ b/distbuild/initiator_connection.py @@ -0,0 +1,216 @@ +# distbuild/initiator_connection.py -- communicate with initiator +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import logging + +import distbuild + + +class InitiatorDisconnect(object): + + def __init__(self, id): + self.id = id + + +class _Close(object): + + def __init__(self, event_source): + self.event_source = event_source + + +class InitiatorConnection(distbuild.StateMachine): + + '''Communicate with the initiator. + + This state machine communicates with the initiator, relaying and + translating messages from the initiator to the rest of the controller's + state machines, and vice versa. + + ''' + + _idgen = distbuild.IdentifierGenerator('InitiatorConnection') + _route_map = distbuild.RouteMap() + + def __init__(self, conn, artifact_cache_server, morph_instance): + distbuild.StateMachine.__init__(self, 'idle') + self.conn = conn + self.artifact_cache_server = artifact_cache_server + self.morph_instance = morph_instance + + def setup(self): + self.jm = distbuild.JsonMachine(self.conn) + self.mainloop.add_state_machine(self.jm) + + self.our_ids = set() + + spec = [ + ('idle', self.jm, distbuild.JsonNewMessage, 'idle', + self._handle_msg), + ('idle', self.jm, distbuild.JsonEof, 'closing', self._disconnect), + ('idle', distbuild.BuildController, distbuild.BuildFinished, + 'idle', self._send_build_finished_message), + ('idle', distbuild.BuildController, distbuild.BuildFailed, + 'idle', self._send_build_failed_message), + ('idle', distbuild.BuildController, distbuild.BuildProgress, + 'idle', self._send_build_progress_message), + ('idle', distbuild.BuildController, distbuild.BuildSteps, + 'idle', self._send_build_steps_message), + ('idle', distbuild.BuildController, distbuild.BuildStepStarted, + 'idle', self._send_build_step_started_message), + ('idle', distbuild.BuildController, distbuild.BuildOutput, + 'idle', self._send_build_output_message), + ('idle', distbuild.BuildController, distbuild.BuildStepFinished, + 'idle', self._send_build_step_finished_message), + ('idle', distbuild.BuildController, distbuild.BuildStepFailed, + 'idle', self._send_build_step_failed_message), + ('closing', self, _Close, None, self._close), + ] + self.add_transitions(spec) + + def _handle_msg(self, event_source, event): + '''Handle message from initiator.''' + + logging.debug( + 'InitiatorConnection: from initiator: %s', repr(event.msg)) + + if event.msg['type'] == 'build-request': + new_id = self._idgen.next() + self.our_ids.add(new_id) + self._route_map.add(event.msg['id'], new_id) + event.msg['id'] = new_id + build_controller = distbuild.BuildController( + event.msg, self.artifact_cache_server, self.morph_instance) + self.mainloop.add_state_machine(build_controller) + + def _disconnect(self, event_source, event): + for id in self.our_ids: + logging.debug('InitiatorConnection: InitiatorDisconnect(%s)' + % str(id)) + self.mainloop.queue_event(InitiatorConnection, + InitiatorDisconnect(id)) + # TODO should this clear our_ids? + self.mainloop.queue_event(self, _Close(event_source)) + + def _close(self, event_source, event): + logging.debug('InitiatorConnection: closing: %s', + repr(event.event_source)) + + event.event_source.close() + + def _handle_result(self, event_source, event): + '''Handle result from helper.''' + + if event.msg['id'] in self.our_ids: + logging.debug( + 'InitiatorConnection: received result: %s', repr(event.msg)) + self.jm.send(event.msg) + + def _send_build_finished_message(self, event_source, event): + if event.id in self.our_ids: + msg = distbuild.message('build-finished', + id=self._route_map.get_incoming_id(event.id), + urls=event.urls) + self._route_map.remove(event.id) + self.our_ids.remove(event.id) + self.jm.send(msg) + logging.debug( + 'InitiatorConnection: sent to initiator: %s', repr(msg)) + + def _send_build_failed_message(self, event_source, event): + if event.id in self.our_ids: + msg = distbuild.message('build-failed', + id=self._route_map.get_incoming_id(event.id), + reason=event.reason) + self._route_map.remove(event.id) + self.our_ids.remove(event.id) + self.jm.send(msg) + logging.debug( + 'InitiatorConnection: sent to initiator: %s', repr(msg)) + + def _send_build_progress_message(self, event_source, event): + if event.id in self.our_ids: + msg = distbuild.message('build-progress', + id=self._route_map.get_incoming_id(event.id), + message=event.message_text) + self.jm.send(msg) + logging.debug( + 'InitiatorConnection: sent to initiator: %s', repr(msg)) + + def _send_build_steps_message(self, event_source, event): + + def make_step_dict(artifact): + return { + 'name': distbuild.build_step_name(artifact), + 'build-depends': [ + distbuild.build_step_name(x) + for x in artifact.dependencies + ] + } + + if event.id in self.our_ids: + step_names = distbuild.map_build_graph( + event.artifact, make_step_dict) + msg = distbuild.message('build-steps', + id=self._route_map.get_incoming_id(event.id), + steps=step_names) + self.jm.send(msg) + logging.debug( + 'InitiatorConnection: sent to initiator: %s', repr(msg)) + + def _send_build_step_started_message(self, event_source, event): + if event.id in self.our_ids: + msg = distbuild.message('step-started', + id=self._route_map.get_incoming_id(event.id), + step_name=event.step_name, + worker_name=event.worker_name) + self.jm.send(msg) + logging.debug( + 'InitiatorConnection: sent to initiator: %s', repr(msg)) + + def _send_build_output_message(self, event_source, event): + logging.debug('InitiatorConnection: build_output: ' + 'id=%s stdout=%s stderr=%s' % + (repr(event.id), repr(event.stdout), repr(event.stderr))) + if event.id in self.our_ids: + msg = distbuild.message('step-output', + id=self._route_map.get_incoming_id(event.id), + step_name=event.step_name, + stdout=event.stdout, + stderr=event.stderr) + self.jm.send(msg) + logging.debug( + 'InitiatorConnection: sent to initiator: %s', repr(msg)) + + def _send_build_step_finished_message(self, event_source, event): + if event.id in self.our_ids: + msg = distbuild.message('step-finished', + id=self._route_map.get_incoming_id(event.id), + step_name=event.step_name) + self.jm.send(msg) + logging.debug( + 'InitiatorConnection: sent to initiator: %s', repr(msg)) + + def _send_build_step_failed_message(self, event_source, event): + if event.id in self.our_ids: + msg = distbuild.message('step-failed', + id=self._route_map.get_incoming_id(event.id), + step_name=event.step_name) + self.jm.send(msg) + logging.debug( + 'InitiatorConnection: sent to initiator: %s', repr(msg)) + diff --git a/distbuild/jm.py b/distbuild/jm.py new file mode 100644 index 00000000..ae222c00 --- /dev/null +++ b/distbuild/jm.py @@ -0,0 +1,98 @@ +# mainloop/jm.py -- state machine for JSON communication between nodes +# +# Copyright 2012 Codethink Limited. +# All rights reserved. + + +import fcntl +import json +import logging +import os +import socket +import sys + +from sm import StateMachine +from stringbuffer import StringBuffer +from sockbuf import (SocketBuffer, SocketBufferNewData, + SocketBufferEof, SocketError) + + +class JsonNewMessage(object): + + def __init__(self, msg): + self.msg = msg + + +class JsonEof(object): + + pass + + +class _Close2(object): + + pass + + +class JsonMachine(StateMachine): + + '''A state machine for sending/receiving JSON messages across TCP.''' + + max_buffer = 16 * 1024 + + def __init__(self, conn): + StateMachine.__init__(self, 'rw') + self.conn = conn + self.debug_json = False + + def setup(self): + sockbuf = self.sockbuf = SocketBuffer(self.conn, self.max_buffer) + self.mainloop.add_state_machine(sockbuf) + + self._eof = False + self.receive_buf = StringBuffer() + + spec = [ + ('rw', sockbuf, SocketBufferNewData, 'rw', self._parse), + ('rw', sockbuf, SocketBufferEof, 'w', self._send_eof), + ('rw', self, _Close2, None, self._really_close), + + ('w', self, _Close2, None, self._really_close), + ] + self.add_transitions(spec) + + def send(self, msg): + '''Send a message to the other side.''' + self.sockbuf.write('%s\n' % json.dumps(msg)) + + def close(self): + '''Tell state machine it should shut down. + + The state machine will vanish once it has flushed any pending + writes. + + ''' + + self.mainloop.queue_event(self, _Close2()) + + def _parse(self, event_source, event): + data = event.data + self.receive_buf.add(data) + if self.debug_json: + logging.debug('JsonMachine: Received: %s' % repr(data)) + while True: + line = self.receive_buf.readline() + if line is None: + break + line = line.rstrip() + if self.debug_json: + logging.debug('JsonMachine: line: %s' % repr(line)) + msg = json.loads(line) + self.mainloop.queue_event(self, JsonNewMessage(msg)) + + def _send_eof(self, event_source, event): + self.mainloop.queue_event(self, JsonEof()) + + def _really_close(self, event_source, event): + self.sockbuf.close() + self._send_eof(event_source, event) + diff --git a/distbuild/json_router.py b/distbuild/json_router.py new file mode 100644 index 00000000..bf272174 --- /dev/null +++ b/distbuild/json_router.py @@ -0,0 +1,164 @@ +# distbuild/json_router.py -- state machine to route JSON messages +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import logging + +import distbuild + + +class JsonRouter(distbuild.StateMachine): + + '''Route JSON messages between clients and helpers. + + This state machine receives JSON messages from clients and helpers, + and routes messages between them. + + Each incoming request is labeled with a unique identifier, then + sent to the next free helper. The helper's response will retain + the unique id, so that the response can be routed to the right + client. + + ''' + + pending_requests = [] + running_requests = {} + pending_helpers = [] + request_counter = distbuild.IdentifierGenerator('JsonRouter') + route_map = distbuild.RouteMap() + + def __init__(self, conn): + distbuild.StateMachine.__init__(self, 'idle') + self.conn = conn + logging.debug('JsonMachine: connection from %s', conn.getpeername()) + + def setup(self): + jm = distbuild.JsonMachine(self.conn) + jm.debug_json = True + self.mainloop.add_state_machine(jm) + + spec = [ + ('idle', jm, distbuild.JsonNewMessage, 'idle', self.bloop), + ('idle', jm, distbuild.JsonEof, None, self.close), + ] + self.add_transitions(spec) + + def _lookup_request(self, request_id): + if request_id in self.running_requests: + return self.running_requests[request_id] + else: + return None + + def bloop(self, event_source, event): + logging.debug('JsonRouter: got msg: %s', repr(event.msg)) + handlers = { + 'http-request': self.do_request, + 'http-response': self.do_response, + 'exec-request': self.do_request, + 'exec-cancel': self.do_cancel, + 'exec-output': self.do_exec_output, + 'exec-response': self.do_response, + 'helper-ready': self.do_helper_ready, + } + handler = handlers.get(event.msg['type']) + handler(event_source, event) + + def do_request(self, client, event): + self._enqueue_request(client, event.msg) + if self.pending_helpers: + self._send_request() + + def do_cancel(self, client, event): + for id in self.route_map.get_outgoing_ids(event.msg['id']): + logging.debug('JsonRouter: looking up request for id %s', id) + t = self._lookup_request(id) + if t: + helper = t[2] + new = dict(event.msg) + new['id'] = id + helper.send(new) + logging.debug('JsonRouter: sent to helper: %s', repr(new)) + + def do_response(self, helper, event): + t = self._lookup_request(event.msg['id']) + if t: + client, msg, helper = t + new = dict(event.msg) + new['id'] = self.route_map.get_incoming_id(msg['id']) + client.send(new) + logging.debug('JsonRouter: sent to client: %s', repr(new)) + + def do_helper_ready(self, helper, event): + self.pending_helpers.append(helper) + if self.pending_requests: + self._send_request() + + def do_exec_output(self, helper, event): + t = self._lookup_request(event.msg['id']) + if t: + client, msg, helper = t + new = dict(event.msg) + new['id'] = self.route_map.get_incoming_id(msg['id']) + client.send(new) + logging.debug('JsonRouter: sent to client: %s', repr(new)) + + def close(self, event_source, event): + logging.debug('closing: %s', repr(event_source)) + event_source.close() + + # Remove from pending helpers. + if event_source in self.pending_helpers: + self.pending_helpers.remove(event_source) + + # Remove from running requests, and put the request back in the + # pending requests queue if the helper quit (but not if the + # client quit). + for request_id in self.running_requests.keys(): + client, msg, helper = self.running_requests[request_id] + if event_source == client: + del self.running_requests[request_id] + elif event_source == helper: + del self.running_requests[request_id] + self._enqueue_request(client, msg) + + # Remove from pending requests, if the client quit. + i = 0 + while i < len(self.pending_requests): + client, msg = self.pending_requests[i] + if event_source == client: + del self.pending_requests[i] + else: + i += 1 + + # Finally, if there are any pending requests and helpers, + # send requests. + while self.pending_requests and self.pending_helpers: + self._send_request() + + def _enqueue_request(self, client, msg): + new = dict(msg) + new['id'] = self.request_counter.next() + self.route_map.add(msg['id'], new['id']) + self.pending_requests.append((client, new)) + + def _send_request(self): + client, msg = self.pending_requests.pop(0) + helper = self.pending_helpers.pop() + self.running_requests[msg['id']] = (client, msg, helper) + helper.send(msg) + logging.debug('JsonRouter: sent to helper: %s', repr(msg)) + diff --git a/distbuild/mainloop.py b/distbuild/mainloop.py new file mode 100644 index 00000000..e3f9ae2d --- /dev/null +++ b/distbuild/mainloop.py @@ -0,0 +1,117 @@ +# mainloop/mainloop.py -- select-based main loop +# +# Copyright 2012 Codethink Limited. +# All rights reserved. + + +import fcntl +import logging +import os +import select + + +class MainLoop(object): + + '''A select-based main loop. + + The main loop watches a set of file descriptors wrapped in + EventSource objects, and when something happens with them, + asks the EventSource objects to create events, which it then + feeds into user-supplied state machines. The state machines + can create further events, which are processed further. + + When nothing is happening, the main loop sleeps in the + select.select call. + + ''' + + def __init__(self): + self._machines = [] + self._sources = [] + self._events = [] + self.dump_filename = None + + def add_state_machine(self, machine): + logging.debug('MainLoop.add_state_machine: %s' % machine) + machine.mainloop = self + machine.setup() + self._machines.append(machine) + if self.dump_filename: + filename = '%s%s.dot' % (self.dump_filename, + machine.__class__.__name__) + machine.dump_dot(filename) + + def remove_state_machine(self, machine): + logging.debug('MainLoop.remove_state_machine: %s' % machine) + self._machines.remove(machine) + + def add_event_source(self, event_source): + logging.debug('MainLoop.add_event_source: %s' % event_source) + self._sources.append(event_source) + + def remove_event_source(self, event_source): + logging.debug('MainLoop.remove_event_source: %s' % event_source) + self._sources.remove(event_source) + + def _setup_select(self): + r = [] + w = [] + x = [] + timeout = None + + self._sources = [s for s in self._sources if not s.is_finished()] + + for event_source in self._sources: + sr, sw, sx, st = event_source.get_select_params() + r.extend(sr) + w.extend(sw) + x.extend(sx) + if timeout is None: + timeout = st + elif st is not None: + timeout = min(timeout, st) + + return r, w, x, timeout + + def _run_once(self): + r, w, x, timeout = self._setup_select() + assert r or w or x or timeout is not None + r, w, x = select.select(r, w, x, timeout) + + for event_source in self._sources: + if event_source.is_finished(): + self.remove_event_source(event_source) + else: + for event in event_source.get_events(r, w, x): + self.queue_event(event_source, event) + + for event_source, event in self._dequeue_events(): + for machine in self._machines[:]: + for new_event in machine.handle_event(event_source, event): + self.queue_event(event_source, new_event) + if machine.state is None: + self.remove_state_machine(machine) + + def run(self): + '''Run the main loop. + + The main loop terminates when there are no state machines to + run anymore. + + ''' + + logging.debug('MainLoop starts') + while self._machines: + self._run_once() + logging.debug('MainLoop ends') + + def queue_event(self, event_source, event): + '''Add an event to queue of events to be processed.''' + + self._events.append((event_source, event)) + + def _dequeue_events(self): + while self._events: + event_queue, event = self._events.pop(0) + yield event_queue, event + diff --git a/distbuild/protocol.py b/distbuild/protocol.py new file mode 100644 index 00000000..5d693761 --- /dev/null +++ b/distbuild/protocol.py @@ -0,0 +1,93 @@ +# distbuild/protocol.py -- abstractions for the JSON messages +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +'''Construct protocol message objects (dicts).''' + + +_types = { + 'build-request': [ + 'id', + 'repo', + 'ref', + 'morphology', + ], + 'build-progress': [ + 'id', + 'message', + ], + 'build-steps': [ + 'id', + 'steps', + ], + 'step-started': [ + 'id', + 'step_name', + 'worker_name', + ], + 'step-output': [ + 'id', + 'step_name', + 'stdout', + 'stderr', + ], + 'step-finished': [ + 'id', + 'step_name', + ], + 'step-failed': [ + 'id', + 'step_name', + ], + 'build-finished': [ + 'id', + 'urls', + ], + 'build-failed': [ + 'id', + 'reason', + ], + 'exec-request': [ + 'id', + 'argv', + 'stdin_contents', + ], + 'exec-cancel': [ + 'id', + ], + 'http-request': [ + 'id', + 'url', + 'method', + ], +} + + +def message(message_type, **kwargs): + assert message_type in _types + required_fields = _types[message_type] + + for name in required_fields: + assert name in kwargs, 'field %s is required' % name + + for name in kwargs: + assert name in required_fields, 'field %s is not allowed' % name + + msg = dict(kwargs) + msg['type'] = message_type + return msg + diff --git a/distbuild/proxy_event_source.py b/distbuild/proxy_event_source.py new file mode 100644 index 00000000..5b35b741 --- /dev/null +++ b/distbuild/proxy_event_source.py @@ -0,0 +1,47 @@ +# distbuild/proxy_event_source.py -- proxy for temporary event sources +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import errno +import logging +import socket + +import distbuild + + +class ProxyEventSource(object): + + '''Proxy event sources that may come and go.''' + + def __init__(self): + self.event_source = None + + def get_select_params(self): + if self.event_source: + return self.event_source.get_select_params() + else: + return [], [], [], None + + def get_events(self, r, w, x): + if self.event_source: + return self.event_source.get_events(r, w, x) + else: + return [] + + def is_finished(self): + return False + diff --git a/distbuild/route_map.py b/distbuild/route_map.py new file mode 100644 index 00000000..f67bcb54 --- /dev/null +++ b/distbuild/route_map.py @@ -0,0 +1,60 @@ +# distbuild/route_map.py -- map message ids for routing purposes +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +class RouteMap(object): + + '''Map message identifiers for routing purposes. + + Various state machines need to handle requests coming from multiple + sources, and they need to keep track of which responses should be + sent to which requestors. This class provides tools for keeping + track of that. + + Each message is expected to have a unique identifier of some sort. + The incoming request message has one, and all responses to it need + to keep that. An incoming request might be converted into one or more + outgoing requests, each with its own unique id. The responses to all + of those need to be mapped back to the original incoming request. + + For this class, we care about "incoming id" and "outgoing id". + There can be multiple outgoing identifiers for one incoming one. + + ''' + + def __init__(self): + self._routes = {} + + def add(self, incoming_id, outgoing_id): + assert (outgoing_id not in self._routes or + self._routes[outgoing_id] == incoming_id) + self._routes[outgoing_id] = incoming_id + + def get_incoming_id(self, outgoing_id): + '''Get the incoming id corresponding to an outgoing one. + + Raise KeyError if not found. + + ''' + + return self._routes[outgoing_id] + + def get_outgoing_ids(self, incoming_id): + return [o for (o, i) in self._routes.iteritems() if i == incoming_id] + + def remove(self, outgoing_id): + del self._routes[outgoing_id] diff --git a/distbuild/route_map_tests.py b/distbuild/route_map_tests.py new file mode 100644 index 00000000..c8ed69b3 --- /dev/null +++ b/distbuild/route_map_tests.py @@ -0,0 +1,56 @@ +# distbuild/route_map_tests.py -- unit tests for message routing +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import unittest + +import distbuild + + +class RouteMapTests(unittest.TestCase): + + def setUp(self): + self.rm = distbuild.RouteMap() + + def test_raises_error_for_unknown_route(self): + self.assertRaises(KeyError, self.rm.get_incoming_id, 'outgoing') + + def test_finds_added_route(self): + self.rm.add('incoming', 'outgoing') + self.assertEqual(self.rm.get_incoming_id('outgoing'), 'incoming') + + def test_finds_outgoing_ids(self): + self.rm.add('incoming', 'outgoing') + self.assertEqual(self.rm.get_outgoing_ids('incoming'), ['outgoing']) + + def test_removes_added_route(self): + self.rm.add('incoming', 'outgoing') + self.rm.remove('outgoing') + self.assertRaises(KeyError, self.rm.get_incoming_id, 'outgoing') + + def test_raises_error_if_forgetting_unknown_route(self): + self.assertRaises(KeyError, self.rm.remove, 'outgoing') + + def test_silently_ignores_adding_existing_route(self): + self.rm.add('incoming', 'outgoing') + self.rm.add('incoming', 'outgoing') + self.assertEqual(self.rm.get_incoming_id('outgoing'), 'incoming') + + def test_raises_assert_if_adding_conflicting_route(self): + self.rm.add('incoming', 'outgoing') + self.assertRaises(AssertionError, self.rm.add, 'different', 'outgoing') + diff --git a/distbuild/serialise.py b/distbuild/serialise.py new file mode 100644 index 00000000..060833b1 --- /dev/null +++ b/distbuild/serialise.py @@ -0,0 +1,166 @@ +# distbuild/serialise.py -- (de)serialise Artifact object graphs +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import json + +import morphlib + + +morphology_attributes = [ + 'needs_artifact_metadata_cached', +] + + +def serialise_artifact(artifact): + '''Serialise an Artifact object and its dependencies into string form.''' + + def encode_morphology(morphology): + result = {} + for key in morphology.keys(): + result[key] = morphology[key] + for x in morphology_attributes: + result['__%s' % x] = getattr(morphology, x) + return result + + def encode_source(source): + source_dic = { + 'repo': None, + 'repo_name': source.repo_name, + 'original_ref': source.original_ref, + 'sha1': source.sha1, + 'tree': source.tree, + 'morphology': encode_morphology(source.morphology), + 'filename': source.filename, + } + if source.morphology['kind'] == 'chunk': + source_dic['build_mode'] = source.build_mode + source_dic['prefix'] = source.prefix + return source_dic + + def encode_single_artifact(a, encoded): + if artifact.source.morphology['kind'] == 'system': + arch = artifact.source.morphology['arch'] + else: + arch = artifact.arch + return { + 'source': encode_source(a.source), + 'name': a.name, + 'cache_id': a.cache_id, + 'cache_key': a.cache_key, + 'dependencies': [encoded[d.cache_key]['cache_key'] + for d in a.dependencies], + 'arch': arch, + } + + visited = set() + def traverse(a): + visited.add(a) + for dep in a.dependencies: + if dep in visited: + continue + for ret in traverse(dep): + yield ret + yield a + + encoded = {} + for a in traverse(artifact): + if a.cache_key not in encoded: + encoded[a.cache_key] = encode_single_artifact(a, encoded) + + encoded['_root'] = artifact.cache_key + return json.dumps(encoded) + + +def deserialise_artifact(encoded): + '''Re-construct the Artifact object (and dependencies). + + The argument should be a string returned by ``serialise_artifact``. + The reconstructed Artifact objects will be sufficiently like the + originals that they can be used as a build graph, and other such + purposes, by Morph. + + ''' + + def unserialise_morphology(le_dict): + '''Convert a dict into something that kinda acts like a Morphology. + + As it happens, we don't need the full Morphology so we cheat. + Cheating is good. + + ''' + + class FakeMorphology(dict): + + def get_commands(self, which): + '''Get commands to run from a morphology or build system''' + if self[which] is None: + attr = '_'.join(which.split('-')) + bs = morphlib.buildsystem.lookup_build_system( + self['build-system']) + return getattr(bs, attr) + else: + return self[which] + + morphology = FakeMorphology(le_dict) + for x in morphology_attributes: + setattr(morphology, x, le_dict['__%s' % x]) + del morphology['__%s' % x] + return morphology + + def unserialise_source(le_dict): + '''Convert a dict into a Source object.''' + + morphology = unserialise_morphology(le_dict['morphology']) + source = morphlib.source.Source(le_dict['repo_name'], + le_dict['original_ref'], + le_dict['sha1'], + le_dict['tree'], + morphology, + le_dict['filename']) + if morphology['kind'] == 'chunk': + source.build_mode = le_dict['build_mode'] + source.prefix = le_dict['prefix'] + return source + + def unserialise_single_artifact(le_dict): + '''Convert dict into an Artifact object. + + Do not set dependencies, that will be dealt with later. + + ''' + + source = unserialise_source(le_dict['source']) + artifact = morphlib.artifact.Artifact(source, le_dict['name']) + artifact.cache_id = le_dict['cache_id'] + artifact.cache_key = le_dict['cache_key'] + artifact.arch = le_dict['arch'] + return artifact + + le_dicts = json.loads(encoded) + cache_keys = [k for k in le_dicts.keys() if k != '_root'] + artifacts = {} + for cache_key in cache_keys: + le_dict = le_dicts[cache_key] + artifacts[cache_key] = unserialise_single_artifact(le_dict) + for cache_key in cache_keys: + le_dict = le_dicts[cache_key] + artifact = artifacts[cache_key] + artifact.dependencies = [artifacts[k] for k in le_dict['dependencies']] + + return artifacts[le_dicts['_root']] + diff --git a/distbuild/serialise_tests.py b/distbuild/serialise_tests.py new file mode 100644 index 00000000..2b4b3af7 --- /dev/null +++ b/distbuild/serialise_tests.py @@ -0,0 +1,148 @@ +# distbuild/serialise_tests.py -- unit tests for Artifact serialisation +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import unittest + +import distbuild + + +class MockMorphology(object): + + def __init__(self, name): + self.dict = { + 'name': '%s.morphology.name' % name, + 'kind': '%s.morphology.kind' % name, + } + self.needs_staging_area = None + self.needs_artifact_metadata_cached = None + + def keys(self): + return self.dict.keys() + + def __getitem__(self, key): + return self.dict[key] + + +class MockSource(object): + + def __init__(self, name): + self.repo = None + self.repo_name = '%s.source.repo_name' % name + self.original_ref = '%s.source.original_ref' % name + self.sha1 = '%s.source.sha1' % name + self.tree = '%s.source.tree' % name + self.morphology = MockMorphology(name) + self.filename = '%s.source.filename' % name + + +class MockArtifact(object): + + def __init__(self, name): + self.source = MockSource(name) + self.name = name + self.cache_id = { + 'blip': '%s.blip' % name, + 'integer': 42, + } + self.cache_key = '%s.cache_key' % name + self.dependencies = [] + + +class SerialisationTests(unittest.TestCase): + + def setUp(self): + self.art1 = MockArtifact('name1') + self.art2 = MockArtifact('name2') + self.art3 = MockArtifact('name3') + self.art4 = MockArtifact('name4') + + def assertEqualMorphologies(self, a, b): + self.assertEqual(sorted(a.keys()), sorted(b.keys())) + keys = sorted(a.keys()) + a_values = [a[k] for k in keys] + b_values = [b[k] for k in keys] + self.assertEqual(a_values, b_values) + self.assertEqual(a.needs_staging_area, b.needs_staging_area) + self.assertEqual(a.needs_artifact_metadata_cached, + b.needs_artifact_metadata_cached) + self.assertEqual(a.needs_staging_area, + b.needs_staging_area) + + def assertEqualSources(self, a, b): + self.assertEqual(a.repo, b.repo) + self.assertEqual(a.repo_name, b.repo_name) + self.assertEqual(a.original_ref, b.original_ref) + self.assertEqual(a.sha1, b.sha1) + self.assertEqual(a.tree, b.tree) + self.assertEqualMorphologies(a.morphology, b.morphology) + self.assertEqual(a.filename, b.filename) + + def assertEqualArtifacts(self, a, b): + self.assertEqualSources(a.source, b.source) + self.assertEqual(a.name, b.name) + self.assertEqual(a.cache_id, b.cache_id) + self.assertEqual(a.cache_key, b.cache_key) + self.assertEqual(len(a.dependencies), len(b.dependencies)) + for i in range(len(a.dependencies)): + self.assertEqualArtifacts(a.dependencies[i], b.dependencies[i]) + + def verify_round_trip(self, artifact): + encoded = distbuild.serialise_artifact(artifact) + decoded = distbuild.deserialise_artifact(encoded) + self.assertEqualArtifacts(artifact, decoded) + + def key(a): + return a.cache_key + + objs = {} + queue = [decoded] + while queue: + obj = queue.pop() + k = key(obj) + if k in objs: + self.assertTrue(obj is objs[k]) + else: + objs[k] = obj + queue.extend(obj.dependencies) + + def test_returns_string(self): + encoded = distbuild.serialise_artifact(self.art1) + self.assertEqual(type(encoded), str) + + def test_works_without_dependencies(self): + self.verify_round_trip(self.art1) + + def test_works_with_single_dependency(self): + self.art1.dependencies = [self.art2] + self.verify_round_trip(self.art1) + + def test_works_with_two_dependencies(self): + self.art1.dependencies = [self.art2, self.art3] + self.verify_round_trip(self.art1) + + def test_works_with_two_levels_of_dependencies(self): + self.art2.dependencies = [self.art4] + self.art1.dependencies = [self.art2, self.art3] + self.verify_round_trip(self.art1) + + def test_works_with_dag(self): + self.art2.dependencies = [self.art4] + self.art3.dependencies = [self.art4] + self.art1.dependencies = [self.art2, self.art3] + self.verify_round_trip(self.art1) + diff --git a/distbuild/sm.py b/distbuild/sm.py new file mode 100644 index 00000000..f50515c3 --- /dev/null +++ b/distbuild/sm.py @@ -0,0 +1,139 @@ +# mainloop/sm.py -- state machine abstraction +# +# Copyright 2012 Codethink Limited. +# All rights reserved. + + +import logging +import re + + +classnamepat = re.compile(r".*)'>") + + +class StateMachine(object): + + '''A state machine abstraction. + + The caller may specify call backs for events coming from specific + event sources. An event source might, for example, be a socket + file descriptor, and the event might be incoming data from the + socket. The callback would then process the data, perhaps by + collecting it into a buffer and parsing out messages from it. + + A callback gets the event source and event as arguments. It returns + the new state, and a list of new events to + + A callback may return or yield new events, which will be handled + eventually. They may or may not be handled in order. + + There can only be one callback for one state, source, and event + class combination. + + States are represented by unique objects, e.g., strings containing + the names of the states. When a machine wants to stop, it sets its + state to None. + + ''' + + def __init__(self, initial_state): + self._transitions = {} + self.state = self._initial_state = initial_state + self.debug_transitions = False + + def setup(self): + '''Set up machine for execution. + + This is called when the machine is added to the main loop. + + ''' + + def _key(self, state, event_source, event_class): + return (state, event_source, event_class) + + def add_transition(self, state, source, event_class, new_state, callback): + '''Add a transition to the state machine. + + When the state machine is in the given state, and an event of + a given type comes from a given source, move the state machine + to the new state and call the callback function. + + ''' + + key = self._key(state, source, event_class) + assert key not in self._transitions, \ + 'Transition %s already registered' % str(key) + self._transitions[key] = (new_state, callback) + + def add_transitions(self, specification): + '''Add many transitions. + + The specification is a list of transitions. + Each transition is a tuple of the arguments given to + ``add_transition``. + + ''' + + for t in specification: + self.add_transition(*t) + + def handle_event(self, event_source, event): + '''Handle a given event. + + Return list of new events to handle. + + ''' + + key = self._key(self.state, event_source, event.__class__) + if key not in self._transitions: + if self.debug_transitions: # pragma: no cover + prefix = '%s: handle_event: ' % self.__class__.__name__ + logging.debug(prefix + 'not relevant for us: %s' % repr(event)) + logging.debug(prefix + 'key: %s', repr(key)) + logging.debug(prefix + 'state: %s', repr(self.state)) + return [] + + new_state, callback = self._transitions[key] + if self.debug_transitions: # pragma: no cover + logging.debug( + '%s: state change %s -> %s callback=%s' % + (self.__class__.__name__, self.state, new_state, + str(callback))) + self.state = new_state + if callback is not None: + ret = callback(event_source, event) + if ret is None: + return [] + else: + return list(ret) + else: + return [] + + def dump_dot(self, filename): # pragma: no cover + '''Write a Graphviz DOT file for the state machine.''' + + with open(filename, 'w') as f: + f.write('digraph %s {\n' % self._classname(self.__class__)) + first = True + for key in self._transitions: + state, src, event_class = key + if first: + f.write('"START" -> "%s" [label=""];\n' % + self._initial_state) + first = False + + new_state, callback = self._transitions[key] + if new_state is None: + new_state = 'END' + f.write('"%s" -> "%s" [label="%s"];\n' % + (state, new_state, self._classname(event_class))) + f.write('}\n') + + def _classname(self, klass): # pragma: no cover + s = str(klass) + m = classnamepat.match(s) + if m: + return m.group('name').split('.')[-1] + else: + return s + diff --git a/distbuild/sm_tests.py b/distbuild/sm_tests.py new file mode 100644 index 00000000..3c33e494 --- /dev/null +++ b/distbuild/sm_tests.py @@ -0,0 +1,86 @@ +# distbuild/sm_tests.py -- unit tests for state machine abstraction +# +# Copyright 2012 Codethink Limited. +# All rights reserved. + + +import unittest + +import distbuild + + +class DummyEventSource(object): + + pass + + +class DummyEvent(object): + + pass + + +class StateMachineTests(unittest.TestCase): + + def setUp(self): + self.sm = distbuild.StateMachine('init') + self.sm.distbuild = None + self.sm.setup() + self.event_source = DummyEventSource() + self.event = DummyEvent() + self.event_sources = [] + self.events = [] + self.callback_result = None + + def callback(self, event_source, event): + self.event_sources.append(event_source) + self.events.append(event) + return self.callback_result + + def test_ignores_event_when_there_are_no_transitions(self): + new_events = self.sm.handle_event(self.event_source, self.event) + self.assertEqual(new_events, []) + self.assertEqual(self.event_sources, []) + self.assertEqual(self.events, []) + + def test_ignores_event_when_no_transition_matches(self): + spec = [ + ('init', self.event_source, str, 'init', self.callback), + ] + self.sm.add_transitions(spec) + new_events = self.sm.handle_event(self.event_source, self.event) + self.assertEqual(new_events, []) + self.assertEqual(self.event_sources, []) + self.assertEqual(self.events, []) + + def test_handles_lack_of_callback_ok(self): + spec = [ + ('init', self.event_source, DummyEvent, 'init', None), + ] + self.sm.add_transitions(spec) + new_events = self.sm.handle_event(self.event_source, self.event) + self.assertEqual(new_events, []) + self.assertEqual(self.event_sources, []) + self.assertEqual(self.events, []) + + def test_calls_registered_callback_for_right_event(self): + spec = [ + ('init', self.event_source, DummyEvent, 'init', self.callback), + ] + self.sm.add_transitions(spec) + new_events = self.sm.handle_event(self.event_source, self.event) + self.assertEqual(new_events, []) + self.assertEqual(self.event_sources, [self.event_source]) + self.assertEqual(self.events, [self.event]) + + def test_handle_converts_nonlist_to_list(self): + self.callback_result = ('foo', 'bar') + + spec = [ + ('init', self.event_source, DummyEvent, 'init', self.callback), + ] + self.sm.add_transitions(spec) + new_events = self.sm.handle_event(self.event_source, self.event) + self.assertEqual(new_events, ['foo', 'bar']) + self.assertEqual(self.event_sources, [self.event_source]) + self.assertEqual(self.events, [self.event]) + diff --git a/distbuild/sockbuf.py b/distbuild/sockbuf.py new file mode 100644 index 00000000..a7fe339a --- /dev/null +++ b/distbuild/sockbuf.py @@ -0,0 +1,159 @@ +# mainloop/sockbuf.py -- a buffering, non-blocking socket I/O state machine +# +# Copyright 2012 Codethink Limited +# All rights reserved. + + +import logging + + +'''A buffering, non-blocking I/O state machine for sockets. + +The state machine is given an open socket. It reads from the socket, +and writes to it, when it can do so without blocking. A maximum size +for the read buffer can be set: the state machine will stop reading +if the buffer becomes full. This avoids the problem of an excessively +large buffer. + +The state machine generates events to indicate that the buffer contains +data or that the end of the file for reading has been reached. An event +is also generated if there is an error while doing I/O with the socket. + +* SocketError: an error has occurred +* SocketBufferNewData: socket buffer has received new data; the data + is available as the ``data`` attribute +* SocketBufferEof: socket buffer has reached EOF for reading, but + still writes anything in the write buffer (or anything that gets added + to the write buffer) +* SocketBufferClosed: socket is now closed + +The state machine starts shutting down when ``close`` method is called, +but continues to operate in write-only mode until the write buffer has +been emptied. + +''' + + +from socketsrc import (SocketError, SocketReadable, SocketWriteable, + SocketEventSource) +from sm import StateMachine +from stringbuffer import StringBuffer + + +class SocketBufferNewData(object): + + '''Socket buffer has received new data.''' + + def __init__(self, data): + self.data = data + + +class SocketBufferEof(object): + + '''Socket buffer has reached end of file when reading. + + Note that the socket buffer may still be available for writing. + However, no more new data will be read. + + ''' + + +class SocketBufferClosed(object): + + '''Socket buffer has closed its socket.''' + + +class _Close(object): pass +class _WriteBufferIsEmpty(object): pass +class _WriteBufferNotEmpty(object): pass + + + +class SocketBuffer(StateMachine): + + def __init__(self, sock, max_buffer): + StateMachine.__init__(self, 'reading') + + self._sock = sock + self._max_buffer = max_buffer + + def setup(self): + src = self._src = SocketEventSource(self._sock) + src.stop_writing() # We'll start writing when we need to. + self.mainloop.add_event_source(src) + + self._wbuf = StringBuffer() + + spec = [ + ('reading', src, SocketReadable, 'reading', self._fill), + ('reading', self, _WriteBufferNotEmpty, 'rw', + self._start_writing), + ('reading', self, SocketBufferEof, 'idle', None), + ('reading', self, _Close, None, self._really_close), + + ('rw', src, SocketReadable, 'rw', self._fill), + ('rw', src, SocketWriteable, 'rw', self._flush), + ('rw', self, _WriteBufferIsEmpty, 'reading', self._stop_writing), + ('rw', self, SocketBufferEof, 'w', None), + ('rw', self, _Close, 'wc', None), + + ('idle', self, _WriteBufferNotEmpty, 'w', self._start_writing), + ('idle', self, _Close, None, self._really_close), + + ('w', src, SocketWriteable, 'w', self._flush), + ('w', self, _WriteBufferIsEmpty, 'idle', self._stop_writing), + + ('wc', src, SocketWriteable, 'wc', self._flush), + ('wc', self, _WriteBufferIsEmpty, None, self._really_close), + ] + self.add_transitions(spec) + + def write(self, data): + '''Put data into write queue.''' + + was_empty = len(self._wbuf) == 0 + self._wbuf.add(data) + if was_empty and len(self._wbuf) > 0: + self._start_writing(None, None) + self.mainloop.queue_event(self, _WriteBufferNotEmpty()) + + def close(self): + '''Tell state machine to terminate.''' + self.mainloop.queue_event(self, _Close()) + + def _report_error(self, event_source, event): + logging.error(str(event)) + + def _fill(self, event_source, event): + try: + data = event.sock.read(self._max_buffer) + except (IOError, OSError), e: + return [SocketError(event.sock, e)] + + if data: + self.mainloop.queue_event(self, SocketBufferNewData(data)) + else: + event_source.stop_reading() + self.mainloop.queue_event(self, SocketBufferEof()) + + def _really_close(self, event_source, event): + self._src.close() + self.mainloop.queue_event(self, SocketBufferClosed()) + + def _flush(self, event_source, event): + max_write = 1024**2 + data = self._wbuf.read(max_write) + try: + n = event.sock.write(data) + except (IOError, OSError), e: + return [SocketError(event.sock, e)] + self._wbuf.remove(n) + if len(self._wbuf) == 0: + self.mainloop.queue_event(self, _WriteBufferIsEmpty()) + + def _start_writing(self, event_source, event): + self._src.start_writing() + + def _stop_writing(self, event_source, event): + self._src.stop_writing() + diff --git a/distbuild/socketsrc.py b/distbuild/socketsrc.py new file mode 100644 index 00000000..78486f0e --- /dev/null +++ b/distbuild/socketsrc.py @@ -0,0 +1,166 @@ +# mainloop/socketsrc.py -- events and event sources for sockets +# +# Copyright 2012 Codethink Limited. +# All rights reserved. + + +import fcntl +import logging +import os +import socket + +from eventsrc import EventSource + + +def set_nonblocking(handle): + '''Make a socket, file descriptor, or other such thing be non-blocking.''' + + if type(handle) is int: + fd = handle + else: + fd = handle.fileno() + + flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0) + flags = flags | os.O_NONBLOCK + fcntl.fcntl(fd, fcntl.F_SETFL, flags) + + +class SocketError(object): + + '''An error has occured with a socket.''' + + def __init__(self, sock, exception): + self.sock = sock + self.exception = exception + + +class NewConnection(object): + + '''A new client connection.''' + + def __init__(self, connection, addr): + self.connection = connection + self.addr = addr + + +class ListeningSocketEventSource(EventSource): + + '''An event source for a socket that listens for connections.''' + + def __init__(self, addr, port): + self.sock = socket.socket() + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.sock.bind((addr, port)) + self.sock.listen(5) + self._accepting = True + logging.info('Listening at %s' % repr(self.sock.getsockname())) + + def get_select_params(self): + r = [self.sock.fileno()] if self._accepting else [] + return r, [], [], None + + def get_events(self, r, w, x): + if self._accepting and self.sock.fileno() in r: + try: + conn, addr = self.sock.accept() + except socket.error, e: + return [SocketError(self.sock, e)] + else: + logging.info( + 'New connection to %s from %s' % + (conn.getsockname(), addr)) + return [NewConnection(conn, addr)] + + return [] + + def start_accepting(self): + self._accepting = True + + def stop_accepting(self): + self._accepting = False + + +class SocketReadable(object): + + '''A socket is readable.''' + + def __init__(self, sock): + self.sock = sock + + +class SocketWriteable(object): + + '''A socket is writeable.''' + + def __init__(self, sock): + self.sock = sock + + +class SocketEventSource(EventSource): + + '''Event source for normal sockets (for I/O). + + This generates events for indicating the socket is readable or + writeable. It does not actually do any I/O itself, that's for the + handler of the events. There are, however, methods for doing the + reading/writing, and for closing the socket. + + The event source can be told to stop checking for readability + or writeability, so that the user may, for example, stop those + events from being triggered while a buffer is full. + + ''' + + def __init__(self, sock): + self.sock = sock + self._reading = True + self._writing = True + + set_nonblocking(sock) + + def get_select_params(self): + r = [self.sock.fileno()] if self._reading else [] + w = [self.sock.fileno()] if self._writing else [] + return r, w, [], None + + def get_events(self, r, w, x): + events = [] + fd = self.sock.fileno() + + if self._reading and fd in r: + events.append(SocketReadable(self)) + + if self._writing and fd in w: + events.append(SocketWriteable(self)) + + return events + + def start_reading(self): + self._reading = True + + def stop_reading(self): + self._reading = False + + def start_writing(self): + self._writing = True + + def stop_writing(self): + self._writing = False + + def read(self, max_bytes): + fd = self.sock.fileno() + return os.read(fd, max_bytes) + + def write(self, data): + fd = self.sock.fileno() + return os.write(fd, data) + + def close(self): + self.stop_reading() + self.stop_writing() + self.sock.close() + self.sock = None + + def is_finished(self): + return self.sock is None + diff --git a/distbuild/sockserv.py b/distbuild/sockserv.py new file mode 100644 index 00000000..6d8f216e --- /dev/null +++ b/distbuild/sockserv.py @@ -0,0 +1,45 @@ +# mainloop/sockserv.py -- socket server state machines +# +# Copyright 2012 Codethink Limited +# All rights reserved. + + +import logging + +from sm import StateMachine +from socketsrc import NewConnection, SocketError, ListeningSocketEventSource + + +class ListenServer(StateMachine): + + '''Listen for new connections on a port, send events for them.''' + + def __init__(self, addr, port, machine, extra_args=None): + StateMachine.__init__(self, 'listening') + self._addr = addr + self._port = port + self._machine = machine + self._extra_args = extra_args or [] + + def setup(self): + src = ListeningSocketEventSource(self._addr, self._port) + self.mainloop.add_event_source(src) + + spec = [ + ('listening', src, NewConnection, 'listening', self.new_conn), + ('listening', src, SocketError, None, self.report_error), + ] + self.add_transitions(spec) + + def new_conn(self, event_source, event): + logging.debug( + 'ListenServer: Creating new %s using %s and %s' % + (repr(self._machine), + repr(event.connection), + repr(self._extra_args))) + m = self._machine(event.connection, *self._extra_args) + self.mainloop.add_state_machine(m) + + def report_error(self, event_source, event): + logging.error(str(event)) + diff --git a/distbuild/stringbuffer.py b/distbuild/stringbuffer.py new file mode 100644 index 00000000..3349bb87 --- /dev/null +++ b/distbuild/stringbuffer.py @@ -0,0 +1,90 @@ +# mainloop/stringbuffer.py -- efficient buffering of strings as a queue +# +# Copyright 2012 Codethink Limited +# All rights reserved. + + +class StringBuffer(object): + + '''Buffer data for a file descriptor. + + The data may arrive in small pieces, and it is buffered in a way that + avoids excessive string catenation or splitting. + + ''' + + def __init__(self): + self.strings = [] + self.len = 0 + + def add(self, data): + '''Add data to buffer.''' + self.strings.append(data) + self.len += len(data) + + def remove(self, num_bytes): + '''Remove specified number of bytes from buffer.''' + while num_bytes > 0 and self.strings: + first = self.strings[0] + if len(first) <= num_bytes: + num_bytes -= len(first) + del self.strings[0] + self.len -= len(first) + else: + self.strings[0] = first[num_bytes:] + self.len -= num_bytes + num_bytes = 0 + + def peek(self): + '''Return contents of buffer as one string.''' + + if len(self.strings) == 0: + return '' + elif len(self.strings) == 1: + return self.strings[0] + else: + self.strings = [''.join(self.strings)] + return self.strings[0] + + def read(self, max_bytes): + '''Return up to max_bytes from the buffer. + + Less is returned if the buffer does not contain at least max_bytes. + The returned data will remain in the buffer; use remove to remove + it. + + ''' + + use = [] + size = 0 + for s in self.strings: + n = max_bytes - size + if len(s) <= n: + use.append(s) + size += len(s) + else: + use.append(s[:n]) + size += n + break + return ''.join(use) + + def readline(self): + '''Return a complete line (ends with '\n') or None.''' + + for i, s in enumerate(self.strings): + newline = s.find('\n') + if newline != -1: + if newline+1 == len(s): + use = self.strings[:i+1] + del self.strings[:i+1] + else: + pre = s[:newline+1] + use = self.strings[:i] + [pre] + del self.strings[:i] + self.strings[0] = s[newline+1:] + return ''.join(use) + return None + + def __len__(self): + return self.len + diff --git a/distbuild/stringbuffer_tests.py b/distbuild/stringbuffer_tests.py new file mode 100644 index 00000000..29530560 --- /dev/null +++ b/distbuild/stringbuffer_tests.py @@ -0,0 +1,140 @@ +# distbuild/stringbuffer_tests.py -- unit tests +# +# Copyright 2012 Codethink Limited +# All rights reserved. + + +import unittest + +import distbuild + + +class StringBufferTests(unittest.TestCase): + + def setUp(self): + self.buf = distbuild.StringBuffer() + + def test_is_empty_initially(self): + self.assertEqual(self.buf.peek(), '') + self.assertEqual(len(self.buf), 0) + + def test_adds_a_string(self): + s = 'foo' + self.buf.add(s) + self.assertEqual(self.buf.peek(), s) + self.assertEqual(len(self.buf), len(s)) + + def test_adds_a_second_string(self): + s = 'foo' + t = 'bar' + self.buf.add(s) + self.buf.add(t) + self.assertEqual(self.buf.peek(), s + t) + self.assertEqual(len(self.buf), len(s + t)) + + +class StringBufferRemoveTests(unittest.TestCase): + + def setUp(self): + self.buf = distbuild.StringBuffer() + self.first = 'foo' + self.second = 'bar' + self.all = self.first + self.second + self.buf.add(self.first) + self.buf.add(self.second) + + def test_removes_part_of_first_string(self): + self.assertTrue(len(self.first) > 1) + self.buf.remove(1) + self.assertEqual(self.buf.peek(), self.all[1:]) + self.assertEqual(len(self.buf), len(self.all) - 1) + + def test_removes_all_of_first_string(self): + self.buf.remove(len(self.first)) + self.assertEqual(self.buf.peek(), self.second) + self.assertEqual(len(self.buf), len(self.second)) + + def test_removes_more_than_first_string(self): + self.assertTrue(len(self.first) > 1) + self.assertTrue(len(self.second) > 1) + self.buf.remove(len(self.first) + 1) + self.assertEqual(self.buf.peek(), self.second[1:]) + self.assertEqual(len(self.buf), len(self.second) - 1) + + def test_removes_all_strings(self): + self.buf.remove(len(self.all)) + self.assertEqual(self.buf.peek(), '') + self.assertEqual(len(self.buf), 0) + + def test_removes_more_than_all_strings(self): + self.buf.remove(len(self.all) + 1) + self.assertEqual(self.buf.peek(), '') + self.assertEqual(len(self.buf), 0) + + +class StringBufferReadTests(unittest.TestCase): + + def setUp(self): + self.buf = distbuild.StringBuffer() + + def test_returns_empty_string_for_empty_buffer(self): + self.assertEqual(self.buf.read(100), '') + self.assertEqual(self.buf.peek(), '') + + def test_returns_partial_string_for_short_buffer(self): + self.buf.add('foo') + self.assertEqual(self.buf.read(100), 'foo') + self.assertEqual(self.buf.peek(), 'foo') + + def test_returns_catenated_strings(self): + self.buf.add('foo') + self.buf.add('bar') + self.assertEqual(self.buf.read(100), 'foobar') + self.assertEqual(self.buf.peek(), 'foobar') + + def test_returns_requested_amount_when_available(self): + self.buf.add('foo') + self.buf.add('bar') + self.assertEqual(self.buf.read(4), 'foob') + self.assertEqual(self.buf.peek(), 'foobar') + + +class StringBufferReadlineTests(unittest.TestCase): + + def setUp(self): + self.buf = distbuild.StringBuffer() + + def test_returns_None_on_empty_buffer(self): + self.assertEqual(self.buf.readline(), None) + + def test_returns_None_on_incomplete_line_in_buffer(self): + self.buf.add('foo') + self.assertEqual(self.buf.readline(), None) + + def test_extracts_complete_line(self): + self.buf.add('foo\n') + self.assertEqual(self.buf.readline(), 'foo\n') + self.assertEqual(self.buf.peek(), '') + + def test_extracts_only_the_initial_line_and_leaves_rest_of_buffer(self): + self.buf.add('foo\nbar\n') + self.assertEqual(self.buf.readline(), 'foo\n') + self.assertEqual(self.buf.peek(), 'bar\n') + + def test_extracts_only_the_initial_line_and_leaves_partial_line(self): + self.buf.add('foo\nbar') + self.assertEqual(self.buf.readline(), 'foo\n') + self.assertEqual(self.buf.peek(), 'bar') + + def test_extracts_only_the_initial_line_from_multiple_pieces(self): + self.buf.add('foo\n') + self.buf.add('bar\n') + self.assertEqual(self.buf.readline(), 'foo\n') + self.assertEqual(self.buf.peek(), 'bar\n') + + def test_extracts_only_the_initial_line_from_multiple_pieces_incomp(self): + self.buf.add('foo\n') + self.buf.add('bar') + self.assertEqual(self.buf.readline(), 'foo\n') + self.assertEqual(self.buf.peek(), 'bar') + diff --git a/distbuild/timer_event_source.py b/distbuild/timer_event_source.py new file mode 100644 index 00000000..f6141a73 --- /dev/null +++ b/distbuild/timer_event_source.py @@ -0,0 +1,59 @@ +# distbuild/timer_event_source.py -- event source for timer events +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import time + + +class Timer(object): + + pass + + +class TimerEventSource(object): + + def __init__(self, interval): + self.interval = interval + self.last_event = time.time() + self.enabled = False + + def start(self): + self.enabled = True + self.last_event = time.time() + + def stop(self): + self.enabled = False + + def get_select_params(self): + if self.enabled: + next_event = self.last_event + self.interval + timeout = next_event - time.time() + return [], [], [], max(0, timeout) + else: + return [], [], [], None + + def get_events(self, r, w, x): + if self.enabled: + now = time.time() + if now >= self.last_event + self.interval: + self.last_event = now + return [Timer()] + return [] + + def is_finished(self): + return False + diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py new file mode 100644 index 00000000..d0f158b6 --- /dev/null +++ b/distbuild/worker_build_scheduler.py @@ -0,0 +1,392 @@ +# distbuild/worker_build_scheduler.py -- schedule worker-builds on workers +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + + +import collections +import errno +import httplib +import logging +import socket +import urllib +import urlparse + +import distbuild + + +class WorkerBuildRequest(object): + + def __init__(self, artifact, initiator_id): + self.artifact = artifact + self.initiator_id = initiator_id + + +class WorkerCancelPending(object): + + def __init__(self, initiator_id): + self.initiator_id = initiator_id + + +class WorkerBuildStepStarted(object): + + def __init__(self, initiator_id, cache_key, worker_name): + self.initiator_id = initiator_id + self.artifact_cache_key = cache_key + self.worker_name = worker_name + + +class WorkerBuildOutput(object): + + def __init__(self, msg, cache_key): + self.msg = msg + self.artifact_cache_key = cache_key + + +class WorkerBuildCaching(object): + + def __init__(self, initiator_id, cache_key): + self.initiator_id = initiator_id + self.artifact_cache_key = cache_key + + +class WorkerBuildFinished(object): + + def __init__(self, msg, cache_key): + self.msg = msg + self.artifact_cache_key = cache_key + + +class WorkerBuildFailed(object): + + def __init__(self, msg, cache_key): + self.msg = msg + self.artifact_cache_key = cache_key + + +class _NeedJob(object): + + def __init__(self, who): + self.who = who + + +class _HaveAJob(object): + + def __init__(self, artifact, initiator_id): + self.artifact = artifact + self.initiator_id = initiator_id + + +class _JobIsFinished(object): + + def __init__(self, msg): + self.msg = msg + + +class _JobFailed(object): + + pass + + +class _Cached(object): + + pass + + +class WorkerBuildQueuer(distbuild.StateMachine): + + '''Maintain queue of outstanding worker-build requests. + + This state machine captures WorkerBuildRequest events, and puts them + into a queue. It also catches _NeedJob events, from a + WorkerConnection, and responds to them with _HaveAJob events, + when it has an outstanding request. + + ''' + + def __init__(self): + distbuild.StateMachine.__init__(self, 'idle') + + def setup(self): + distbuild.crash_point() + + logging.debug('WBQ: Setting up %s' % self) + self._request_queue = [] + self._available_workers = [] + + spec = [ + ('idle', WorkerBuildQueuer, WorkerBuildRequest, 'idle', + self._handle_request), + ('idle', WorkerBuildQueuer, WorkerCancelPending, 'idle', + self._handle_cancel), + ('idle', WorkerConnection, _NeedJob, 'idle', self._handle_worker), + ] + self.add_transitions(spec) + + def _handle_request(self, event_source, event): + distbuild.crash_point() + + logging.debug('WBQ: Adding request to queue: %s' % event.artifact.name) + self._request_queue.append(event) + logging.debug( + 'WBQ: %d available workers and %d requests queued' % + (len(self._available_workers), + len(self._request_queue))) + if self._available_workers: + self._give_job() + + def _handle_cancel(self, event_source, worker_cancel_pending): + for request in [r for r in self._request_queue if + r.initiator_id == worker_cancel_pending.initiator_id]: + logging.debug('WBQ: Removing request from queue: %s', + request.artifact.name) + self._request_queue.remove(request) + + def _handle_worker(self, event_source, event): + distbuild.crash_point() + + logging.debug('WBQ: Adding worker to queue: %s' % event.who) + self._available_workers.append(event) + logging.debug( + 'WBQ: %d available workers and %d requests queued' % + (len(self._available_workers), + len(self._request_queue))) + if self._request_queue: + self._give_job() + + def _give_job(self): + request = self._request_queue.pop(0) + worker = self._available_workers.pop(0) + logging.debug( + 'WBQ: Giving %s to %s' % + (request.artifact.name, worker.who.name())) + self.mainloop.queue_event(worker.who, _HaveAJob(request.artifact, + request.initiator_id)) + + +class WorkerConnection(distbuild.StateMachine): + + '''Communicate with a single worker.''' + + _request_ids = distbuild.IdentifierGenerator('WorkerConnection') + _route_map = distbuild.RouteMap() + _initiator_request_map = collections.defaultdict(set) + + def __init__(self, cm, conn, writeable_cache_server, + worker_cache_server_port, morph_instance): + distbuild.StateMachine.__init__(self, 'idle') + self._cm = cm + self._conn = conn + self._writeable_cache_server = writeable_cache_server + self._worker_cache_server_port = worker_cache_server_port + self._morph_instance = morph_instance + self._helper_id = None + + def name(self): + addr, port = self._conn.getpeername() + name = socket.getfqdn(addr) + return '%s:%s' % (name, port) + + def setup(self): + distbuild.crash_point() + + logging.debug('WC: Setting up instance %s' % repr(self)) + + self._jm = distbuild.JsonMachine(self._conn) + self.mainloop.add_state_machine(self._jm) + + spec = [ + ('idle', self._jm, distbuild.JsonEof, None, self._reconnect), + ('idle', self, _HaveAJob, 'building', self._start_build), + + ('building', distbuild.BuildController, + distbuild.BuildCancel, 'building', + self._maybe_cancel), + ('building', self._jm, distbuild.JsonEof, None, self._reconnect), + ('building', self._jm, distbuild.JsonNewMessage, 'building', + self._handle_json_message), + ('building', self, _JobFailed, 'idle', self._request_job), + ('building', self, _JobIsFinished, 'caching', + self._request_caching), + + ('caching', distbuild.HelperRouter, distbuild.HelperResult, + 'caching', self._handle_helper_result), + ('caching', self, _Cached, 'idle', self._request_job), + ('caching', self, _JobFailed, 'idle', self._request_job), + ] + self.add_transitions(spec) + + self._request_job(None, None) + + def _maybe_cancel(self, event_source, build_cancel): + logging.debug('WC: BuildController requested a cancel') + if build_cancel.id == self._initiator_id: + distbuild.crash_point() + + for id in self._initiator_request_map[self._initiator_id]: + logging.debug('WC: Cancelling exec %s' % id) + msg = distbuild.message('exec-cancel', id=id) + self._jm.send(msg) + + def _reconnect(self, event_source, event): + distbuild.crash_point() + + logging.debug('WC: Triggering reconnect') + self.mainloop.queue_event(self._cm, distbuild.Reconnect()) + + def _start_build(self, event_source, event): + distbuild.crash_point() + + self._artifact = event.artifact + self._initiator_id = event.initiator_id + logging.debug('WC: starting build: %s for %s' % + (self._artifact.name, self._initiator_id)) + + argv = [ + self._morph_instance, + 'worker-build', + self._artifact.name, + ] + msg = distbuild.message('exec-request', + id=self._request_ids.next(), + argv=argv, + stdin_contents=distbuild.serialise_artifact(self._artifact), + ) + self._jm.send(msg) + logging.debug('WC: sent to worker: %s' % repr(msg)) + self._route_map.add(self._initiator_id, msg['id']) + self._initiator_request_map[self._initiator_id].add(msg['id']) + logging.debug( + 'WC: route map from %s to %s', + self._artifact.cache_key, msg['id']) + + started = WorkerBuildStepStarted( + self._initiator_id, self._artifact.cache_key, self.name()) + self.mainloop.queue_event(WorkerConnection, started) + + def _handle_json_message(self, event_source, event): + '''Handle JSON messages from the worker.''' + + distbuild.crash_point() + + logging.debug('WC: from worker: %s' % repr(event.msg)) + + handlers = { + 'exec-output': self._handle_exec_output, + 'exec-response': self._handle_exec_response, + } + + handler = handlers[event.msg['type']] + handler(event.msg) + + def _handle_exec_output(self, msg): + new = dict(msg) + new['id'] = self._route_map.get_incoming_id(msg['id']) + logging.debug('WC: emitting: %s', repr(new)) + self.mainloop.queue_event( + WorkerConnection, + WorkerBuildOutput(new, self._artifact.cache_key)) + + def _handle_exec_response(self, msg): + logging.debug('WC: finished building: %s' % self._artifact.name) + + new = dict(msg) + new['id'] = self._route_map.get_incoming_id(msg['id']) + self._route_map.remove(msg['id']) + self._initiator_request_map[self._initiator_id].remove(msg['id']) + + if new['exit'] != 0: + # Build failed. + new_event = WorkerBuildFailed(new, self._artifact.cache_key) + self.mainloop.queue_event(WorkerConnection, new_event) + self.mainloop.queue_event(self, _JobFailed()) + self._artifact = None + self._initiator_id = None + else: + # Build succeeded. We have more work to do: caching the result. + self.mainloop.queue_event(self, _JobIsFinished(new)) + + def _request_job(self, event_source, event): + distbuild.crash_point() + self.mainloop.queue_event(WorkerConnection, _NeedJob(self)) + + def _request_caching(self, event_source, event): + distbuild.crash_point() + + logging.debug('Requesting shared artifact cache to get artifacts') + + filename = ('%s.%s' % + (self._artifact.source.morphology['kind'], + self._artifact.name)) + suffixes = [filename] + kind = self._artifact.source.morphology['kind'] + if kind == 'stratum': + suffixes.append(filename + '.meta') + elif kind == 'system': + # FIXME: This is a really ugly hack. + if filename.endswith('-rootfs'): + suffixes.append(filename[:-len('-rootfs')] + '-kernel') + + suffixes = [urllib.quote(x) for x in suffixes] + suffixes = ','.join(suffixes) + + worker_host = self._conn.getpeername()[0] + + url = urlparse.urljoin( + self._writeable_cache_server, + '/1.0/fetch?host=%s:%d&cacheid=%s&artifacts=%s' % + (urllib.quote(worker_host), + self._worker_cache_server_port, + urllib.quote(self._artifact.cache_key), + suffixes)) + + msg = distbuild.message( + 'http-request', id=self._request_ids.next(), url=url, method='GET') + self._helper_id = msg['id'] + req = distbuild.HelperRequest(msg) + self.mainloop.queue_event(distbuild.HelperRouter, req) + + progress = WorkerBuildCaching( + self._initiator_id, self._artifact.cache_key) + self.mainloop.queue_event(WorkerConnection, progress) + + self._initiator_id = None + self._finished_msg = event.msg + + def _handle_helper_result(self, event_source, event): + if event.msg['id'] == self._helper_id: + distbuild.crash_point() + + logging.debug('caching: event.msg: %s' % repr(event.msg)) + if event.msg['status'] == httplib.OK: + logging.debug('Shared artifact cache population done') + new_event = WorkerBuildFinished( + self._finished_msg, self._artifact.cache_key) + self.mainloop.queue_event(WorkerConnection, new_event) + self._finished_msg = None + self._helper_id = None + self.mainloop.queue_event(self, _Cached()) + else: + logging.error( + 'Failed to populate artifact cache: %s %s' % + (event.msg['status'], event.msg['body'])) + new_event = WorkerBuildFailed( + self._finished_msg, self._artifact.cache_key) + self.mainloop.queue_event(WorkerConnection, new_event) + self._finished_msg = None + self._helper_id = None + self.mainloop.queue_event(self, _JobFailed()) + + self._artifact = None -- cgit v1.2.1