summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames E. Blair <jeblair@redhat.com>2018-02-02 10:55:59 -0800
committerJames E. Blair <jeblair@redhat.com>2018-02-02 11:19:14 -0800
commiteee504755de2c5927651ca1e3075470e42d77cda (patch)
treec76d7b24ed1a1490e3ec572bfddfd6bae691596e
parentda802f2828e146b3ca466ee71a75fe83f7a25fb2 (diff)
downloadgear-eee504755de2c5927651ca1e3075470e42d77cda.tar.gz
Automatically send GRAB_JOB after CAN_DO
After registering a function, if a connection is sleeping (ie, waiting for a NOOP to wake it up), cause it to send another grab_job in case the newly registered job is something it can handle. Change-Id: Ibea13726f4a451ebc67850b17e168bd4accfbc0b
-rw-r--r--gear/__init__.py6
-rw-r--r--gear/tests/test_functional.py30
2 files changed, 36 insertions, 0 deletions
diff --git a/gear/__init__.py b/gear/__init__.py
index b085c29..4ed674e 100644
--- a/gear/__init__.py
+++ b/gear/__init__.py
@@ -1897,6 +1897,12 @@ class Worker(BaseClient):
else:
self._sendCanDo(name)
+ connections = self.active_connections[:]
+ for connection in connections:
+ if connection.state == "SLEEP":
+ connection.changeState("IDLE")
+ self._updateStateMachines()
+
def unRegisterFunction(self, name):
"""Remove a function from Gearman's registry.
diff --git a/gear/tests/test_functional.py b/gear/tests/test_functional.py
index 09c7d2f..3bca907 100644
--- a/gear/tests/test_functional.py
+++ b/gear/tests/test_functional.py
@@ -238,6 +238,36 @@ class TestFunctionalText(tests.BaseTestCase):
self.assertTrue(job.complete)
self.assertEqual(job.exception, 'work failed')
+ def test_grab_job_after_register(self):
+ jobunique = uuid.uuid4().hex
+ job = gear.TextJob('test', 'testdata', unique=jobunique)
+ self.client.submitJob(job)
+ self.assertNotEqual(job.handle, None)
+
+ def getJob():
+ workerjob = self.worker.getJob()
+ workerjob.sendWorkComplete()
+
+ jobthread = threading.Thread(target=getJob)
+ jobthread.daemon = True
+ jobthread.start()
+
+ for count in iterate_timeout(30, "worker sleeping"):
+ if self.worker.active_connections[0].state == 'SLEEP':
+ break
+ self.assertEqual(1, len(self.server.normal_queue))
+ self.assertFalse(job.complete)
+
+ # When we register the function, the worker should send a
+ # grab_job packet and pick up the job and it should complete.
+ self.worker.registerFunction('test')
+
+ for count in iterate_timeout(30, "job completion"):
+ if job.complete:
+ break
+
+ self.assertEqual(0, len(self.server.normal_queue))
+
def load_tests(loader, in_tests, pattern):
return testscenarios.load_tests_apply_scenarios(loader, in_tests, pattern)