summaryrefslogtreecommitdiff
path: root/gear/worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'gear/worker.py')
-rw-r--r--gear/worker.py196
1 files changed, 133 insertions, 63 deletions
diff --git a/gear/worker.py b/gear/worker.py
index 824a7bba..3b0f4591 100644
--- a/gear/worker.py
+++ b/gear/worker.py
@@ -1,15 +1,13 @@
import gear
-worker = gear.Worker('reverser')
-worker.addServer('localhost')
-worker.registerFunction("reverse")
-worker.registerFunction("build-graph")
-worker.registerFunction("build-artifact")
-worker.registerFunction("cache-request")
import time
import json
import os
import requests
import urlparse
+import signal
+import threading
+import time
+import cliapp
from subprocess import Popen, PIPE, STDOUT
@@ -23,6 +21,20 @@ logging.basicConfig()
#TODO: values from settings
cache_server = 'http://git.baserock.org:8080'
+class BuildFailedError(cliapp.AppException):
+ def __init__(self):
+ cliapp.AppException.__init__(
+ self, 'Build Failed')
+
+class GraphFailedError(cliapp.AppException):
+ def __init__(self):
+ cliapp.AppException.__init__(
+ self, 'Graph Failed')
+
+class CacheFailedError(cliapp.AppException):
+ def __init__(self):
+ cliapp.AppException.__init__(
+ self, 'Cache Failed')
@contextmanager
def ssh_manager(host, port, username, key):
@@ -41,6 +53,7 @@ def ssh_manager(host, port, username, key):
finally:
client.close()
+
def upload_files(cache_key, suffixes):
print "DEBUG: start upload_files"
cache_dir = '/src/cache/artifacts'
@@ -68,60 +81,117 @@ def upload_files(cache_key, suffixes):
sftp.put(single_file, remote_dest_tmp)
sftp.rename(remote_dest_tmp, remote_dest)
-while True:
- print "DEBUG: Waiting for job"
- job = worker.getJob()
- print "DEBUG: Received job '%s'" % job.name
- if job.name == "reverse":
- print "DEBUG: Starting job reverse with '%s'" % job.arguments
- for x in range(0, 100):
- job.sendWorkData("This is: %s" % x)
- job.sendWorkComplete("answer")
- elif job.name == "build-graph":
- bg_request=json.loads(job.arguments)
- print ("DEBUG: Starting build-graph calculation for Repo: '%s' "
- "Ref: '%s' System: '%s'") % (bg_request['repo'],
- bg_request['ref'],
- bg_request['system'])
- # TODO: There should be another way of doing this.
- cmd = ['morph', 'calculate-build-graph', '--quiet',
- bg_request['repo'], bg_request['ref'], bg_request['system']]
- p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
- output = p.stdout.read()
- # TODO: catch errors calculating build-graph here instead of sending
- # the error as build-graph :)
- print "DEBUG: finished computing build graph"
- job.sendWorkComplete(output)
- elif job.name == "build-artifact":
- artifact = distbuild.decode_artifact_reference(job.arguments)
- print "building %s" % artifact.name
- cmd = ['morph', 'worker-build', '--build-log-on-stdout', artifact.name]
- p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
- p.stdin.write(job.arguments)
- p.stdin.close()
- while True:
- line = p.stdout.readline()
- if not line: break
- job.sendWorkData(line)
-
- kind = artifact.kind
-
- if kind == 'chunk':
- artifact_names = artifact.source_artifact_names
-
- suffixes = ['%s.%s' % (kind, name) for name in artifact_names]
- suffixes.append('build-log')
- else:
- filename = '%s.%s' % (kind, artifact.name)
- suffixes = [filename]
-
- if kind == 'stratum':
- suffixes.append(filename + '.meta')
- upload_files(artifact.cache_key, suffixes)
- job.sendWorkComplete(artifact.cache_key)
- elif job.name == 'cache-request':
- artifact_names = json.loads(job.arguments)
-
- url = urlparse.urljoin(cache_server, '/1.0/artifacts')
- r = requests.post(url, json=artifact_names)
- job.sendWorkComplete(json.dumps(r.json()))
+class GearWorkerManager(threading.Thread):
+
+ def __init__(self):
+ super(GearWorkerManager, self).__init__()
+ self.stopped = False
+ self.worker = gear.Worker('reverser')
+ self.worker.addServer('localhost')
+ self.worker.registerFunction("reverse")
+ self.worker.registerFunction("build-graph")
+ self.worker.registerFunction("build-artifact")
+ self.worker.registerFunction("cache-request")
+
+ def shutdown(self):
+ self.stopped = True
+ self.worker.stopWaitingForJobs()
+
+ def run(self):
+ while not self.stopped:
+ try:
+ print "DEBUG: Waiting for job"
+ job = self.worker.getJob()
+ self._handle_job(job)
+ except gear.InterruptedError:
+ print 'We were asked to stop waiting for jobs'
+ except BuildFailedError:
+ print "DEBUG: Build failed"
+ pass
+ except GraphFailedError:
+ print "DEBUG: Graph failed"
+ pass
+ except CacheFailedError:
+ print "DEBUG: Cache failed"
+ pass
+
+ def _handle_job(self, job):
+ print "DEBUG: Received job '%s'" % job.name
+ if job.name == "reverse":
+ print "DEBUG: Starting job reverse with '%s'" % job.arguments
+ for x in range(0, 100):
+ job.sendWorkData("This is: %s" % x)
+ job.sendWorkComplete("answer")
+ elif job.name == "build-graph":
+ bg_request=json.loads(job.arguments)
+ print ("DEBUG: Starting build-graph calculation for Repo: '%s' "
+ "Ref: '%s' System: '%s'") % (bg_request['repo'],
+ bg_request['ref'],
+ bg_request['system'])
+ # TODO: There should be another way of doing this.
+ cmd = ['morph', 'calculate-build-graph', '--quiet',
+ bg_request['repo'], bg_request['ref'], bg_request['system']]
+ p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
+ output = p.stdout.read()
+ p.wait()
+ if p.returncode != 0:
+ raise GraphFailedError()
+ # TODO: catch errors calculating build-graph here instead of sending
+ # the error as build-graph :)
+ print "DEBUG: finished computing build graph"
+ job.sendWorkComplete(output)
+ elif job.name == "build-artifact":
+ artifact = distbuild.decode_artifact_reference(job.arguments)
+ print "building %s" % artifact.name
+ cmd = ['morph', 'worker-build', '--build-log-on-stdout', artifact.name]
+ p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
+ p.stdin.write(job.arguments)
+ p.stdin.close()
+ while True:
+ line = p.stdout.readline()
+ if not line: break
+ job.sendWorkData(line)
+ p.wait()
+ if p.returncode != 0:
+ raise BuildFailedError()
+
+ kind = artifact.kind
+
+ if kind == 'chunk':
+ artifact_names = artifact.source_artifact_names
+
+ suffixes = ['%s.%s' % (kind, name) for name in artifact_names]
+ suffixes.append('build-log')
+ else:
+ filename = '%s.%s' % (kind, artifact.name)
+ suffixes = [filename]
+
+ if kind == 'stratum':
+ suffixes.append(filename + '.meta')
+ upload_files(artifact.cache_key, suffixes)
+ job.sendWorkComplete(artifact.cache_key)
+ elif job.name == 'cache-request':
+ artifact_names = json.loads(job.arguments)
+
+ url = urlparse.urljoin(cache_server, '/1.0/artifacts')
+ try:
+ r = requests.post(url, json=artifact_names)
+ except requests.exceptions.RequestException:
+ raise CacheFailedError()
+ job.sendWorkComplete(json.dumps(r.json()))
+
+## From command line gear-worker
+
+def term_handler(signum, frame):
+ worker_manager.shutdown()
+signal.signal(signal.SIGTERM, term_handler)
+
+worker_manager = GearWorkerManager()
+worker_manager.start()
+
+while not worker_manager.stopped:
+ try:
+ time.sleep(3)
+ except KeyboardInterrupt:
+ print "Ctrl + C: asking tasks to exit nicely...\n"
+ worker_manager.shutdown()