diff options
Diffstat (limited to 'tests/http/testenv/curl.py')
-rw-r--r-- | tests/http/testenv/curl.py | 157 |
1 files changed, 109 insertions, 48 deletions
diff --git a/tests/http/testenv/curl.py b/tests/http/testenv/curl.py index 98c1bd4ab..a272dbf27 100644 --- a/tests/http/testenv/curl.py +++ b/tests/http/testenv/curl.py @@ -24,6 +24,7 @@ # ########################################################################### # +import pytest import json import logging import os @@ -31,7 +32,7 @@ import re import shutil import subprocess from datetime import timedelta, datetime -from typing import List, Optional, Dict +from typing import List, Optional, Dict, Union from urllib.parse import urlparse from .env import Env @@ -111,6 +112,10 @@ class ExecResult: return ''.join(self._stderr) @property + def trace_lines(self) -> List[str]: + return self._trace if self._trace else self._stderr + + @property def duration(self) -> timedelta: return self._duration @@ -159,53 +164,97 @@ class ExecResult: def add_assets(self, assets: List): self._assets.extend(assets) - def check_exit_code(self, code: int): - assert self.exit_code == code, \ - f'expected exit code {code}, '\ - f'got {self.exit_code}\n{self._dump_logs()}' - - def check_exit_code_not(self, code: int): - assert self.exit_code != code, \ - f'expected exit code other than {code}\n{self._dump_logs()}' - - def check_responses(self, count: int, exp_status: Optional[int] = None, - exp_exitcode: Optional[int] = None): - assert len(self.responses) == count, \ - f'response count: expected {count}, ' \ - f'got {len(self.responses)}\n{self._dump_logs()}' - if exp_status is not None: - for idx, x in enumerate(self.responses): - assert x['status'] == exp_status, \ - f'response #{idx} status: expected {exp_status},'\ - f'got {x["status"]}\n{self._dump_logs()}' - if exp_exitcode is not None: - for idx, x in enumerate(self.responses): - if 'exitcode' in x: - assert x['exitcode'] == 0, \ - f'response #{idx} exitcode: expected {exp_exitcode}, '\ - f'got {x["exitcode"]}\n{self._dump_logs()}' - if self.with_stats: - self.check_stats(count) + def check_exit_code(self, code: Union[int, bool]): + if code is True: + assert self.exit_code == 0, f'expected exit code {code}, '\ + f'got {self.exit_code}\n{self.dump_logs()}' + elif code is False: + assert self.exit_code != 0, f'expected exit code {code}, '\ + f'got {self.exit_code}\n{self.dump_logs()}' + else: + assert self.exit_code == code, f'expected exit code {code}, '\ + f'got {self.exit_code}\n{self.dump_logs()}' + + def check_response(self, http_status: Optional[int] = 200, + count: Optional[int] = 1, + protocol: Optional[str] = None, + exitcode: Optional[int] = 0, + connect_count: Optional[int] = None): + if exitcode: + self.check_exit_code(exitcode) + if self.with_stats and isinstance(exitcode, int): + for idx, x in enumerate(self.stats): + if 'exitcode' in x: + assert int(x['exitcode']) == exitcode, \ + f'response #{idx} exitcode: expected {exitcode}, '\ + f'got {x["exitcode"]}\n{self.dump_logs()}' - def check_stats(self, count: int, exp_status: Optional[int] = None, - exp_exitcode: Optional[int] = None): + if self.with_stats: + assert len(self.stats) == count, \ + f'response count: expected {count}, ' \ + f'got {len(self.stats)}\n{self.dump_logs()}' + else: + assert len(self.responses) == count, \ + f'response count: expected {count}, ' \ + f'got {len(self.responses)}\n{self.dump_logs()}' + if http_status is not None: + if self.with_stats: + for idx, x in enumerate(self.stats): + assert 'http_code' in x, \ + f'response #{idx} reports no http_code\n{self.dump_logs()}' + assert x['http_code'] == http_status, \ + f'response #{idx} http_code: expected {http_status}, '\ + f'got {x["http_code"]}\n{self.dump_logs()}' + else: + for idx, x in enumerate(self.responses): + assert x['status'] == http_status, \ + f'response #{idx} status: expected {http_status},'\ + f'got {x["status"]}\n{self.dump_logs()}' + if protocol is not None: + if self.with_stats: + http_version = None + if protocol == 'HTTP/1.1': + http_version = '1.1' + elif protocol == 'HTTP/2': + http_version = '2' + elif protocol == 'HTTP/3': + http_version = '3' + if http_version is not None: + for idx, x in enumerate(self.stats): + assert x['http_version'] == http_version, \ + f'response #{idx} protocol: expected http/{http_version},' \ + f'got version {x["http_version"]}\n{self.dump_logs()}' + else: + for idx, x in enumerate(self.responses): + assert x['protocol'] == protocol, \ + f'response #{idx} protocol: expected {protocol},'\ + f'got {x["protocol"]}\n{self.dump_logs()}' + if connect_count is not None: + assert self.total_connects == connect_count, \ + f'expected {connect_count}, but {self.total_connects} '\ + f'were made\n{self.dump_logs()}' + + def check_stats(self, count: int, http_status: Optional[int] = None, + exitcode: Optional[int] = None): + if exitcode is None: + self.check_exit_code(0) assert len(self.stats) == count, \ - f'stats count: expected {count}, got {len(self.stats)}\n{self._dump_logs()}' - if exp_status is not None: + f'stats count: expected {count}, got {len(self.stats)}\n{self.dump_logs()}' + if http_status is not None: for idx, x in enumerate(self.stats): assert 'http_code' in x, \ - f'status #{idx} reports no http_code\n{self._dump_logs()}' - assert x['http_code'] == exp_status, \ - f'status #{idx} http_code: expected {exp_status}, '\ - f'got {x["http_code"]}\n{self._dump_logs()}' - if exp_exitcode is not None: + f'status #{idx} reports no http_code\n{self.dump_logs()}' + assert x['http_code'] == http_status, \ + f'status #{idx} http_code: expected {http_status}, '\ + f'got {x["http_code"]}\n{self.dump_logs()}' + if exitcode is not None: for idx, x in enumerate(self.stats): if 'exitcode' in x: assert x['exitcode'] == 0, \ - f'status #{idx} exitcode: expected {exp_exitcode}, '\ - f'got {x["exitcode"]}\n{self._dump_logs()}' + f'status #{idx} exitcode: expected {exitcode}, '\ + f'got {x["exitcode"]}\n{self.dump_logs()}' - def _dump_logs(self): + def dump_logs(self): lines = [] lines.append('>>--stdout ----------------------------------------------\n') lines.extend(self._stdout) @@ -252,6 +301,10 @@ class CurlClient: def download_file(self, i: int) -> str: return os.path.join(self.run_dir, f'download_{i}.data') + @property + def trace_file(self) -> str: + return self._tracefile + def _rmf(self, path): if os.path.exists(path): return os.remove(path) @@ -272,6 +325,7 @@ class CurlClient: with_stats: bool = True, with_headers: bool = False, no_save: bool = False, + with_trace: bool = False, extra_args: List[str] = None): if extra_args is None: extra_args = [] @@ -292,12 +346,14 @@ class CurlClient: ]) return self._raw(urls, alpn_proto=alpn_proto, options=extra_args, with_stats=with_stats, - with_headers=with_headers) + with_headers=with_headers, + with_trace=with_trace) def http_upload(self, urls: List[str], data: str, alpn_proto: Optional[str] = None, with_stats: bool = True, with_headers: bool = False, + with_trace: bool = False, extra_args: Optional[List[str]] = None): if extra_args is None: extra_args = [] @@ -310,12 +366,14 @@ class CurlClient: ]) return self._raw(urls, alpn_proto=alpn_proto, options=extra_args, with_stats=with_stats, - with_headers=with_headers) + with_headers=with_headers, + with_trace=with_trace) def http_put(self, urls: List[str], data=None, fdata=None, alpn_proto: Optional[str] = None, with_stats: bool = True, with_headers: bool = False, + with_trace: bool = False, extra_args: Optional[List[str]] = None): if extra_args is None: extra_args = [] @@ -333,7 +391,8 @@ class CurlClient: return self._raw(urls, intext=data, alpn_proto=alpn_proto, options=extra_args, with_stats=with_stats, - with_headers=with_headers) + with_headers=with_headers, + with_trace=with_trace) def response_file(self, idx: int): return os.path.join(self._run_dir, f'download_{idx}.data') @@ -379,15 +438,16 @@ class CurlClient: duration=datetime.now() - start, with_stats=with_stats) - def _raw(self, urls, intext='', timeout=10, options=None, insecure=False, + def _raw(self, urls, intext='', timeout=None, options=None, insecure=False, alpn_proto: Optional[str] = None, force_resolve=True, with_stats=False, - with_headers=True): + with_headers=True, + with_trace=False): args = self._complete_args( urls=urls, timeout=timeout, options=options, insecure=insecure, alpn_proto=alpn_proto, force_resolve=force_resolve, - with_headers=with_headers) + with_headers=with_headers, with_trace=with_trace) r = self._run(args, intext=intext, with_stats=with_stats) if r.exit_code == 0 and with_headers: self._parse_headerfile(self._headerfile, r=r) @@ -398,14 +458,15 @@ class CurlClient: def _complete_args(self, urls, timeout=None, options=None, insecure=False, force_resolve=True, alpn_proto: Optional[str] = None, - with_headers: bool = True): + with_headers: bool = True, + with_trace: bool = False): if not isinstance(urls, list): urls = [urls] args = [self._curl, "-s", "--path-as-is"] if with_headers: args.extend(["-D", self._headerfile]) - if self.env.verbose > 2: + if with_trace or self.env.verbose > 2: args.extend(['--trace', self._tracefile, '--trace-time']) elif self.env.verbose > 1: args.extend(['--trace', self._tracefile]) |