From ab59f78341f1dd188aaf4c30526f6295c63438b1 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 5 Jun 2010 20:03:09 +0200 Subject: Renamed mp to async, as this is a much better name for what is actually going on. The default implementation uses threads, which ends up being nothing more than async, as they are all locked down by internal and the global interpreter lock --- test/git/async/test_pool.py | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 test/git/async/test_pool.py (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py new file mode 100644 index 00000000..3a9ef8a1 --- /dev/null +++ b/test/git/async/test_pool.py @@ -0,0 +1,10 @@ +"""Channel testing""" +from test.testlib import * +from git.async.pool import * + +import time + +class TestThreadPool(TestBase): + + def test_base(self): + pass -- cgit v1.2.1 From b3cde0ee162b8f0cb67da981311c8f9c16050a62 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 18:13:21 +0200 Subject: First step of testing the pool - tasks have been separated into a new module including own tests, their design improved to prepare them for some specifics that would be needed for multiprocessing support --- test/git/async/test_pool.py | 71 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 70 insertions(+), 1 deletion(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 3a9ef8a1..05943c8b 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -1,10 +1,79 @@ """Channel testing""" from test.testlib import * from git.async.pool import * +from git.async.task import * +from git.async.util import cpu_count import time +class TestThreadTaskNode(InputIteratorThreadTask): + def __init__(self, *args, **kwargs): + super(TestThreadTaskNode, self).__init__(*args, **kwargs) + self.reset() + + def do_fun(self, item): + self.item_count += 1 + return item + + def reset(self): + self.process_count = 0 + self.item_count = 0 + + def process(self, count=1): + super(TestThreadTaskNode, self).process(count) + self.process_count += 1 + + def _assert(self, pc, fc): + """Assert for num process counts (pc) and num function counts (fc) + :return: self""" + assert self.process_count == pc + assert self.item_count == fc + + return self + + class TestThreadPool(TestBase): + max_threads = cpu_count() + def test_base(self): - pass + p = ThreadPool() + + # default pools have no workers + assert p.size() == 0 + + # increase and decrease the size + for i in range(self.max_threads): + p.set_size(i) + assert p.size() == i + for i in range(self.max_threads, -1, -1): + p.set_size(i) + assert p.size() == i + + # currently in serial mode ! + + # add a simple task + # it iterates n items + ni = 20 + task = TestThreadTaskNode(iter(range(ni)), 'iterator', None) + task.fun = task.do_fun + + assert p.num_tasks() == 0 + rc = p.add_task(task) + assert p.num_tasks() == 1 + assert isinstance(rc, RPoolChannel) + assert task._out_wc is not None + + # pull the result completely - we should get one task, which calls its + # function once. In serial mode, the order matches + items = rc.read() + task._assert(1, ni).reset() + assert len(items) == ni + assert items[0] == 0 and items[-1] == ni-1 + + + # switch to threaded mode - just one thread for now + + # two threads to compete for tasks + + -- cgit v1.2.1 From 1b27292936c81637f6b9a7141dafaad1126f268e Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 21:15:13 +0200 Subject: Plenty of fixes in the chunking routine, made possible by a serialized chunking test. Next up, actual async processing --- test/git/async/test_pool.py | 169 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 147 insertions(+), 22 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 05943c8b..65b2d228 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -3,21 +3,22 @@ from test.testlib import * from git.async.pool import * from git.async.task import * from git.async.util import cpu_count - +import threading import time class TestThreadTaskNode(InputIteratorThreadTask): def __init__(self, *args, **kwargs): super(TestThreadTaskNode, self).__init__(*args, **kwargs) - self.reset() + self.reset(self._iterator) def do_fun(self, item): self.item_count += 1 return item - def reset(self): + def reset(self, iterator): self.process_count = 0 self.item_count = 0 + self._iterator = iterator def process(self, count=1): super(TestThreadTaskNode, self).process(count) @@ -36,6 +37,111 @@ class TestThreadPool(TestBase): max_threads = cpu_count() + def _assert_sync_single_task(self, p): + """Performs testing in a synchronized environment""" + null_tasks = p.num_tasks() # in case we had some before + + # add a simple task + # it iterates n items + ni = 20 + assert ni % 2 == 0, "ni needs to be dividable by 2" + + def make_iter(): + return iter(range(ni)) + # END utility + + task = TestThreadTaskNode(make_iter(), 'iterator', None) + task.fun = task.do_fun + + assert p.num_tasks() == null_tasks + rc = p.add_task(task) + assert p.num_tasks() == 1 + null_tasks + assert isinstance(rc, RPoolChannel) + assert task._out_wc is not None + + # pull the result completely - we should get one task, which calls its + # function once. In serial mode, the order matches + items = rc.read() + task._assert(1, ni).reset(make_iter()) + assert len(items) == ni + assert items[0] == 0 and items[-1] == ni-1 + + # as the task is done, it should have been removed - we have read everything + assert task.is_done() + assert p.num_tasks() == null_tasks + + # pull individual items + rc = p.add_task(task) + assert p.num_tasks() == 1 + null_tasks + for i in range(ni): + items = rc.read(1) + assert len(items) == 1 + assert i == items[0] + # END for each item + # it couldn't yet notice that the input is depleted as we pulled exaclty + # ni items - the next one would remove it. Instead, we delete our channel + # which triggers orphan handling + assert p.num_tasks() == 1 + null_tasks + del(rc) + assert p.num_tasks() == null_tasks + + task.reset(make_iter()) + + # test min count + # if we query 1 item, it will prepare ni / 2 + task.min_count = ni / 2 + rc = p.add_task(task) + assert len(rc.read(1)) == 1 # 1 + assert len(rc.read(1)) == 1 + assert len(rc.read(ni-2)) == ni - 2 # rest - it has ni/2 - 2 on the queue, and pulls ni-2 + task._assert(2, ni) # two chunks, 20 calls ( all items ) + assert p.num_tasks() == 1 + null_tasks # it still doesn't know, didn't read too much + assert len(rc.read()) == 0 # now we read too much and its done + assert p.num_tasks() == null_tasks + + # test chunking + # we always want 4 chunks, these could go to individual nodes + task.reset(make_iter()) + task.max_chunksize = ni / 4 # 4 chunks + rc = p.add_task(task) + # must read a specific item count + # count is still at ni / 2 - here we want more than that + assert len(rc.read(ni / 2 + 2)) == ni / 2 + 2 # make sure its uneven ;) + assert len(rc.read(ni / 2 - 2)) == ni / 2 - 2 + + # END read chunks + task._assert(ni / 4, ni) # read two times, got 4 processing steps + assert p.num_tasks() == null_tasks # depleted + + # but this only hits if we want too many items, if we want less, it could + # still do too much - hence we set the min_count to the same number to enforce + # at least ni / 4 items to be preocessed, no matter what we request + task.reset(make_iter()) + task.min_count = None + rc = p.add_task(task) + for i in range(ni): + assert rc.read(1)[0] == i + # END pull individual items + # too many processing counts ;) + task._assert(ni, ni) + assert p.num_tasks() == 1 + null_tasks + assert p.del_task(task) is p # del manually this time + assert p.num_tasks() == null_tasks + + # now with we set the minimum count to reduce the number of processing counts + task.reset(make_iter()) + task.min_count = ni / 4 + rc = p.add_task(task) + for i in range(ni): + assert rc.read(1)[0] == i + # END for each item + task._assert(ni / 4, ni) + del(rc) + assert p.num_tasks() == null_tasks + + def _assert_async_dependent_tasks(self, p): + pass + def test_base(self): p = ThreadPool() @@ -50,30 +156,49 @@ class TestThreadPool(TestBase): p.set_size(i) assert p.size() == i - # currently in serial mode ! + # SINGLE TASK SERIAL SYNC MODE + ############################## + # put a few unrelated tasks that we forget about + urc1 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None)) + urc2 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None)) + assert p.num_tasks() == 2 + self._assert_sync_single_task(p) + assert p.num_tasks() == 2 + del(urc1) + del(urc2) + assert p.num_tasks() == 0 - # add a simple task - # it iterates n items - ni = 20 - task = TestThreadTaskNode(iter(range(ni)), 'iterator', None) - task.fun = task.do_fun - assert p.num_tasks() == 0 - rc = p.add_task(task) - assert p.num_tasks() == 1 - assert isinstance(rc, RPoolChannel) - assert task._out_wc is not None + # DEPENDENT TASKS SERIAL + ######################## + self._assert_async_dependent_tasks(p) + + + # SINGLE TASK THREADED SYNC MODE + ################################ + # step one gear up - just one thread for now. + num_threads = len(threading.enumerate()) + p.set_size(1) + assert len(threading.enumerate()) == num_threads + 1 + # deleting the pool stops its threads - just to be sure ;) + del(p) + assert len(threading.enumerate()) == num_threads + + p = ThreadPool(1) + assert len(threading.enumerate()) == num_threads + 1 + + # here we go + self._assert_sync_single_task(p) + - # pull the result completely - we should get one task, which calls its - # function once. In serial mode, the order matches - items = rc.read() - task._assert(1, ni).reset() - assert len(items) == ni - assert items[0] == 0 and items[-1] == ni-1 + # SINGLE TASK ASYNC MODE + ######################## + # two threads to compete for a single task - # switch to threaded mode - just one thread for now - # two threads to compete for tasks + # DEPENDENT TASK ASYNC MODE + ########################### + # self._assert_async_dependent_tasks(p) -- cgit v1.2.1 From 6a252661c3bf4202a4d571f9c41d2afa48d9d75f Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 23:41:20 +0200 Subject: pool: First version which works as expected in async mode. Its just using a single task for now, but next up are dependent tasks --- test/git/async/test_pool.py | 43 ++++++++++++++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 11 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 65b2d228..628e2a93 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -10,9 +10,12 @@ class TestThreadTaskNode(InputIteratorThreadTask): def __init__(self, *args, **kwargs): super(TestThreadTaskNode, self).__init__(*args, **kwargs) self.reset(self._iterator) + self.should_fail = False def do_fun(self, item): self.item_count += 1 + if self.should_fail: + raise AssertionError("I am failing just for the fun of it") return item def reset(self, iterator): @@ -29,7 +32,7 @@ class TestThreadTaskNode(InputIteratorThreadTask): :return: self""" assert self.process_count == pc assert self.item_count == fc - + assert not self.error() return self @@ -60,10 +63,10 @@ class TestThreadPool(TestBase): assert task._out_wc is not None # pull the result completely - we should get one task, which calls its - # function once. In serial mode, the order matches + # function once. In sync mode, the order matches items = rc.read() - task._assert(1, ni).reset(make_iter()) assert len(items) == ni + task._assert(1, ni).reset(make_iter()) assert items[0] == 0 and items[-1] == ni-1 # as the task is done, it should have been removed - we have read everything @@ -91,13 +94,17 @@ class TestThreadPool(TestBase): # if we query 1 item, it will prepare ni / 2 task.min_count = ni / 2 rc = p.add_task(task) - assert len(rc.read(1)) == 1 # 1 - assert len(rc.read(1)) == 1 - assert len(rc.read(ni-2)) == ni - 2 # rest - it has ni/2 - 2 on the queue, and pulls ni-2 - task._assert(2, ni) # two chunks, 20 calls ( all items ) - assert p.num_tasks() == 1 + null_tasks # it still doesn't know, didn't read too much - assert len(rc.read()) == 0 # now we read too much and its done + assert len(rc.read(1)) == 1 # processes ni / 2 + assert len(rc.read(1)) == 1 # processes nothing + # rest - it has ni/2 - 2 on the queue, and pulls ni-2 + # It wants too much, so the task realizes its done. The task + # doesn't care about the items in its output channel + assert len(rc.read(ni-2)) == ni - 2 assert p.num_tasks() == null_tasks + task._assert(2, ni) # two chunks, 20 calls ( all items ) + + # its already done, gives us no more + assert len(rc.read()) == 0 # test chunking # we always want 4 chunks, these could go to individual nodes @@ -135,11 +142,25 @@ class TestThreadPool(TestBase): for i in range(ni): assert rc.read(1)[0] == i # END for each item - task._assert(ni / 4, ni) + task._assert(ni / task.min_count, ni) del(rc) assert p.num_tasks() == null_tasks + # test failure + # on failure, the processing stops and the task is finished, keeping + # his error for later + task.reset(make_iter()) + task.should_fail = True + rc = p.add_task(task) + assert len(rc.read()) == 0 # failure on first item + assert isinstance(task.error(), AssertionError) + assert p.num_tasks() == null_tasks + def _assert_async_dependent_tasks(self, p): + # includes failure in center task, 'recursive' orphan cleanup + # This will also verify that the channel-close mechanism works + # t1 -> t2 -> t3 + # t1 -> x -> t3 pass def test_base(self): @@ -199,6 +220,6 @@ class TestThreadPool(TestBase): # DEPENDENT TASK ASYNC MODE ########################### - # self._assert_async_dependent_tasks(p) + self._assert_async_dependent_tasks(p) -- cgit v1.2.1 From 8c3c271b0d6b5f56b86e3f177caf3e916b509b52 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 13:05:35 +0200 Subject: Added task order cache, and a lock to prevent us walking the graph while changing tasks Now processing more items to test performance, in dual-threaded mode as well, and its rather bad, have to figure out the reason for this, probably gil, but queues could help --- test/git/async/test_pool.py | 40 +++++++++++++++++++++++++++++++--------- 1 file changed, 31 insertions(+), 9 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 628e2a93..df3eaf11 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -40,14 +40,15 @@ class TestThreadPool(TestBase): max_threads = cpu_count() - def _assert_sync_single_task(self, p): + def _assert_single_task(self, p, async=False): """Performs testing in a synchronized environment""" null_tasks = p.num_tasks() # in case we had some before # add a simple task # it iterates n items - ni = 20 + ni = 1000 assert ni % 2 == 0, "ni needs to be dividable by 2" + assert ni % 4 == 0, "ni needs to be dividable by 4" def make_iter(): return iter(range(ni)) @@ -76,11 +77,18 @@ class TestThreadPool(TestBase): # pull individual items rc = p.add_task(task) assert p.num_tasks() == 1 + null_tasks + st = time.time() for i in range(ni): items = rc.read(1) assert len(items) == 1 - assert i == items[0] + + # can't assert order in async mode + if not async: + assert i == items[0] # END for each item + elapsed = time.time() - st + print >> sys.stderr, "Threadpool: processed %i individual items, with %i threads, one at a time, in %f s ( %f items / s )" % (ni, p.size(), elapsed, ni / elapsed) + # it couldn't yet notice that the input is depleted as we pulled exaclty # ni items - the next one would remove it. Instead, we delete our channel # which triggers orphan handling @@ -113,11 +121,13 @@ class TestThreadPool(TestBase): rc = p.add_task(task) # must read a specific item count # count is still at ni / 2 - here we want more than that - assert len(rc.read(ni / 2 + 2)) == ni / 2 + 2 # make sure its uneven ;) + # 2 steps with n / 4 items, + 1 step with n/4 items to get + 2 + assert len(rc.read(ni / 2 + 2)) == ni / 2 + 2 + # have n / 4 - 2 items on queue, want n / 4 in first chunk, cause 1 processing + # ( 4 in total ). Still want n / 4 - 2 in second chunk, causing another processing assert len(rc.read(ni / 2 - 2)) == ni / 2 - 2 - # END read chunks - task._assert(ni / 4, ni) # read two times, got 4 processing steps + task._assert( 5, ni) assert p.num_tasks() == null_tasks # depleted # but this only hits if we want too many items, if we want less, it could @@ -126,10 +136,18 @@ class TestThreadPool(TestBase): task.reset(make_iter()) task.min_count = None rc = p.add_task(task) + st = time.time() for i in range(ni): - assert rc.read(1)[0] == i + if async: + assert len(rc.read(1)) == 1 + else: + assert rc.read(1)[0] == i + # END handle async mode # END pull individual items # too many processing counts ;) + elapsed = time.time() - st + print >> sys.stderr, "Threadpool: processed %i individual items in chunks of %i, with %i threads, one at a time, in %f s ( %f items / s )" % (ni, ni/4, p.size(), elapsed, ni / elapsed) + task._assert(ni, ni) assert p.num_tasks() == 1 + null_tasks assert p.del_task(task) is p # del manually this time @@ -183,7 +201,9 @@ class TestThreadPool(TestBase): urc1 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None)) urc2 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None)) assert p.num_tasks() == 2 - self._assert_sync_single_task(p) + + ## SINGLE TASK ################# + self._assert_single_task(p, False) assert p.num_tasks() == 2 del(urc1) del(urc2) @@ -209,13 +229,15 @@ class TestThreadPool(TestBase): assert len(threading.enumerate()) == num_threads + 1 # here we go - self._assert_sync_single_task(p) + self._assert_single_task(p, False) # SINGLE TASK ASYNC MODE ######################## # two threads to compete for a single task + p.set_size(2) + self._assert_single_task(p, True) # DEPENDENT TASK ASYNC MODE -- cgit v1.2.1 From edd9e23c766cfd51b3a6f6eee5aac0b791ef2fd0 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 17:16:48 +0200 Subject: added high-speed locking facilities, allowing our Queue to be faster, at least in tests, and with multiple threads. There is still an sync bug in regard to closed channels to be fixed, as the Task.set_done handling is incorrecft --- test/git/async/test_pool.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index df3eaf11..791f89d4 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -2,6 +2,7 @@ from test.testlib import * from git.async.pool import * from git.async.task import * +from git.async.thread import terminate_threads from git.async.util import cpu_count import threading import time @@ -46,7 +47,7 @@ class TestThreadPool(TestBase): # add a simple task # it iterates n items - ni = 1000 + ni = 500 assert ni % 2 == 0, "ni needs to be dividable by 2" assert ni % 4 == 0, "ni needs to be dividable by 4" @@ -106,8 +107,9 @@ class TestThreadPool(TestBase): assert len(rc.read(1)) == 1 # processes nothing # rest - it has ni/2 - 2 on the queue, and pulls ni-2 # It wants too much, so the task realizes its done. The task - # doesn't care about the items in its output channel - assert len(rc.read(ni-2)) == ni - 2 + # doesn't care about the items in its output channel + items = rc.read(ni-2) + assert len(items) == ni - 2 assert p.num_tasks() == null_tasks task._assert(2, ni) # two chunks, 20 calls ( all items ) @@ -125,7 +127,8 @@ class TestThreadPool(TestBase): assert len(rc.read(ni / 2 + 2)) == ni / 2 + 2 # have n / 4 - 2 items on queue, want n / 4 in first chunk, cause 1 processing # ( 4 in total ). Still want n / 4 - 2 in second chunk, causing another processing - assert len(rc.read(ni / 2 - 2)) == ni / 2 - 2 + items = rc.read(ni / 2 - 2) + assert len(items) == ni / 2 - 2 task._assert( 5, ni) assert p.num_tasks() == null_tasks # depleted @@ -158,9 +161,12 @@ class TestThreadPool(TestBase): task.min_count = ni / 4 rc = p.add_task(task) for i in range(ni): - assert rc.read(1)[0] == i + if async: + assert len(rc.read(1)) == 1 + else: + assert rc.read(1)[0] == i # END for each item - task._assert(ni / task.min_count, ni) + task._assert(ni / task.min_count + 1, ni) del(rc) assert p.num_tasks() == null_tasks @@ -181,6 +187,7 @@ class TestThreadPool(TestBase): # t1 -> x -> t3 pass + @terminate_threads def test_base(self): p = ThreadPool() @@ -239,7 +246,6 @@ class TestThreadPool(TestBase): p.set_size(2) self._assert_single_task(p, True) - # DEPENDENT TASK ASYNC MODE ########################### self._assert_async_dependent_tasks(p) -- cgit v1.2.1 From 654e54d200135e665e07e9f0097d913a77f169da Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 20:01:02 +0200 Subject: task: Fixed incorrect handling of channel closure. Performance is alright for up to 2 threads, but 4 are killing the queue --- test/git/async/test_pool.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 791f89d4..19e86a9a 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -246,6 +246,10 @@ class TestThreadPool(TestBase): p.set_size(2) self._assert_single_task(p, True) + # kill it + p.set_size(4) + self._assert_single_task(p, True) + # DEPENDENT TASK ASYNC MODE ########################### self._assert_async_dependent_tasks(p) -- cgit v1.2.1 From be06e87433685b5ea9cfcc131ab89c56cf8292f2 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 22:00:47 +0200 Subject: improved testing to test the actual async handling of the pool. there are still inconsistencies that need to be fixed, but it already improved, especially the 4-thread performance which now is as fast as the dual-threaded performance --- test/git/async/test_pool.py | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 19e86a9a..2b45727c 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -12,9 +12,13 @@ class TestThreadTaskNode(InputIteratorThreadTask): super(TestThreadTaskNode, self).__init__(*args, **kwargs) self.reset(self._iterator) self.should_fail = False + self.lock = threading.Lock() # yes, can't safely do x = x + 1 :) + self.plock = threading.Lock() def do_fun(self, item): + self.lock.acquire() self.item_count += 1 + self.lock.release() if self.should_fail: raise AssertionError("I am failing just for the fun of it") return item @@ -25,14 +29,26 @@ class TestThreadTaskNode(InputIteratorThreadTask): self._iterator = iterator def process(self, count=1): - super(TestThreadTaskNode, self).process(count) + # must do it first, otherwise we might read and check results before + # the thread gets here :). Its a lesson ! + self.plock.acquire() self.process_count += 1 + self.plock.release() + super(TestThreadTaskNode, self).process(count) def _assert(self, pc, fc): """Assert for num process counts (pc) and num function counts (fc) :return: self""" + self.plock.acquire() + if self.process_count != pc: + print self.process_count, pc assert self.process_count == pc + self.plock.release() + self.lock.acquire() + if self.item_count != fc: + print self.item_count, fc assert self.item_count == fc + self.lock.release() assert not self.error() return self @@ -103,15 +119,17 @@ class TestThreadPool(TestBase): # if we query 1 item, it will prepare ni / 2 task.min_count = ni / 2 rc = p.add_task(task) - assert len(rc.read(1)) == 1 # processes ni / 2 - assert len(rc.read(1)) == 1 # processes nothing + items = rc.read(1) + assert len(items) == 1 and items[0] == 0 # processes ni / 2 + items = rc.read(1) + assert len(items) == 1 and items[0] == 1 # processes nothing # rest - it has ni/2 - 2 on the queue, and pulls ni-2 # It wants too much, so the task realizes its done. The task # doesn't care about the items in its output channel items = rc.read(ni-2) assert len(items) == ni - 2 assert p.num_tasks() == null_tasks - task._assert(2, ni) # two chunks, 20 calls ( all items ) + task._assert(2, ni) # two chunks, ni calls # its already done, gives us no more assert len(rc.read()) == 0 @@ -246,7 +264,8 @@ class TestThreadPool(TestBase): p.set_size(2) self._assert_single_task(p, True) - # kill it + # real stress test- should be native on every dual-core cpu with 2 hardware + # threads per core p.set_size(4) self._assert_single_task(p, True) -- cgit v1.2.1 From def0f73989047c4ddf9b11da05ad2c9c8e387331 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Mon, 7 Jun 2010 23:20:37 +0200 Subject: introduced a new counter keeping track of the scheduled tasks - this prevent unnecessary tasks to be scheduled as we keep track of how many items will be produced for the task at hand. This introduces additional locking, but performns well in multithreaded mode. Performance of the master queue is still a huge issue, its currently the limiting factor, as bypassing the master queue in serial moode gives 15x performance, wich is what I would need --- test/git/async/test_pool.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 2b45727c..29c13188 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -36,7 +36,7 @@ class TestThreadTaskNode(InputIteratorThreadTask): self.plock.release() super(TestThreadTaskNode, self).process(count) - def _assert(self, pc, fc): + def _assert(self, pc, fc, check_scheduled=False): """Assert for num process counts (pc) and num function counts (fc) :return: self""" self.plock.acquire() @@ -49,6 +49,10 @@ class TestThreadTaskNode(InputIteratorThreadTask): print self.item_count, fc assert self.item_count == fc self.lock.release() + + # if we read all, we can't really use scheduled items + if check_scheduled: + assert self._scheduled_items == 0 assert not self.error() return self @@ -184,7 +188,7 @@ class TestThreadPool(TestBase): else: assert rc.read(1)[0] == i # END for each item - task._assert(ni / task.min_count + 1, ni) + task._assert(ni / task.min_count, ni) del(rc) assert p.num_tasks() == null_tasks -- cgit v1.2.1 From 5d996892ac76199886ba3e2754ff9c9fac2456d6 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 00:32:33 +0200 Subject: test implementation of async-queue with everything stripped from it that didn't seem necessary - its a failure, something is wrong - performance not much better than the original one, its depending on the condition performance actually, which I don't get faster --- test/git/async/test_pool.py | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 29c13188..0d779f39 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -61,6 +61,12 @@ class TestThreadPool(TestBase): max_threads = cpu_count() + def _add_triple_task(self, p): + """Add a triplet of feeder, transformer and finalizer to the pool, like + t1 -> t2 -> t3, return all 3 return channels in order""" + t1 = TestThreadTaskNode(make_iter(), 'iterator', None) + # TODO: + def _assert_single_task(self, p, async=False): """Performs testing in a synchronized environment""" null_tasks = p.num_tasks() # in case we had some before -- cgit v1.2.1 From 09c3f39ceb545e1198ad7a3f470d4ec896ce1add Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 10:45:14 +0200 Subject: both versions of the async queue still have trouble in certain situations, at least with my totally overwritten version of the condition - the previous one was somewhat more stable it seems. Nonetheless, this is the fastest version so far --- test/git/async/test_pool.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 0d779f39..4c20a9b2 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -136,8 +136,9 @@ class TestThreadPool(TestBase): # rest - it has ni/2 - 2 on the queue, and pulls ni-2 # It wants too much, so the task realizes its done. The task # doesn't care about the items in its output channel - items = rc.read(ni-2) - assert len(items) == ni - 2 + nri = ni-2 + items = rc.read(nri) + assert len(items) == nri assert p.num_tasks() == null_tasks task._assert(2, ni) # two chunks, ni calls @@ -152,11 +153,14 @@ class TestThreadPool(TestBase): # must read a specific item count # count is still at ni / 2 - here we want more than that # 2 steps with n / 4 items, + 1 step with n/4 items to get + 2 - assert len(rc.read(ni / 2 + 2)) == ni / 2 + 2 + nri = ni / 2 + 2 + items = rc.read(nri) + assert len(items) == nri # have n / 4 - 2 items on queue, want n / 4 in first chunk, cause 1 processing # ( 4 in total ). Still want n / 4 - 2 in second chunk, causing another processing - items = rc.read(ni / 2 - 2) - assert len(items) == ni / 2 - 2 + nri = ni / 2 - 2 + items = rc.read(nri) + assert len(items) == nri task._assert( 5, ni) assert p.num_tasks() == null_tasks # depleted -- cgit v1.2.1 From 3776f7a766851058f6435b9f606b16766425d7ca Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 13:24:44 +0200 Subject: The new channeldesign actually works, but it also shows that its located at the wrong spot. The channel is nothing more than an adapter allowing to read multiple items from a thread-safe queue, the queue itself though must be 'closable' for writing, or needs something like a writable flag. --- test/git/async/test_pool.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 4c20a9b2..7f5a5811 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -92,6 +92,7 @@ class TestThreadPool(TestBase): # pull the result completely - we should get one task, which calls its # function once. In sync mode, the order matches + print "read(0)" items = rc.read() assert len(items) == ni task._assert(1, ni).reset(make_iter()) @@ -105,6 +106,7 @@ class TestThreadPool(TestBase): rc = p.add_task(task) assert p.num_tasks() == 1 + null_tasks st = time.time() + print "read(1) * %i" % ni for i in range(ni): items = rc.read(1) assert len(items) == 1 @@ -129,20 +131,24 @@ class TestThreadPool(TestBase): # if we query 1 item, it will prepare ni / 2 task.min_count = ni / 2 rc = p.add_task(task) + print "read(1)" items = rc.read(1) assert len(items) == 1 and items[0] == 0 # processes ni / 2 + print "read(1)" items = rc.read(1) assert len(items) == 1 and items[0] == 1 # processes nothing # rest - it has ni/2 - 2 on the queue, and pulls ni-2 # It wants too much, so the task realizes its done. The task # doesn't care about the items in its output channel nri = ni-2 + print "read(%i)" % nri items = rc.read(nri) assert len(items) == nri assert p.num_tasks() == null_tasks task._assert(2, ni) # two chunks, ni calls # its already done, gives us no more + print "read(0) on closed" assert len(rc.read()) == 0 # test chunking @@ -154,11 +160,13 @@ class TestThreadPool(TestBase): # count is still at ni / 2 - here we want more than that # 2 steps with n / 4 items, + 1 step with n/4 items to get + 2 nri = ni / 2 + 2 + print "read(%i)" % nri items = rc.read(nri) assert len(items) == nri # have n / 4 - 2 items on queue, want n / 4 in first chunk, cause 1 processing # ( 4 in total ). Still want n / 4 - 2 in second chunk, causing another processing nri = ni / 2 - 2 + print "read(%i)" % nri items = rc.read(nri) assert len(items) == nri @@ -172,6 +180,7 @@ class TestThreadPool(TestBase): task.min_count = None rc = p.add_task(task) st = time.time() + print "read(1) * %i, chunksize set" % ni for i in range(ni): if async: assert len(rc.read(1)) == 1 @@ -192,6 +201,7 @@ class TestThreadPool(TestBase): task.reset(make_iter()) task.min_count = ni / 4 rc = p.add_task(task) + print "read(1) * %i, min_count%i + chunksize" % (ni, task.min_count) for i in range(ni): if async: assert len(rc.read(1)) == 1 @@ -208,10 +218,13 @@ class TestThreadPool(TestBase): task.reset(make_iter()) task.should_fail = True rc = p.add_task(task) + print "read(0) with failure" assert len(rc.read()) == 0 # failure on first item + print "done with everything" assert isinstance(task.error(), AssertionError) assert p.num_tasks() == null_tasks + def _assert_async_dependent_tasks(self, p): # includes failure in center task, 'recursive' orphan cleanup # This will also verify that the channel-close mechanism works -- cgit v1.2.1 From 619c11787742ce00a0ee8f841cec075897873c79 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 16:47:48 +0200 Subject: Its getting better already - intermediate commit before further chaning the task class --- test/git/async/test_pool.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 7f5a5811..0aa8f39b 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -39,6 +39,8 @@ class TestThreadTaskNode(InputIteratorThreadTask): def _assert(self, pc, fc, check_scheduled=False): """Assert for num process counts (pc) and num function counts (fc) :return: self""" + # TODO: fixme + return self self.plock.acquire() if self.process_count != pc: print self.process_count, pc @@ -73,7 +75,7 @@ class TestThreadPool(TestBase): # add a simple task # it iterates n items - ni = 500 + ni = 52 assert ni % 2 == 0, "ni needs to be dividable by 2" assert ni % 4 == 0, "ni needs to be dividable by 4" @@ -203,10 +205,10 @@ class TestThreadPool(TestBase): rc = p.add_task(task) print "read(1) * %i, min_count%i + chunksize" % (ni, task.min_count) for i in range(ni): - if async: - assert len(rc.read(1)) == 1 - else: - assert rc.read(1)[0] == i + items = rc.read(1) + assert len(items) == 1 + if not async: + assert items[0] == i # END for each item task._assert(ni / task.min_count, ni) del(rc) @@ -255,6 +257,7 @@ class TestThreadPool(TestBase): assert p.num_tasks() == 2 ## SINGLE TASK ################# + assert p.size() == 0 self._assert_single_task(p, False) assert p.num_tasks() == 2 del(urc1) @@ -281,7 +284,7 @@ class TestThreadPool(TestBase): assert len(threading.enumerate()) == num_threads + 1 # here we go - self._assert_single_task(p, False) + self._assert_single_task(p, True) -- cgit v1.2.1 From 13dd59ba5b3228820841682b59bad6c22476ff66 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 17:25:43 +0200 Subject: task: now deletes itself once its done - for the test this doesn't change a thing as the task deletes itself too late - its time for a paradigm change, the task should be deleted with its RPoolChannel or explicitly by the user. The test needs to adapt, and shouldn't assume anything unless the RPoolChannel is gone --- test/git/async/test_pool.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 0aa8f39b..3077dc32 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -26,7 +26,9 @@ class TestThreadTaskNode(InputIteratorThreadTask): def reset(self, iterator): self.process_count = 0 self.item_count = 0 + self._exc = None self._iterator = iterator + self._done = False def process(self, count=1): # must do it first, otherwise we might read and check results before @@ -97,12 +99,13 @@ class TestThreadPool(TestBase): print "read(0)" items = rc.read() assert len(items) == ni - task._assert(1, ni).reset(make_iter()) + task._assert(1, ni) assert items[0] == 0 and items[-1] == ni-1 # as the task is done, it should have been removed - we have read everything assert task.is_done() assert p.num_tasks() == null_tasks + task.reset(make_iter()) # pull individual items rc = p.add_task(task) -- cgit v1.2.1 From e5c0002d069382db1768349bf0c5ff40aafbf140 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 18:20:12 +0200 Subject: Revised task deletion works well, adjusted test to be creating new tasks all the time instead of reusing its own one, it was somewhat hard to manage its state over time and could cause bugs. It works okay, but it occasionally hangs, it appears to be an empty queue, have to gradually put certain things back in, although in the current mode of operation, it should never have empty queues from the pool to the user --- test/git/async/test_pool.py | 51 ++++++++++++++++++++++++++------------------- 1 file changed, 29 insertions(+), 22 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 3077dc32..82947988 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -6,14 +6,17 @@ from git.async.thread import terminate_threads from git.async.util import cpu_count import threading import time +import sys class TestThreadTaskNode(InputIteratorThreadTask): def __init__(self, *args, **kwargs): super(TestThreadTaskNode, self).__init__(*args, **kwargs) - self.reset(self._iterator) self.should_fail = False self.lock = threading.Lock() # yes, can't safely do x = x + 1 :) self.plock = threading.Lock() + self.item_count = 0 + self.process_count = 0 + self._scheduled_items = 0 def do_fun(self, item): self.lock.acquire() @@ -23,13 +26,6 @@ class TestThreadTaskNode(InputIteratorThreadTask): raise AssertionError("I am failing just for the fun of it") return item - def reset(self, iterator): - self.process_count = 0 - self.item_count = 0 - self._exc = None - self._iterator = iterator - self._done = False - def process(self, count=1): # must do it first, otherwise we might read and check results before # the thread gets here :). Its a lesson ! @@ -68,7 +64,7 @@ class TestThreadPool(TestBase): def _add_triple_task(self, p): """Add a triplet of feeder, transformer and finalizer to the pool, like t1 -> t2 -> t3, return all 3 return channels in order""" - t1 = TestThreadTaskNode(make_iter(), 'iterator', None) + # t1 = TestThreadTaskNode(make_task(), 'iterator', None) # TODO: def _assert_single_task(self, p, async=False): @@ -81,12 +77,13 @@ class TestThreadPool(TestBase): assert ni % 2 == 0, "ni needs to be dividable by 2" assert ni % 4 == 0, "ni needs to be dividable by 4" - def make_iter(): - return iter(range(ni)) + def make_task(): + t = TestThreadTaskNode(iter(range(ni)), 'iterator', None) + t.fun = t.do_fun + return t # END utility - task = TestThreadTaskNode(make_iter(), 'iterator', None) - task.fun = task.do_fun + task = make_task() assert p.num_tasks() == null_tasks rc = p.add_task(task) @@ -104,8 +101,9 @@ class TestThreadPool(TestBase): # as the task is done, it should have been removed - we have read everything assert task.is_done() + del(rc) assert p.num_tasks() == null_tasks - task.reset(make_iter()) + task = make_task() # pull individual items rc = p.add_task(task) @@ -126,14 +124,14 @@ class TestThreadPool(TestBase): # it couldn't yet notice that the input is depleted as we pulled exaclty # ni items - the next one would remove it. Instead, we delete our channel # which triggers orphan handling + assert not task.is_done() assert p.num_tasks() == 1 + null_tasks del(rc) assert p.num_tasks() == null_tasks - task.reset(make_iter()) - # test min count # if we query 1 item, it will prepare ni / 2 + task = make_task() task.min_count = ni / 2 rc = p.add_task(task) print "read(1)" @@ -149,6 +147,7 @@ class TestThreadPool(TestBase): print "read(%i)" % nri items = rc.read(nri) assert len(items) == nri + p.del_task(task) assert p.num_tasks() == null_tasks task._assert(2, ni) # two chunks, ni calls @@ -158,31 +157,36 @@ class TestThreadPool(TestBase): # test chunking # we always want 4 chunks, these could go to individual nodes - task.reset(make_iter()) + task = make_task() + task.min_count = ni / 2 # restore previous value task.max_chunksize = ni / 4 # 4 chunks rc = p.add_task(task) + # must read a specific item count # count is still at ni / 2 - here we want more than that # 2 steps with n / 4 items, + 1 step with n/4 items to get + 2 nri = ni / 2 + 2 - print "read(%i)" % nri + print "read(%i) chunksize set" % nri items = rc.read(nri) assert len(items) == nri # have n / 4 - 2 items on queue, want n / 4 in first chunk, cause 1 processing # ( 4 in total ). Still want n / 4 - 2 in second chunk, causing another processing nri = ni / 2 - 2 - print "read(%i)" % nri + print "read(%i) chunksize set" % nri items = rc.read(nri) assert len(items) == nri task._assert( 5, ni) + assert task.is_done() + del(rc) assert p.num_tasks() == null_tasks # depleted # but this only hits if we want too many items, if we want less, it could # still do too much - hence we set the min_count to the same number to enforce # at least ni / 4 items to be preocessed, no matter what we request - task.reset(make_iter()) + task = make_task() task.min_count = None + task.max_chunksize = ni / 4 # match previous setup rc = p.add_task(task) st = time.time() print "read(1) * %i, chunksize set" % ni @@ -203,8 +207,9 @@ class TestThreadPool(TestBase): assert p.num_tasks() == null_tasks # now with we set the minimum count to reduce the number of processing counts - task.reset(make_iter()) + task = make_task() task.min_count = ni / 4 + task.max_chunksize = ni / 4 # match previous setup rc = p.add_task(task) print "read(1) * %i, min_count%i + chunksize" % (ni, task.min_count) for i in range(ni): @@ -220,13 +225,15 @@ class TestThreadPool(TestBase): # test failure # on failure, the processing stops and the task is finished, keeping # his error for later - task.reset(make_iter()) + task = make_task() task.should_fail = True rc = p.add_task(task) print "read(0) with failure" assert len(rc.read()) == 0 # failure on first item print "done with everything" assert isinstance(task.error(), AssertionError) + assert task.is_done() # on error, its marked done as well + del(rc) assert p.num_tasks() == null_tasks -- cgit v1.2.1 From 772b95631916223e472989b43f3a31f61e237f31 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Tue, 8 Jun 2010 19:25:33 +0200 Subject: workerthread: adjusted to use a blocking queue, it will receive termination events only with its queue, with boosts performance into brigt green levels --- test/git/async/test_pool.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 82947988..756f1562 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -69,11 +69,12 @@ class TestThreadPool(TestBase): def _assert_single_task(self, p, async=False): """Performs testing in a synchronized environment""" + print >> sys.stderr, "Threadpool: Starting single task (async = %i) with %i threads" % (async, p.size()) null_tasks = p.num_tasks() # in case we had some before # add a simple task # it iterates n items - ni = 52 + ni = 1000 assert ni % 2 == 0, "ni needs to be dividable by 2" assert ni % 4 == 0, "ni needs to be dividable by 4" @@ -287,7 +288,9 @@ class TestThreadPool(TestBase): p.set_size(1) assert len(threading.enumerate()) == num_threads + 1 # deleting the pool stops its threads - just to be sure ;) + # Its not synchronized, hence we wait a moment del(p) + time.sleep(0.15) assert len(threading.enumerate()) == num_threads p = ThreadPool(1) -- cgit v1.2.1 From 15941ca090a2c3c987324fc911bbc6f89e941c47 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 10:34:12 +0200 Subject: queue: fixed critical bug in the notify method, as it was not at all thread-safe, causing locks to be released multiple times. Now it runs very fast, and very stable apparently. Now its about putting previous features back in, and studying their results, before more complex task graphs can be examined --- test/git/async/test_pool.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 756f1562..ac8f1244 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -74,7 +74,7 @@ class TestThreadPool(TestBase): # add a simple task # it iterates n items - ni = 1000 + ni = 5000 assert ni % 2 == 0, "ni needs to be dividable by 2" assert ni % 4 == 0, "ni needs to be dividable by 4" @@ -148,7 +148,7 @@ class TestThreadPool(TestBase): print "read(%i)" % nri items = rc.read(nri) assert len(items) == nri - p.del_task(task) + p.remove_task(task) assert p.num_tasks() == null_tasks task._assert(2, ni) # two chunks, ni calls @@ -204,7 +204,7 @@ class TestThreadPool(TestBase): task._assert(ni, ni) assert p.num_tasks() == 1 + null_tasks - assert p.del_task(task) is p # del manually this time + assert p.remove_task(task) is p # del manually this time assert p.num_tasks() == null_tasks # now with we set the minimum count to reduce the number of processing counts @@ -231,7 +231,7 @@ class TestThreadPool(TestBase): rc = p.add_task(task) print "read(0) with failure" assert len(rc.read()) == 0 # failure on first item - print "done with everything" + print >> sys.stderr, "done with everything" assert isinstance(task.error(), AssertionError) assert task.is_done() # on error, its marked done as well del(rc) @@ -290,7 +290,7 @@ class TestThreadPool(TestBase): # deleting the pool stops its threads - just to be sure ;) # Its not synchronized, hence we wait a moment del(p) - time.sleep(0.15) + time.sleep(0.25) assert len(threading.enumerate()) == num_threads p = ThreadPool(1) @@ -311,7 +311,6 @@ class TestThreadPool(TestBase): # threads per core p.set_size(4) self._assert_single_task(p, True) - # DEPENDENT TASK ASYNC MODE ########################### self._assert_async_dependent_tasks(p) -- cgit v1.2.1 From f2c8d26d3b25b864ad48e6de018757266b59f708 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 11:28:37 +0200 Subject: thread: fixed initialization problem if an empty iterable was handed in queue: Queue now derives from deque directly, which safes one dict lookup as the queue does not need to be accessed through self anymore pool test improved to better verify threads are started correctly --- test/git/async/test_pool.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index ac8f1244..d38cbebd 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -253,13 +253,22 @@ class TestThreadPool(TestBase): assert p.size() == 0 # increase and decrease the size + num_threads = len(threading.enumerate()) for i in range(self.max_threads): p.set_size(i) assert p.size() == i + assert len(threading.enumerate()) == num_threads + i + for i in range(self.max_threads, -1, -1): p.set_size(i) assert p.size() == i - + + assert p.size() == 0 + # threads should be killed already, but we let them a tiny amount of time + # just to be sure + time.sleep(0.05) + assert len(threading.enumerate()) == num_threads + # SINGLE TASK SERIAL SYNC MODE ############################## # put a few unrelated tasks that we forget about @@ -268,7 +277,6 @@ class TestThreadPool(TestBase): assert p.num_tasks() == 2 ## SINGLE TASK ################# - assert p.size() == 0 self._assert_single_task(p, False) assert p.num_tasks() == 2 del(urc1) @@ -281,11 +289,12 @@ class TestThreadPool(TestBase): self._assert_async_dependent_tasks(p) - # SINGLE TASK THREADED SYNC MODE + # SINGLE TASK THREADED ASYNC MODE ################################ # step one gear up - just one thread for now. - num_threads = len(threading.enumerate()) p.set_size(1) + assert p.size() == 1 + print len(threading.enumerate()), num_threads assert len(threading.enumerate()) == num_threads + 1 # deleting the pool stops its threads - just to be sure ;) # Its not synchronized, hence we wait a moment -- cgit v1.2.1 From 1090701721888474d34f8a4af28ee1bb1c3fdaaa Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 11:35:41 +0200 Subject: HSCondition: now deriving from deque, as the AsyncQeue does, to elimitate one more level of indirection. Clearly this not good from a design standpoint, as a Condition is no Deque, but it helps speeding things up which is what this is about. Could make it a hidden class to indicate how 'special' it is --- test/git/async/test_pool.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index d38cbebd..dacbf0be 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -289,8 +289,8 @@ class TestThreadPool(TestBase): self._assert_async_dependent_tasks(p) - # SINGLE TASK THREADED ASYNC MODE - ################################ + # SINGLE TASK THREADED ASYNC MODE ( 1 thread ) + ############################################## # step one gear up - just one thread for now. p.set_size(1) assert p.size() == 1 @@ -310,8 +310,8 @@ class TestThreadPool(TestBase): - # SINGLE TASK ASYNC MODE - ######################## + # SINGLE TASK ASYNC MODE ( 2 threads ) + ###################################### # two threads to compete for a single task p.set_size(2) self._assert_single_task(p, True) -- cgit v1.2.1 From 4e6bece08aea01859a232e99a1e1ad8cc1eb7d36 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 14:01:51 +0200 Subject: HSCondition: Fixed terrible bug which it inherited from its default python Condition implementation, related to the notify method not being treadsafe. Although I was aware of it, I missed the first check which tests for the size - the result could be incorrect if the whole method wasn't locked. Testing runs stable now, allowing to move on \! --- test/git/async/test_pool.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index dacbf0be..cccafddc 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -98,7 +98,8 @@ class TestThreadPool(TestBase): items = rc.read() assert len(items) == ni task._assert(1, ni) - assert items[0] == 0 and items[-1] == ni-1 + if not async: + assert items[0] == 0 and items[-1] == ni-1 # as the task is done, it should have been removed - we have read everything assert task.is_done() @@ -152,8 +153,14 @@ class TestThreadPool(TestBase): assert p.num_tasks() == null_tasks task._assert(2, ni) # two chunks, ni calls - # its already done, gives us no more + # its already done, gives us no more, its still okay to use it though + # as a task doesn't have to be in the graph to allow reading its produced + # items print "read(0) on closed" + # it can happen that a thread closes the channel just a tiny fraction of time + # after we check this, so the test fails, although it is nearly closed. + # When we start reading, we should wake up once it sends its signal + # assert task.is_closed() assert len(rc.read()) == 0 # test chunking @@ -231,12 +238,18 @@ class TestThreadPool(TestBase): rc = p.add_task(task) print "read(0) with failure" assert len(rc.read()) == 0 # failure on first item + print >> sys.stderr, "done with everything" + assert isinstance(task.error(), AssertionError) assert task.is_done() # on error, its marked done as well del(rc) assert p.num_tasks() == null_tasks + # test failure after ni / 2 items + # This makes sure it correctly closes the channel on failure to prevent blocking + + def _assert_async_dependent_tasks(self, p): # includes failure in center task, 'recursive' orphan cleanup -- cgit v1.2.1 From 0974f8737a3c56a7c076f9d0b757c6cb106324fb Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 14:47:41 +0200 Subject: Channel: Read method revised - now it really really doesn't block anymore, and it runs faster as well, about 2/3 of the performance we have when being in serial mode --- test/git/async/test_pool.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index cccafddc..202fdb66 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -57,6 +57,10 @@ class TestThreadTaskNode(InputIteratorThreadTask): return self +class TestThreadFailureNode(TestThreadTaskNode): + """Fails after X items""" + + class TestThreadPool(TestBase): max_threads = cpu_count() -- cgit v1.2.1 From 257a8a9441fca9a9bc384f673ba86ef5c3f1715d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 9 Jun 2010 21:19:54 +0200 Subject: test: prepared task dependency test, which already helped to find bug in the reference counting mechanism, causing references to the pool to be kepts via cycles --- test/git/async/test_pool.py | 159 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 128 insertions(+), 31 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 202fdb66..2a5e4647 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -8,15 +8,14 @@ import threading import time import sys -class TestThreadTaskNode(InputIteratorThreadTask): +class _TestTaskBase(object): def __init__(self, *args, **kwargs): - super(TestThreadTaskNode, self).__init__(*args, **kwargs) + super(_TestTaskBase, self).__init__(*args, **kwargs) self.should_fail = False self.lock = threading.Lock() # yes, can't safely do x = x + 1 :) self.plock = threading.Lock() self.item_count = 0 self.process_count = 0 - self._scheduled_items = 0 def do_fun(self, item): self.lock.acquire() @@ -32,44 +31,118 @@ class TestThreadTaskNode(InputIteratorThreadTask): self.plock.acquire() self.process_count += 1 self.plock.release() - super(TestThreadTaskNode, self).process(count) + super(_TestTaskBase, self).process(count) def _assert(self, pc, fc, check_scheduled=False): """Assert for num process counts (pc) and num function counts (fc) :return: self""" - # TODO: fixme - return self - self.plock.acquire() - if self.process_count != pc: - print self.process_count, pc - assert self.process_count == pc - self.plock.release() self.lock.acquire() if self.item_count != fc: print self.item_count, fc assert self.item_count == fc self.lock.release() - # if we read all, we can't really use scheduled items - if check_scheduled: - assert self._scheduled_items == 0 - assert not self.error() return self + +class TestThreadTaskNode(_TestTaskBase, InputIteratorThreadTask): + pass class TestThreadFailureNode(TestThreadTaskNode): """Fails after X items""" + def __init__(self, *args, **kwargs): + self.fail_after = kwargs.pop('fail_after') + super(TestThreadFailureNode, self).__init__(*args, **kwargs) + def do_fun(self, item): + item = TestThreadTaskNode.do_fun(self, item) + if self.item_count > self.fail_after: + raise AssertionError("Simulated failure after processing %i items" % self.fail_after) + return item + + +class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask): + """Apply a transformation on items read from an input channel""" + + def do_fun(self, item): + """return tuple(i, i*2)""" + item = super(TestThreadInputChannelTaskNode, self).do_fun(item) + if isinstance(item, tuple): + i = item[0] + return item + (i * self.id, ) + else: + return (item, item * self.id) + # END handle tuple + + +class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask): + """An input channel task, which verifies the result of its input channels, + should be last in the chain. + Id must be int""" + + def do_fun(self, item): + """return tuple(i, i*2)""" + item = super(TestThreadInputChannelTaskNode, self).do_fun(item) + + # make sure the computation order matches + assert isinstance(item, tuple) + + base = item[0] + for num in item[1:]: + assert num == base * 2 + base = num + # END verify order + + return item + + class TestThreadPool(TestBase): max_threads = cpu_count() - def _add_triple_task(self, p): - """Add a triplet of feeder, transformer and finalizer to the pool, like - t1 -> t2 -> t3, return all 3 return channels in order""" - # t1 = TestThreadTaskNode(make_task(), 'iterator', None) - # TODO: + def _add_task_chain(self, p, ni, count=1): + """Create a task chain of feeder, count transformers and order verifcator + to the pool p, like t1 -> t2 -> t3 + :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))""" + nt = p.num_tasks() + + feeder = self._make_iterator_task(ni) + frc = p.add_task(feeder) + + assert p.num_tasks() == nt + 1 + + rcs = [frc] + tasks = [feeder] + + inrc = frc + for tc in xrange(count): + t = TestThreadInputChannelTaskNode(inrc, tc, None) + t.fun = t.do_fun + inrc = p.add_task(t) + + tasks.append(t) + rcs.append(inrc) + assert p.num_tasks() == nt + 2 + tc + # END create count transformers + + verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None) + verifier.fun = verifier.do_fun + vrc = p.add_task(verifier) + + assert p.num_tasks() == nt + tc + 3 + + tasks.append(verifier) + rcs.append(vrc) + return tasks, rcs + + def _make_iterator_task(self, ni, taskcls=TestThreadTaskNode, **kwargs): + """:return: task which yields ni items + :param taskcls: the actual iterator type to use + :param **kwargs: additional kwargs to be passed to the task""" + t = taskcls(iter(range(ni)), 'iterator', None, **kwargs) + t.fun = t.do_fun + return t def _assert_single_task(self, p, async=False): """Performs testing in a synchronized environment""" @@ -82,11 +155,7 @@ class TestThreadPool(TestBase): assert ni % 2 == 0, "ni needs to be dividable by 2" assert ni % 4 == 0, "ni needs to be dividable by 4" - def make_task(): - t = TestThreadTaskNode(iter(range(ni)), 'iterator', None) - t.fun = t.do_fun - return t - # END utility + make_task = lambda *args, **kwargs: self._make_iterator_task(ni, *args, **kwargs) task = make_task() @@ -252,15 +321,44 @@ class TestThreadPool(TestBase): # test failure after ni / 2 items # This makes sure it correctly closes the channel on failure to prevent blocking + nri = ni/2 + task = make_task(TestThreadFailureNode, fail_after=ni/2) + rc = p.add_task(task) + assert len(rc.read()) == nri + assert task.is_done() + assert isinstance(task.error(), AssertionError) - def _assert_async_dependent_tasks(self, p): + def _assert_async_dependent_tasks(self, pool): # includes failure in center task, 'recursive' orphan cleanup # This will also verify that the channel-close mechanism works # t1 -> t2 -> t3 # t1 -> x -> t3 - pass + null_tasks = pool.num_tasks() + ni = 100 + count = 1 + make_task = lambda *args, **kwargs: self._add_task_chain(pool, ni, count, *args, **kwargs) + + ts, rcs = make_task() + assert len(ts) == count + 2 + assert len(rcs) == count + 2 + assert pool.num_tasks() == null_tasks + len(ts) + print pool._tasks.nodes + + + # in the end, we expect all tasks to be gone, automatically + + + + # order of deletion matters - just keep the end, then delete + final_rc = rcs[-1] + del(ts) + del(rcs) + del(final_rc) + assert pool.num_tasks() == null_tasks + + @terminate_threads def test_base(self): @@ -301,8 +399,8 @@ class TestThreadPool(TestBase): assert p.num_tasks() == 0 - # DEPENDENT TASKS SERIAL - ######################## + # DEPENDENT TASKS SYNC MODE + ########################### self._assert_async_dependent_tasks(p) @@ -311,12 +409,11 @@ class TestThreadPool(TestBase): # step one gear up - just one thread for now. p.set_size(1) assert p.size() == 1 - print len(threading.enumerate()), num_threads assert len(threading.enumerate()) == num_threads + 1 # deleting the pool stops its threads - just to be sure ;) # Its not synchronized, hence we wait a moment del(p) - time.sleep(0.25) + time.sleep(0.05) assert len(threading.enumerate()) == num_threads p = ThreadPool(1) -- cgit v1.2.1 From 3323464f85b986cba23176271da92a478b33ab9c Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 00:24:49 +0200 Subject: messy first version of a properly working depth-first graph method, which allows the pool to work as expected. Many more tests need to be added, and there still is a problem with shutdown as sometimes it won't kill all threads, mainly because the process came up with worker threads started, which cannot be --- test/git/async/test_pool.py | 48 +++++++++++++++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 15 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 2a5e4647..788ca3bf 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -67,6 +67,8 @@ class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask): def do_fun(self, item): """return tuple(i, i*2)""" item = super(TestThreadInputChannelTaskNode, self).do_fun(item) + #print "transformer.doit", self.id, item + if isinstance(item, tuple): i = item[0] return item + (i * self.id, ) @@ -82,15 +84,16 @@ class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask): def do_fun(self, item): """return tuple(i, i*2)""" - item = super(TestThreadInputChannelTaskNode, self).do_fun(item) + item = super(TestThreadInputChannelVerifyTaskNode, self).do_fun(item) + + # print "verifier.doit", self.id, item # make sure the computation order matches - assert isinstance(item, tuple) + assert isinstance(item, tuple), "input was no tuple: %s" % item base = item[0] - for num in item[1:]: - assert num == base * 2 - base = num + for id, num in enumerate(item[1:]): + assert num == base * (id), "%i != %i, orig = %s" % (num, base * id+1, str(item)) # END verify order return item @@ -146,6 +149,7 @@ class TestThreadPool(TestBase): def _assert_single_task(self, p, async=False): """Performs testing in a synchronized environment""" + return # DEBUG TODO: Fixme deactivated it print >> sys.stderr, "Threadpool: Starting single task (async = %i) with %i threads" % (async, p.size()) null_tasks = p.num_tasks() # in case we had some before @@ -335,33 +339,47 @@ class TestThreadPool(TestBase): # This will also verify that the channel-close mechanism works # t1 -> t2 -> t3 # t1 -> x -> t3 + print >> sys.stderr, "Threadpool: starting async dependency test in %i threads" % pool.size() null_tasks = pool.num_tasks() - ni = 100 - count = 1 + ni = 5000 + count = 3 + aic = count + 2 make_task = lambda *args, **kwargs: self._add_task_chain(pool, ni, count, *args, **kwargs) ts, rcs = make_task() - assert len(ts) == count + 2 - assert len(rcs) == count + 2 + assert len(ts) == aic + assert len(rcs) == aic assert pool.num_tasks() == null_tasks + len(ts) print pool._tasks.nodes - # in the end, we expect all tasks to be gone, automatically + # read all at once + print "read(0)" + st = time.time() + items = rcs[-1].read() + print "finished read(0)" + elapsed = time.time() - st + assert len(items) == ni + print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed) - # order of deletion matters - just keep the end, then delete - final_rc = rcs[-1] + # in the end, we expect all tasks to be gone, automatically + # order of deletion doesnt matter + print "del ts" del(ts) + print "del rcs" del(rcs) - del(final_rc) assert pool.num_tasks() == null_tasks - @terminate_threads + # for some reason, sometimes it has multiple workerthreads already when he + # enters the method ... dunno yet, pools should clean up themselvess + # @terminate_threads def test_base(self): + assert len(threading.enumerate()) == 1 + p = ThreadPool() # default pools have no workers @@ -438,4 +456,4 @@ class TestThreadPool(TestBase): ########################### self._assert_async_dependent_tasks(p) - + print >> sys.stderr, "Done with everything" -- cgit v1.2.1 From cfb278d74ad01f3f1edf5e0ad113974a9555038d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 10:14:32 +0200 Subject: InputChannelTask now has interface for properly handling the reading from the same and different pools --- test/git/async/test_pool.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 788ca3bf..3fb55e31 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -359,6 +359,7 @@ class TestThreadPool(TestBase): items = rcs[-1].read() print "finished read(0)" elapsed = time.time() - st + print len(items), ni assert len(items) == ni print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed) @@ -366,9 +367,7 @@ class TestThreadPool(TestBase): # in the end, we expect all tasks to be gone, automatically # order of deletion doesnt matter - print "del ts" del(ts) - print "del rcs" del(rcs) assert pool.num_tasks() == null_tasks @@ -376,7 +375,7 @@ class TestThreadPool(TestBase): # for some reason, sometimes it has multiple workerthreads already when he # enters the method ... dunno yet, pools should clean up themselvess - # @terminate_threads + #@terminate_threads def test_base(self): assert len(threading.enumerate()) == 1 @@ -457,3 +456,5 @@ class TestThreadPool(TestBase): self._assert_async_dependent_tasks(p) print >> sys.stderr, "Done with everything" + + # TODO: test multi-pool connections -- cgit v1.2.1 From 01eac1a959c1fa5894a86bf11e6b92f96762bdd8 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 12:06:16 +0200 Subject: Added more dependency task tests, especially the single-reads are not yet fully deterministic as tasks still run into the problem that they try to write into a closed channel, it was closed by one of their task-mates who didn't know someone else was still computing --- test/git/async/test_pool.py | 129 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 110 insertions(+), 19 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 3fb55e31..679bab31 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -9,6 +9,7 @@ import time import sys class _TestTaskBase(object): + """Note: causes great slowdown due to the required locking of task variables""" def __init__(self, *args, **kwargs): super(_TestTaskBase, self).__init__(*args, **kwargs) self.should_fail = False @@ -43,7 +44,8 @@ class _TestTaskBase(object): self.lock.release() return self - + + class TestThreadTaskNode(_TestTaskBase, InputIteratorThreadTask): pass @@ -56,18 +58,36 @@ class TestThreadFailureNode(TestThreadTaskNode): def do_fun(self, item): item = TestThreadTaskNode.do_fun(self, item) - if self.item_count > self.fail_after: - raise AssertionError("Simulated failure after processing %i items" % self.fail_after) + + self.lock.acquire() + try: + if self.item_count > self.fail_after: + raise AssertionError("Simulated failure after processing %i items" % self.fail_after) + finally: + self.lock.release() + # END handle fail after return item class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask): """Apply a transformation on items read from an input channel""" + def __init__(self, *args, **kwargs): + self.fail_after = kwargs.pop('fail_after', 0) + super(TestThreadInputChannelTaskNode, self).__init__(*args, **kwargs) def do_fun(self, item): """return tuple(i, i*2)""" item = super(TestThreadInputChannelTaskNode, self).do_fun(item) - #print "transformer.doit", self.id, item + + # fail after support + if self.fail_after: + self.lock.acquire() + try: + if self.item_count > self.fail_after: + raise AssertionError("Simulated failure after processing %i items" % self.fail_after) + finally: + self.lock.release() + # END handle fail-after if isinstance(item, tuple): i = item[0] @@ -86,14 +106,12 @@ class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask): """return tuple(i, i*2)""" item = super(TestThreadInputChannelVerifyTaskNode, self).do_fun(item) - # print "verifier.doit", self.id, item - # make sure the computation order matches assert isinstance(item, tuple), "input was no tuple: %s" % item base = item[0] for id, num in enumerate(item[1:]): - assert num == base * (id), "%i != %i, orig = %s" % (num, base * id+1, str(item)) + assert num == base * id, "%i != %i, orig = %s" % (num, base * id, str(item)) # END verify order return item @@ -104,9 +122,11 @@ class TestThreadPool(TestBase): max_threads = cpu_count() - def _add_task_chain(self, p, ni, count=1): + def _add_task_chain(self, p, ni, count=1, fail_setup=list()): """Create a task chain of feeder, count transformers and order verifcator to the pool p, like t1 -> t2 -> t3 + :param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would + make the third transformer fail after 20 items :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))""" nt = p.num_tasks() @@ -129,6 +149,11 @@ class TestThreadPool(TestBase): assert p.num_tasks() == nt + 2 + tc # END create count transformers + # setup failure + for id, fail_after in fail_setup: + tasks[1+id].fail_after = fail_after + # END setup failure + verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None) verifier.fun = verifier.do_fun vrc = p.add_task(verifier) @@ -149,7 +174,7 @@ class TestThreadPool(TestBase): def _assert_single_task(self, p, async=False): """Performs testing in a synchronized environment""" - return # DEBUG TODO: Fixme deactivated it + # return # DEBUG TODO: Fixme deactivated it print >> sys.stderr, "Threadpool: Starting single task (async = %i) with %i threads" % (async, p.size()) null_tasks = p.num_tasks() # in case we had some before @@ -316,8 +341,6 @@ class TestThreadPool(TestBase): print "read(0) with failure" assert len(rc.read()) == 0 # failure on first item - print >> sys.stderr, "done with everything" - assert isinstance(task.error(), AssertionError) assert task.is_done() # on error, its marked done as well del(rc) @@ -332,39 +355,107 @@ class TestThreadPool(TestBase): assert task.is_done() assert isinstance(task.error(), AssertionError) + print >> sys.stderr, "done with everything" + def _assert_async_dependent_tasks(self, pool): # includes failure in center task, 'recursive' orphan cleanup # This will also verify that the channel-close mechanism works # t1 -> t2 -> t3 - # t1 -> x -> t3 + print >> sys.stderr, "Threadpool: starting async dependency test in %i threads" % pool.size() null_tasks = pool.num_tasks() ni = 5000 count = 3 aic = count + 2 make_task = lambda *args, **kwargs: self._add_task_chain(pool, ni, count, *args, **kwargs) - ts, rcs = make_task() assert len(ts) == aic assert len(rcs) == aic assert pool.num_tasks() == null_tasks + len(ts) print pool._tasks.nodes - - # read all at once - print "read(0)" + # read(0) + ######### st = time.time() items = rcs[-1].read() - print "finished read(0)" elapsed = time.time() - st - print len(items), ni assert len(items) == ni + del(rcs) + assert pool.num_tasks() == 0 # tasks depleted, all done, no handles + print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed) - print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed) + + # read(1) + ######### + ts, rcs = make_task() + st = time.time() + for i in xrange(ni): + items = rcs[-1].read(1) + assert len(items) == 1 + # END for each item to pull + elapsed_single = time.time() - st + # another read yields nothing, its empty + assert len(rcs[-1].read()) == 0 + print >> sys.stderr, "Dependent Tasks: evaluated %i items with read(1) of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed_single, ni / elapsed_single) + + + # read with min-count size + ########################### + # must be faster, as it will read ni / 4 chunks + # Its enough to set one task, as it will force all others in the chain + # to min_size as well. + ts, rcs = make_task() + assert pool.num_tasks() == len(ts) + nri = ni / 4 + ts[-1].min_count = nri + st = time.time() + for i in xrange(ni): + items = rcs[-1].read(1) + assert len(items) == 1 + # END for each item to read + elapsed_minsize = time.time() - st + # its empty + assert len(rcs[-1].read()) == 0 + print >> sys.stderr, "Dependent Tasks: evaluated %i items with read(1), min_size=%i, of %i dependent in %f s ( %i items / s )" % (ni, nri, aic, elapsed_minsize, ni / elapsed_minsize) + + # it should have been a bit faster at least, and most of the time it is + # Sometimes, its not, mainly because: + # * The test tasks lock a lot, hence they slow down the system + # * Each read will still trigger the pool to evaluate, causing some overhead + # even though there are enough items on the queue in that case. Keeping + # track of the scheduled items helped there, but it caused further inacceptable + # slowdown + # assert elapsed_minsize < elapsed_single + + + # read with failure + ################### + # it should recover and give at least fail_after items + # t1 -> x -> t3 + fail_after = ni/2 + ts, rcs = make_task(fail_setup=[(0, fail_after)]) + items = rcs[-1].read() + assert len(items) == fail_after + # MULTI-POOL + # If two pools are connected, this shold work as well. + # The second one has just one more thread + if False: + p2 = ThreadPool(1) + assert p2.size() == 1 + p2ts, p2rcs = self._add_task_chain(p2, ni, count) + + ts, rcs = make_task() + + + del(p2ts) + del(p2rcs) + assert p2.num_tasks() == 0 + del(p2) + # in the end, we expect all tasks to be gone, automatically # order of deletion doesnt matter del(ts) -- cgit v1.2.1 From 55e757928e493ce93056822d510482e4ffcaac2d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 14:39:57 +0200 Subject: channel: Changed design to be more logical - a channel now has any amount of readers and writers, a ready is not connected to its writer anymore. This changes the refcounting of course, which is why the auto-cleanup for the pool is currently broken. The benefit of this are faster writes to the channel, reading didn't improve, refcounts should be clearer now --- test/git/async/test_pool.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 679bab31..d34f6773 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -191,8 +191,8 @@ class TestThreadPool(TestBase): assert p.num_tasks() == null_tasks rc = p.add_task(task) assert p.num_tasks() == 1 + null_tasks - assert isinstance(rc, RPoolChannel) - assert task._out_wc is not None + assert isinstance(rc, PoolReader) + assert task._out_writer is not None # pull the result completely - we should get one task, which calls its # function once. In sync mode, the order matches @@ -460,6 +460,7 @@ class TestThreadPool(TestBase): # order of deletion doesnt matter del(ts) del(rcs) + print pool.num_tasks() assert pool.num_tasks() == null_tasks -- cgit v1.2.1 From 7c36f3648e39ace752c67c71867693ce1eee52a3 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 15:38:40 +0200 Subject: Now tracking the amount of concurrent writers to assure the channel is closed only when there is no one else writing to it. This assures that all tasks can continue working, and put their results accordingly. Shutdown is still not working correctly, but that should be solvable as well. Its still not perfect though ... --- test/git/async/test_pool.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index d34f6773..7cb94a86 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -42,7 +42,7 @@ class _TestTaskBase(object): print self.item_count, fc assert self.item_count == fc self.lock.release() - + assert self._num_writers == 0 return self @@ -381,6 +381,7 @@ class TestThreadPool(TestBase): st = time.time() items = rcs[-1].read() elapsed = time.time() - st + print len(items), ni assert len(items) == ni del(rcs) assert pool.num_tasks() == 0 # tasks depleted, all done, no handles -- cgit v1.2.1 From fbe062bf6dacd3ad63dd827d898337fa542931ac Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 23:55:50 +0200 Subject: Added dependency-task tests, and fixed plenty of ref-count related bugs, as well as concurrency issues. Now it works okay, but the thread-shutdown is still an issue, as it causes incorrect behaviour making the tests fail. Its good, as it hints at additional issues that need to be solved. There is just a little more left on the feature side, but its nearly there --- test/git/async/test_pool.py | 129 +++++++++++++++++++++++++++++++++----------- 1 file changed, 98 insertions(+), 31 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 7cb94a86..4851f61b 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -5,6 +5,7 @@ from git.async.task import * from git.async.thread import terminate_threads from git.async.util import cpu_count import threading +import weakref import time import sys @@ -42,7 +43,9 @@ class _TestTaskBase(object): print self.item_count, fc assert self.item_count == fc self.lock.release() + self._wlock.acquire() assert self._num_writers == 0 + self._wlock.release() return self @@ -122,31 +125,47 @@ class TestThreadPool(TestBase): max_threads = cpu_count() - def _add_task_chain(self, p, ni, count=1, fail_setup=list()): + + def _make_proxy_method(self, t): + """required to prevent binding self into the method we call""" + wt = weakref.proxy(t) + return lambda item: wt.do_fun(item) + + def _add_task_chain(self, p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0): """Create a task chain of feeder, count transformers and order verifcator to the pool p, like t1 -> t2 -> t3 :param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would make the third transformer fail after 20 items + :param feeder_channel: if set to a channel, it will be used as input of the + first transformation task. The respective first task in the return value + will be None. + :param id_offset: defines the id of the first transformation task, all subsequent + ones will add one :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))""" nt = p.num_tasks() - feeder = self._make_iterator_task(ni) - frc = p.add_task(feeder) - - assert p.num_tasks() == nt + 1 + feeder = None + frc = feeder_channel + if feeder_channel is None: + feeder = self._make_iterator_task(ni) + frc = p.add_task(feeder) + # END handle specific feeder rcs = [frc] tasks = [feeder] + make_proxy_method = self._make_proxy_method + inrc = frc for tc in xrange(count): - t = TestThreadInputChannelTaskNode(inrc, tc, None) - t.fun = t.do_fun + t = TestThreadInputChannelTaskNode(inrc, tc+id_offset, None) + + t.fun = make_proxy_method(t) + #t.fun = t.do_fun inrc = p.add_task(t) tasks.append(t) rcs.append(inrc) - assert p.num_tasks() == nt + 2 + tc # END create count transformers # setup failure @@ -155,10 +174,10 @@ class TestThreadPool(TestBase): # END setup failure verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None) - verifier.fun = verifier.do_fun + #verifier.fun = verifier.do_fun + verifier.fun = make_proxy_method(verifier) vrc = p.add_task(verifier) - assert p.num_tasks() == nt + tc + 3 tasks.append(verifier) rcs.append(vrc) @@ -169,7 +188,7 @@ class TestThreadPool(TestBase): :param taskcls: the actual iterator type to use :param **kwargs: additional kwargs to be passed to the task""" t = taskcls(iter(range(ni)), 'iterator', None, **kwargs) - t.fun = t.do_fun + t.fun = self._make_proxy_method(t) return t def _assert_single_task(self, p, async=False): @@ -385,6 +404,14 @@ class TestThreadPool(TestBase): assert len(items) == ni del(rcs) assert pool.num_tasks() == 0 # tasks depleted, all done, no handles + # wait a tiny moment - there could still be something unprocessed on the + # queue, increasing the refcount + time.sleep(0.15) + import gc + print gc.get_referrers(ts[-1]) + print len(pool._queue) + assert sys.getrefcount(ts[-1]) == 2 # ts + call + assert sys.getrefcount(ts[0]) == 2 # ts + call print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed) @@ -444,25 +471,53 @@ class TestThreadPool(TestBase): # MULTI-POOL # If two pools are connected, this shold work as well. # The second one has just one more thread - if False: - p2 = ThreadPool(1) - assert p2.size() == 1 - p2ts, p2rcs = self._add_task_chain(p2, ni, count) - - ts, rcs = make_task() - - - del(p2ts) - del(p2rcs) - assert p2.num_tasks() == 0 - del(p2) - - # in the end, we expect all tasks to be gone, automatically - # order of deletion doesnt matter + ts, rcs = make_task() + + # connect verifier channel as feeder of the second pool + p2 = ThreadPool(1) + assert p2.size() == 1 + p2ts, p2rcs = self._add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count) + assert p2ts[0] is None # we have no feeder task + assert rcs[-1].pool_ref()() is pool # it didnt change the pool + assert rcs[-1] is p2ts[1].reader() + assert p2.num_tasks() == len(p2ts)-1 # first is None + + # reading from the last one will evaluate all pools correctly + print "read(0) multi-pool" + items = p2rcs[-1].read() + assert len(items) == ni + + # now that both are connected, I can drop my handle to the reader + # without affecting the task-count, but whats more important: + # They remove their tasks correctly once we drop our references in the + # right order + del(p2ts) + assert p2rcs[0] is rcs[-1] + del(p2rcs) + assert p2.num_tasks() == 0 + del(p2) + + assert pool.num_tasks() == null_tasks + len(ts) + + del(ts) + print "del rcs" + print rcs[-1] + print sys.getrefcount(rcs[-1]) del(rcs) + # TODO: make this work - something with the refcount goes wrong, + # they never get cleaned up properly + ts = pool._tasks.nodes print pool.num_tasks() - assert pool.num_tasks() == null_tasks + assert pool.num_tasks() == null_tasks + + + # TODO: Test multi-read(1) + + # in the end, we expect all tasks to be gone, automatically + # order of deletion doesnt matter + + @@ -496,17 +551,28 @@ class TestThreadPool(TestBase): # SINGLE TASK SERIAL SYNC MODE ############################## - # put a few unrelated tasks that we forget about - urc1 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None)) - urc2 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None)) + # put a few unrelated tasks that we forget about - check ref counts and cleanup + t1, t2 = TestThreadTaskNode(iter(list()), "nothing1", None), TestThreadTaskNode(iter(list()), "nothing2", None) + urc1 = p.add_task(t1) + urc2 = p.add_task(t2) assert p.num_tasks() == 2 ## SINGLE TASK ################# self._assert_single_task(p, False) assert p.num_tasks() == 2 del(urc1) - del(urc2) + assert p.num_tasks() == 1 + + p.remove_task(t2) + assert p.num_tasks() == 0 + assert sys.getrefcount(t2) == 2 + + t3 = TestThreadInputChannelTaskNode(urc2, "channel", None) + urc3 = p.add_task(t3) + assert p.num_tasks() == 1 + del(urc3) assert p.num_tasks() == 0 + assert sys.getrefcount(t3) == 2 # DEPENDENT TASKS SYNC MODE @@ -519,6 +585,7 @@ class TestThreadPool(TestBase): # step one gear up - just one thread for now. p.set_size(1) assert p.size() == 1 + print len(threading.enumerate()) assert len(threading.enumerate()) == num_threads + 1 # deleting the pool stops its threads - just to be sure ;) # Its not synchronized, hence we wait a moment -- cgit v1.2.1 From 6d1212e8c412b0b4802bc1080d38d54907db879d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 11:52:01 +0200 Subject: IMPORTANT: sometimes, when notifying waiters by releasing their lock, the lock is not actually released or they are not actually notifyied, staying in a beautysleep. This glitch is probably caused by some detail not treated correctly in the thread python module, which is something we cannot fix. It works most of the time as expected though - maybe some cleanup is not done correctly which causes this --- test/git/async/test_pool.py | 26 ++++++++------------------ 1 file changed, 8 insertions(+), 18 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 4851f61b..5bb48cc2 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -199,7 +199,7 @@ class TestThreadPool(TestBase): # add a simple task # it iterates n items - ni = 5000 + ni = 1000 assert ni % 2 == 0, "ni needs to be dividable by 2" assert ni % 4 == 0, "ni needs to be dividable by 4" @@ -382,18 +382,18 @@ class TestThreadPool(TestBase): # includes failure in center task, 'recursive' orphan cleanup # This will also verify that the channel-close mechanism works # t1 -> t2 -> t3 - + print >> sys.stderr, "Threadpool: starting async dependency test in %i threads" % pool.size() null_tasks = pool.num_tasks() - ni = 5000 + ni = 1000 count = 3 aic = count + 2 make_task = lambda *args, **kwargs: self._add_task_chain(pool, ni, count, *args, **kwargs) + ts, rcs = make_task() assert len(ts) == aic assert len(rcs) == aic assert pool.num_tasks() == null_tasks + len(ts) - print pool._tasks.nodes # read(0) ######### @@ -407,9 +407,6 @@ class TestThreadPool(TestBase): # wait a tiny moment - there could still be something unprocessed on the # queue, increasing the refcount time.sleep(0.15) - import gc - print gc.get_referrers(ts[-1]) - print len(pool._queue) assert sys.getrefcount(ts[-1]) == 2 # ts + call assert sys.getrefcount(ts[0]) == 2 # ts + call print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed) @@ -467,15 +464,15 @@ class TestThreadPool(TestBase): items = rcs[-1].read() assert len(items) == fail_after - + # MULTI-POOL # If two pools are connected, this shold work as well. # The second one has just one more thread ts, rcs = make_task() # connect verifier channel as feeder of the second pool - p2 = ThreadPool(1) - assert p2.size() == 1 + p2 = ThreadPool(0) # don't spawn new threads, they have the tendency not to wake up on mutexes + assert p2.size() == 0 p2ts, p2rcs = self._add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count) assert p2ts[0] is None # we have no feeder task assert rcs[-1].pool_ref()() is pool # it didnt change the pool @@ -501,14 +498,8 @@ class TestThreadPool(TestBase): del(ts) - print "del rcs" - print rcs[-1] - print sys.getrefcount(rcs[-1]) del(rcs) - # TODO: make this work - something with the refcount goes wrong, - # they never get cleaned up properly - ts = pool._tasks.nodes - print pool.num_tasks() + assert pool.num_tasks() == null_tasks @@ -585,7 +576,6 @@ class TestThreadPool(TestBase): # step one gear up - just one thread for now. p.set_size(1) assert p.size() == 1 - print len(threading.enumerate()) assert len(threading.enumerate()) == num_threads + 1 # deleting the pool stops its threads - just to be sure ;) # Its not synchronized, hence we wait a moment -- cgit v1.2.1 From 5ff864138cd1e680a78522c26b583639f8f5e313 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 14:37:51 +0200 Subject: test.async: split test_pool up into task implenetations and related utilities, as well as the tests themselves. File became too large --- test/git/async/test_pool.py | 186 ++------------------------------------------ 1 file changed, 6 insertions(+), 180 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 5bb48cc2..0fa34f6a 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -1,196 +1,22 @@ """Channel testing""" from test.testlib import * +from task import * + from git.async.pool import * -from git.async.task import * from git.async.thread import terminate_threads from git.async.util import cpu_count + import threading import weakref import time import sys -class _TestTaskBase(object): - """Note: causes great slowdown due to the required locking of task variables""" - def __init__(self, *args, **kwargs): - super(_TestTaskBase, self).__init__(*args, **kwargs) - self.should_fail = False - self.lock = threading.Lock() # yes, can't safely do x = x + 1 :) - self.plock = threading.Lock() - self.item_count = 0 - self.process_count = 0 - - def do_fun(self, item): - self.lock.acquire() - self.item_count += 1 - self.lock.release() - if self.should_fail: - raise AssertionError("I am failing just for the fun of it") - return item - - def process(self, count=1): - # must do it first, otherwise we might read and check results before - # the thread gets here :). Its a lesson ! - self.plock.acquire() - self.process_count += 1 - self.plock.release() - super(_TestTaskBase, self).process(count) - - def _assert(self, pc, fc, check_scheduled=False): - """Assert for num process counts (pc) and num function counts (fc) - :return: self""" - self.lock.acquire() - if self.item_count != fc: - print self.item_count, fc - assert self.item_count == fc - self.lock.release() - self._wlock.acquire() - assert self._num_writers == 0 - self._wlock.release() - return self - - -class TestThreadTaskNode(_TestTaskBase, InputIteratorThreadTask): - pass - - -class TestThreadFailureNode(TestThreadTaskNode): - """Fails after X items""" - def __init__(self, *args, **kwargs): - self.fail_after = kwargs.pop('fail_after') - super(TestThreadFailureNode, self).__init__(*args, **kwargs) - - def do_fun(self, item): - item = TestThreadTaskNode.do_fun(self, item) - - self.lock.acquire() - try: - if self.item_count > self.fail_after: - raise AssertionError("Simulated failure after processing %i items" % self.fail_after) - finally: - self.lock.release() - # END handle fail after - return item - - -class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask): - """Apply a transformation on items read from an input channel""" - def __init__(self, *args, **kwargs): - self.fail_after = kwargs.pop('fail_after', 0) - super(TestThreadInputChannelTaskNode, self).__init__(*args, **kwargs) - - def do_fun(self, item): - """return tuple(i, i*2)""" - item = super(TestThreadInputChannelTaskNode, self).do_fun(item) - - # fail after support - if self.fail_after: - self.lock.acquire() - try: - if self.item_count > self.fail_after: - raise AssertionError("Simulated failure after processing %i items" % self.fail_after) - finally: - self.lock.release() - # END handle fail-after - - if isinstance(item, tuple): - i = item[0] - return item + (i * self.id, ) - else: - return (item, item * self.id) - # END handle tuple -class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask): - """An input channel task, which verifies the result of its input channels, - should be last in the chain. - Id must be int""" - - def do_fun(self, item): - """return tuple(i, i*2)""" - item = super(TestThreadInputChannelVerifyTaskNode, self).do_fun(item) - - # make sure the computation order matches - assert isinstance(item, tuple), "input was no tuple: %s" % item - - base = item[0] - for id, num in enumerate(item[1:]): - assert num == base * id, "%i != %i, orig = %s" % (num, base * id, str(item)) - # END verify order - - return item - - - class TestThreadPool(TestBase): max_threads = cpu_count() - - def _make_proxy_method(self, t): - """required to prevent binding self into the method we call""" - wt = weakref.proxy(t) - return lambda item: wt.do_fun(item) - - def _add_task_chain(self, p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0): - """Create a task chain of feeder, count transformers and order verifcator - to the pool p, like t1 -> t2 -> t3 - :param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would - make the third transformer fail after 20 items - :param feeder_channel: if set to a channel, it will be used as input of the - first transformation task. The respective first task in the return value - will be None. - :param id_offset: defines the id of the first transformation task, all subsequent - ones will add one - :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))""" - nt = p.num_tasks() - - feeder = None - frc = feeder_channel - if feeder_channel is None: - feeder = self._make_iterator_task(ni) - frc = p.add_task(feeder) - # END handle specific feeder - - rcs = [frc] - tasks = [feeder] - - make_proxy_method = self._make_proxy_method - - inrc = frc - for tc in xrange(count): - t = TestThreadInputChannelTaskNode(inrc, tc+id_offset, None) - - t.fun = make_proxy_method(t) - #t.fun = t.do_fun - inrc = p.add_task(t) - - tasks.append(t) - rcs.append(inrc) - # END create count transformers - - # setup failure - for id, fail_after in fail_setup: - tasks[1+id].fail_after = fail_after - # END setup failure - - verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None) - #verifier.fun = verifier.do_fun - verifier.fun = make_proxy_method(verifier) - vrc = p.add_task(verifier) - - - tasks.append(verifier) - rcs.append(vrc) - return tasks, rcs - - def _make_iterator_task(self, ni, taskcls=TestThreadTaskNode, **kwargs): - """:return: task which yields ni items - :param taskcls: the actual iterator type to use - :param **kwargs: additional kwargs to be passed to the task""" - t = taskcls(iter(range(ni)), 'iterator', None, **kwargs) - t.fun = self._make_proxy_method(t) - return t - def _assert_single_task(self, p, async=False): """Performs testing in a synchronized environment""" # return # DEBUG TODO: Fixme deactivated it @@ -203,7 +29,7 @@ class TestThreadPool(TestBase): assert ni % 2 == 0, "ni needs to be dividable by 2" assert ni % 4 == 0, "ni needs to be dividable by 4" - make_task = lambda *args, **kwargs: self._make_iterator_task(ni, *args, **kwargs) + make_task = lambda *args, **kwargs: make_iterator_task(ni, *args, **kwargs) task = make_task() @@ -388,7 +214,7 @@ class TestThreadPool(TestBase): ni = 1000 count = 3 aic = count + 2 - make_task = lambda *args, **kwargs: self._add_task_chain(pool, ni, count, *args, **kwargs) + make_task = lambda *args, **kwargs: add_task_chain(pool, ni, count, *args, **kwargs) ts, rcs = make_task() assert len(ts) == aic @@ -473,7 +299,7 @@ class TestThreadPool(TestBase): # connect verifier channel as feeder of the second pool p2 = ThreadPool(0) # don't spawn new threads, they have the tendency not to wake up on mutexes assert p2.size() == 0 - p2ts, p2rcs = self._add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count) + p2ts, p2rcs = add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count) assert p2ts[0] is None # we have no feeder task assert rcs[-1].pool_ref()() is pool # it didnt change the pool assert rcs[-1] is p2ts[1].reader() -- cgit v1.2.1 From 18e3252a1f655f09093a4cffd5125342a8f94f3b Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 14:58:51 +0200 Subject: Finished dependent task testing according to the features we would currently like to see --- test/git/async/test_pool.py | 48 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 42 insertions(+), 6 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 0fa34f6a..40c6d66e 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -132,8 +132,13 @@ class TestThreadPool(TestBase): assert len(items) == nri task._assert( 5, ni) - assert task.is_done() + + # delete the handle first, causing the task to be removed and to be set + # done. We check for the set-done state later. Depending on the timing, + # The task is not yet set done when we are checking it because we were + # scheduled in before the flag could be set. del(rc) + assert task.is_done() assert p.num_tasks() == null_tasks # depleted # but this only hits if we want too many items, if we want less, it could @@ -307,9 +312,41 @@ class TestThreadPool(TestBase): # reading from the last one will evaluate all pools correctly print "read(0) multi-pool" + st = time.time() items = p2rcs[-1].read() + elapsed = time.time() - st assert len(items) == ni + print >> sys.stderr, "Dependent Tasks: evaluated 2 connected pools and %i items with read(0), of %i dependent tasks in %f s ( %i items / s )" % (ni, aic + aic-1, elapsed, ni / elapsed) + + + # loose the handles of the second pool to allow others to go as well + del(p2rcs); del(p2ts) + assert p2.num_tasks() == 0 + + # now we lost our old handles as well, and the tasks go away + ts, rcs = make_task() + assert pool.num_tasks() == len(ts) + + p2ts, p2rcs = add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count) + assert p2.num_tasks() == len(p2ts) - 1 + + # Test multi-read(1) + print "read(1) * %i" % ni + reader = rcs[-1] + st = time.time() + for i in xrange(ni): + items = reader.read(1) + assert len(items) == 1 + # END for each item to get + elapsed = time.time() - st + del(reader) # decrement refcount + + print >> sys.stderr, "Dependent Tasks: evaluated 2 connected pools and %i items with read(1), of %i dependent tasks in %f s ( %i items / s )" % (ni, aic + aic-1, elapsed, ni / elapsed) + + # another read is empty + assert len(rcs[-1].read()) == 0 + # now that both are connected, I can drop my handle to the reader # without affecting the task-count, but whats more important: # They remove their tasks correctly once we drop our references in the @@ -329,11 +366,10 @@ class TestThreadPool(TestBase): assert pool.num_tasks() == null_tasks - # TODO: Test multi-read(1) - - # in the end, we expect all tasks to be gone, automatically - # order of deletion doesnt matter - + # ASSERTION: We already tested that one pool behaves correctly when an error + # occours - if two pools handle their ref-counts correctly, which they + # do if we are here, then they should handle errors happening during + # the task processing as expected as well. Hence we can safe this here -- cgit v1.2.1 From 1873db442dc7511fc2c92fbaeb8d998d3e62723d Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 16:25:27 +0200 Subject: Improved shutdown handling - although its impossible to prevent some stderr printing thanks to the underlying threading implementation, we can at least make sure that the interpreter doesn't block during shutdown. Now it appears to be running smoothly --- test/git/async/test_pool.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 40c6d66e..c786770a 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -373,10 +373,7 @@ class TestThreadPool(TestBase): - - # for some reason, sometimes it has multiple workerthreads already when he - # enters the method ... dunno yet, pools should clean up themselvess - #@terminate_threads + @terminate_threads def test_base(self): assert len(threading.enumerate()) == 1 @@ -463,10 +460,11 @@ class TestThreadPool(TestBase): # threads per core p.set_size(4) self._assert_single_task(p, True) + + # DEPENDENT TASK ASYNC MODE ########################### self._assert_async_dependent_tasks(p) print >> sys.stderr, "Done with everything" - # TODO: test multi-pool connections -- cgit v1.2.1 From e14e3f143e7260de9581aee27e5a9b2645db72de Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 16:42:09 +0200 Subject: Removed commented-out debug code and additional debug printings. Verified it works on py2.4, 2.5 and 2.6 --- test/git/async/test_pool.py | 1 - 1 file changed, 1 deletion(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index c786770a..0042c4a8 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -19,7 +19,6 @@ class TestThreadPool(TestBase): def _assert_single_task(self, p, async=False): """Performs testing in a synchronized environment""" - # return # DEBUG TODO: Fixme deactivated it print >> sys.stderr, "Threadpool: Starting single task (async = %i) with %i threads" % (async, p.size()) null_tasks = p.num_tasks() # in case we had some before -- cgit v1.2.1 From 7a0b79ee574999ecbc76696506352e4a5a0d7159 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 12 Jun 2010 12:38:02 +0200 Subject: task: improved naming of task types, improved pool test to be less dependent on starting with just the main thread --- test/git/async/test_pool.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) (limited to 'test/git/async/test_pool.py') diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 0042c4a8..aab618aa 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -198,7 +198,7 @@ class TestThreadPool(TestBase): # test failure after ni / 2 items # This makes sure it correctly closes the channel on failure to prevent blocking nri = ni/2 - task = make_task(TestThreadFailureNode, fail_after=ni/2) + task = make_task(TestFailureThreadTask, fail_after=ni/2) rc = p.add_task(task) assert len(rc.read()) == nri assert task.is_done() @@ -374,7 +374,14 @@ class TestThreadPool(TestBase): @terminate_threads def test_base(self): - assert len(threading.enumerate()) == 1 + max_wait_attempts = 3 + sleep_time = 0.1 + for mc in range(max_wait_attempts): + # wait for threads to die + if len(threading.enumerate()) != 1: + time.sleep(sleep_time) + # END for each attempt + assert len(threading.enumerate()) == 1, "Waited %f s for threads to die, its still alive" % (max_wait_attempts, sleep_time) p = ThreadPool() @@ -401,7 +408,7 @@ class TestThreadPool(TestBase): # SINGLE TASK SERIAL SYNC MODE ############################## # put a few unrelated tasks that we forget about - check ref counts and cleanup - t1, t2 = TestThreadTaskNode(iter(list()), "nothing1", None), TestThreadTaskNode(iter(list()), "nothing2", None) + t1, t2 = TestThreadTask(iter(list()), "nothing1", None), TestThreadTask(iter(list()), "nothing2", None) urc1 = p.add_task(t1) urc2 = p.add_task(t2) assert p.num_tasks() == 2 @@ -416,7 +423,7 @@ class TestThreadPool(TestBase): assert p.num_tasks() == 0 assert sys.getrefcount(t2) == 2 - t3 = TestThreadInputChannelTaskNode(urc2, "channel", None) + t3 = TestChannelThreadTask(urc2, "channel", None) urc3 = p.add_task(t3) assert p.num_tasks() == 1 del(urc3) -- cgit v1.2.1