diff options
author | Ben Firshman <ben@firshman.co.uk> | 2016-09-05 16:00:14 +0200 |
---|---|---|
committer | Ben Firshman <ben@firshman.co.uk> | 2016-11-22 17:03:03 +0000 |
commit | 9daa320454ec0c19035a09b436a41e13c3fb71ad (patch) | |
tree | 2e93cbf590547f6de6faeeca08167da48b7c6cf4 /tests/integration/api_container_test.py | |
parent | c7903f084eb3a7525e21974390b48e2a0dc03732 (diff) | |
download | docker-py-9daa320454ec0c19035a09b436a41e13c3fb71ad.tar.gz |
Rename Client to APIClient
Signed-off-by: Ben Firshman <ben@firshman.co.uk>
Diffstat (limited to 'tests/integration/api_container_test.py')
-rw-r--r-- | tests/integration/api_container_test.py | 1186 |
1 files changed, 1186 insertions, 0 deletions
diff --git a/tests/integration/api_container_test.py b/tests/integration/api_container_test.py new file mode 100644 index 0000000..71e192f --- /dev/null +++ b/tests/integration/api_container_test.py @@ -0,0 +1,1186 @@ +import os +import signal +import tempfile + +import docker +from docker.constants import IS_WINDOWS_PLATFORM +from docker.utils.socket import next_frame_size +from docker.utils.socket import read_exactly +import pytest +import six + +from ..helpers import requires_api_version +from .. import helpers +from .base import BaseIntegrationTest, BUSYBOX + + +class ListContainersTest(BaseIntegrationTest): + def test_list_containers(self): + res0 = self.client.containers(all=True) + size = len(res0) + res1 = self.client.create_container(BUSYBOX, 'true') + self.assertIn('Id', res1) + self.client.start(res1['Id']) + self.tmp_containers.append(res1['Id']) + res2 = self.client.containers(all=True) + self.assertEqual(size + 1, len(res2)) + retrieved = [x for x in res2 if x['Id'].startswith(res1['Id'])] + self.assertEqual(len(retrieved), 1) + retrieved = retrieved[0] + self.assertIn('Command', retrieved) + self.assertEqual(retrieved['Command'], six.text_type('true')) + self.assertIn('Image', retrieved) + self.assertRegex(retrieved['Image'], r'busybox:.*') + self.assertIn('Status', retrieved) + + +class CreateContainerTest(BaseIntegrationTest): + + def test_create(self): + res = self.client.create_container(BUSYBOX, 'true') + self.assertIn('Id', res) + self.tmp_containers.append(res['Id']) + + def test_create_with_host_pid_mode(self): + ctnr = self.client.create_container( + BUSYBOX, 'true', host_config=self.client.create_host_config( + pid_mode='host', network_mode='none' + ) + ) + self.assertIn('Id', ctnr) + self.tmp_containers.append(ctnr['Id']) + self.client.start(ctnr) + inspect = self.client.inspect_container(ctnr) + self.assertIn('HostConfig', inspect) + host_config = inspect['HostConfig'] + self.assertIn('PidMode', host_config) + self.assertEqual(host_config['PidMode'], 'host') + + def test_create_with_links(self): + res0 = self.client.create_container( + BUSYBOX, 'cat', + detach=True, stdin_open=True, + environment={'FOO': '1'}) + + container1_id = res0['Id'] + self.tmp_containers.append(container1_id) + + self.client.start(container1_id) + + res1 = self.client.create_container( + BUSYBOX, 'cat', + detach=True, stdin_open=True, + environment={'FOO': '1'}) + + container2_id = res1['Id'] + self.tmp_containers.append(container2_id) + + self.client.start(container2_id) + + # we don't want the first / + link_path1 = self.client.inspect_container(container1_id)['Name'][1:] + link_alias1 = 'mylink1' + link_env_prefix1 = link_alias1.upper() + + link_path2 = self.client.inspect_container(container2_id)['Name'][1:] + link_alias2 = 'mylink2' + link_env_prefix2 = link_alias2.upper() + + res2 = self.client.create_container( + BUSYBOX, 'env', host_config=self.client.create_host_config( + links={link_path1: link_alias1, link_path2: link_alias2}, + network_mode='bridge' + ) + ) + container3_id = res2['Id'] + self.tmp_containers.append(container3_id) + self.client.start(container3_id) + self.assertEqual(self.client.wait(container3_id), 0) + + logs = self.client.logs(container3_id) + if six.PY3: + logs = logs.decode('utf-8') + self.assertIn('{0}_NAME='.format(link_env_prefix1), logs) + self.assertIn('{0}_ENV_FOO=1'.format(link_env_prefix1), logs) + self.assertIn('{0}_NAME='.format(link_env_prefix2), logs) + self.assertIn('{0}_ENV_FOO=1'.format(link_env_prefix2), logs) + + def test_create_with_restart_policy(self): + container = self.client.create_container( + BUSYBOX, ['sleep', '2'], + host_config=self.client.create_host_config( + restart_policy={"Name": "always", "MaximumRetryCount": 0}, + network_mode='none' + ) + ) + id = container['Id'] + self.client.start(id) + self.client.wait(id) + with self.assertRaises(docker.errors.APIError) as exc: + self.client.remove_container(id) + err = exc.exception.explanation + self.assertIn( + 'You cannot remove a running container', err + ) + self.client.remove_container(id, force=True) + + def test_create_container_with_volumes_from(self): + vol_names = ['foobar_vol0', 'foobar_vol1'] + + res0 = self.client.create_container( + BUSYBOX, 'true', name=vol_names[0] + ) + container1_id = res0['Id'] + self.tmp_containers.append(container1_id) + self.client.start(container1_id) + + res1 = self.client.create_container( + BUSYBOX, 'true', name=vol_names[1] + ) + container2_id = res1['Id'] + self.tmp_containers.append(container2_id) + self.client.start(container2_id) + with self.assertRaises(docker.errors.DockerException): + self.client.create_container( + BUSYBOX, 'cat', detach=True, stdin_open=True, + volumes_from=vol_names + ) + res2 = self.client.create_container( + BUSYBOX, 'cat', detach=True, stdin_open=True, + host_config=self.client.create_host_config( + volumes_from=vol_names, network_mode='none' + ) + ) + container3_id = res2['Id'] + self.tmp_containers.append(container3_id) + self.client.start(container3_id) + + info = self.client.inspect_container(res2['Id']) + self.assertCountEqual(info['HostConfig']['VolumesFrom'], vol_names) + + def create_container_readonly_fs(self): + ctnr = self.client.create_container( + BUSYBOX, ['mkdir', '/shrine'], + host_config=self.client.create_host_config( + read_only=True, network_mode='none' + ) + ) + self.assertIn('Id', ctnr) + self.tmp_containers.append(ctnr['Id']) + self.client.start(ctnr) + res = self.client.wait(ctnr) + self.assertNotEqual(res, 0) + + def create_container_with_name(self): + res = self.client.create_container(BUSYBOX, 'true', name='foobar') + self.assertIn('Id', res) + self.tmp_containers.append(res['Id']) + inspect = self.client.inspect_container(res['Id']) + self.assertIn('Name', inspect) + self.assertEqual('/foobar', inspect['Name']) + + def create_container_privileged(self): + res = self.client.create_container( + BUSYBOX, 'true', host_config=self.client.create_host_config( + privileged=True, network_mode='none' + ) + ) + self.assertIn('Id', res) + self.tmp_containers.append(res['Id']) + self.client.start(res['Id']) + inspect = self.client.inspect_container(res['Id']) + self.assertIn('Config', inspect) + self.assertIn('Id', inspect) + self.assertTrue(inspect['Id'].startswith(res['Id'])) + self.assertIn('Image', inspect) + self.assertIn('State', inspect) + self.assertIn('Running', inspect['State']) + if not inspect['State']['Running']: + self.assertIn('ExitCode', inspect['State']) + self.assertEqual(inspect['State']['ExitCode'], 0) + # Since Nov 2013, the Privileged flag is no longer part of the + # container's config exposed via the API (safety concerns?). + # + if 'Privileged' in inspect['Config']: + self.assertEqual(inspect['Config']['Privileged'], True) + + def test_create_with_mac_address(self): + mac_address_expected = "02:42:ac:11:00:0a" + container = self.client.create_container( + BUSYBOX, ['sleep', '60'], mac_address=mac_address_expected) + + id = container['Id'] + + self.client.start(container) + res = self.client.inspect_container(container['Id']) + self.assertEqual(mac_address_expected, + res['NetworkSettings']['MacAddress']) + + self.client.kill(id) + + @requires_api_version('1.20') + def test_group_id_ints(self): + container = self.client.create_container( + BUSYBOX, 'id -G', + host_config=self.client.create_host_config(group_add=[1000, 1001]) + ) + self.tmp_containers.append(container) + self.client.start(container) + self.client.wait(container) + + logs = self.client.logs(container) + if six.PY3: + logs = logs.decode('utf-8') + groups = logs.strip().split(' ') + self.assertIn('1000', groups) + self.assertIn('1001', groups) + + @requires_api_version('1.20') + def test_group_id_strings(self): + container = self.client.create_container( + BUSYBOX, 'id -G', host_config=self.client.create_host_config( + group_add=['1000', '1001'] + ) + ) + self.tmp_containers.append(container) + self.client.start(container) + self.client.wait(container) + + logs = self.client.logs(container) + if six.PY3: + logs = logs.decode('utf-8') + + groups = logs.strip().split(' ') + self.assertIn('1000', groups) + self.assertIn('1001', groups) + + def test_valid_log_driver_and_log_opt(self): + log_config = docker.utils.LogConfig( + type='json-file', + config={'max-file': '100'} + ) + + container = self.client.create_container( + BUSYBOX, ['true'], + host_config=self.client.create_host_config(log_config=log_config) + ) + self.tmp_containers.append(container['Id']) + self.client.start(container) + + info = self.client.inspect_container(container) + container_log_config = info['HostConfig']['LogConfig'] + + self.assertEqual(container_log_config['Type'], log_config.type) + self.assertEqual(container_log_config['Config'], log_config.config) + + def test_invalid_log_driver_raises_exception(self): + log_config = docker.utils.LogConfig( + type='asdf-nope', + config={} + ) + + expected_msg = "logger: no log driver named 'asdf-nope' is registered" + with pytest.raises(docker.errors.APIError) as excinfo: + # raises an internal server error 500 + container = self.client.create_container( + BUSYBOX, ['true'], host_config=self.client.create_host_config( + log_config=log_config + ) + ) + self.client.start(container) + + assert excinfo.value.explanation == expected_msg + + def test_valid_no_log_driver_specified(self): + log_config = docker.utils.LogConfig( + type="", + config={'max-file': '100'} + ) + + container = self.client.create_container( + BUSYBOX, ['true'], + host_config=self.client.create_host_config(log_config=log_config) + ) + self.tmp_containers.append(container['Id']) + self.client.start(container) + + info = self.client.inspect_container(container) + container_log_config = info['HostConfig']['LogConfig'] + + self.assertEqual(container_log_config['Type'], "json-file") + self.assertEqual(container_log_config['Config'], log_config.config) + + def test_valid_no_config_specified(self): + log_config = docker.utils.LogConfig( + type="json-file", + config=None + ) + + container = self.client.create_container( + BUSYBOX, ['true'], + host_config=self.client.create_host_config(log_config=log_config) + ) + self.tmp_containers.append(container['Id']) + self.client.start(container) + + info = self.client.inspect_container(container) + container_log_config = info['HostConfig']['LogConfig'] + + self.assertEqual(container_log_config['Type'], "json-file") + self.assertEqual(container_log_config['Config'], {}) + + def test_create_with_memory_constraints_with_str(self): + ctnr = self.client.create_container( + BUSYBOX, 'true', + host_config=self.client.create_host_config( + memswap_limit='1G', + mem_limit='700M' + ) + ) + self.assertIn('Id', ctnr) + self.tmp_containers.append(ctnr['Id']) + self.client.start(ctnr) + inspect = self.client.inspect_container(ctnr) + + self.assertIn('HostConfig', inspect) + host_config = inspect['HostConfig'] + for limit in ['Memory', 'MemorySwap']: + self.assertIn(limit, host_config) + + def test_create_with_memory_constraints_with_int(self): + ctnr = self.client.create_container( + BUSYBOX, 'true', + host_config=self.client.create_host_config(mem_swappiness=40) + ) + self.assertIn('Id', ctnr) + self.tmp_containers.append(ctnr['Id']) + self.client.start(ctnr) + inspect = self.client.inspect_container(ctnr) + + self.assertIn('HostConfig', inspect) + host_config = inspect['HostConfig'] + self.assertIn('MemorySwappiness', host_config) + + def test_create_host_config_exception_raising(self): + self.assertRaises(TypeError, + self.client.create_host_config, mem_swappiness='40') + + self.assertRaises(ValueError, + self.client.create_host_config, pid_mode='40') + + def test_create_with_environment_variable_no_value(self): + container = self.client.create_container( + BUSYBOX, + ['echo'], + environment={'Foo': None, 'Other': 'one', 'Blank': ''}, + ) + self.tmp_containers.append(container['Id']) + config = self.client.inspect_container(container['Id']) + assert ( + sorted(config['Config']['Env']) == + sorted(['Foo', 'Other=one', 'Blank=']) + ) + + @requires_api_version('1.22') + def test_create_with_tmpfs(self): + tmpfs = { + '/tmp1': 'size=3M' + } + + container = self.client.create_container( + BUSYBOX, + ['echo'], + host_config=self.client.create_host_config( + tmpfs=tmpfs)) + + self.tmp_containers.append(container['Id']) + config = self.client.inspect_container(container) + assert config['HostConfig']['Tmpfs'] == tmpfs + + @requires_api_version('1.24') + def test_create_with_isolation(self): + container = self.client.create_container( + BUSYBOX, ['echo'], host_config=self.client.create_host_config( + isolation='default' + ) + ) + self.tmp_containers.append(container['Id']) + config = self.client.inspect_container(container) + assert config['HostConfig']['Isolation'] == 'default' + + +class VolumeBindTest(BaseIntegrationTest): + def setUp(self): + super(VolumeBindTest, self).setUp() + + self.mount_dest = '/mnt' + + # Get a random pathname - we don't need it to exist locally + self.mount_origin = tempfile.mkdtemp() + self.filename = 'shared.txt' + + self.run_with_volume( + False, + BUSYBOX, + ['touch', os.path.join(self.mount_dest, self.filename)], + ) + + @pytest.mark.xfail( + IS_WINDOWS_PLATFORM, reason='Test not designed for Windows platform' + ) + def test_create_with_binds_rw(self): + + container = self.run_with_volume( + False, + BUSYBOX, + ['ls', self.mount_dest], + ) + logs = self.client.logs(container) + + if six.PY3: + logs = logs.decode('utf-8') + self.assertIn(self.filename, logs) + inspect_data = self.client.inspect_container(container) + self.check_container_data(inspect_data, True) + + @pytest.mark.xfail( + IS_WINDOWS_PLATFORM, reason='Test not designed for Windows platform' + ) + def test_create_with_binds_ro(self): + self.run_with_volume( + False, + BUSYBOX, + ['touch', os.path.join(self.mount_dest, self.filename)], + ) + container = self.run_with_volume( + True, + BUSYBOX, + ['ls', self.mount_dest], + ) + logs = self.client.logs(container) + + if six.PY3: + logs = logs.decode('utf-8') + self.assertIn(self.filename, logs) + + inspect_data = self.client.inspect_container(container) + self.check_container_data(inspect_data, False) + + def check_container_data(self, inspect_data, rw): + if docker.utils.compare_version('1.20', self.client._version) < 0: + self.assertIn('Volumes', inspect_data) + self.assertIn(self.mount_dest, inspect_data['Volumes']) + self.assertEqual( + self.mount_origin, inspect_data['Volumes'][self.mount_dest] + ) + self.assertIn(self.mount_dest, inspect_data['VolumesRW']) + self.assertFalse(inspect_data['VolumesRW'][self.mount_dest]) + else: + self.assertIn('Mounts', inspect_data) + filtered = list(filter( + lambda x: x['Destination'] == self.mount_dest, + inspect_data['Mounts'] + )) + self.assertEqual(len(filtered), 1) + mount_data = filtered[0] + self.assertEqual(mount_data['Source'], self.mount_origin) + self.assertEqual(mount_data['RW'], rw) + + def run_with_volume(self, ro, *args, **kwargs): + return self.run_container( + *args, + volumes={self.mount_dest: {}}, + host_config=self.client.create_host_config( + binds={ + self.mount_origin: { + 'bind': self.mount_dest, + 'ro': ro, + }, + }, + network_mode='none' + ), + **kwargs + ) + + +@requires_api_version('1.20') +class ArchiveTest(BaseIntegrationTest): + def test_get_file_archive_from_container(self): + data = 'The Maid and the Pocket Watch of Blood' + ctnr = self.client.create_container( + BUSYBOX, 'sh -c "echo {0} > /vol1/data.txt"'.format(data), + volumes=['/vol1'] + ) + self.tmp_containers.append(ctnr) + self.client.start(ctnr) + self.client.wait(ctnr) + with tempfile.NamedTemporaryFile() as destination: + strm, stat = self.client.get_archive(ctnr, '/vol1/data.txt') + for d in strm: + destination.write(d) + destination.seek(0) + retrieved_data = helpers.untar_file(destination, 'data.txt') + if six.PY3: + retrieved_data = retrieved_data.decode('utf-8') + self.assertEqual(data, retrieved_data.strip()) + + def test_get_file_stat_from_container(self): + data = 'The Maid and the Pocket Watch of Blood' + ctnr = self.client.create_container( + BUSYBOX, 'sh -c "echo -n {0} > /vol1/data.txt"'.format(data), + volumes=['/vol1'] + ) + self.tmp_containers.append(ctnr) + self.client.start(ctnr) + self.client.wait(ctnr) + strm, stat = self.client.get_archive(ctnr, '/vol1/data.txt') + self.assertIn('name', stat) + self.assertEqual(stat['name'], 'data.txt') + self.assertIn('size', stat) + self.assertEqual(stat['size'], len(data)) + + def test_copy_file_to_container(self): + data = b'Deaf To All But The Song' + with tempfile.NamedTemporaryFile(delete=False) as test_file: + test_file.write(data) + test_file.seek(0) + ctnr = self.client.create_container( + BUSYBOX, + 'cat {0}'.format( + os.path.join('/vol1/', os.path.basename(test_file.name)) + ), + volumes=['/vol1'] + ) + self.tmp_containers.append(ctnr) + with helpers.simple_tar(test_file.name) as test_tar: + self.client.put_archive(ctnr, '/vol1', test_tar) + self.client.start(ctnr) + self.client.wait(ctnr) + logs = self.client.logs(ctnr) + if six.PY3: + logs = logs.decode('utf-8') + data = data.decode('utf-8') + self.assertEqual(logs.strip(), data) + + def test_copy_directory_to_container(self): + files = ['a.py', 'b.py', 'foo/b.py'] + dirs = ['foo', 'bar'] + base = helpers.make_tree(dirs, files) + ctnr = self.client.create_container( + BUSYBOX, 'ls -p /vol1', volumes=['/vol1'] + ) + self.tmp_containers.append(ctnr) + with docker.utils.tar(base) as test_tar: + self.client.put_archive(ctnr, '/vol1', test_tar) + self.client.start(ctnr) + self.client.wait(ctnr) + logs = self.client.logs(ctnr) + if six.PY3: + logs = logs.decode('utf-8') + results = logs.strip().split() + self.assertIn('a.py', results) + self.assertIn('b.py', results) + self.assertIn('foo/', results) + self.assertIn('bar/', results) + + +class RenameContainerTest(BaseIntegrationTest): + def test_rename_container(self): + version = self.client.version()['Version'] + name = 'hong_meiling' + res = self.client.create_container(BUSYBOX, 'true') + self.assertIn('Id', res) + self.tmp_containers.append(res['Id']) + self.client.rename(res, name) + inspect = self.client.inspect_container(res['Id']) + self.assertIn('Name', inspect) + if version == '1.5.0': + self.assertEqual(name, inspect['Name']) + else: + self.assertEqual('/{0}'.format(name), inspect['Name']) + + +class StartContainerTest(BaseIntegrationTest): + def test_start_container(self): + res = self.client.create_container(BUSYBOX, 'true') + self.assertIn('Id', res) + self.tmp_containers.append(res['Id']) + self.client.start(res['Id']) + inspect = self.client.inspect_container(res['Id']) + self.assertIn('Config', inspect) + self.assertIn('Id', inspect) + self.assertTrue(inspect['Id'].startswith(res['Id'])) + self.assertIn('Image', inspect) + self.assertIn('State', inspect) + self.assertIn('Running', inspect['State']) + if not inspect['State']['Running']: + self.assertIn('ExitCode', inspect['State']) + self.assertEqual(inspect['State']['ExitCode'], 0) + + def test_start_container_with_dict_instead_of_id(self): + res = self.client.create_container(BUSYBOX, 'true') + self.assertIn('Id', res) + self.tmp_containers.append(res['Id']) + self.client.start(res) + inspect = self.client.inspect_container(res['Id']) + self.assertIn('Config', inspect) + self.assertIn('Id', inspect) + self.assertTrue(inspect['Id'].startswith(res['Id'])) + self.assertIn('Image', inspect) + self.assertIn('State', inspect) + self.assertIn('Running', inspect['State']) + if not inspect['State']['Running']: + self.assertIn('ExitCode', inspect['State']) + self.assertEqual(inspect['State']['ExitCode'], 0) + + def test_run_shlex_commands(self): + commands = [ + 'true', + 'echo "The Young Descendant of Tepes & Septette for the ' + 'Dead Princess"', + 'echo -n "The Young Descendant of Tepes & Septette for the ' + 'Dead Princess"', + '/bin/sh -c "echo Hello World"', + '/bin/sh -c \'echo "Hello World"\'', + 'echo "\"Night of Nights\""', + 'true && echo "Night of Nights"' + ] + for cmd in commands: + container = self.client.create_container(BUSYBOX, cmd) + id = container['Id'] + self.client.start(id) + self.tmp_containers.append(id) + exitcode = self.client.wait(id) + self.assertEqual(exitcode, 0, msg=cmd) + + +class WaitTest(BaseIntegrationTest): + def test_wait(self): + res = self.client.create_container(BUSYBOX, ['sleep', '3']) + id = res['Id'] + self.tmp_containers.append(id) + self.client.start(id) + exitcode = self.client.wait(id) + self.assertEqual(exitcode, 0) + inspect = self.client.inspect_container(id) + self.assertIn('Running', inspect['State']) + self.assertEqual(inspect['State']['Running'], False) + self.assertIn('ExitCode', inspect['State']) + self.assertEqual(inspect['State']['ExitCode'], exitcode) + + def test_wait_with_dict_instead_of_id(self): + res = self.client.create_container(BUSYBOX, ['sleep', '3']) + id = res['Id'] + self.tmp_containers.append(id) + self.client.start(res) + exitcode = self.client.wait(res) + self.assertEqual(exitcode, 0) + inspect = self.client.inspect_container(res) + self.assertIn('Running', inspect['State']) + self.assertEqual(inspect['State']['Running'], False) + self.assertIn('ExitCode', inspect['State']) + self.assertEqual(inspect['State']['ExitCode'], exitcode) + + +class LogsTest(BaseIntegrationTest): + def test_logs(self): + snippet = 'Flowering Nights (Sakuya Iyazoi)' + container = self.client.create_container( + BUSYBOX, 'echo {0}'.format(snippet) + ) + id = container['Id'] + self.tmp_containers.append(id) + self.client.start(id) + exitcode = self.client.wait(id) + self.assertEqual(exitcode, 0) + logs = self.client.logs(id) + self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii')) + + def test_logs_tail_option(self): + snippet = '''Line1 +Line2''' + container = self.client.create_container( + BUSYBOX, 'echo "{0}"'.format(snippet) + ) + id = container['Id'] + self.tmp_containers.append(id) + self.client.start(id) + exitcode = self.client.wait(id) + self.assertEqual(exitcode, 0) + logs = self.client.logs(id, tail=1) + self.assertEqual(logs, 'Line2\n'.encode(encoding='ascii')) + + def test_logs_streaming_and_follow(self): + snippet = 'Flowering Nights (Sakuya Iyazoi)' + container = self.client.create_container( + BUSYBOX, 'echo {0}'.format(snippet) + ) + id = container['Id'] + self.tmp_containers.append(id) + self.client.start(id) + logs = six.binary_type() + for chunk in self.client.logs(id, stream=True, follow=True): + logs += chunk + + exitcode = self.client.wait(id) + self.assertEqual(exitcode, 0) + + self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii')) + + def test_logs_with_dict_instead_of_id(self): + snippet = 'Flowering Nights (Sakuya Iyazoi)' + container = self.client.create_container( + BUSYBOX, 'echo {0}'.format(snippet) + ) + id = container['Id'] + self.tmp_containers.append(id) + self.client.start(id) + exitcode = self.client.wait(id) + self.assertEqual(exitcode, 0) + logs = self.client.logs(container) + self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii')) + + def test_logs_with_tail_0(self): + snippet = 'Flowering Nights (Sakuya Iyazoi)' + container = self.client.create_container( + BUSYBOX, 'echo "{0}"'.format(snippet) + ) + id = container['Id'] + self.tmp_containers.append(id) + self.client.start(id) + exitcode = self.client.wait(id) + self.assertEqual(exitcode, 0) + logs = self.client.logs(id, tail=0) + self.assertEqual(logs, ''.encode(encoding='ascii')) + + +class DiffTest(BaseIntegrationTest): + def test_diff(self): + container = self.client.create_container(BUSYBOX, ['touch', '/test']) + id = container['Id'] + self.client.start(id) + self.tmp_containers.append(id) + exitcode = self.client.wait(id) + self.assertEqual(exitcode, 0) + diff = self.client.diff(id) + test_diff = [x for x in diff if x.get('Path', None) == '/test'] + self.assertEqual(len(test_diff), 1) + self.assertIn('Kind', test_diff[0]) + self.assertEqual(test_diff[0]['Kind'], 1) + + def test_diff_with_dict_instead_of_id(self): + container = self.client.create_container(BUSYBOX, ['touch', '/test']) + id = container['Id'] + self.client.start(id) + self.tmp_containers.append(id) + exitcode = self.client.wait(id) + self.assertEqual(exitcode, 0) + diff = self.client.diff(container) + test_diff = [x for x in diff if x.get('Path', None) == '/test'] + self.assertEqual(len(test_diff), 1) + self.assertIn('Kind', test_diff[0]) + self.assertEqual(test_diff[0]['Kind'], 1) + + +class StopTest(BaseIntegrationTest): + def test_stop(self): + container = self.client.create_container(BUSYBOX, ['sleep', '9999']) + id = container['Id'] + self.client.start(id) + self.tmp_containers.append(id) + self.client.stop(id, timeout=2) + container_info = self.client.inspect_container(id) + self.assertIn('State', container_info) + state = container_info['State'] + self.assertIn('Running', state) + self.assertEqual(state['Running'], False) + + def test_stop_with_dict_instead_of_id(self): + container = self.client.create_container(BUSYBOX, ['sleep', '9999']) + self.assertIn('Id', container) + id = container['Id'] + self.client.start(container) + self.tmp_containers.append(id) + self.client.stop(container, timeout=2) + container_info = self.client.inspect_container(id) + self.assertIn('State', container_info) + state = container_info['State'] + self.assertIn('Running', state) + self.assertEqual(state['Running'], False) + + +class KillTest(BaseIntegrationTest): + def test_kill(self): + container = self.client.create_container(BUSYBOX, ['sleep', '9999']) + id = container['Id'] + self.client.start(id) + self.tmp_containers.append(id) + self.client.kill(id) + container_info = self.client.inspect_container(id) + self.assertIn('State', container_info) + state = container_info['State'] + self.assertIn('ExitCode', state) + self.assertNotEqual(state['ExitCode'], 0) + self.assertIn('Running', state) + self.assertEqual(state['Running'], False) + + def test_kill_with_dict_instead_of_id(self): + container = self.client.create_container(BUSYBOX, ['sleep', '9999']) + id = container['Id'] + self.client.start(id) + self.tmp_containers.append(id) + self.client.kill(container) + container_info = self.client.inspect_container(id) + self.assertIn('State', container_info) + state = container_info['State'] + self.assertIn('ExitCode', state) + self.assertNotEqual(state['ExitCode'], 0) + self.assertIn('Running', state) + self.assertEqual(state['Running'], False) + + def test_kill_with_signal(self): + id = self.client.create_container(BUSYBOX, ['sleep', '60']) + self.tmp_containers.append(id) + self.client.start(id) + self.client.kill( + id, signal=signal.SIGKILL if not IS_WINDOWS_PLATFORM else 9 + ) + exitcode = self.client.wait(id) + self.assertNotEqual(exitcode, 0) + container_info = self.client.inspect_container(id) + self.assertIn('State', container_info) + state = container_info['State'] + self.assertIn('ExitCode', state) + self.assertNotEqual(state['ExitCode'], 0) + self.assertIn('Running', state) + self.assertEqual(state['Running'], False, state) + + def test_kill_with_signal_name(self): + id = self.client.create_container(BUSYBOX, ['sleep', '60']) + self.client.start(id) + self.tmp_containers.append(id) + self.client.kill(id, signal='SIGKILL') + exitcode = self.client.wait(id) + self.assertNotEqual(exitcode, 0) + container_info = self.client.inspect_container(id) + self.assertIn('State', container_info) + state = container_info['State'] + self.assertIn('ExitCode', state) + self.assertNotEqual(state['ExitCode'], 0) + self.assertIn('Running', state) + self.assertEqual(state['Running'], False, state) + + def test_kill_with_signal_integer(self): + id = self.client.create_container(BUSYBOX, ['sleep', '60']) + self.client.start(id) + self.tmp_containers.append(id) + self.client.kill(id, signal=9) + exitcode = self.client.wait(id) + self.assertNotEqual(exitcode, 0) + container_info = self.client.inspect_container(id) + self.assertIn('State', container_info) + state = container_info['State'] + self.assertIn('ExitCode', state) + self.assertNotEqual(state['ExitCode'], 0) + self.assertIn('Running', state) + self.assertEqual(state['Running'], False, state) + + +class PortTest(BaseIntegrationTest): + def test_port(self): + + port_bindings = { + '1111': ('127.0.0.1', '4567'), + '2222': ('127.0.0.1', '4568') + } + + container = self.client.create_container( + BUSYBOX, ['sleep', '60'], ports=list(port_bindings.keys()), + host_config=self.client.create_host_config( + port_bindings=port_bindings, network_mode='bridge' + ) + ) + id = container['Id'] + + self.client.start(container) + + # Call the port function on each biding and compare expected vs actual + for port in port_bindings: + actual_bindings = self.client.port(container, port) + port_binding = actual_bindings.pop() + + ip, host_port = port_binding['HostIp'], port_binding['HostPort'] + + self.assertEqual(ip, port_bindings[port][0]) + self.assertEqual(host_port, port_bindings[port][1]) + + self.client.kill(id) + + +class ContainerTopTest(BaseIntegrationTest): + def test_top(self): + container = self.client.create_container( + BUSYBOX, ['sleep', '60'] + ) + + self.tmp_containers.append(container) + + self.client.start(container) + res = self.client.top(container) + if IS_WINDOWS_PLATFORM: + assert res['Titles'] == ['PID', 'USER', 'TIME', 'COMMAND'] + else: + assert res['Titles'] == [ + 'UID', 'PID', 'PPID', 'C', 'STIME', 'TTY', 'TIME', 'CMD' + ] + assert len(res['Processes']) == 1 + assert res['Processes'][0][-1] == 'sleep 60' + self.client.kill(container) + + @pytest.mark.skipif( + IS_WINDOWS_PLATFORM, reason='No psargs support on windows' + ) + def test_top_with_psargs(self): + container = self.client.create_container( + BUSYBOX, ['sleep', '60']) + + self.tmp_containers.append(container) + + self.client.start(container) + res = self.client.top(container, 'waux') + self.assertEqual( + res['Titles'], + ['USER', 'PID', '%CPU', '%MEM', 'VSZ', 'RSS', + 'TTY', 'STAT', 'START', 'TIME', 'COMMAND'], + ) + self.assertEqual(len(res['Processes']), 1) + self.assertEqual(res['Processes'][0][10], 'sleep 60') + + +class RestartContainerTest(BaseIntegrationTest): + def test_restart(self): + container = self.client.create_container(BUSYBOX, ['sleep', '9999']) + id = container['Id'] + self.client.start(id) + self.tmp_containers.append(id) + info = self.client.inspect_container(id) + self.assertIn('State', info) + self.assertIn('StartedAt', info['State']) + start_time1 = info['State']['StartedAt'] + self.client.restart(id, timeout=2) + info2 = self.client.inspect_container(id) + self.assertIn('State', info2) + self.assertIn('StartedAt', info2['State']) + start_time2 = info2['State']['StartedAt'] + self.assertNotEqual(start_time1, start_time2) + self.assertIn('Running', info2['State']) + self.assertEqual(info2['State']['Running'], True) + self.client.kill(id) + + def test_restart_with_dict_instead_of_id(self): + container = self.client.create_container(BUSYBOX, ['sleep', '9999']) + self.assertIn('Id', container) + id = container['Id'] + self.client.start(container) + self.tmp_containers.append(id) + info = self.client.inspect_container(id) + self.assertIn('State', info) + self.assertIn('StartedAt', info['State']) + start_time1 = info['State']['StartedAt'] + self.client.restart(container, timeout=2) + info2 = self.client.inspect_container(id) + self.assertIn('State', info2) + self.assertIn('StartedAt', info2['State']) + start_time2 = info2['State']['StartedAt'] + self.assertNotEqual(start_time1, start_time2) + self.assertIn('Running', info2['State']) + self.assertEqual(info2['State']['Running'], True) + self.client.kill(id) + + +class RemoveContainerTest(BaseIntegrationTest): + def test_remove(self): + container = self.client.create_container(BUSYBOX, ['true']) + id = container['Id'] + self.client.start(id) + self.client.wait(id) + self.client.remove_container(id) + containers = self.client.containers(all=True) + res = [x for x in containers if 'Id' in x and x['Id'].startswith(id)] + self.assertEqual(len(res), 0) + + def test_remove_with_dict_instead_of_id(self): + container = self.client.create_container(BUSYBOX, ['true']) + id = container['Id'] + self.client.start(id) + self.client.wait(id) + self.client.remove_container(container) + containers = self.client.containers(all=True) + res = [x for x in containers if 'Id' in x and x['Id'].startswith(id)] + self.assertEqual(len(res), 0) + + +class AttachContainerTest(BaseIntegrationTest): + def test_run_container_streaming(self): + container = self.client.create_container(BUSYBOX, '/bin/sh', + detach=True, stdin_open=True) + id = container['Id'] + self.tmp_containers.append(id) + self.client.start(id) + sock = self.client.attach_socket(container, ws=False) + self.assertTrue(sock.fileno() > -1) + + def test_run_container_reading_socket(self): + line = 'hi there and stuff and things, words!' + # `echo` appends CRLF, `printf` doesn't + command = "printf '{0}'".format(line) + container = self.client.create_container(BUSYBOX, command, + detach=True, tty=False) + ident = container['Id'] + self.tmp_containers.append(ident) + + opts = {"stdout": 1, "stream": 1, "logs": 1} + pty_stdout = self.client.attach_socket(ident, opts) + self.addCleanup(pty_stdout.close) + + self.client.start(ident) + + next_size = next_frame_size(pty_stdout) + self.assertEqual(next_size, len(line)) + data = read_exactly(pty_stdout, next_size) + self.assertEqual(data.decode('utf-8'), line) + + +class PauseTest(BaseIntegrationTest): + def test_pause_unpause(self): + container = self.client.create_container(BUSYBOX, ['sleep', '9999']) + id = container['Id'] + self.tmp_containers.append(id) + self.client.start(container) + self.client.pause(id) + container_info = self.client.inspect_container(id) + self.assertIn('State', container_info) + state = container_info['State'] + self.assertIn('ExitCode', state) + self.assertEqual(state['ExitCode'], 0) + self.assertIn('Running', state) + self.assertEqual(state['Running'], True) + self.assertIn('Paused', state) + self.assertEqual(state['Paused'], True) + + self.client.unpause(id) + container_info = self.client.inspect_container(id) + self.assertIn('State', container_info) + state = container_info['State'] + self.assertIn('ExitCode', state) + self.assertEqual(state['ExitCode'], 0) + self.assertIn('Running', state) + self.assertEqual(state['Running'], True) + self.assertIn('Paused', state) + self.assertEqual(state['Paused'], False) + + +class GetContainerStatsTest(BaseIntegrationTest): + @requires_api_version('1.19') + def test_get_container_stats_no_stream(self): + container = self.client.create_container( + BUSYBOX, ['sleep', '60'], + ) + self.tmp_containers.append(container) + self.client.start(container) + response = self.client.stats(container, stream=0) + self.client.kill(container) + + self.assertEqual(type(response), dict) + for key in ['read', 'networks', 'precpu_stats', 'cpu_stats', + 'memory_stats', 'blkio_stats']: + self.assertIn(key, response) + + @requires_api_version('1.17') + def test_get_container_stats_stream(self): + container = self.client.create_container( + BUSYBOX, ['sleep', '60'], + ) + self.tmp_containers.append(container) + self.client.start(container) + stream = self.client.stats(container) + for chunk in stream: + self.assertEqual(type(chunk), dict) + for key in ['read', 'network', 'precpu_stats', 'cpu_stats', + 'memory_stats', 'blkio_stats']: + self.assertIn(key, chunk) + + +class ContainerUpdateTest(BaseIntegrationTest): + @requires_api_version('1.22') + def test_update_container(self): + old_mem_limit = 400 * 1024 * 1024 + new_mem_limit = 300 * 1024 * 1024 + container = self.client.create_container( + BUSYBOX, 'top', host_config=self.client.create_host_config( + mem_limit=old_mem_limit + ) + ) + self.tmp_containers.append(container) + self.client.start(container) + self.client.update_container(container, mem_limit=new_mem_limit) + inspect_data = self.client.inspect_container(container) + self.assertEqual(inspect_data['HostConfig']['Memory'], new_mem_limit) + + @requires_api_version('1.23') + def test_restart_policy_update(self): + old_restart_policy = { + 'MaximumRetryCount': 0, + 'Name': 'always' + } + new_restart_policy = { + 'MaximumRetryCount': 42, + 'Name': 'on-failure' + } + container = self.client.create_container( + BUSYBOX, ['sleep', '60'], + host_config=self.client.create_host_config( + restart_policy=old_restart_policy + ) + ) + self.tmp_containers.append(container) + self.client.start(container) + self.client.update_container(container, + restart_policy=new_restart_policy) + inspect_data = self.client.inspect_container(container) + self.assertEqual( + inspect_data['HostConfig']['RestartPolicy']['MaximumRetryCount'], + new_restart_policy['MaximumRetryCount'] + ) + self.assertEqual( + inspect_data['HostConfig']['RestartPolicy']['Name'], + new_restart_policy['Name'] + ) + + +class ContainerCPUTest(BaseIntegrationTest): + @requires_api_version('1.18') + def test_container_cpu_shares(self): + cpu_shares = 512 + container = self.client.create_container( + BUSYBOX, 'ls', host_config=self.client.create_host_config( + cpu_shares=cpu_shares + ) + ) + self.tmp_containers.append(container) + self.client.start(container) + inspect_data = self.client.inspect_container(container) + self.assertEqual(inspect_data['HostConfig']['CpuShares'], 512) + + @requires_api_version('1.18') + def test_container_cpuset(self): + cpuset_cpus = "0,1" + container = self.client.create_container( + BUSYBOX, 'ls', host_config=self.client.create_host_config( + cpuset_cpus=cpuset_cpus + ) + ) + self.tmp_containers.append(container) + self.client.start(container) + inspect_data = self.client.inspect_container(container) + self.assertEqual(inspect_data['HostConfig']['CpusetCpus'], cpuset_cpus) |