diff options
Diffstat (limited to 'Lib/Queue.py')
| -rw-r--r-- | Lib/Queue.py | 46 | 
1 files changed, 46 insertions, 0 deletions
| diff --git a/Lib/Queue.py b/Lib/Queue.py index c6c608b744..51ad354260 100644 --- a/Lib/Queue.py +++ b/Lib/Queue.py @@ -35,6 +35,51 @@ class Queue:          # Notify not_full whenever an item is removed from the queue;          # a thread waiting to put is notified then.          self.not_full = threading.Condition(self.mutex) +        # Notify all_tasks_done whenever the number of unfinished tasks +        # drops to zero; thread waiting to join() is notified to resume +        self.all_tasks_done = threading.Condition(self.mutex) +        self.unfinished_tasks = 0 + +    def task_done(self): +        """Indicate that a formerly enqueued task is complete. + +        Used by Queue consumer threads.  For each get() used to fetch a task, +        a subsequent call to task_done() tells the queue that the processing +        on the task is complete. + +        If a join() is currently blocking, it will resume when all items +        have been processed (meaning that a task_done() call was received +        for every item that had been put() into the queue). + +        Raises a ValueError if called more times than there were items +        placed in the queue. +        """ +        self.all_tasks_done.acquire() +        try: +            unfinished = self.unfinished_tasks - 1 +            if unfinished <= 0: +                if unfinished < 0: +                    raise ValueError('task_done() called too many times') +                self.all_tasks_done.notifyAll() +            self.unfinished_tasks = unfinished +        finally: +            self.all_tasks_done.release() + +    def join(self): +        """Blocks until all items in the Queue have been gotten and processed. + +        The count of unfinished tasks goes up whenever an item is added to the +        queue. The count goes down whenever a consumer thread calls task_done() +        to indicate the item was retrieved and all work on it is complete. + +        When the count of unfinished tasks drops to zero, join() unblocks. +        """ +        self.all_tasks_done.acquire() +        try: +            while self.unfinished_tasks: +                self.all_tasks_done.wait() +        finally: +            self.all_tasks_done.release()      def qsize(self):          """Return the approximate size of the queue (not reliable!).""" @@ -86,6 +131,7 @@ class Queue:                          raise Full                      self.not_full.wait(remaining)              self._put(item) +            self.unfinished_tasks += 1              self.not_empty.notify()          finally:              self.not_full.release() | 
