summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-11 14:37:51 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-11 14:37:51 +0200
commit5ff864138cd1e680a78522c26b583639f8f5e313 (patch)
treef78ba0335c9a620406c04837ffe60d0fff8d9531
parent583e6a25b0d891a2f531a81029f2bac0c237cbf9 (diff)
downloadgitpython-5ff864138cd1e680a78522c26b583639f8f5e313.tar.gz
test.async: split test_pool up into task implenetations and related utilities, as well as the tests themselves. File became too large
-rw-r--r--test/git/async/task.py190
-rw-r--r--test/git/async/test_pool.py186
2 files changed, 196 insertions, 180 deletions
diff --git a/test/git/async/task.py b/test/git/async/task.py
new file mode 100644
index 00000000..9cc3cb9d
--- /dev/null
+++ b/test/git/async/task.py
@@ -0,0 +1,190 @@
+"""Module containing task implementations useful for testing them"""
+from git.async.task import *
+
+import threading
+import weakref
+
+class _TestTaskBase(object):
+ """Note: causes great slowdown due to the required locking of task variables"""
+ def __init__(self, *args, **kwargs):
+ super(_TestTaskBase, self).__init__(*args, **kwargs)
+ self.should_fail = False
+ self.lock = threading.Lock() # yes, can't safely do x = x + 1 :)
+ self.plock = threading.Lock()
+ self.item_count = 0
+ self.process_count = 0
+
+ def do_fun(self, item):
+ self.lock.acquire()
+ self.item_count += 1
+ self.lock.release()
+ if self.should_fail:
+ raise AssertionError("I am failing just for the fun of it")
+ return item
+
+ def process(self, count=1):
+ # must do it first, otherwise we might read and check results before
+ # the thread gets here :). Its a lesson !
+ self.plock.acquire()
+ self.process_count += 1
+ self.plock.release()
+ super(_TestTaskBase, self).process(count)
+
+ def _assert(self, pc, fc, check_scheduled=False):
+ """Assert for num process counts (pc) and num function counts (fc)
+ :return: self"""
+ self.lock.acquire()
+ if self.item_count != fc:
+ print self.item_count, fc
+ assert self.item_count == fc
+ self.lock.release()
+
+ # NOTE: asserting num-writers fails every now and then, implying a thread is
+ # still processing (an empty chunk) when we are checking it. This can
+ # only be prevented by checking the scheduled items, which requires locking
+ # and causes slowdows, so we don't do that. If the num_writers
+ # counter wouldn't be maintained properly, more tests would fail, so
+ # we can safely refrain from checking this here
+ # self._wlock.acquire()
+ # assert self._num_writers == 0
+ # self._wlock.release()
+ return self
+
+
+class TestThreadTaskNode(_TestTaskBase, InputIteratorThreadTask):
+ pass
+
+
+class TestThreadFailureNode(TestThreadTaskNode):
+ """Fails after X items"""
+ def __init__(self, *args, **kwargs):
+ self.fail_after = kwargs.pop('fail_after')
+ super(TestThreadFailureNode, self).__init__(*args, **kwargs)
+
+ def do_fun(self, item):
+ item = TestThreadTaskNode.do_fun(self, item)
+
+ self.lock.acquire()
+ try:
+ if self.item_count > self.fail_after:
+ raise AssertionError("Simulated failure after processing %i items" % self.fail_after)
+ finally:
+ self.lock.release()
+ # END handle fail after
+ return item
+
+
+class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask):
+ """Apply a transformation on items read from an input channel"""
+ def __init__(self, *args, **kwargs):
+ self.fail_after = kwargs.pop('fail_after', 0)
+ super(TestThreadInputChannelTaskNode, self).__init__(*args, **kwargs)
+
+ def do_fun(self, item):
+ """return tuple(i, i*2)"""
+ item = super(TestThreadInputChannelTaskNode, self).do_fun(item)
+
+ # fail after support
+ if self.fail_after:
+ self.lock.acquire()
+ try:
+ if self.item_count > self.fail_after:
+ raise AssertionError("Simulated failure after processing %i items" % self.fail_after)
+ finally:
+ self.lock.release()
+ # END handle fail-after
+
+ if isinstance(item, tuple):
+ i = item[0]
+ return item + (i * self.id, )
+ else:
+ return (item, item * self.id)
+ # END handle tuple
+
+
+class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask):
+ """An input channel task, which verifies the result of its input channels,
+ should be last in the chain.
+ Id must be int"""
+
+ def do_fun(self, item):
+ """return tuple(i, i*2)"""
+ item = super(TestThreadInputChannelVerifyTaskNode, self).do_fun(item)
+
+ # make sure the computation order matches
+ assert isinstance(item, tuple), "input was no tuple: %s" % item
+
+ base = item[0]
+ for id, num in enumerate(item[1:]):
+ assert num == base * id, "%i != %i, orig = %s" % (num, base * id, str(item))
+ # END verify order
+
+ return item
+
+
+#{ Utilities
+
+def make_proxy_method(t):
+ """required to prevent binding self into the method we call"""
+ wt = weakref.proxy(t)
+ return lambda item: wt.do_fun(item)
+
+def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0):
+ """Create a task chain of feeder, count transformers and order verifcator
+ to the pool p, like t1 -> t2 -> t3
+ :param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would
+ make the third transformer fail after 20 items
+ :param feeder_channel: if set to a channel, it will be used as input of the
+ first transformation task. The respective first task in the return value
+ will be None.
+ :param id_offset: defines the id of the first transformation task, all subsequent
+ ones will add one
+ :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))"""
+ nt = p.num_tasks()
+
+ feeder = None
+ frc = feeder_channel
+ if feeder_channel is None:
+ feeder = make_iterator_task(ni)
+ frc = p.add_task(feeder)
+ # END handle specific feeder
+
+ rcs = [frc]
+ tasks = [feeder]
+
+ inrc = frc
+ for tc in xrange(count):
+ t = TestThreadInputChannelTaskNode(inrc, tc+id_offset, None)
+
+ t.fun = make_proxy_method(t)
+ #t.fun = t.do_fun
+ inrc = p.add_task(t)
+
+ tasks.append(t)
+ rcs.append(inrc)
+ # END create count transformers
+
+ # setup failure
+ for id, fail_after in fail_setup:
+ tasks[1+id].fail_after = fail_after
+ # END setup failure
+
+ verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None)
+ #verifier.fun = verifier.do_fun
+ verifier.fun = make_proxy_method(verifier)
+ vrc = p.add_task(verifier)
+
+
+ tasks.append(verifier)
+ rcs.append(vrc)
+ return tasks, rcs
+
+def make_iterator_task(ni, taskcls=TestThreadTaskNode, **kwargs):
+ """:return: task which yields ni items
+ :param taskcls: the actual iterator type to use
+ :param **kwargs: additional kwargs to be passed to the task"""
+ t = taskcls(iter(range(ni)), 'iterator', None, **kwargs)
+ t.fun = make_proxy_method(t)
+ return t
+
+#} END utilities
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py
index 5bb48cc2..0fa34f6a 100644
--- a/test/git/async/test_pool.py
+++ b/test/git/async/test_pool.py
@@ -1,196 +1,22 @@
"""Channel testing"""
from test.testlib import *
+from task import *
+
from git.async.pool import *
-from git.async.task import *
from git.async.thread import terminate_threads
from git.async.util import cpu_count
+
import threading
import weakref
import time
import sys
-class _TestTaskBase(object):
- """Note: causes great slowdown due to the required locking of task variables"""
- def __init__(self, *args, **kwargs):
- super(_TestTaskBase, self).__init__(*args, **kwargs)
- self.should_fail = False
- self.lock = threading.Lock() # yes, can't safely do x = x + 1 :)
- self.plock = threading.Lock()
- self.item_count = 0
- self.process_count = 0
-
- def do_fun(self, item):
- self.lock.acquire()
- self.item_count += 1
- self.lock.release()
- if self.should_fail:
- raise AssertionError("I am failing just for the fun of it")
- return item
-
- def process(self, count=1):
- # must do it first, otherwise we might read and check results before
- # the thread gets here :). Its a lesson !
- self.plock.acquire()
- self.process_count += 1
- self.plock.release()
- super(_TestTaskBase, self).process(count)
-
- def _assert(self, pc, fc, check_scheduled=False):
- """Assert for num process counts (pc) and num function counts (fc)
- :return: self"""
- self.lock.acquire()
- if self.item_count != fc:
- print self.item_count, fc
- assert self.item_count == fc
- self.lock.release()
- self._wlock.acquire()
- assert self._num_writers == 0
- self._wlock.release()
- return self
-
-
-class TestThreadTaskNode(_TestTaskBase, InputIteratorThreadTask):
- pass
-
-
-class TestThreadFailureNode(TestThreadTaskNode):
- """Fails after X items"""
- def __init__(self, *args, **kwargs):
- self.fail_after = kwargs.pop('fail_after')
- super(TestThreadFailureNode, self).__init__(*args, **kwargs)
-
- def do_fun(self, item):
- item = TestThreadTaskNode.do_fun(self, item)
-
- self.lock.acquire()
- try:
- if self.item_count > self.fail_after:
- raise AssertionError("Simulated failure after processing %i items" % self.fail_after)
- finally:
- self.lock.release()
- # END handle fail after
- return item
-
-
-class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask):
- """Apply a transformation on items read from an input channel"""
- def __init__(self, *args, **kwargs):
- self.fail_after = kwargs.pop('fail_after', 0)
- super(TestThreadInputChannelTaskNode, self).__init__(*args, **kwargs)
-
- def do_fun(self, item):
- """return tuple(i, i*2)"""
- item = super(TestThreadInputChannelTaskNode, self).do_fun(item)
-
- # fail after support
- if self.fail_after:
- self.lock.acquire()
- try:
- if self.item_count > self.fail_after:
- raise AssertionError("Simulated failure after processing %i items" % self.fail_after)
- finally:
- self.lock.release()
- # END handle fail-after
-
- if isinstance(item, tuple):
- i = item[0]
- return item + (i * self.id, )
- else:
- return (item, item * self.id)
- # END handle tuple
-class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask):
- """An input channel task, which verifies the result of its input channels,
- should be last in the chain.
- Id must be int"""
-
- def do_fun(self, item):
- """return tuple(i, i*2)"""
- item = super(TestThreadInputChannelVerifyTaskNode, self).do_fun(item)
-
- # make sure the computation order matches
- assert isinstance(item, tuple), "input was no tuple: %s" % item
-
- base = item[0]
- for id, num in enumerate(item[1:]):
- assert num == base * id, "%i != %i, orig = %s" % (num, base * id, str(item))
- # END verify order
-
- return item
-
-
-
class TestThreadPool(TestBase):
max_threads = cpu_count()
-
- def _make_proxy_method(self, t):
- """required to prevent binding self into the method we call"""
- wt = weakref.proxy(t)
- return lambda item: wt.do_fun(item)
-
- def _add_task_chain(self, p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0):
- """Create a task chain of feeder, count transformers and order verifcator
- to the pool p, like t1 -> t2 -> t3
- :param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would
- make the third transformer fail after 20 items
- :param feeder_channel: if set to a channel, it will be used as input of the
- first transformation task. The respective first task in the return value
- will be None.
- :param id_offset: defines the id of the first transformation task, all subsequent
- ones will add one
- :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))"""
- nt = p.num_tasks()
-
- feeder = None
- frc = feeder_channel
- if feeder_channel is None:
- feeder = self._make_iterator_task(ni)
- frc = p.add_task(feeder)
- # END handle specific feeder
-
- rcs = [frc]
- tasks = [feeder]
-
- make_proxy_method = self._make_proxy_method
-
- inrc = frc
- for tc in xrange(count):
- t = TestThreadInputChannelTaskNode(inrc, tc+id_offset, None)
-
- t.fun = make_proxy_method(t)
- #t.fun = t.do_fun
- inrc = p.add_task(t)
-
- tasks.append(t)
- rcs.append(inrc)
- # END create count transformers
-
- # setup failure
- for id, fail_after in fail_setup:
- tasks[1+id].fail_after = fail_after
- # END setup failure
-
- verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None)
- #verifier.fun = verifier.do_fun
- verifier.fun = make_proxy_method(verifier)
- vrc = p.add_task(verifier)
-
-
- tasks.append(verifier)
- rcs.append(vrc)
- return tasks, rcs
-
- def _make_iterator_task(self, ni, taskcls=TestThreadTaskNode, **kwargs):
- """:return: task which yields ni items
- :param taskcls: the actual iterator type to use
- :param **kwargs: additional kwargs to be passed to the task"""
- t = taskcls(iter(range(ni)), 'iterator', None, **kwargs)
- t.fun = self._make_proxy_method(t)
- return t
-
def _assert_single_task(self, p, async=False):
"""Performs testing in a synchronized environment"""
# return # DEBUG TODO: Fixme deactivated it
@@ -203,7 +29,7 @@ class TestThreadPool(TestBase):
assert ni % 2 == 0, "ni needs to be dividable by 2"
assert ni % 4 == 0, "ni needs to be dividable by 4"
- make_task = lambda *args, **kwargs: self._make_iterator_task(ni, *args, **kwargs)
+ make_task = lambda *args, **kwargs: make_iterator_task(ni, *args, **kwargs)
task = make_task()
@@ -388,7 +214,7 @@ class TestThreadPool(TestBase):
ni = 1000
count = 3
aic = count + 2
- make_task = lambda *args, **kwargs: self._add_task_chain(pool, ni, count, *args, **kwargs)
+ make_task = lambda *args, **kwargs: add_task_chain(pool, ni, count, *args, **kwargs)
ts, rcs = make_task()
assert len(ts) == aic
@@ -473,7 +299,7 @@ class TestThreadPool(TestBase):
# connect verifier channel as feeder of the second pool
p2 = ThreadPool(0) # don't spawn new threads, they have the tendency not to wake up on mutexes
assert p2.size() == 0
- p2ts, p2rcs = self._add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count)
+ p2ts, p2rcs = add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count)
assert p2ts[0] is None # we have no feeder task
assert rcs[-1].pool_ref()() is pool # it didnt change the pool
assert rcs[-1] is p2ts[1].reader()