summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames E. Blair <jeblair@redhat.com>2017-09-13 17:24:21 -0600
committerJames E. Blair <jeblair@redhat.com>2017-09-13 18:21:39 -0600
commitca4666f005735078727c5158c518031c9f09c72b (patch)
tree0cc694db3c493d68fe816370f6fc29b87aa5a008
parentbf8d96cb7732bdd265d0360652c2df269d356ea9 (diff)
downloadgear-ca4666f005735078727c5158c518031c9f09c72b.tar.gz
Server: support background jobs0.10.0
Change-Id: Ic15ab05c16f143f1d557d935aecb6d0afe419d59
-rw-r--r--gear/__init__.py36
-rw-r--r--gear/tests/test_functional.py15
2 files changed, 42 insertions, 9 deletions
diff --git a/gear/__init__.py b/gear/__init__.py
index ae99eb8..435f8ed 100644
--- a/gear/__init__.py
+++ b/gear/__init__.py
@@ -2981,7 +2981,8 @@ class Server(BaseClientServer):
# the worker disconnected, alert the client
try:
p = Packet(constants.REQ, constants.WORK_FAIL, job.handle)
- job.client_connection.sendPacket(p)
+ if job.client_connection:
+ job.client_connection.sendPacket(p)
except Exception:
self.log.exception("Sending WORK_FAIL to client after "
"worker disconnect failed:")
@@ -3004,10 +3005,11 @@ class Server(BaseClientServer):
def _removeJob(self, job, dequeue=True):
# dequeue is tri-state: True, False, or a specific queue
- try:
- del job.client_connection.related_jobs[job.handle]
- except KeyError:
- pass
+ if job.client_connection:
+ try:
+ del job.client_connection.related_jobs[job.handle]
+ except KeyError:
+ pass
if job.worker_connection:
try:
del job.worker_connection.related_jobs[job.handle]
@@ -3301,7 +3303,7 @@ class Server(BaseClientServer):
workers += 1
self.statsd.gauge('workers', workers)
- def _handleSubmitJob(self, packet, precedence):
+ def _handleSubmitJob(self, packet, precedence, background=False):
name = packet.getArgument(0)
unique = packet.getArgument(1)
if not unique:
@@ -3318,11 +3320,16 @@ class Server(BaseClientServer):
self.max_handle += 1
handle = ('H:%s:%s' % (packet.connection.host,
self.max_handle)).encode('utf8')
- job = ServerJob(handle, name, arguments, packet.connection, unique)
+ if not background:
+ conn = packet.connection
+ else:
+ conn = None
+ job = ServerJob(handle, name, arguments, conn, unique)
p = Packet(constants.RES, constants.JOB_CREATED, handle)
packet.connection.sendPacket(p)
self.jobs[handle] = job
- packet.connection.related_jobs[handle] = job
+ if not background:
+ packet.connection.related_jobs[handle] = job
if precedence == PRECEDENCE_HIGH:
self.high_queue.append(job)
elif precedence == PRECEDENCE_NORMAL:
@@ -3341,6 +3348,16 @@ class Server(BaseClientServer):
def handleSubmitJobLow(self, packet):
return self._handleSubmitJob(packet, PRECEDENCE_LOW)
+ def handleSubmitJobBg(self, packet):
+ return self._handleSubmitJob(packet, PRECEDENCE_NORMAL,
+ background=True)
+
+ def handleSubmitJobHighBg(self, packet):
+ return self._handleSubmitJob(packet, PRECEDENCE_HIGH, background=True)
+
+ def handleSubmitJobLowBg(self, packet):
+ return self._handleSubmitJob(packet, PRECEDENCE_LOW, background=True)
+
def getJobForConnection(self, connection, peek=False):
for queue in [self.high_queue, self.normal_queue, self.low_queue]:
for job in queue:
@@ -3410,7 +3427,8 @@ class Server(BaseClientServer):
self.log.info("Received packet %s for unknown job" % (packet,))
return
packet.code = constants.RES
- job.client_connection.sendPacket(packet)
+ if job.client_connection:
+ job.client_connection.sendPacket(packet)
if finished:
self._removeJob(job, dequeue=False)
self._updateStats()
diff --git a/gear/tests/test_functional.py b/gear/tests/test_functional.py
index e512f2f..09c7d2f 100644
--- a/gear/tests/test_functional.py
+++ b/gear/tests/test_functional.py
@@ -128,6 +128,21 @@ class TestFunctional(tests.BaseTestCase):
self.assertTrue(job.complete)
self.assertEqual(job.data, [b'workdata'])
+ def test_bg_job(self):
+ self.worker.registerFunction('test')
+
+ job = gear.Job(b'test', b'testdata')
+ self.client.submitJob(job, background=True)
+ self.assertNotEqual(job.handle, None)
+ self.client.shutdown()
+ del self.client
+
+ workerjob = self.worker.getJob()
+ self.assertEqual(workerjob.handle, job.handle)
+ self.assertEqual(workerjob.arguments, b'testdata')
+ workerjob.sendWorkData(b'workdata')
+ workerjob.sendWorkComplete()
+
def test_worker_termination(self):
def getJob():
with testtools.ExpectedException(gear.InterruptedError):