diff options
author | James E. Blair <jeblair@openstack.org> | 2014-05-01 10:06:18 -0700 |
---|---|---|
committer | James E. Blair <jeblair@openstack.org> | 2014-05-01 10:06:18 -0700 |
commit | 02e3be81a54d7dffaf26d05da3138bbed2948b91 (patch) | |
tree | ff2c29d03fabcfae710cab500b02b8b3c636d355 | |
parent | 21243f5dc9d8d29c26f06db358c782364af95a06 (diff) | |
download | gear-02e3be81a54d7dffaf26d05da3138bbed2948b91.tar.gz |
geard: be more careful removing jobs
When a client connection is lost and jobs are being removed, there
is a race condition where if the worker completes the job first,
it may be removed from the list of associated jobs before the same
thing is done due to the client disconnect. In all cases, wrap
job deletion in try/except calls.
Additionally ensure that not only running jobs, but also queued jobs
are removed in such cases.
Change-Id: I24c3af3fa7fd068bec29af90d7ff42a59b21ea94
-rw-r--r-- | gear/__init__.py | 42 |
1 files changed, 35 insertions, 7 deletions
diff --git a/gear/__init__.py b/gear/__init__.py index dcdb15b..cac73ad 100644 --- a/gear/__init__.py +++ b/gear/__init__.py @@ -2312,11 +2312,42 @@ class Server(BaseClientServer): except Exception: self.log.exception("Sending WORK_FAIL to client after " "worker disconnect failed:") + self._removeJob(job) + self._updateStats() + + def _removeJob(self, job, dequeue=True): + # dequeue is tri-state: True, False, or a specific queue + try: del job.client_connection.related_jobs[job.handle] - if job.worker_connection: + except KeyError: + pass + if job.worker_connection: + try: del job.worker_connection.related_jobs[job.handle] + except KeyError: + pass + try: del self.jobs[job.handle] - self._updateStats() + except KeyError: + pass + if dequeue is True: + # Search all queues for the job + try: + self.high_queue.remove(job) + except ValueError: + pass + try: + self.normal_queue.remove(job) + except ValueError: + pass + try: + self.low_queue.remove(job) + except ValueError: + pass + elif dequeue is not False: + # A specific queue was supplied + dequeue.remove(job) + # If dequeue is false, no need to remove from any queue def getQueue(self): """Returns a copy of all internal queues in a flattened form. @@ -2354,8 +2385,7 @@ class Server(BaseClientServer): (request.connection.ssl_subject, job.name)) request.connection.sendRaw(b'ERR PERMISSION_DENIED\n') return - queue.remove(job) - del self.jobs[job.handle] + self._removeJob(job, dequeue=queue) self._updateStats() request.connection.sendRaw(b'OK\n') return @@ -2680,9 +2710,7 @@ class Server(BaseClientServer): packet.code = constants.RES job.client_connection.sendPacket(packet) if finished: - del self.jobs[handle] - del job.client_connection.related_jobs[handle] - del job.worker_connection.related_jobs[handle] + self._removeJob(job, dequeue=False) self._updateStats() def handleSetClientID(self, packet): |