summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xdistbuild-helper2
-rw-r--r--distbuild/__init__.py2
-rw-r--r--distbuild/build_controller.py125
-rw-r--r--distbuild/connection_machine.py3
-rw-r--r--distbuild/distbuild_socket.py63
-rw-r--r--distbuild/helper_router.py1
-rw-r--r--distbuild/initiator.py3
-rw-r--r--distbuild/initiator_connection.py62
-rw-r--r--distbuild/jm.py7
-rw-r--r--distbuild/json_router.py1
-rw-r--r--distbuild/serialise.py7
-rw-r--r--distbuild/sockbuf.py5
-rw-r--r--distbuild/socketsrc.py13
-rw-r--r--distbuild/sockserv.py3
-rw-r--r--distbuild/worker_build_scheduler.py22
-rwxr-xr-xmorphlib/exts/kvm.check47
-rwxr-xr-xmorphlib/exts/kvm.write9
-rwxr-xr-xmorphlib/exts/nfsboot.check7
-rw-r--r--morphlib/gitdir.py14
-rw-r--r--morphlib/plugins/distbuild_plugin.py21
-rw-r--r--morphlib/writeexts.py15
21 files changed, 301 insertions, 131 deletions
diff --git a/distbuild-helper b/distbuild-helper
index 2aee8222..cdc1873e 100755
--- a/distbuild-helper
+++ b/distbuild-helper
@@ -315,7 +315,7 @@ class DistributedBuildHelper(cliapp.Application):
addr = self.settings['parent-address']
port = self.settings['parent-port']
- conn = socket.socket()
+ conn = distbuild.create_socket()
conn.connect((addr, port))
helper = HelperMachine(conn)
helper.debug_messages = self.settings['debug-messages']
diff --git a/distbuild/__init__.py b/distbuild/__init__.py
index 3e60d5ee..57aaccaf 100644
--- a/distbuild/__init__.py
+++ b/distbuild/__init__.py
@@ -58,4 +58,6 @@ from protocol import message
from crashpoint import (crash_point, add_crash_condition, add_crash_conditions,
clear_crash_conditions)
+from distbuild_socket import create_socket
+
__all__ = locals()
diff --git a/distbuild/build_controller.py b/distbuild/build_controller.py
index 8b9f34e7..ed6c424e 100644
--- a/distbuild/build_controller.py
+++ b/distbuild/build_controller.py
@@ -155,74 +155,81 @@ class BuildController(distbuild.StateMachine):
_idgen = distbuild.IdentifierGenerator('BuildController')
- def __init__(self, build_request_message, artifact_cache_server,
- morph_instance):
+ def __init__(self, initiator_connection, build_request_message,
+ artifact_cache_server, morph_instance):
distbuild.crash_point()
distbuild.StateMachine.__init__(self, 'init')
+ self._initiator_connection = initiator_connection
self._request = build_request_message
self._artifact_cache_server = artifact_cache_server
self._morph_instance = morph_instance
self._helper_id = None
- self.debug_transitions = True
+ self.debug_transitions = False
+
+ def __repr__(self):
+ return '<BuildController at 0x%x, request-id %s>' % (id(self),
+ self._request['id'])
def setup(self):
distbuild.crash_point()
spec = [
+ # state, source, event_class, new_state, callback
('init', self, _Start, 'graphing', self._start_graphing),
- ('init', distbuild.InitiatorConnection,
- distbuild.InitiatorDisconnect, 'init', self._maybe_abort),
- ('init', self, _Abort, None, None),
-
+ ('init', self._initiator_connection,
+ distbuild.InitiatorDisconnect, None, None),
+
('graphing', distbuild.HelperRouter, distbuild.HelperOutput,
- 'graphing', self._collect_graph),
+ 'graphing', self._maybe_collect_graph),
('graphing', distbuild.HelperRouter, distbuild.HelperResult,
- 'graphing', self._finish_graph),
+ 'graphing', self._maybe_finish_graph),
('graphing', self, _GotGraph,
'annotating', self._start_annotating),
('graphing', self, _GraphFailed, None, None),
- ('graphing', distbuild.InitiatorConnection,
- distbuild.InitiatorDisconnect, None,
- self._maybe_abort),
-
+ ('graphing', self._initiator_connection,
+ distbuild.InitiatorDisconnect, None, None),
+
('annotating', distbuild.HelperRouter, distbuild.HelperResult,
- 'annotating', self._handle_cache_response),
+ 'annotating', self._maybe_handle_cache_response),
('annotating', self, _AnnotationFailed, None,
self._notify_annotation_failed),
('annotating', self, _Annotated, 'building',
self._queue_worker_builds),
- ('annotating', distbuild.InitiatorConnection,
- distbuild.InitiatorDisconnect, None,
- self._maybe_abort),
-
- ('building', distbuild.WorkerConnection,
- distbuild.WorkerBuildStepStarted, 'building',
- self._relay_build_step_started),
- ('building', distbuild.WorkerConnection,
- distbuild.WorkerBuildOutput, 'building',
- self._relay_build_output),
- ('building', distbuild.WorkerConnection,
- distbuild.WorkerBuildCaching, 'building',
- self._relay_build_caching),
- ('building', distbuild.WorkerConnection,
- distbuild.WorkerBuildFinished, 'building',
- self._check_result_and_queue_more_builds),
- ('building', distbuild.WorkerConnection,
- distbuild.WorkerBuildFailed, None,
- self._notify_build_failed),
+ ('annotating', self._initiator_connection,
+ distbuild.InitiatorDisconnect, None, None),
+
+ # The exact WorkerConnection that is doing our building changes
+ # from build to build. We must listen to all messages from all
+ # workers, and choose whether to change state inside the callback.
+ # (An alternative would be to manage a set of temporary transitions
+ # specific to WorkerConnection instances that our currently
+ # building for us, but the state machines are not intended to
+ # behave that way).
+ ('building', distbuild.WorkerConnection,
+ distbuild.WorkerBuildStepStarted, 'building',
+ self._maybe_relay_build_step_started),
+ ('building', distbuild.WorkerConnection,
+ distbuild.WorkerBuildOutput, 'building',
+ self._maybe_relay_build_output),
+ ('building', distbuild.WorkerConnection,
+ distbuild.WorkerBuildCaching, 'building',
+ self._maybe_relay_build_caching),
+ ('building', distbuild.WorkerConnection,
+ distbuild.WorkerBuildFinished, 'building',
+ self._maybe_check_result_and_queue_more_builds),
+ ('building', distbuild.WorkerConnection,
+ distbuild.WorkerBuildFailed, 'building',
+ self._maybe_notify_build_failed),
+ ('building', self, _Abort, None, None),
('building', self, _Built, None, self._notify_build_done),
- ('building', distbuild.InitiatorConnection,
- distbuild.InitiatorDisconnect, 'building',
+ ('building', self._initiator_connection,
+ distbuild.InitiatorDisconnect, None,
self._notify_initiator_disconnected),
]
self.add_transitions(spec)
self.mainloop.queue_event(self, _Start())
- def _maybe_abort(self, event_source, event):
- if event.id == self._request['id']:
- self.mainloop.queue_event(self, _Abort())
-
def _start_graphing(self, event_source, event):
distbuild.crash_point()
@@ -248,14 +255,14 @@ class BuildController(distbuild.StateMachine):
progress = BuildProgress(self._request['id'], 'Computing build graph')
self.mainloop.queue_event(BuildController, progress)
- def _collect_graph(self, event_source, event):
+ def _maybe_collect_graph(self, event_source, event):
distbuild.crash_point()
if event.msg['id'] == self._helper_id:
self._artifact_data.add(event.msg['stdout'])
self._artifact_error.add(event.msg['stderr'])
- def _finish_graph(self, event_source, event):
+ def _maybe_finish_graph(self, event_source, event):
distbuild.crash_point()
def notify_failure(msg_text):
@@ -333,7 +340,7 @@ class BuildController(distbuild.StateMachine):
logging.debug('Made cache request for state of artifacts '
'(helper id: %s)' % self._helper_id)
- def _handle_cache_response(self, event_source, event):
+ def _maybe_handle_cache_response(self, event_source, event):
logging.debug('Got cache response: %s' % repr(event.msg))
def set_status(artifact):
@@ -423,11 +430,12 @@ class BuildController(distbuild.StateMachine):
def _notify_initiator_disconnected(self, event_source, disconnect):
- if disconnect.id == self._request['id']:
- cancel = BuildCancel(disconnect.id)
- self.mainloop.queue_event(BuildController, cancel)
+ logging.debug("BuildController %r: initiator id %s disconnected", self,
+ disconnect.id)
+ cancel = BuildCancel(disconnect.id)
+ self.mainloop.queue_event(BuildController, cancel)
- def _relay_build_step_started(self, event_source, event):
+ def _maybe_relay_build_step_started(self, event_source, event):
distbuild.crash_point()
if event.initiator_id != self._request['id']:
return # not for us
@@ -445,7 +453,7 @@ class BuildController(distbuild.StateMachine):
self.mainloop.queue_event(BuildController, started)
logging.debug('BC: emitted %s' % repr(started))
- def _relay_build_output(self, event_source, event):
+ def _maybe_relay_build_output(self, event_source, event):
distbuild.crash_point()
if event.msg['id'] != self._request['id']:
return # not for us
@@ -463,7 +471,7 @@ class BuildController(distbuild.StateMachine):
self.mainloop.queue_event(BuildController, output)
logging.debug('BC: queued %s' % repr(output))
- def _relay_build_caching(self, event_source, event):
+ def _maybe_relay_build_caching(self, event_source, event):
distbuild.crash_point()
if event.initiator_id != self._request['id']:
return # not for us
@@ -486,7 +494,7 @@ class BuildController(distbuild.StateMachine):
else:
return None
- def _check_result_and_queue_more_builds(self, event_source, event):
+ def _maybe_check_result_and_queue_more_builds(self, event_source, event):
distbuild.crash_point()
if event.msg['id'] != self._request['id']:
return # not for us
@@ -526,17 +534,24 @@ class BuildController(distbuild.StateMachine):
failed = BuildFailed(self._request['id'], errmsg)
self.mainloop.queue_event(BuildController, failed)
- def _notify_build_failed(self, event_source, event):
+ def _maybe_notify_build_failed(self, event_source, event):
distbuild.crash_point()
+
if event.msg['id'] != self._request['id']:
- return # not for us
+ return
artifact = self._find_artifact(event.artifact_cache_key)
+
if artifact is None:
- # This is not the event you are looking for.
- return
+ logging.error(
+ 'BuildController %r: artifact %s is not in our build graph!',
+ self, artifact)
+ # We abort the build in this case on the grounds that something is
+ # very wrong internally, and it's best for the initiator to receive
+ # an error than to be left hanging.
+ self.mainloop.queue_event(self, _Abort())
- logging.error(
+ logging.info(
'Build step failed for %s: %s', artifact.name, repr(event.msg))
step_failed = BuildStepFailed(
@@ -561,6 +576,8 @@ class BuildController(distbuild.StateMachine):
cancel = BuildCancel(self._request['id'])
self.mainloop.queue_event(BuildController, cancel)
+ self.mainloop.queue_event(self, _Abort())
+
def _notify_build_done(self, event_source, event):
distbuild.crash_point()
diff --git a/distbuild/connection_machine.py b/distbuild/connection_machine.py
index 3d4e8d04..648ce35a 100644
--- a/distbuild/connection_machine.py
+++ b/distbuild/connection_machine.py
@@ -81,6 +81,7 @@ class ConnectionMachine(distbuild.StateMachine):
self.mainloop.add_event_source(self._timer)
spec = [
+ # state, source, event_class, new_state, callback
('connecting', self._sock_proxy, distbuild.SocketWriteable,
'connected', self._connect),
('connecting', self, StopConnecting, None, self._stop),
@@ -97,7 +98,7 @@ class ConnectionMachine(distbuild.StateMachine):
logging.debug(
'ConnectionMachine: connecting to %s:%s' %
(self._addr, self._port))
- self._socket = socket.socket()
+ self._socket = distbuild.create_socket()
distbuild.set_nonblocking(self._socket)
try:
self._socket.connect((self._addr, self._port))
diff --git a/distbuild/distbuild_socket.py b/distbuild/distbuild_socket.py
new file mode 100644
index 00000000..0408a2c1
--- /dev/null
+++ b/distbuild/distbuild_socket.py
@@ -0,0 +1,63 @@
+# distbuild/distbuild_socket.py -- wrapper around Python 'socket' module.
+#
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA..
+
+
+import socket
+
+
+class DistbuildSocket(object):
+ '''Wraps socket.SocketType with a few helper functions.'''
+
+ def __init__(self, real_socket):
+ self.real_socket = real_socket
+
+ def __getattr__(self, name):
+ return getattr(self.real_socket, name)
+
+ def __repr__(self):
+ return '<DistbuildSocket at 0x%x: %s>' % (id(self), str(self))
+
+ def __str__(self):
+ localname = self.localname() or '(closed)'
+ remotename = self.remotename()
+ if remotename is None:
+ return '%s' % self.localname()
+ else:
+ return '%s -> %s' % (self.localname(), remotename)
+
+ def accept(self, *args):
+ result = self.real_socket.accept(*args)
+ return DistbuildSocket(result[0]), result[1:]
+
+ def localname(self):
+ '''Get local end of socket connection as a string.'''
+ try:
+ return '%s:%s' % self.getsockname()
+ except socket.error:
+ # If the socket is in destruction we may get EBADF here.
+ return None
+
+ def remotename(self):
+ '''Get remote end of socket connection as a string.'''
+ try:
+ return '%s:%s' % self.getpeername()
+ except socket.error:
+ return None
+
+
+def create_socket(*args):
+ return DistbuildSocket(socket.socket(*args))
diff --git a/distbuild/helper_router.py b/distbuild/helper_router.py
index 752a5fdb..1f0ce45b 100644
--- a/distbuild/helper_router.py
+++ b/distbuild/helper_router.py
@@ -82,6 +82,7 @@ class HelperRouter(distbuild.StateMachine):
self.mainloop.add_state_machine(jm)
spec = [
+ # state, source, event_class, new_state, callback
('idle', HelperRouter, HelperRequest, 'idle',
self._handle_request),
('idle', jm, distbuild.JsonNewMessage, 'idle', self._helper_msg),
diff --git a/distbuild/initiator.py b/distbuild/initiator.py
index e4d4975f..6e4ca65a 100644
--- a/distbuild/initiator.py
+++ b/distbuild/initiator.py
@@ -48,7 +48,7 @@ class Initiator(distbuild.StateMachine):
self._morphology = morphology
self._steps = None
self._step_outputs = {}
- self.debug_transitions = True
+ self.debug_transitions = False
def setup(self):
distbuild.crash_point()
@@ -58,6 +58,7 @@ class Initiator(distbuild.StateMachine):
logging.debug('initiator: _jm=%s' % repr(self._jm))
spec = [
+ # state, source, event_class, new_state, callback
('waiting', self._jm, distbuild.JsonEof, None, self._terminate),
('waiting', self._jm, distbuild.JsonNewMessage, 'waiting',
self._handle_json_message),
diff --git a/distbuild/initiator_connection.py b/distbuild/initiator_connection.py
index 48d083e4..34f2bdaa 100644
--- a/distbuild/initiator_connection.py
+++ b/distbuild/initiator_connection.py
@@ -35,9 +35,12 @@ class _Close(object):
class InitiatorConnection(distbuild.StateMachine):
- '''Communicate with the initiator.
-
- This state machine communicates with the initiator, relaying and
+ '''Communicate with a single initiator.
+
+ When a developer runs 'morph distbuild' and connects to the controller,
+ the ListenServer object on the controller creates an InitiatorConnection.
+
+ This state machine communicates with the build initiator, relaying and
translating messages from the initiator to the rest of the controller's
state machines, and vice versa.
@@ -51,6 +54,11 @@ class InitiatorConnection(distbuild.StateMachine):
self.conn = conn
self.artifact_cache_server = artifact_cache_server
self.morph_instance = morph_instance
+ self.initiator_name = conn.remotename()
+
+ def __repr__(self):
+ return '<InitiatorConnection at 0x%x: remote %s>' % (id(self),
+ self.initiator_name)
def setup(self):
self.jm = distbuild.JsonMachine(self.conn)
@@ -59,6 +67,7 @@ class InitiatorConnection(distbuild.StateMachine):
self.our_ids = set()
spec = [
+ # state, source, event_class, new_state, callback
('idle', self.jm, distbuild.JsonNewMessage, 'idle',
self._handle_msg),
('idle', self.jm, distbuild.JsonEof, 'closing', self._disconnect),
@@ -84,31 +93,32 @@ class InitiatorConnection(distbuild.StateMachine):
def _handle_msg(self, event_source, event):
'''Handle message from initiator.'''
-
- logging.debug(
- 'InitiatorConnection: from initiator: %s', repr(event.msg))
-
+
+ logging.debug('InitiatorConnection: from %s: %r', self.initiator_name,
+ event.msg)
+
if event.msg['type'] == 'build-request':
new_id = self._idgen.next()
self.our_ids.add(new_id)
self._route_map.add(event.msg['id'], new_id)
event.msg['id'] = new_id
build_controller = distbuild.BuildController(
- event.msg, self.artifact_cache_server, self.morph_instance)
+ self, event.msg, self.artifact_cache_server,
+ self.morph_instance)
self.mainloop.add_state_machine(build_controller)
def _disconnect(self, event_source, event):
for id in self.our_ids:
- logging.debug('InitiatorConnection: InitiatorDisconnect(%s)'
- % str(id))
+ logging.debug('InitiatorConnection: %s: InitiatorDisconnect(%s)',
+ self.initiator_name, str(id))
self.mainloop.queue_event(InitiatorConnection,
InitiatorDisconnect(id))
# TODO should this clear our_ids?
self.mainloop.queue_event(self, _Close(event_source))
def _close(self, event_source, event):
- logging.debug('InitiatorConnection: closing: %s',
- repr(event.event_source))
+ logging.debug('InitiatorConnection: %s: closing: %s',
+ self.initiator_name, repr(event.event_source))
event.event_source.close()
@@ -120,6 +130,10 @@ class InitiatorConnection(distbuild.StateMachine):
'InitiatorConnection: received result: %s', repr(event.msg))
self.jm.send(event.msg)
+ def _log_send(self, msg):
+ logging.debug(
+ 'InitiatorConnection: sent to %s: %r', self.initiator_name, msg)
+
def _send_build_finished_message(self, event_source, event):
if event.id in self.our_ids:
msg = distbuild.message('build-finished',
@@ -128,8 +142,7 @@ class InitiatorConnection(distbuild.StateMachine):
self._route_map.remove(event.id)
self.our_ids.remove(event.id)
self.jm.send(msg)
- logging.debug(
- 'InitiatorConnection: sent to initiator: %s', repr(msg))
+ self._log_send(msg)
def _send_build_failed_message(self, event_source, event):
if event.id in self.our_ids:
@@ -139,8 +152,7 @@ class InitiatorConnection(distbuild.StateMachine):
self._route_map.remove(event.id)
self.our_ids.remove(event.id)
self.jm.send(msg)
- logging.debug(
- 'InitiatorConnection: sent to initiator: %s', repr(msg))
+ self._log_send(msg)
def _send_build_progress_message(self, event_source, event):
if event.id in self.our_ids:
@@ -148,8 +160,7 @@ class InitiatorConnection(distbuild.StateMachine):
id=self._route_map.get_incoming_id(event.id),
message=event.message_text)
self.jm.send(msg)
- logging.debug(
- 'InitiatorConnection: sent to initiator: %s', repr(msg))
+ self._log_send(msg)
def _send_build_steps_message(self, event_source, event):
@@ -169,8 +180,7 @@ class InitiatorConnection(distbuild.StateMachine):
id=self._route_map.get_incoming_id(event.id),
steps=step_names)
self.jm.send(msg)
- logging.debug(
- 'InitiatorConnection: sent to initiator: %s', repr(msg))
+ self._log_send(msg)
def _send_build_step_started_message(self, event_source, event):
if event.id in self.our_ids:
@@ -179,8 +189,7 @@ class InitiatorConnection(distbuild.StateMachine):
step_name=event.step_name,
worker_name=event.worker_name)
self.jm.send(msg)
- logging.debug(
- 'InitiatorConnection: sent to initiator: %s', repr(msg))
+ self._log_send(msg)
def _send_build_output_message(self, event_source, event):
logging.debug('InitiatorConnection: build_output: '
@@ -193,8 +202,7 @@ class InitiatorConnection(distbuild.StateMachine):
stdout=event.stdout,
stderr=event.stderr)
self.jm.send(msg)
- logging.debug(
- 'InitiatorConnection: sent to initiator: %s', repr(msg))
+ self._log_send(msg)
def _send_build_step_finished_message(self, event_source, event):
if event.id in self.our_ids:
@@ -202,8 +210,7 @@ class InitiatorConnection(distbuild.StateMachine):
id=self._route_map.get_incoming_id(event.id),
step_name=event.step_name)
self.jm.send(msg)
- logging.debug(
- 'InitiatorConnection: sent to initiator: %s', repr(msg))
+ self._log_send(msg)
def _send_build_step_failed_message(self, event_source, event):
if event.id in self.our_ids:
@@ -211,6 +218,5 @@ class InitiatorConnection(distbuild.StateMachine):
id=self._route_map.get_incoming_id(event.id),
step_name=event.step_name)
self.jm.send(msg)
- logging.debug(
- 'InitiatorConnection: sent to initiator: %s', repr(msg))
+ self._log_send(msg)
diff --git a/distbuild/jm.py b/distbuild/jm.py
index ae222c00..69fa5bd1 100644
--- a/distbuild/jm.py
+++ b/distbuild/jm.py
@@ -43,7 +43,11 @@ class JsonMachine(StateMachine):
StateMachine.__init__(self, 'rw')
self.conn = conn
self.debug_json = False
-
+
+ def __repr__(self):
+ return '<JsonMachine at 0x%x: socket %s, max_buffer %s>' % \
+ (id(self), self.conn, self.max_buffer)
+
def setup(self):
sockbuf = self.sockbuf = SocketBuffer(self.conn, self.max_buffer)
self.mainloop.add_state_machine(sockbuf)
@@ -52,6 +56,7 @@ class JsonMachine(StateMachine):
self.receive_buf = StringBuffer()
spec = [
+ # state, source, event_class, new_state, callback
('rw', sockbuf, SocketBufferNewData, 'rw', self._parse),
('rw', sockbuf, SocketBufferEof, 'w', self._send_eof),
('rw', self, _Close2, None, self._really_close),
diff --git a/distbuild/json_router.py b/distbuild/json_router.py
index bf272174..93533b2e 100644
--- a/distbuild/json_router.py
+++ b/distbuild/json_router.py
@@ -52,6 +52,7 @@ class JsonRouter(distbuild.StateMachine):
self.mainloop.add_state_machine(jm)
spec = [
+ # state, source, event_class, new_state, callback
('idle', jm, distbuild.JsonNewMessage, 'idle', self.bloop),
('idle', jm, distbuild.JsonEof, None, self.close),
]
diff --git a/distbuild/serialise.py b/distbuild/serialise.py
index cd871042..44d96eee 100644
--- a/distbuild/serialise.py
+++ b/distbuild/serialise.py
@@ -75,9 +75,6 @@ def serialise_artifact(artifact):
else:
arch = artifact.arch
- logging.debug('encode_single_artifact dependencies: %s'
- % str([('id: %s' % str(id(d)), d.name) for d in a.dependencies]))
-
return {
'source_id': source_id,
'name': a.name,
@@ -104,13 +101,10 @@ def serialise_artifact(artifact):
encoded_sources = {}
for a in traverse(artifact):
- logging.debug('traversing artifacts at %s' % a.name)
-
if id(a.source) not in encoded_sources:
if a.source.morphology['kind'] == 'chunk':
for (_, sa) in a.source.artifacts.iteritems():
if id(sa) not in artifacts:
- logging.debug('encoding source artifact %s' % sa.name)
artifacts[id(sa)] = sa
encoded_artifacts[id(sa)] = encode_single_artifact(sa,
artifacts, id(a.source))
@@ -130,7 +124,6 @@ def serialise_artifact(artifact):
if id(a) not in artifacts:
artifacts[id(a)] = a
- logging.debug('encoding artifact %s' % a.name)
encoded_artifacts[id(a)] = encode_single_artifact(a, artifacts,
id(a.source))
diff --git a/distbuild/sockbuf.py b/distbuild/sockbuf.py
index a7fe339a..346706db 100644
--- a/distbuild/sockbuf.py
+++ b/distbuild/sockbuf.py
@@ -77,6 +77,10 @@ class SocketBuffer(StateMachine):
self._sock = sock
self._max_buffer = max_buffer
+ def __repr__(self):
+ return '<SocketBuffer at 0x%x: socket %s max_buffer %i>' % (
+ id(self), self._sock, self._max_buffer)
+
def setup(self):
src = self._src = SocketEventSource(self._sock)
src.stop_writing() # We'll start writing when we need to.
@@ -85,6 +89,7 @@ class SocketBuffer(StateMachine):
self._wbuf = StringBuffer()
spec = [
+ # state, source, event_class, new_state, callback
('reading', src, SocketReadable, 'reading', self._fill),
('reading', self, _WriteBufferNotEmpty, 'rw',
self._start_writing),
diff --git a/distbuild/socketsrc.py b/distbuild/socketsrc.py
index 78486f0e..14adc74d 100644
--- a/distbuild/socketsrc.py
+++ b/distbuild/socketsrc.py
@@ -9,6 +9,8 @@ import logging
import os
import socket
+import distbuild
+
from eventsrc import EventSource
@@ -48,13 +50,13 @@ class ListeningSocketEventSource(EventSource):
'''An event source for a socket that listens for connections.'''
def __init__(self, addr, port):
- self.sock = socket.socket()
+ self.sock = distbuild.create_socket()
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind((addr, port))
self.sock.listen(5)
self._accepting = True
- logging.info('Listening at %s' % repr(self.sock.getsockname()))
-
+ logging.info('Listening at %s' % self.sock.remotename())
+
def get_select_params(self):
r = [self.sock.fileno()] if self._accepting else []
return r, [], [], None
@@ -117,7 +119,10 @@ class SocketEventSource(EventSource):
self._writing = True
set_nonblocking(sock)
-
+
+ def __repr__(self):
+ return '<SocketEventSource at %x: socket %s>' % (id(self), self.sock)
+
def get_select_params(self):
r = [self.sock.fileno()] if self._reading else []
w = [self.sock.fileno()] if self._writing else []
diff --git a/distbuild/sockserv.py b/distbuild/sockserv.py
index 6d8f216e..124d29b9 100644
--- a/distbuild/sockserv.py
+++ b/distbuild/sockserv.py
@@ -26,6 +26,7 @@ class ListenServer(StateMachine):
self.mainloop.add_event_source(src)
spec = [
+ # state, source, event_class, new_state, callback
('listening', src, NewConnection, 'listening', self.new_conn),
('listening', src, SocketError, None, self.report_error),
]
@@ -34,7 +35,7 @@ class ListenServer(StateMachine):
def new_conn(self, event_source, event):
logging.debug(
'ListenServer: Creating new %s using %s and %s' %
- (repr(self._machine),
+ (self._machine,
repr(event.connection),
repr(self._extra_args)))
m = self._machine(event.connection, *self._extra_args)
diff --git a/distbuild/worker_build_scheduler.py b/distbuild/worker_build_scheduler.py
index 7953447f..fc5849b3 100644
--- a/distbuild/worker_build_scheduler.py
+++ b/distbuild/worker_build_scheduler.py
@@ -127,6 +127,7 @@ class WorkerBuildQueuer(distbuild.StateMachine):
self._available_workers = []
spec = [
+ # state, source, event_class, new_state, callback
('idle', WorkerBuildQueuer, WorkerBuildRequest, 'idle',
self._handle_request),
('idle', WorkerBuildQueuer, WorkerCancelPending, 'idle',
@@ -193,11 +194,13 @@ class WorkerConnection(distbuild.StateMachine):
self._worker_cache_server_port = worker_cache_server_port
self._morph_instance = morph_instance
self._helper_id = None
-
- def name(self):
+
addr, port = self._conn.getpeername()
name = socket.getfqdn(addr)
- return '%s:%s' % (name, port)
+ self._worker_name = '%s:%s' % (name, port)
+
+ def name(self):
+ return self._worker_name
def setup(self):
distbuild.crash_point()
@@ -208,6 +211,7 @@ class WorkerConnection(distbuild.StateMachine):
self.mainloop.add_state_machine(self._jm)
spec = [
+ # state, source, event_class, new_state, callback
('idle', self._jm, distbuild.JsonEof, None, self._reconnect),
('idle', self, _HaveAJob, 'building', self._start_build),
@@ -222,7 +226,7 @@ class WorkerConnection(distbuild.StateMachine):
self._request_caching),
('caching', distbuild.HelperRouter, distbuild.HelperResult,
- 'caching', self._handle_helper_result),
+ 'caching', self._maybe_handle_helper_result),
('caching', self, _Cached, 'idle', self._request_job),
('caching', self, _JobFailed, 'idle', self._request_job),
]
@@ -231,7 +235,8 @@ class WorkerConnection(distbuild.StateMachine):
self._request_job(None, None)
def _maybe_cancel(self, event_source, build_cancel):
- logging.debug('WC: BuildController requested a cancel')
+ logging.debug('WC: BuildController %r requested a cancel' %
+ event_source)
if build_cancel.id == self._initiator_id:
distbuild.crash_point()
@@ -265,7 +270,7 @@ class WorkerConnection(distbuild.StateMachine):
stdin_contents=distbuild.serialise_artifact(self._artifact),
)
self._jm.send(msg)
- logging.debug('WC: sent to worker: %s' % repr(msg))
+ logging.debug('WC: sent to worker %s: %r' % (self._worker_name, msg))
self._route_map.add(self._initiator_id, msg['id'])
self._initiator_request_map[self._initiator_id].add(msg['id'])
logging.debug(
@@ -281,7 +286,8 @@ class WorkerConnection(distbuild.StateMachine):
distbuild.crash_point()
- logging.debug('WC: from worker: %s' % repr(event.msg))
+ logging.debug(
+ 'WC: from worker %s: %r' % (self._worker_name, event.msg))
handlers = {
'exec-output': self._handle_exec_output,
@@ -371,7 +377,7 @@ class WorkerConnection(distbuild.StateMachine):
self._initiator_id = None
self._finished_msg = event.msg
- def _handle_helper_result(self, event_source, event):
+ def _maybe_handle_helper_result(self, event_source, event):
if event.msg['id'] == self._helper_id:
distbuild.crash_point()
diff --git a/morphlib/exts/kvm.check b/morphlib/exts/kvm.check
index be7c51c2..957d0893 100755
--- a/morphlib/exts/kvm.check
+++ b/morphlib/exts/kvm.check
@@ -17,11 +17,16 @@
'''Preparatory checks for Morph 'kvm' write extension'''
import cliapp
+import re
+import urlparse
import morphlib.writeexts
class KvmPlusSshCheckExtension(morphlib.writeexts.WriteExtension):
+
+ location_pattern = '^/(?P<guest>[^/]+)(?P<path>/.+)$'
+
def process_args(self, args):
if len(args) != 1:
raise cliapp.AppException('Wrong number of command line args')
@@ -32,4 +37,46 @@ class KvmPlusSshCheckExtension(morphlib.writeexts.WriteExtension):
'Use the `ssh-rsync` write extension to deploy upgrades to an '
'existing remote system.')
+ location = args[0]
+ ssh_host, vm_name, vm_path = self.check_and_parse_location(location)
+
+ self.check_ssh_connectivity(ssh_host)
+ self.check_no_existing_libvirt_vm(ssh_host, vm_name)
+ self.check_extra_disks_exist(ssh_host, self.parse_attach_disks())
+
+ def check_and_parse_location(self, location):
+ '''Check and parse the location argument to get relevant data.'''
+
+ x = urlparse.urlparse(location)
+
+ if x.scheme != 'kvm+ssh':
+ raise cliapp.AppException(
+ 'URL schema must be kvm+ssh in %s' % location)
+
+ m = re.match(self.location_pattern, x.path)
+ if not m:
+ raise cliapp.AppException('Cannot parse location %s' % location)
+
+ return x.netloc, m.group('guest'), m.group('path')
+
+ def check_no_existing_libvirt_vm(self, ssh_host, vm_name):
+ try:
+ cliapp.ssh_runcmd(ssh_host,
+ ['virsh', '--connect', 'qemu:///system', 'domstate', vm_name])
+ except cliapp.AppException as e:
+ pass
+ else:
+ raise cliapp.AppException(
+ 'Host %s already has a VM named %s. You can use the ssh-rsync '
+ 'write extension to deploy upgrades to existing machines.' %
+ (ssh_host, vm_name))
+
+ def check_extra_disks_exist(self, ssh_host, filename_list):
+ for filename in filename_list:
+ try:
+ cliapp.ssh_runcmd(ssh_host, ['ls', filename])
+ except cliapp.AppException as e:
+ raise cliapp.AppException('Did not find file %s on host %s' %
+ (filename, ssh_host))
+
KvmPlusSshCheckExtension().run()
diff --git a/morphlib/exts/kvm.write b/morphlib/exts/kvm.write
index 94560972..94a55daa 100755
--- a/morphlib/exts/kvm.write
+++ b/morphlib/exts/kvm.write
@@ -50,6 +50,8 @@ class KvmPlusSshWriteExtension(morphlib.writeexts.WriteExtension):
'''
+ location_pattern = '^/(?P<guest>[^/]+)(?P<path>/.+)$'
+
def process_args(self, args):
if len(args) != 2:
raise cliapp.AppException('Wrong number of command line args')
@@ -79,14 +81,9 @@ class KvmPlusSshWriteExtension(morphlib.writeexts.WriteExtension):
def parse_location(self, location):
'''Parse the location argument to get relevant data.'''
-
+
x = urlparse.urlparse(location)
- if x.scheme != 'kvm+ssh':
- raise cliapp.AppException(
- 'URL schema must be vbox+ssh in %s' % location)
m = re.match('^/(?P<guest>[^/]+)(?P<path>/.+)$', x.path)
- if not m:
- raise cliapp.AppException('Cannot parse location %s' % location)
return x.netloc, m.group('guest'), m.group('path')
def transfer(self, raw_disk, ssh_host, vm_path):
diff --git a/morphlib/exts/nfsboot.check b/morphlib/exts/nfsboot.check
index f84f187f..806e560a 100755
--- a/morphlib/exts/nfsboot.check
+++ b/morphlib/exts/nfsboot.check
@@ -56,12 +56,7 @@ class NFSBootCheckExtension(morphlib.writeexts.WriteExtension):
version_label, location))
def test_good_server(self, server):
- # Can be ssh'ed into
- try:
- cliapp.ssh_runcmd('root@%s' % server, ['true'])
- except cliapp.AppException:
- raise cliapp.AppException('You are unable to ssh into server %s'
- % server)
+ self.check_ssh_connectivity(server)
# Is an NFS server
try:
diff --git a/morphlib/gitdir.py b/morphlib/gitdir.py
index 3d0ab53e..8f6d69d7 100644
--- a/morphlib/gitdir.py
+++ b/morphlib/gitdir.py
@@ -108,23 +108,22 @@ class PushError(cliapp.AppException):
class NoRefspecsError(PushError):
def __init__(self, remote):
- self.remote = remote.name
- PushError.__init__(self,
- 'Push to remote %r was given no refspecs.' % remote)
+ self.remote = remote
+ PushError.__init__(
+ self, 'Push to remote "%s" was given no refspecs.' % remote)
class PushFailureError(PushError):
def __init__(self, remote, refspecs, exit, results, stderr):
- self.remote = remote.name
+ self.remote = remote
self.push_url = push_url = remote.get_push_url()
self.refspecs = refspecs
self.exit = exit
self.results = results
self.stderr = stderr
- PushError.__init__(self, 'Push to remote %(remote)r, '\
+ PushError.__init__(self, 'Push to remote "%(remote)s", '\
'push url %(push_url)s '\
- 'with refspecs %(refspecs)r '\
'failed with exit code %(exit)s' % locals())
@@ -235,6 +234,9 @@ class Remote(object):
self.push_url = None
self.fetch_url = None
+ def __str__(self):
+ return self.name or '(nascent remote)'
+
def set_fetch_url(self, url):
self.fetch_url = url
if self.name is not None:
diff --git a/morphlib/plugins/distbuild_plugin.py b/morphlib/plugins/distbuild_plugin.py
index 27d87e35..c60dee6e 100644
--- a/morphlib/plugins/distbuild_plugin.py
+++ b/morphlib/plugins/distbuild_plugin.py
@@ -18,6 +18,7 @@
import cliapp
import logging
+import re
import sys
import morphlib
@@ -89,16 +90,23 @@ class WorkerBuild(cliapp.Plugin):
artifact = distbuild.deserialise_artifact(serialized)
bc = morphlib.buildcommand.BuildCommand(self.app)
-
- # We always, unconditionally clear the local artifact cache
- # to avoid it growing boundlessly on a worker. Especially system
- # artifacts are big (up to gigabytes), and having a new one for
- # every build eats up a lot of disk space.
- bc.lac.clear()
+
+ # Now, before we start the build, we garbage collect the caches
+ # to ensure we have room. First we remove all system artifacts
+ # since we never need to recover those from workers post-hoc
+ for cachekey, artifacts, last_used in bc.lac.list_contents():
+ if any(self.is_system_artifact(f) for f in artifacts):
+ logging.debug("Removing all artifacts for system %s" %
+ cachekey)
+ bc.lac.remove(cachekey)
+
+ self.app.subcommands['gc']([])
arch = artifact.arch
bc.build_artifact(artifact, bc.new_build_env(arch))
+ def is_system_artifact(self, filename):
+ return re.match(r'^[0-9a-fA-F]{64}\.system\.', filename)
class WorkerDaemon(cliapp.Plugin):
@@ -208,6 +216,7 @@ class ControllerDaemon(cliapp.Plugin):
morph_instance = self.app.settings['morph-instance']
listener_specs = [
+ # address, port, class to initiate on connection, class init args
('controller-helper-address', 'controller-helper-port',
distbuild.HelperRouter, []),
('controller-initiator-address', 'controller-initiator-port',
diff --git a/morphlib/writeexts.py b/morphlib/writeexts.py
index 1849f406..3f9c33d5 100644
--- a/morphlib/writeexts.py
+++ b/morphlib/writeexts.py
@@ -15,6 +15,7 @@
import cliapp
+import logging
import os
import re
import shutil
@@ -333,11 +334,15 @@ class WriteExtension(cliapp.Application):
cliapp.runcmd(['cp', '-a', try_path, kernel_dest])
break
+ def get_extra_kernel_args(self):
+ return os.environ.get('KERNEL_ARGS', '')
+
def install_extlinux(self, real_root):
'''Install extlinux on the newly created disk image.'''
self.status(msg='Creating extlinux.conf')
config = os.path.join(real_root, 'extlinux.conf')
+ kernel_args = self.get_extra_kernel_args()
with open(config, 'w') as f:
f.write('default linux\n')
f.write('timeout 1\n')
@@ -345,7 +350,7 @@ class WriteExtension(cliapp.Application):
f.write('kernel /systems/default/kernel\n')
f.write('append root=/dev/sda '
'rootflags=subvol=systems/default/run '
- 'init=/sbin/init rw\n')
+ '%s init=/sbin/init rw\n' % (kernel_args))
self.status(msg='Installing extlinux')
cliapp.runcmd(['extlinux', '--install', real_root])
@@ -413,3 +418,11 @@ class WriteExtension(cliapp.Application):
else:
raise cliapp.AppException('Unexpected value for %s: %s' %
(variable, value))
+
+ def check_ssh_connectivity(self, ssh_host):
+ try:
+ cliapp.ssh_runcmd(ssh_host, ['true'])
+ except cliapp.AppException as e:
+ logging.error("Error checking SSH connectivity: %s", str(e))
+ raise cliapp.AppException(
+ 'Unable to SSH to %s: %s' % (ssh_host, e))