diff options
author | Thomas Haller <thaller@redhat.com> | 2019-10-12 11:02:21 +0200 |
---|---|---|
committer | Thomas Haller <thaller@redhat.com> | 2019-10-12 13:16:53 +0200 |
commit | bb4b749595bc83cbe0567b716394c2da751787cf (patch) | |
tree | 900110274004ffd839a425cfc9b39c7f926d9aa7 /clients/tests | |
parent | facfc94744e2f952f5058fb0693915bc674ff8c2 (diff) | |
download | NetworkManager-bb4b749595bc83cbe0567b716394c2da751787cf.tar.gz |
clients/tests: don't wait for first job before scheduling parallel jobs
Previously, the test would kick off 15 processes in parallel, but
the first job in the queue would block more processes from being
started.
That is, async_start() would only start 15 processes, but since none of
them were reaped before async_wait() was called, no more than 15 jobs
were running during the start phase. That is not a real issue, because
the start phase is non-blocking and queues all the jobs quickly. It's
not really expected that during that time many processes already completed.
Anyway, this was a bit ugly.
The bigger problem is that async_wait() would always block for the
first job to complete, before starting more processes. That means,
if the first job in the queue takes unusually long, then this blocks
other processes from getting reaped and new processes from being
started.
Instead, don't block only one one jobs, but poll them in turn for a
short amount of time. Whichever process exits first will be completed
and more jobs will be started.
In fact, in the current setup it's hard to notice any difference,
because all nmcli invocations take about the same time and are
relatively fast. That this approach parallelizes better can be seen
when the runtime of jobs varies stronger (and some invocations take
a notably longer time). As we later want to run nmcli under valgrind,
this probably will make a difference.
An alternative would be not to poll()/wait() for child processes,
but somehow get notified. For example, we could use a GMainContext
and watch child processes. But that's probably more complicated
to do, so let's keep the naive approach with polling.
Diffstat (limited to 'clients/tests')
-rwxr-xr-x | clients/tests/test-client.py | 172 |
1 files changed, 124 insertions, 48 deletions
diff --git a/clients/tests/test-client.py b/clients/tests/test-client.py index bafbc17bd6..03e19ac5fa 100755 --- a/clients/tests/test-client.py +++ b/clients/tests/test-client.py @@ -93,6 +93,7 @@ import shlex import re import dbus import time +import random import dbus.service import dbus.mainloop.glib @@ -171,21 +172,51 @@ class Util: return "'" + s.replace("'", "'\"'\"'") + "'" @staticmethod - def popen_wait(p, timeout = None): - # wait() has a timeout argument only since 3.3 + def popen_wait(p, timeout = 0): if Util.python_has_version(3, 3): - return p.wait(timeout) - if timeout is None: - return p.wait() + if timeout == 0: + return p.poll() + try: + return p.wait(timeout) + except subprocess.TimeoutExpired: + return None start = NM.utils_get_timestamp_msec() while True: if p.poll() is not None: return p.returncode - if start + (timeout * 1000) < NM.utils_get_timestamp_msec(): - raise Exception("timeout expired") + if timeout == 0 or start + (timeout * 1000) < NM.utils_get_timestamp_msec(): + return None time.sleep(0.05) @staticmethod + def random_job(jobs): + jobs = list(jobs) + l = len(jobs) + t = l * (l + 1) / 2 + while True: + # we return a random jobs from the list, but the indexes at the front of + # the list are more likely. The idea is, that those jobs were started first, + # and are expected to complete first. As we poll, we want to check more frequently + # on the elements at the beginning of the list... + # + # Let's assign probabilities with an arithmetic series. + # That is, if there are 16 jobs, then the first gets weighted + # with 16, the second with 15, then 14, and so on, until the + # last has weight 1. That means, the first element is 16 times + # more probable than the last. + # Element at idx (starting with 0) is picked with probability + # 1 / (l*(l+1)/2) * (l - idx) + r = random.random() * t + idx = 0 + rx = 0 + while True: + rx += (l - idx) + if rx >= r or idx == l - 1: + yield jobs[idx] + break + idx += 1 + + @staticmethod def iter_single(itr, min_num = 1, max_num = 1): itr = list(itr) n = 0 @@ -336,7 +367,7 @@ class NMStubServer: if (NM.utils_get_timestamp_msec() - start) >= 4000: p.stdin.close() p.kill() - Util.popen_wait(p, 1000) + Util.popen_wait(p, 1) raise Exception("after starting stub service the D-Bus name was not claimed in time") self._nmobj = nmobj @@ -344,14 +375,17 @@ class NMStubServer: self._p = p def shutdown(self): + conn = self._conn + p = self._p self._nmobj = None self._nmiface = None self._conn = None - self._p.stdin.close() - self._p.kill() - Util.popen_wait(self._p, 1000) self._p = None - if self._conn_get_main_object(self._conn) is not None: + p.stdin.close() + p.kill() + if Util.popen_wait(p, 1) is None: + raise Exception("Stub service did not exit in time") + if self._conn_get_main_object(conn) is not None: raise Exception("Stub service is not still here although it should shut down") class _MethodProxy: @@ -409,51 +443,64 @@ class AsyncProcess(): def __init__(self, args, env, - complete_cb): - self._args = args + complete_cb, + max_waittime_msec = 2000): + self._args = list(args) self._env = env self._complete_cb = complete_cb + self._max_waittime_msec = max_waittime_msec def start(self): if not hasattr(self, '_p'): + self._p_start_timestamp = NM.utils_get_timestamp_msec() self._p = subprocess.Popen(self._args, stdout = subprocess.PIPE, stderr = subprocess.PIPE, env = self._env) - def wait(self): + def _timeout_remaining_time(self): + # note that we call this during poll() and wait_and_complete(). + # we don't know the exact time when the process terminated, + # so this is only approximately correct, if we call poll/wait + # frequently. + # Worst case, we will think that the process did not time out, + # when in fact it was running longer than max-waittime. + return self._max_waittime_msec - (NM.utils_get_timestamp_msec() - self._p_start_timestamp) + def poll(self, timeout = 0): self.start() - error = False - try: - Util.popen_wait(self._p, 2000) - except Exception as e: - error = True - raise e - finally: - (returncode, stdout, stderr) = (self._p.returncode, - self._p.stdout.read(), - self._p.stderr.read()) + return_code = Util.popen_wait(self._p, timeout) + if return_code is not None \ + and self._timeout_remaining_time() <= 0: + raise Exception("process is still running after timeout: %s" % (' '.join(self._args))) + return return_code - self._p.stdout.close() - self._p.stderr.close() - self._p = None + def wait_and_complete(self): + self.start() - if error: - print(stdout) - print(stderr) + p = self._p + self._p = None - try: - self._complete_cb(self, returncode, stdout, stderr) - except Exception as e: - raise e + return_code = Util.popen_wait(p, max(0, self._timeout_remaining_time()) / 1000) + (stdout, stderr) = (p.stdout.read(), p.stderr.read()) + p.stdout.close() + p.stderr.close() + + if return_code is None: + print(stdout) + print(stderr) + raise Exception("process did not complete in time: %s" % (' '.join(self._args))) + + self._complete_cb(self, return_code, stdout, stderr) ############################################################################### class NmTestBase(unittest.TestCase): pass +MAX_JOBS = 15 + class TestNmcli(NmTestBase): @staticmethod @@ -638,6 +685,9 @@ class TestNmcli(NmTestBase): if expected_stderr is _DEFAULT_ARG: expected_stderr = None + results_idx = len(self._results) + self._results.append(None) + def complete_cb(async_job, returncode, stdout, @@ -699,11 +749,11 @@ class TestNmcli(NmTestBase): content = ('size: %s\n' % (len(content))).encode('utf8') + \ content - self._results.append({ + self._results[results_idx] = { 'test_name' : test_name, 'ignore_l10n_diff' : ignore_l10n_diff, 'content' : content, - }) + } async_job = AsyncProcess(args = args, env = env, @@ -711,20 +761,46 @@ class TestNmcli(NmTestBase): self._async_jobs.append(async_job) - if sync_barrier: - self.async_wait() - else: - self.async_start() + self.async_start(wait_all = sync_barrier) + + def async_start(self, wait_all = False): - def async_start(self): - # limit number parallel running jobs - for async_job in self._async_jobs[0:15]: - async_job.start() + while True: + + while True: + for async_job in list(self._async_jobs[0:MAX_JOBS]): + async_job.start() + # start up to MAX_JOBS jobs, but poll() and complete those + # that are already exited. Retry, until there are no more + # jobs to start, or until MAX_JOBS are running. + jobs_running = [] + for async_job in list(self._async_jobs[0:MAX_JOBS]): + if async_job.poll() is not None: + self._async_jobs.remove(async_job) + async_job.wait_and_complete() + continue + jobs_running.append(async_job) + if len(jobs_running) >= len(self._async_jobs): + break + if len(jobs_running) >= MAX_JOBS: + break + + if not jobs_running: + return + if not wait_all: + return + + # in a loop, indefinitely poll the running jobs until we find one that + # completes. Note that poll() itself will raise an exception if a + # jobs times out. + for async_job in Util.random_job(jobs_running): + if async_job.poll(timeout = 0.03) is not None: + self._async_jobs.remove(async_job) + async_job.wait_and_complete() + break def async_wait(self): - while self._async_jobs: - self.async_start() - self._async_jobs.pop(0).wait() + return self.async_start(wait_all = True) def _nm_test_pre(self): self._calling_num = {} |