summaryrefslogtreecommitdiff
path: root/lorry-controller-minion
diff options
context:
space:
mode:
authorLars Wirzenius <lars.wirzenius@codethink.co.uk>2014-01-20 14:24:27 +0000
committerLars Wirzenius <lars.wirzenius@codethink.co.uk>2014-04-15 13:29:27 +0000
commit4fc162b07b2e9d8489e16ed647e5d96f5c66e10a (patch)
treeac2a2a5b86a5d789bd28b383851b28d7f293b928 /lorry-controller-minion
parent716ad28c18ac00c52797dc42c843569b1834fb88 (diff)
downloadlorry-controller-4fc162b07b2e9d8489e16ed647e5d96f5c66e10a.tar.gz
Add new Lorry Controller
Diffstat (limited to 'lorry-controller-minion')
-rwxr-xr-xlorry-controller-minion302
1 files changed, 302 insertions, 0 deletions
diff --git a/lorry-controller-minion b/lorry-controller-minion
new file mode 100755
index 0000000..fe2089f
--- /dev/null
+++ b/lorry-controller-minion
@@ -0,0 +1,302 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+import fcntl
+import httplib
+import json
+import logging
+import os
+import platform
+import random
+import select
+import subprocess
+import tempfile
+import time
+import urllib
+
+import cliapp
+
+import lorrycontroller
+
+
+class WEBAPPError(Exception):
+
+ def __init__(self, status, reason, body):
+ Exception.__init__(
+ self, 'WEBAPP returned %s %s:\n%sbody' % (status, reason, body))
+
+
+class MINION(cliapp.Application):
+
+ def add_settings(self):
+ self.settings.string(
+ ['webapp-host'],
+ 'address of WEBAPP',
+ default='localhost')
+
+ self.settings.integer(
+ ['webapp-port'],
+ 'port of WEBAPP',
+ default=80)
+
+ self.settings.integer(
+ ['webapp-timeout'],
+ 'how long to wait for an HTTP response from WEBAPP (in seconds)',
+ default=10)
+
+ self.settings.integer(
+ ['sleep'],
+ 'do nothing for this long if there is no new job available '
+ '(0 for random 30..60 s)',
+ default=0)
+
+ self.settings.string(
+ ['lorry-cmd'],
+ 'run CMD as argv0 instead of lorry '
+ '(args will be added as for lorry)',
+ metavar='CMD',
+ default='lorry')
+
+ self.settings.string(
+ ['lorry-working-area'],
+ 'where will Lorry put its files?',
+ metavar='DIR',
+ default='/home/lorry/working-area')
+
+ self.settings.string(
+ ['proxy-config'],
+ 'read HTTP proxy config from FILENAME',
+ metavar='FILENAME')
+
+ def process_args(self, args):
+ logging.info('Starting MINION')
+
+ if self.settings['sleep'] == 0:
+ self.settings['sleep'] = random.randint(30, 60)
+
+ if self.settings['proxy-config']:
+ lorrycontroller.setup_proxy(self.settings['proxy-config'])
+
+ while True:
+ job_spec = self.get_job_spec()
+ if job_spec:
+ self.run_job(job_spec)
+ else:
+ logging.info(
+ 'Got no job from WEBAPP, sleeping for %s s',
+ self.settings['sleep'])
+ time.sleep(self.settings['sleep'])
+
+ def get_job_spec(self):
+ host = self.settings['webapp-host']
+ port = int(self.settings['webapp-port'])
+ timeout = self.settings['webapp-timeout']
+
+ logging.debug('Requesting job from WEBAPP (%s:%s)', host, port)
+
+ params = urllib.urlencode({
+ 'host': platform.node(),
+ 'pid': os.getpid(),
+ })
+
+ try:
+ body = self.webapp_request('POST', '/1.0/give-me-job', params)
+ except WEBAPPError as e:
+ logging.error(str(e))
+ return None
+
+ obj = json.loads(body)
+ if obj.get('job_id', None):
+ return obj
+ return None
+
+ def run_job(self, job_spec):
+ self.start_job(job_spec)
+ while True:
+ stdout, stderr, exit = self.poll_job()
+ kill_job = self.update_webapp_about_job(
+ job_spec, stdout, stderr, exit)
+ if exit is not None:
+ break
+ if kill_job:
+ exit = self.kill_job()
+ self.update_webapp_about_job(
+ job_spec, '', '', exit)
+ break
+
+ def start_job(self, job_spec):
+ logging.info(
+ 'Running job %s: %s on %s',
+ job_spec['job_id'],
+ self.settings['lorry-cmd'],
+ job_spec['path'])
+
+ fd, self.temp_lorry_filename = tempfile.mkstemp()
+ os.write(fd, job_spec['text'])
+ os.close(fd)
+
+ argv = [
+ self.settings['lorry-cmd'],
+ self.temp_lorry_filename,
+ ]
+
+ pipe = os.pipe()
+ self.stdout_fd = pipe[0]
+ self.set_nonblocking(self.stdout_fd)
+
+ devnull = open('/dev/null')
+
+ self.process = subprocess.Popen(
+ argv,
+ stdin=devnull,
+ stdout=pipe[1],
+ stderr=subprocess.STDOUT)
+
+ os.close(pipe[1])
+ devnull.close()
+
+ def set_nonblocking(self, fd):
+ flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
+ flags = flags | os.O_NONBLOCK
+ fcntl.fcntl(fd, fcntl.F_SETFL, flags)
+
+ def poll_job(self):
+ read_size = 1024
+
+ exit = self.process.poll()
+ if exit is None:
+ # Process is still running.
+ wait_for_output = 10.0
+ r, w, x = select.select([self.stdout_fd], [], [], wait_for_output)
+ stdout = stderr = ''
+ if r:
+ stdout = os.read(self.stdout_fd, read_size)
+ else:
+ # Finished.
+ if exit != 0:
+ logging.error('Subprocess failed')
+ stdout_parts = []
+ while True:
+ data = os.read(self.stdout_fd, read_size)
+ if not data:
+ break
+ stdout_parts.append(data)
+ stdout = ''.join(stdout_parts)
+ stderr = ''
+ os.remove(self.temp_lorry_filename)
+
+ os.close(self.stdout_fd)
+ self.stdout_fd = None
+
+ return stdout, stderr, exit
+
+ def kill_job(self):
+ self.process.kill()
+ return self.process.wait()
+
+ def update_webapp_about_job(self, job_spec, stdout, stderr, exit):
+ logging.debug(
+ 'Updating WEBAPP about running job %s', job_spec['job_id'])
+
+ if exit is None:
+ disk_usage = None
+ else:
+ disk_usage = self.get_lorry_disk_usage(job_spec)
+
+ params = urllib.urlencode({
+ 'job_id': job_spec['job_id'],
+ 'exit': 'no' if exit is None else exit,
+ 'stdout': stdout,
+ 'stderr': stderr,
+ 'disk_usage': disk_usage,
+ })
+
+ try:
+ body = self.webapp_request('POST', '/1.0/job-update', params)
+ except WEBAPPError as e:
+ logging.error(str(e))
+ return
+
+ obj = json.loads(body)
+ return obj['kill_job']
+
+ def webapp_request(self, method, path, body):
+ logging.debug(
+ 'Making HTTP request to WEBAPP: method=%r path=%r body=%r',
+ method, path, body)
+
+ host = self.settings['webapp-host']
+ port = int(self.settings['webapp-port'])
+ timeout = self.settings['webapp-timeout']
+ conn = httplib.HTTPConnection(host, port=port, timeout=timeout)
+
+ headers = {}
+ if body:
+ headers['Content-type'] = 'application/x-www-form-urlencoded'
+
+ conn.request(method, path, body=body, headers=headers)
+
+ response = conn.getresponse()
+ response_body = response.read()
+ conn.close()
+
+ if response.status != httplib.OK:
+ raise WEBAPPError(response.status, response.reason, response_body)
+
+ return response_body
+
+ def get_lorry_disk_usage(self, job_spec):
+ dirname = os.path.join(
+ self.settings['lorry-working-area'],
+ self.escape_lorry_area_basename(job_spec['path']))
+ return self.disk_usage_by_dir(dirname)
+
+ def escape_lorry_area_basename(self, basename):
+ # FIXME: This code should be kept in sync with the respective
+ # code in lorry, or, better, we would import the code from
+ # Lorry directly.
+
+ assert '\0' not in basename
+ # We escape slashes as underscores.
+ return '_'.join(basename.split('/'))
+
+ def disk_usage_by_dir(self, dirname):
+ exit, out, err = cliapp.runcmd_unchecked(['du', '-sk', dirname])
+ if exit:
+ logging.error('du -sk %s failed: %r', dirname, err)
+ return 0
+
+ lines = out.splitlines()
+ if not lines:
+ logging.warning('no output from du')
+ return 0
+
+ words = lines[-1].split()
+ if not words:
+ logging.warning('last line of du output is empty')
+ return 0
+
+ kibibyte = 1024
+ try:
+ return int(words[0]) * kibibyte
+ except ValueError:
+ logging.warning('error converting %r to string' % words[0])
+ return 0
+
+
+MINION().run()