summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames E. Blair <jeblair@openstack.org>2014-05-01 10:06:18 -0700
committerJames E. Blair <jeblair@openstack.org>2014-05-01 10:06:18 -0700
commit02e3be81a54d7dffaf26d05da3138bbed2948b91 (patch)
treeff2c29d03fabcfae710cab500b02b8b3c636d355
parent21243f5dc9d8d29c26f06db358c782364af95a06 (diff)
downloadgear-02e3be81a54d7dffaf26d05da3138bbed2948b91.tar.gz
geard: be more careful removing jobs
When a client connection is lost and jobs are being removed, there is a race condition where if the worker completes the job first, it may be removed from the list of associated jobs before the same thing is done due to the client disconnect. In all cases, wrap job deletion in try/except calls. Additionally ensure that not only running jobs, but also queued jobs are removed in such cases. Change-Id: I24c3af3fa7fd068bec29af90d7ff42a59b21ea94
-rw-r--r--gear/__init__.py42
1 files changed, 35 insertions, 7 deletions
diff --git a/gear/__init__.py b/gear/__init__.py
index dcdb15b..cac73ad 100644
--- a/gear/__init__.py
+++ b/gear/__init__.py
@@ -2312,11 +2312,42 @@ class Server(BaseClientServer):
except Exception:
self.log.exception("Sending WORK_FAIL to client after "
"worker disconnect failed:")
+ self._removeJob(job)
+ self._updateStats()
+
+ def _removeJob(self, job, dequeue=True):
+ # dequeue is tri-state: True, False, or a specific queue
+ try:
del job.client_connection.related_jobs[job.handle]
- if job.worker_connection:
+ except KeyError:
+ pass
+ if job.worker_connection:
+ try:
del job.worker_connection.related_jobs[job.handle]
+ except KeyError:
+ pass
+ try:
del self.jobs[job.handle]
- self._updateStats()
+ except KeyError:
+ pass
+ if dequeue is True:
+ # Search all queues for the job
+ try:
+ self.high_queue.remove(job)
+ except ValueError:
+ pass
+ try:
+ self.normal_queue.remove(job)
+ except ValueError:
+ pass
+ try:
+ self.low_queue.remove(job)
+ except ValueError:
+ pass
+ elif dequeue is not False:
+ # A specific queue was supplied
+ dequeue.remove(job)
+ # If dequeue is false, no need to remove from any queue
def getQueue(self):
"""Returns a copy of all internal queues in a flattened form.
@@ -2354,8 +2385,7 @@ class Server(BaseClientServer):
(request.connection.ssl_subject, job.name))
request.connection.sendRaw(b'ERR PERMISSION_DENIED\n')
return
- queue.remove(job)
- del self.jobs[job.handle]
+ self._removeJob(job, dequeue=queue)
self._updateStats()
request.connection.sendRaw(b'OK\n')
return
@@ -2680,9 +2710,7 @@ class Server(BaseClientServer):
packet.code = constants.RES
job.client_connection.sendPacket(packet)
if finished:
- del self.jobs[handle]
- del job.client_connection.related_jobs[handle]
- del job.worker_connection.related_jobs[handle]
+ self._removeJob(job, dequeue=False)
self._updateStats()
def handleSetClientID(self, packet):