summaryrefslogtreecommitdiff
path: root/eventlet/greenpool.py
diff options
context:
space:
mode:
authorRyan Williams <rdw@lindenlab.com>2010-01-01 15:03:58 -0800
committerRyan Williams <rdw@lindenlab.com>2010-01-01 15:03:58 -0800
commited301bd5bd00180d05f6c34d6fd3f82cddd4e8f4 (patch)
tree8a8a6863f20f8481916f8f4c48399a1a1a53f217 /eventlet/greenpool.py
parent19acf22454b19a09372e17fcd051e39752dcb672 (diff)
downloadeventlet-ed301bd5bd00180d05f6c34d6fd3f82cddd4e8f4.tar.gz
Renamed parallel to greenpool.
Diffstat (limited to 'eventlet/greenpool.py')
-rw-r--r--eventlet/greenpool.py200
1 files changed, 200 insertions, 0 deletions
diff --git a/eventlet/greenpool.py b/eventlet/greenpool.py
new file mode 100644
index 0000000..c424cba
--- /dev/null
+++ b/eventlet/greenpool.py
@@ -0,0 +1,200 @@
+import itertools
+
+from eventlet import greenthread
+from eventlet import coros
+
+__all__ = ['GreenPool', 'GreenPile']
+
+try:
+ next
+except NameError:
+ def next(it):
+ try:
+ return it.next()
+ except AttributeError:
+ raise TypeError("%s object is not an iterator" % type(it))
+
+class GreenPool(object):
+ """ The GreenPool class is a pool of green threads.
+ """
+ def __init__(self, size):
+ self.size = size
+ self.coroutines_running = set()
+ self.sem = coros.Semaphore(size)
+ self.no_coros_running = greenthread.Event()
+
+ def resize(self, new_size):
+ """ Change the max number of coroutines doing work at any given time.
+
+ If resize is called when there are more than *new_size*
+ coroutines already working on tasks, they will be allowed to complete
+ but no new tasks will be allowed to get launched until enough coroutines
+ finish their tasks to drop the overall quantity below *new_size*. Until
+ then, the return value of free() will be negative.
+ """
+ size_delta = new_size - self.size
+ self.sem.counter += size_delta
+ self.size = new_size
+
+ def running(self):
+ """ Returns the number of coroutines that are currently executing
+ functions in the Parallel's pool."""
+ return len(self.coroutines_running)
+
+ def free(self):
+ """ Returns the number of coroutines available for use.
+
+ If zero or less, the next call to :meth:`spawn` will block the calling
+ coroutine until a slot becomes available."""
+ return self.sem.counter
+
+ def spawn(self, function, *args, **kwargs):
+ """Run the *function* with its arguments in its own green thread.
+ Returns the GreenThread object that is running the function, which can
+ be used to retrieve the results.
+ """
+ # if reentering an empty pool, don't try to wait on a coroutine freeing
+ # itself -- instead, just execute in the current coroutine
+ current = greenthread.getcurrent()
+ if self.sem.locked() and current in self.coroutines_running:
+ # a bit hacky to use the GT without switching to it
+ gt = greenthread.GreenThread(current)
+ gt.main(function, args, kwargs)
+ return gt
+ else:
+ self.sem.acquire()
+ gt = greenthread.spawn(function, *args, **kwargs)
+ if not self.coroutines_running:
+ self.no_coros_running = greenthread.Event()
+ self.coroutines_running.add(gt)
+ gt.link(self._spawn_done, coro=gt)
+ return gt
+
+ def _spawn_n_impl(self, func, args, kwargs, coro=None):
+ try:
+ try:
+ func(*args, **kwargs)
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except:
+ # TODO in debug mode print these
+ pass
+ finally:
+ if coro is None:
+ return
+ else:
+ coro = greenthread.getcurrent()
+ self._spawn_done(coro=coro)
+
+ def spawn_n(self, func, *args, **kwargs):
+ """ Create a coroutine to run the *function*. Returns None; the results
+ of the function are not retrievable.
+ """
+ # if reentering an empty pool, don't try to wait on a coroutine freeing
+ # itself -- instead, just execute in the current coroutine
+ current = greenthread.getcurrent()
+ if self.sem.locked() and current in self.coroutines_running:
+ self._spawn_n_impl(func, args, kwargs)
+ else:
+ self.sem.acquire()
+ g = greenthread.spawn_n(self._spawn_n_impl, func, args, kwargs, coro=True)
+ if not self.coroutines_running:
+ self.no_coros_running = greenthread.Event()
+ self.coroutines_running.add(g)
+
+ def waitall(self):
+ """Waits until all coroutines in the pool are finished working."""
+ self.no_coros_running.wait()
+
+ def _spawn_done(self, result=None, exc=None, coro=None):
+ self.sem.release()
+ if coro is not None:
+ self.coroutines_running.remove(coro)
+ # if done processing (no more work is waiting for processing),
+ # send StopIteration so that the queue knows it's done
+ if self.sem.balance == self.size:
+ self.no_coros_running.send(None)
+
+ def waiting(self):
+ """Return the number of coroutines waiting to spawn.
+ """
+ if self.sem.balance < 0:
+ return -self.sem.balance
+ else:
+ return 0
+
+ def _do_imap(self, func, it, gi):
+ for args in it:
+ gi.spawn(func, *args)
+ gi.spawn(raise_stop_iteration)
+
+ def imap(self, function, *iterables):
+ """This is the same as itertools.imap, except that *func* is
+ executed in separate green threads, with the concurrency controlled by
+ the pool. In operation, imap consumes a constant amount of memory,
+ proportional to the size of the pool, and is thus suited for iterating
+ over extremely long input lists.
+ """
+ if function is None:
+ function = lambda *a: a
+ it = itertools.izip(*iterables)
+ gi = GreenImap(self.size)
+ greenthread.spawn_n(self._do_imap, function, it, gi)
+ return gi
+
+
+def raise_stop_iteration():
+ raise StopIteration()
+
+
+class GreenPile(object):
+ """GreenPile is an abstraction representing a bunch of I/O-related tasks.
+
+ Construct a GreenPile with an existing GreenPool object. The GreenPile will
+ then use that pool's concurrency as it processes its jobs. There can be
+ many GreenPiles associated with a single GreenPool.
+
+ A GreenPile can also be constructed standalone, not associated with any
+ GreenPool. To do this, construct it with an integer size parameter instead
+ of a GreenPool
+ """
+ def __init__(self, size_or_pool):
+ if isinstance(size_or_pool, GreenPool):
+ self.pool = size_or_pool
+ else:
+ self.pool = GreenPool(size_or_pool)
+ self.waiters = coros.Queue()
+ self.used = False
+ self.counter = 0
+
+ def spawn(self, func, *args, **kw):
+ """Runs *func* in its own green thread, with the result available by
+ iterating over the GreenPile object."""
+ self.used = True
+ self.counter += 1
+ try:
+ gt = self.pool.spawn(func, *args, **kw)
+ self.waiters.send(gt)
+ except:
+ self.counter -= 1
+ raise
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ """Wait for the next result, suspending the current coroutine until it
+ is available. Raises StopIteration when there are no more results."""
+ if self.counter == 0 and self.used:
+ raise StopIteration()
+ try:
+ return self.waiters.wait().wait()
+ finally:
+ self.counter -= 1
+
+# this is identical to GreenPile but it blocks on spawn if the results
+# aren't consumed
+class GreenImap(GreenPile):
+ def __init__(self, size_or_pool):
+ super(GreenImap, self).__init__(size_or_pool)
+ self.waiters = coros.Channel(max_size=self.pool.size) \ No newline at end of file