diff options
Diffstat (limited to 'cxmanage_api/tasks.py')
-rw-r--r-- | cxmanage_api/tasks.py | 175 |
1 files changed, 175 insertions, 0 deletions
diff --git a/cxmanage_api/tasks.py b/cxmanage_api/tasks.py new file mode 100644 index 0000000..6b5cfde --- /dev/null +++ b/cxmanage_api/tasks.py @@ -0,0 +1,175 @@ +# Copyright (c) 2012, Calxeda Inc. +# +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of Calxeda Inc. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS +# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR +# TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH +# DAMAGE. + + +from collections import deque +from threading import Thread, Lock, Event +from time import sleep + + +class Task(object): + """A task object represents some unit of work to be done. + + :param method: The actual method (function) to execute. + :type method: function + :param args: Arguments to pass to the named method to run. + :type args: list + """ + + def __init__(self, method, *args): + """Default constructor for the Task class.""" + self.status = "Queued" + self.result = None + self.error = None + + self._method = method + self._args = args + self._finished = Event() + + def join(self): + """Wait for this task to finish.""" + self._finished.wait() + + def is_alive(self): + """Return true if this task hasn't been finished. + + :returns: Whether or not the task is still alive. + :rtype: boolean + + """ + return not self._finished.is_set() + + def _run(self): + """Execute this task. Should only be called by TaskWorker.""" + self.status = "In Progress" + try: + self.result = self._method(*self._args) + self.status = "Completed" + except Exception as e: + self.error = e + self.status = "Failed" + + self._finished.set() + + +class TaskQueue(object): + """A task queue, consisting of a queue and a number of workers. + + :param threads: Number of threads to create (if needed). + :type threads: integer + :param delay: Time to wait between + """ + + def __init__(self, threads=48, delay=0): + """Default constructor for the TaskQueue class.""" + self.threads = threads + self.delay = delay + + self._lock = Lock() + self._queue = deque() + self._workers = 0 + + def put(self, method, *args): + """Add a task to the task queue, and spawn a worker if we're not full. + + :param method: Named method to run. + :type method: string + :param args: Arguments to pass to the named method to run. + :type args: list + + :returns: A Task that will be executed by a worker at a later time. + :rtype: Task + + """ + self._lock.acquire() + + task = Task(method, *args) + self._queue.append(task) + + if self._workers < self.threads: + TaskWorker(task_queue=self, delay=self.delay) + self._workers += 1 + + self._lock.release() + return task + + def get(self): + """ + Get a task from the task queue. Mainly used by workers. + + :returns: A Task object that hasn't been executed yet. + :rtype: Task + + :raises IndexError: If there are no tasks in the queue. + + """ + self._lock.acquire() + try: + return self._queue.popleft() + finally: + self._lock.release() + + def _remove_worker(self): + """Decrement the worker count. Should only be used by TaskWorker.""" + self._lock.acquire() + self._workers -= 1 + self._lock.release() + + +class TaskWorker(Thread): + """A worker thread that runs tasks from a TaskQueue. + + :param task_queue: Task queue to get tasks from. + :type task_queue: TaskQueue + :param delay: Time to wait in-between execution. + + """ + def __init__(self, task_queue, delay=0): + super(TaskWorker, self).__init__() + self.daemon = True + + self._task_queue = task_queue + self._delay = delay + + self.start() + + def run(self): + """Repeatedly get tasks from the TaskQueue and execute them.""" + try: + while True: + sleep(self._delay) + task = self._task_queue.get() + task._run() + except: + self._task_queue._remove_worker() + +DEFAULT_TASK_QUEUE = TaskQueue() + +# End of file: ./tasks.py |