From ca4666f005735078727c5158c518031c9f09c72b Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Wed, 13 Sep 2017 17:24:21 -0600 Subject: Server: support background jobs Change-Id: Ic15ab05c16f143f1d557d935aecb6d0afe419d59 --- gear/__init__.py | 36 +++++++++++++++++++++++++++--------- gear/tests/test_functional.py | 15 +++++++++++++++ 2 files changed, 42 insertions(+), 9 deletions(-) diff --git a/gear/__init__.py b/gear/__init__.py index ae99eb8..435f8ed 100644 --- a/gear/__init__.py +++ b/gear/__init__.py @@ -2981,7 +2981,8 @@ class Server(BaseClientServer): # the worker disconnected, alert the client try: p = Packet(constants.REQ, constants.WORK_FAIL, job.handle) - job.client_connection.sendPacket(p) + if job.client_connection: + job.client_connection.sendPacket(p) except Exception: self.log.exception("Sending WORK_FAIL to client after " "worker disconnect failed:") @@ -3004,10 +3005,11 @@ class Server(BaseClientServer): def _removeJob(self, job, dequeue=True): # dequeue is tri-state: True, False, or a specific queue - try: - del job.client_connection.related_jobs[job.handle] - except KeyError: - pass + if job.client_connection: + try: + del job.client_connection.related_jobs[job.handle] + except KeyError: + pass if job.worker_connection: try: del job.worker_connection.related_jobs[job.handle] @@ -3301,7 +3303,7 @@ class Server(BaseClientServer): workers += 1 self.statsd.gauge('workers', workers) - def _handleSubmitJob(self, packet, precedence): + def _handleSubmitJob(self, packet, precedence, background=False): name = packet.getArgument(0) unique = packet.getArgument(1) if not unique: @@ -3318,11 +3320,16 @@ class Server(BaseClientServer): self.max_handle += 1 handle = ('H:%s:%s' % (packet.connection.host, self.max_handle)).encode('utf8') - job = ServerJob(handle, name, arguments, packet.connection, unique) + if not background: + conn = packet.connection + else: + conn = None + job = ServerJob(handle, name, arguments, conn, unique) p = Packet(constants.RES, constants.JOB_CREATED, handle) packet.connection.sendPacket(p) self.jobs[handle] = job - packet.connection.related_jobs[handle] = job + if not background: + packet.connection.related_jobs[handle] = job if precedence == PRECEDENCE_HIGH: self.high_queue.append(job) elif precedence == PRECEDENCE_NORMAL: @@ -3341,6 +3348,16 @@ class Server(BaseClientServer): def handleSubmitJobLow(self, packet): return self._handleSubmitJob(packet, PRECEDENCE_LOW) + def handleSubmitJobBg(self, packet): + return self._handleSubmitJob(packet, PRECEDENCE_NORMAL, + background=True) + + def handleSubmitJobHighBg(self, packet): + return self._handleSubmitJob(packet, PRECEDENCE_HIGH, background=True) + + def handleSubmitJobLowBg(self, packet): + return self._handleSubmitJob(packet, PRECEDENCE_LOW, background=True) + def getJobForConnection(self, connection, peek=False): for queue in [self.high_queue, self.normal_queue, self.low_queue]: for job in queue: @@ -3410,7 +3427,8 @@ class Server(BaseClientServer): self.log.info("Received packet %s for unknown job" % (packet,)) return packet.code = constants.RES - job.client_connection.sendPacket(packet) + if job.client_connection: + job.client_connection.sendPacket(packet) if finished: self._removeJob(job, dequeue=False) self._updateStats() diff --git a/gear/tests/test_functional.py b/gear/tests/test_functional.py index e512f2f..09c7d2f 100644 --- a/gear/tests/test_functional.py +++ b/gear/tests/test_functional.py @@ -128,6 +128,21 @@ class TestFunctional(tests.BaseTestCase): self.assertTrue(job.complete) self.assertEqual(job.data, [b'workdata']) + def test_bg_job(self): + self.worker.registerFunction('test') + + job = gear.Job(b'test', b'testdata') + self.client.submitJob(job, background=True) + self.assertNotEqual(job.handle, None) + self.client.shutdown() + del self.client + + workerjob = self.worker.getJob() + self.assertEqual(workerjob.handle, job.handle) + self.assertEqual(workerjob.arguments, b'testdata') + workerjob.sendWorkData(b'workdata') + workerjob.sendWorkComplete() + def test_worker_termination(self): def getJob(): with testtools.ExpectedException(gear.InterruptedError): -- cgit v1.2.1