summaryrefslogtreecommitdiff
path: root/tests/http/testenv/curl.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/http/testenv/curl.py')
-rw-r--r--tests/http/testenv/curl.py157
1 files changed, 109 insertions, 48 deletions
diff --git a/tests/http/testenv/curl.py b/tests/http/testenv/curl.py
index 98c1bd4ab..a272dbf27 100644
--- a/tests/http/testenv/curl.py
+++ b/tests/http/testenv/curl.py
@@ -24,6 +24,7 @@
#
###########################################################################
#
+import pytest
import json
import logging
import os
@@ -31,7 +32,7 @@ import re
import shutil
import subprocess
from datetime import timedelta, datetime
-from typing import List, Optional, Dict
+from typing import List, Optional, Dict, Union
from urllib.parse import urlparse
from .env import Env
@@ -111,6 +112,10 @@ class ExecResult:
return ''.join(self._stderr)
@property
+ def trace_lines(self) -> List[str]:
+ return self._trace if self._trace else self._stderr
+
+ @property
def duration(self) -> timedelta:
return self._duration
@@ -159,53 +164,97 @@ class ExecResult:
def add_assets(self, assets: List):
self._assets.extend(assets)
- def check_exit_code(self, code: int):
- assert self.exit_code == code, \
- f'expected exit code {code}, '\
- f'got {self.exit_code}\n{self._dump_logs()}'
-
- def check_exit_code_not(self, code: int):
- assert self.exit_code != code, \
- f'expected exit code other than {code}\n{self._dump_logs()}'
-
- def check_responses(self, count: int, exp_status: Optional[int] = None,
- exp_exitcode: Optional[int] = None):
- assert len(self.responses) == count, \
- f'response count: expected {count}, ' \
- f'got {len(self.responses)}\n{self._dump_logs()}'
- if exp_status is not None:
- for idx, x in enumerate(self.responses):
- assert x['status'] == exp_status, \
- f'response #{idx} status: expected {exp_status},'\
- f'got {x["status"]}\n{self._dump_logs()}'
- if exp_exitcode is not None:
- for idx, x in enumerate(self.responses):
- if 'exitcode' in x:
- assert x['exitcode'] == 0, \
- f'response #{idx} exitcode: expected {exp_exitcode}, '\
- f'got {x["exitcode"]}\n{self._dump_logs()}'
- if self.with_stats:
- self.check_stats(count)
+ def check_exit_code(self, code: Union[int, bool]):
+ if code is True:
+ assert self.exit_code == 0, f'expected exit code {code}, '\
+ f'got {self.exit_code}\n{self.dump_logs()}'
+ elif code is False:
+ assert self.exit_code != 0, f'expected exit code {code}, '\
+ f'got {self.exit_code}\n{self.dump_logs()}'
+ else:
+ assert self.exit_code == code, f'expected exit code {code}, '\
+ f'got {self.exit_code}\n{self.dump_logs()}'
+
+ def check_response(self, http_status: Optional[int] = 200,
+ count: Optional[int] = 1,
+ protocol: Optional[str] = None,
+ exitcode: Optional[int] = 0,
+ connect_count: Optional[int] = None):
+ if exitcode:
+ self.check_exit_code(exitcode)
+ if self.with_stats and isinstance(exitcode, int):
+ for idx, x in enumerate(self.stats):
+ if 'exitcode' in x:
+ assert int(x['exitcode']) == exitcode, \
+ f'response #{idx} exitcode: expected {exitcode}, '\
+ f'got {x["exitcode"]}\n{self.dump_logs()}'
- def check_stats(self, count: int, exp_status: Optional[int] = None,
- exp_exitcode: Optional[int] = None):
+ if self.with_stats:
+ assert len(self.stats) == count, \
+ f'response count: expected {count}, ' \
+ f'got {len(self.stats)}\n{self.dump_logs()}'
+ else:
+ assert len(self.responses) == count, \
+ f'response count: expected {count}, ' \
+ f'got {len(self.responses)}\n{self.dump_logs()}'
+ if http_status is not None:
+ if self.with_stats:
+ for idx, x in enumerate(self.stats):
+ assert 'http_code' in x, \
+ f'response #{idx} reports no http_code\n{self.dump_logs()}'
+ assert x['http_code'] == http_status, \
+ f'response #{idx} http_code: expected {http_status}, '\
+ f'got {x["http_code"]}\n{self.dump_logs()}'
+ else:
+ for idx, x in enumerate(self.responses):
+ assert x['status'] == http_status, \
+ f'response #{idx} status: expected {http_status},'\
+ f'got {x["status"]}\n{self.dump_logs()}'
+ if protocol is not None:
+ if self.with_stats:
+ http_version = None
+ if protocol == 'HTTP/1.1':
+ http_version = '1.1'
+ elif protocol == 'HTTP/2':
+ http_version = '2'
+ elif protocol == 'HTTP/3':
+ http_version = '3'
+ if http_version is not None:
+ for idx, x in enumerate(self.stats):
+ assert x['http_version'] == http_version, \
+ f'response #{idx} protocol: expected http/{http_version},' \
+ f'got version {x["http_version"]}\n{self.dump_logs()}'
+ else:
+ for idx, x in enumerate(self.responses):
+ assert x['protocol'] == protocol, \
+ f'response #{idx} protocol: expected {protocol},'\
+ f'got {x["protocol"]}\n{self.dump_logs()}'
+ if connect_count is not None:
+ assert self.total_connects == connect_count, \
+ f'expected {connect_count}, but {self.total_connects} '\
+ f'were made\n{self.dump_logs()}'
+
+ def check_stats(self, count: int, http_status: Optional[int] = None,
+ exitcode: Optional[int] = None):
+ if exitcode is None:
+ self.check_exit_code(0)
assert len(self.stats) == count, \
- f'stats count: expected {count}, got {len(self.stats)}\n{self._dump_logs()}'
- if exp_status is not None:
+ f'stats count: expected {count}, got {len(self.stats)}\n{self.dump_logs()}'
+ if http_status is not None:
for idx, x in enumerate(self.stats):
assert 'http_code' in x, \
- f'status #{idx} reports no http_code\n{self._dump_logs()}'
- assert x['http_code'] == exp_status, \
- f'status #{idx} http_code: expected {exp_status}, '\
- f'got {x["http_code"]}\n{self._dump_logs()}'
- if exp_exitcode is not None:
+ f'status #{idx} reports no http_code\n{self.dump_logs()}'
+ assert x['http_code'] == http_status, \
+ f'status #{idx} http_code: expected {http_status}, '\
+ f'got {x["http_code"]}\n{self.dump_logs()}'
+ if exitcode is not None:
for idx, x in enumerate(self.stats):
if 'exitcode' in x:
assert x['exitcode'] == 0, \
- f'status #{idx} exitcode: expected {exp_exitcode}, '\
- f'got {x["exitcode"]}\n{self._dump_logs()}'
+ f'status #{idx} exitcode: expected {exitcode}, '\
+ f'got {x["exitcode"]}\n{self.dump_logs()}'
- def _dump_logs(self):
+ def dump_logs(self):
lines = []
lines.append('>>--stdout ----------------------------------------------\n')
lines.extend(self._stdout)
@@ -252,6 +301,10 @@ class CurlClient:
def download_file(self, i: int) -> str:
return os.path.join(self.run_dir, f'download_{i}.data')
+ @property
+ def trace_file(self) -> str:
+ return self._tracefile
+
def _rmf(self, path):
if os.path.exists(path):
return os.remove(path)
@@ -272,6 +325,7 @@ class CurlClient:
with_stats: bool = True,
with_headers: bool = False,
no_save: bool = False,
+ with_trace: bool = False,
extra_args: List[str] = None):
if extra_args is None:
extra_args = []
@@ -292,12 +346,14 @@ class CurlClient:
])
return self._raw(urls, alpn_proto=alpn_proto, options=extra_args,
with_stats=with_stats,
- with_headers=with_headers)
+ with_headers=with_headers,
+ with_trace=with_trace)
def http_upload(self, urls: List[str], data: str,
alpn_proto: Optional[str] = None,
with_stats: bool = True,
with_headers: bool = False,
+ with_trace: bool = False,
extra_args: Optional[List[str]] = None):
if extra_args is None:
extra_args = []
@@ -310,12 +366,14 @@ class CurlClient:
])
return self._raw(urls, alpn_proto=alpn_proto, options=extra_args,
with_stats=with_stats,
- with_headers=with_headers)
+ with_headers=with_headers,
+ with_trace=with_trace)
def http_put(self, urls: List[str], data=None, fdata=None,
alpn_proto: Optional[str] = None,
with_stats: bool = True,
with_headers: bool = False,
+ with_trace: bool = False,
extra_args: Optional[List[str]] = None):
if extra_args is None:
extra_args = []
@@ -333,7 +391,8 @@ class CurlClient:
return self._raw(urls, intext=data,
alpn_proto=alpn_proto, options=extra_args,
with_stats=with_stats,
- with_headers=with_headers)
+ with_headers=with_headers,
+ with_trace=with_trace)
def response_file(self, idx: int):
return os.path.join(self._run_dir, f'download_{idx}.data')
@@ -379,15 +438,16 @@ class CurlClient:
duration=datetime.now() - start,
with_stats=with_stats)
- def _raw(self, urls, intext='', timeout=10, options=None, insecure=False,
+ def _raw(self, urls, intext='', timeout=None, options=None, insecure=False,
alpn_proto: Optional[str] = None,
force_resolve=True,
with_stats=False,
- with_headers=True):
+ with_headers=True,
+ with_trace=False):
args = self._complete_args(
urls=urls, timeout=timeout, options=options, insecure=insecure,
alpn_proto=alpn_proto, force_resolve=force_resolve,
- with_headers=with_headers)
+ with_headers=with_headers, with_trace=with_trace)
r = self._run(args, intext=intext, with_stats=with_stats)
if r.exit_code == 0 and with_headers:
self._parse_headerfile(self._headerfile, r=r)
@@ -398,14 +458,15 @@ class CurlClient:
def _complete_args(self, urls, timeout=None, options=None,
insecure=False, force_resolve=True,
alpn_proto: Optional[str] = None,
- with_headers: bool = True):
+ with_headers: bool = True,
+ with_trace: bool = False):
if not isinstance(urls, list):
urls = [urls]
args = [self._curl, "-s", "--path-as-is"]
if with_headers:
args.extend(["-D", self._headerfile])
- if self.env.verbose > 2:
+ if with_trace or self.env.verbose > 2:
args.extend(['--trace', self._tracefile, '--trace-time'])
elif self.env.verbose > 1:
args.extend(['--trace', self._tracefile])