summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile14
-rw-r--r--tests/base.py25
-rw-r--r--tests/integration/__init__.py2
-rw-r--r--tests/integration/api_test.py1719
-rw-r--r--tests/integration_test.py1724
-rw-r--r--tests/unit/__init__.py (renamed from tests/testdata/certs/ca.pem)0
-rw-r--r--tests/unit/api_test.py (renamed from tests/test.py)33
-rw-r--r--tests/unit/fake_api.py (renamed from tests/fake_api.py)0
-rw-r--r--tests/unit/fake_stat.py (renamed from tests/fake_stat.py)0
-rw-r--r--tests/unit/testdata/certs/ca.pem (renamed from tests/testdata/certs/cert.pem)0
-rw-r--r--tests/unit/testdata/certs/cert.pem (renamed from tests/testdata/certs/key.pem)0
-rw-r--r--tests/unit/testdata/certs/key.pem0
-rw-r--r--tests/unit/testdata/context/Dockerfile (renamed from tests/testdata/context/Dockerfile)0
-rw-r--r--tests/unit/testdata/context/ctx.tar.gz (renamed from tests/testdata/context/ctx.tar.gz)bin171 -> 171 bytes
-rw-r--r--tests/unit/testdata/context/custom_dockerfile (renamed from tests/testdata/context/custom_dockerfile)0
-rw-r--r--tests/unit/utils_test.py (renamed from tests/utils_test.py)4
-rw-r--r--tox.ini2
17 files changed, 1764 insertions, 1759 deletions
diff --git a/Makefile b/Makefile
index f98abe7..772c2e2 100644
--- a/Makefile
+++ b/Makefile
@@ -14,22 +14,22 @@ build-py3:
test: flake8 unit-test unit-test-py3 integration-dind
unit-test: build
- docker run docker-py py.test tests/test.py tests/utils_test.py
+ docker run docker-py py.test tests/unit
unit-test-py3: build-py3
- docker run docker-py3 py.test tests/test.py tests/utils_test.py
+ docker run docker-py3 py.test tests/unit
integration-test: build
- docker run -v /var/run/docker.sock:/var/run/docker.sock docker-py py.test -rxs tests/integration_test.py
+ docker run -v /var/run/docker.sock:/var/run/docker.sock docker-py py.test -rxs tests/integration
integration-test-py3: build-py3
- docker run -v /var/run/docker.sock:/var/run/docker.sock docker-py3 py.test -rxs tests/integration_test.py
+ docker run -v /var/run/docker.sock:/var/run/docker.sock docker-py3 py.test -rxs tests/integration
integration-dind: build build-py3
docker run -d --name dpy-dind --privileged dockerswarm/dind:1.8.1 docker -d -H tcp://0.0.0.0:2375
- docker run --env="DOCKER_HOST=tcp://docker:2375" --link=dpy-dind:docker docker-py py.test -rxs tests/integration_test.py
- docker run --env="DOCKER_HOST=tcp://docker:2375" --link=dpy-dind:docker docker-py3 py.test -rxs tests/integration_test.py
+ docker run --env="DOCKER_HOST=tcp://docker:2375" --link=dpy-dind:docker docker-py py.test -rxs tests/integration
+ docker run --env="DOCKER_HOST=tcp://docker:2375" --link=dpy-dind:docker docker-py3 py.test -rxs tests/integration
docker rm -vf dpy-dind
flake8: build
- docker run docker-py flake8 docker tests \ No newline at end of file
+ docker run docker-py flake8 docker tests
diff --git a/tests/base.py b/tests/base.py
index 51b2300..a2c01fc 100644
--- a/tests/base.py
+++ b/tests/base.py
@@ -21,3 +21,28 @@ def requires_api_version(version):
),
reason="API version is too low (< {0})".format(version)
)
+
+
+class Cleanup(object):
+ if sys.version_info < (2, 7):
+ # Provide a basic implementation of addCleanup for Python < 2.7
+ def __init__(self, *args, **kwargs):
+ super(Cleanup, self).__init__(*args, **kwargs)
+ self._cleanups = []
+
+ def tearDown(self):
+ super(Cleanup, self).tearDown()
+ ok = True
+ while self._cleanups:
+ fn, args, kwargs = self._cleanups.pop(-1)
+ try:
+ fn(*args, **kwargs)
+ except KeyboardInterrupt:
+ raise
+ except:
+ ok = False
+ if not ok:
+ raise
+
+ def addCleanup(self, function, *args, **kwargs):
+ self._cleanups.append((function, args, kwargs))
diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py
new file mode 100644
index 0000000..07feaa1
--- /dev/null
+++ b/tests/integration/__init__.py
@@ -0,0 +1,2 @@
+# flake8: noqa
+from .api_test import *
diff --git a/tests/integration/api_test.py b/tests/integration/api_test.py
new file mode 100644
index 0000000..9080891
--- /dev/null
+++ b/tests/integration/api_test.py
@@ -0,0 +1,1719 @@
+# Copyright 2013 dotCloud inc.
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import base64
+import contextlib
+import json
+import io
+import os
+import shutil
+import signal
+import socket
+import tarfile
+import tempfile
+import threading
+import time
+import unittest
+import warnings
+
+import pytest
+import six
+from six.moves import BaseHTTPServer
+from six.moves import socketserver
+
+import docker
+from docker.errors import APIError, NotFound
+from docker.utils import kwargs_from_env
+
+from ..base import requires_api_version, Cleanup
+
+
+# FIXME: missing tests for
+# export; history; insert; port; push; tag; get; load; stats
+
+warnings.simplefilter('error')
+compare_version = docker.utils.compare_version
+
+EXEC_DRIVER = []
+BUSYBOX = 'busybox:buildroot-2014.02'
+
+
+def exec_driver_is_native():
+ global EXEC_DRIVER
+ if not EXEC_DRIVER:
+ c = docker_client()
+ EXEC_DRIVER = c.info()['ExecutionDriver']
+ c.close()
+ return EXEC_DRIVER.startswith('native')
+
+
+def docker_client(**kwargs):
+ return docker.Client(**docker_client_kwargs(**kwargs))
+
+
+def docker_client_kwargs(**kwargs):
+ client_kwargs = kwargs_from_env(assert_hostname=False)
+ client_kwargs.update(kwargs)
+ return client_kwargs
+
+
+def setup_module():
+ c = docker_client()
+ try:
+ c.inspect_image(BUSYBOX)
+ except NotFound:
+ c.pull(BUSYBOX)
+ c.inspect_image(BUSYBOX)
+ c.close()
+
+
+class BaseTestCase(unittest.TestCase):
+ tmp_imgs = []
+ tmp_containers = []
+ tmp_folders = []
+ tmp_volumes = []
+
+ def setUp(self):
+ if six.PY2:
+ self.assertRegex = self.assertRegexpMatches
+ self.assertCountEqual = self.assertItemsEqual
+ self.client = docker_client(timeout=60)
+ self.tmp_imgs = []
+ self.tmp_containers = []
+ self.tmp_folders = []
+ self.tmp_volumes = []
+
+ def tearDown(self):
+ for img in self.tmp_imgs:
+ try:
+ self.client.remove_image(img)
+ except docker.errors.APIError:
+ pass
+ for container in self.tmp_containers:
+ try:
+ self.client.stop(container, timeout=1)
+ self.client.remove_container(container)
+ except docker.errors.APIError:
+ pass
+ for folder in self.tmp_folders:
+ shutil.rmtree(folder)
+
+ for volume in self.tmp_volumes:
+ try:
+ self.client.remove_volume(volume)
+ except docker.errors.APIError:
+ pass
+
+ self.client.close()
+
+ def run_container(self, *args, **kwargs):
+ container = self.client.create_container(*args, **kwargs)
+ self.tmp_containers.append(container)
+ self.client.start(container)
+ exitcode = self.client.wait(container)
+
+ if exitcode != 0:
+ output = self.client.logs(container)
+ raise Exception(
+ "Container exited with code {}:\n{}"
+ .format(exitcode, output))
+
+ return container
+
+
+#########################
+# INFORMATION TESTS #
+#########################
+
+
+class TestVersion(BaseTestCase):
+ def runTest(self):
+ res = self.client.version()
+ self.assertIn('GoVersion', res)
+ self.assertIn('Version', res)
+ self.assertEqual(len(res['Version'].split('.')), 3)
+
+
+class TestInfo(BaseTestCase):
+ def runTest(self):
+ res = self.client.info()
+ self.assertIn('Containers', res)
+ self.assertIn('Images', res)
+ self.assertIn('Debug', res)
+
+
+class TestSearch(BaseTestCase):
+ def runTest(self):
+ self.client = docker_client(timeout=10)
+ res = self.client.search('busybox')
+ self.assertTrue(len(res) >= 1)
+ base_img = [x for x in res if x['name'] == 'busybox']
+ self.assertEqual(len(base_img), 1)
+ self.assertIn('description', base_img[0])
+
+###################
+# LISTING TESTS #
+###################
+
+
+class TestImages(BaseTestCase):
+ def runTest(self):
+ res1 = self.client.images(all=True)
+ self.assertIn('Id', res1[0])
+ res10 = res1[0]
+ self.assertIn('Created', res10)
+ self.assertIn('RepoTags', res10)
+ distinct = []
+ for img in res1:
+ if img['Id'] not in distinct:
+ distinct.append(img['Id'])
+ self.assertEqual(len(distinct), self.client.info()['Images'])
+
+
+class TestImageIds(BaseTestCase):
+ def runTest(self):
+ res1 = self.client.images(quiet=True)
+ self.assertEqual(type(res1[0]), six.text_type)
+
+
+class TestListContainers(BaseTestCase):
+ def runTest(self):
+ res0 = self.client.containers(all=True)
+ size = len(res0)
+ res1 = self.client.create_container(BUSYBOX, 'true')
+ self.assertIn('Id', res1)
+ self.client.start(res1['Id'])
+ self.tmp_containers.append(res1['Id'])
+ res2 = self.client.containers(all=True)
+ self.assertEqual(size + 1, len(res2))
+ retrieved = [x for x in res2 if x['Id'].startswith(res1['Id'])]
+ self.assertEqual(len(retrieved), 1)
+ retrieved = retrieved[0]
+ self.assertIn('Command', retrieved)
+ self.assertEqual(retrieved['Command'], six.text_type('true'))
+ self.assertIn('Image', retrieved)
+ self.assertRegex(retrieved['Image'], r'busybox:.*')
+ self.assertIn('Status', retrieved)
+
+#####################
+# CONTAINER TESTS #
+#####################
+
+
+class TestCreateContainer(BaseTestCase):
+ def runTest(self):
+ res = self.client.create_container(BUSYBOX, 'true')
+ self.assertIn('Id', res)
+ self.tmp_containers.append(res['Id'])
+
+
+class TestCreateContainerWithBinds(BaseTestCase):
+ def setUp(self):
+ super(TestCreateContainerWithBinds, self).setUp()
+
+ self.mount_dest = '/mnt'
+
+ # Get a random pathname - we don't need it to exist locally
+ self.mount_origin = tempfile.mkdtemp()
+ shutil.rmtree(self.mount_origin)
+
+ self.filename = 'shared.txt'
+
+ self.run_with_volume(
+ False,
+ BUSYBOX,
+ ['touch', os.path.join(self.mount_dest, self.filename)],
+ )
+
+ def run_with_volume(self, ro, *args, **kwargs):
+ return self.run_container(
+ *args,
+ volumes={self.mount_dest: {}},
+ host_config=self.client.create_host_config(
+ binds={
+ self.mount_origin: {
+ 'bind': self.mount_dest,
+ 'ro': ro,
+ },
+ },
+ network_mode='none'
+ ),
+ **kwargs
+ )
+
+ def test_rw(self):
+ container = self.run_with_volume(
+ False,
+ BUSYBOX,
+ ['ls', self.mount_dest],
+ )
+ logs = self.client.logs(container)
+
+ if six.PY3:
+ logs = logs.decode('utf-8')
+ self.assertIn(self.filename, logs)
+ inspect_data = self.client.inspect_container(container)
+ self.check_container_data(inspect_data, True)
+
+ def test_ro(self):
+ container = self.run_with_volume(
+ True,
+ BUSYBOX,
+ ['ls', self.mount_dest],
+ )
+ logs = self.client.logs(container)
+
+ if six.PY3:
+ logs = logs.decode('utf-8')
+ self.assertIn(self.filename, logs)
+
+ inspect_data = self.client.inspect_container(container)
+ self.check_container_data(inspect_data, False)
+
+ def check_container_data(self, inspect_data, rw):
+ if docker.utils.compare_version('1.20', self.client._version) < 0:
+ self.assertIn('Volumes', inspect_data)
+ self.assertIn(self.mount_dest, inspect_data['Volumes'])
+ self.assertEqual(
+ self.mount_origin, inspect_data['Volumes'][self.mount_dest]
+ )
+ self.assertIn(self.mount_dest, inspect_data['VolumesRW'])
+ self.assertFalse(inspect_data['VolumesRW'][self.mount_dest])
+ else:
+ self.assertIn('Mounts', inspect_data)
+ filtered = list(filter(
+ lambda x: x['Destination'] == self.mount_dest,
+ inspect_data['Mounts']
+ ))
+ self.assertEqual(len(filtered), 1)
+ mount_data = filtered[0]
+ self.assertEqual(mount_data['Source'], self.mount_origin)
+ self.assertEqual(mount_data['RW'], rw)
+
+
+@requires_api_version('1.20')
+class CreateContainerWithGroupAddTest(BaseTestCase):
+ def test_group_id_ints(self):
+ container = self.client.create_container(
+ BUSYBOX, 'id -G',
+ host_config=self.client.create_host_config(group_add=[1000, 1001])
+ )
+ self.tmp_containers.append(container)
+ self.client.start(container)
+ self.client.wait(container)
+
+ logs = self.client.logs(container)
+ if six.PY3:
+ logs = logs.decode('utf-8')
+ groups = logs.strip().split(' ')
+ self.assertIn('1000', groups)
+ self.assertIn('1001', groups)
+
+ def test_group_id_strings(self):
+ container = self.client.create_container(
+ BUSYBOX, 'id -G', host_config=self.client.create_host_config(
+ group_add=['1000', '1001']
+ )
+ )
+ self.tmp_containers.append(container)
+ self.client.start(container)
+ self.client.wait(container)
+
+ logs = self.client.logs(container)
+ if six.PY3:
+ logs = logs.decode('utf-8')
+
+ groups = logs.strip().split(' ')
+ self.assertIn('1000', groups)
+ self.assertIn('1001', groups)
+
+
+class CreateContainerWithLogConfigTest(BaseTestCase):
+ def test_valid_log_driver_and_log_opt(self):
+ log_config = docker.utils.LogConfig(
+ type='json-file',
+ config={'max-file': '100'}
+ )
+
+ container = self.client.create_container(
+ BUSYBOX, ['true'],
+ host_config=self.client.create_host_config(log_config=log_config)
+ )
+ self.tmp_containers.append(container['Id'])
+ self.client.start(container)
+
+ info = self.client.inspect_container(container)
+ container_log_config = info['HostConfig']['LogConfig']
+
+ self.assertEqual(container_log_config['Type'], log_config.type)
+ self.assertEqual(container_log_config['Config'], log_config.config)
+
+ def test_invalid_log_driver_raises_exception(self):
+ log_config = docker.utils.LogConfig(
+ type='asdf-nope',
+ config={}
+ )
+
+ container = self.client.create_container(
+ BUSYBOX, ['true'],
+ host_config=self.client.create_host_config(log_config=log_config)
+ )
+
+ expected_msg = "logger: no log driver named 'asdf-nope' is registered"
+
+ with pytest.raises(APIError) as excinfo:
+ # raises an internal server error 500
+ self.client.start(container)
+
+ assert expected_msg in str(excinfo.value)
+
+ @pytest.mark.skipif(True,
+ reason="https://github.com/docker/docker/issues/15633")
+ def test_valid_no_log_driver_specified(self):
+ log_config = docker.utils.LogConfig(
+ type="",
+ config={'max-file': '100'}
+ )
+
+ container = self.client.create_container(
+ BUSYBOX, ['true'],
+ host_config=self.client.create_host_config(log_config=log_config)
+ )
+ self.tmp_containers.append(container['Id'])
+ self.client.start(container)
+
+ info = self.client.inspect_container(container)
+ container_log_config = info['HostConfig']['LogConfig']
+
+ self.assertEqual(container_log_config['Type'], "json-file")
+ self.assertEqual(container_log_config['Config'], log_config.config)
+
+ def test_valid_no_config_specified(self):
+ log_config = docker.utils.LogConfig(
+ type="json-file",
+ config=None
+ )
+
+ container = self.client.create_container(
+ BUSYBOX, ['true'],
+ host_config=self.client.create_host_config(log_config=log_config)
+ )
+ self.tmp_containers.append(container['Id'])
+ self.client.start(container)
+
+ info = self.client.inspect_container(container)
+ container_log_config = info['HostConfig']['LogConfig']
+
+ self.assertEqual(container_log_config['Type'], "json-file")
+ self.assertEqual(container_log_config['Config'], {})
+
+
+class TestCreateContainerReadOnlyFs(BaseTestCase):
+ def runTest(self):
+ if not exec_driver_is_native():
+ pytest.skip('Exec driver not native')
+
+ ctnr = self.client.create_container(
+ BUSYBOX, ['mkdir', '/shrine'],
+ host_config=self.client.create_host_config(
+ read_only=True, network_mode='none'
+ )
+ )
+ self.assertIn('Id', ctnr)
+ self.tmp_containers.append(ctnr['Id'])
+ self.client.start(ctnr)
+ res = self.client.wait(ctnr)
+ self.assertNotEqual(res, 0)
+
+
+class TestCreateContainerWithName(BaseTestCase):
+ def runTest(self):
+ res = self.client.create_container(BUSYBOX, 'true', name='foobar')
+ self.assertIn('Id', res)
+ self.tmp_containers.append(res['Id'])
+ inspect = self.client.inspect_container(res['Id'])
+ self.assertIn('Name', inspect)
+ self.assertEqual('/foobar', inspect['Name'])
+
+
+class TestRenameContainer(BaseTestCase):
+ def runTest(self):
+ version = self.client.version()['Version']
+ name = 'hong_meiling'
+ res = self.client.create_container(BUSYBOX, 'true')
+ self.assertIn('Id', res)
+ self.tmp_containers.append(res['Id'])
+ self.client.rename(res, name)
+ inspect = self.client.inspect_container(res['Id'])
+ self.assertIn('Name', inspect)
+ if version == '1.5.0':
+ self.assertEqual(name, inspect['Name'])
+ else:
+ self.assertEqual('/{0}'.format(name), inspect['Name'])
+
+
+class TestStartContainer(BaseTestCase):
+ def runTest(self):
+ res = self.client.create_container(BUSYBOX, 'true')
+ self.assertIn('Id', res)
+ self.tmp_containers.append(res['Id'])
+ self.client.start(res['Id'])
+ inspect = self.client.inspect_container(res['Id'])
+ self.assertIn('Config', inspect)
+ self.assertIn('Id', inspect)
+ self.assertTrue(inspect['Id'].startswith(res['Id']))
+ self.assertIn('Image', inspect)
+ self.assertIn('State', inspect)
+ self.assertIn('Running', inspect['State'])
+ if not inspect['State']['Running']:
+ self.assertIn('ExitCode', inspect['State'])
+ self.assertEqual(inspect['State']['ExitCode'], 0)
+
+
+class TestStartContainerWithDictInsteadOfId(BaseTestCase):
+ def runTest(self):
+ res = self.client.create_container(BUSYBOX, 'true')
+ self.assertIn('Id', res)
+ self.tmp_containers.append(res['Id'])
+ self.client.start(res)
+ inspect = self.client.inspect_container(res['Id'])
+ self.assertIn('Config', inspect)
+ self.assertIn('Id', inspect)
+ self.assertTrue(inspect['Id'].startswith(res['Id']))
+ self.assertIn('Image', inspect)
+ self.assertIn('State', inspect)
+ self.assertIn('Running', inspect['State'])
+ if not inspect['State']['Running']:
+ self.assertIn('ExitCode', inspect['State'])
+ self.assertEqual(inspect['State']['ExitCode'], 0)
+
+
+class TestCreateContainerPrivileged(BaseTestCase):
+ def runTest(self):
+ res = self.client.create_container(
+ BUSYBOX, 'true', host_config=self.client.create_host_config(
+ privileged=True, network_mode='none'
+ )
+ )
+ self.assertIn('Id', res)
+ self.tmp_containers.append(res['Id'])
+ self.client.start(res['Id'])
+ inspect = self.client.inspect_container(res['Id'])
+ self.assertIn('Config', inspect)
+ self.assertIn('Id', inspect)
+ self.assertTrue(inspect['Id'].startswith(res['Id']))
+ self.assertIn('Image', inspect)
+ self.assertIn('State', inspect)
+ self.assertIn('Running', inspect['State'])
+ if not inspect['State']['Running']:
+ self.assertIn('ExitCode', inspect['State'])
+ self.assertEqual(inspect['State']['ExitCode'], 0)
+ # Since Nov 2013, the Privileged flag is no longer part of the
+ # container's config exposed via the API (safety concerns?).
+ #
+ if 'Privileged' in inspect['Config']:
+ self.assertEqual(inspect['Config']['Privileged'], True)
+
+
+class TestWait(BaseTestCase):
+ def runTest(self):
+ res = self.client.create_container(BUSYBOX, ['sleep', '3'])
+ id = res['Id']
+ self.tmp_containers.append(id)
+ self.client.start(id)
+ exitcode = self.client.wait(id)
+ self.assertEqual(exitcode, 0)
+ inspect = self.client.inspect_container(id)
+ self.assertIn('Running', inspect['State'])
+ self.assertEqual(inspect['State']['Running'], False)
+ self.assertIn('ExitCode', inspect['State'])
+ self.assertEqual(inspect['State']['ExitCode'], exitcode)
+
+
+class TestWaitWithDictInsteadOfId(BaseTestCase):
+ def runTest(self):
+ res = self.client.create_container(BUSYBOX, ['sleep', '3'])
+ id = res['Id']
+ self.tmp_containers.append(id)
+ self.client.start(res)
+ exitcode = self.client.wait(res)
+ self.assertEqual(exitcode, 0)
+ inspect = self.client.inspect_container(res)
+ self.assertIn('Running', inspect['State'])
+ self.assertEqual(inspect['State']['Running'], False)
+ self.assertIn('ExitCode', inspect['State'])
+ self.assertEqual(inspect['State']['ExitCode'], exitcode)
+
+
+class TestLogs(BaseTestCase):
+ def runTest(self):
+ snippet = 'Flowering Nights (Sakuya Iyazoi)'
+ container = self.client.create_container(
+ BUSYBOX, 'echo {0}'.format(snippet)
+ )
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ exitcode = self.client.wait(id)
+ self.assertEqual(exitcode, 0)
+ logs = self.client.logs(id)
+ self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii'))
+
+
+class TestLogsWithTailOption(BaseTestCase):
+ def runTest(self):
+ snippet = '''Line1
+Line2'''
+ container = self.client.create_container(
+ BUSYBOX, 'echo "{0}"'.format(snippet)
+ )
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ exitcode = self.client.wait(id)
+ self.assertEqual(exitcode, 0)
+ logs = self.client.logs(id, tail=1)
+ self.assertEqual(logs, ('Line2\n').encode(encoding='ascii'))
+
+
+# class TestLogsStreaming(BaseTestCase):
+# def runTest(self):
+# snippet = 'Flowering Nights (Sakuya Iyazoi)'
+# container = self.client.create_container(
+# BUSYBOX, 'echo {0}'.format(snippet)
+# )
+# id = container['Id']
+# self.client.start(id)
+# self.tmp_containers.append(id)
+# logs = bytes() if six.PY3 else str()
+# for chunk in self.client.logs(id, stream=True):
+# logs += chunk
+
+# exitcode = self.client.wait(id)
+# self.assertEqual(exitcode, 0)
+
+# self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii'))
+
+
+class TestLogsWithDictInsteadOfId(BaseTestCase):
+ def runTest(self):
+ snippet = 'Flowering Nights (Sakuya Iyazoi)'
+ container = self.client.create_container(
+ BUSYBOX, 'echo {0}'.format(snippet)
+ )
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ exitcode = self.client.wait(id)
+ self.assertEqual(exitcode, 0)
+ logs = self.client.logs(container)
+ self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii'))
+
+
+class TestDiff(BaseTestCase):
+ def runTest(self):
+ container = self.client.create_container(BUSYBOX, ['touch', '/test'])
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ exitcode = self.client.wait(id)
+ self.assertEqual(exitcode, 0)
+ diff = self.client.diff(id)
+ test_diff = [x for x in diff if x.get('Path', None) == '/test']
+ self.assertEqual(len(test_diff), 1)
+ self.assertIn('Kind', test_diff[0])
+ self.assertEqual(test_diff[0]['Kind'], 1)
+
+
+class TestDiffWithDictInsteadOfId(BaseTestCase):
+ def runTest(self):
+ container = self.client.create_container(BUSYBOX, ['touch', '/test'])
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ exitcode = self.client.wait(id)
+ self.assertEqual(exitcode, 0)
+ diff = self.client.diff(container)
+ test_diff = [x for x in diff if x.get('Path', None) == '/test']
+ self.assertEqual(len(test_diff), 1)
+ self.assertIn('Kind', test_diff[0])
+ self.assertEqual(test_diff[0]['Kind'], 1)
+
+
+class TestStop(BaseTestCase):
+ def runTest(self):
+ container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ self.client.stop(id, timeout=2)
+ container_info = self.client.inspect_container(id)
+ self.assertIn('State', container_info)
+ state = container_info['State']
+ self.assertIn('ExitCode', state)
+ if exec_driver_is_native():
+ self.assertNotEqual(state['ExitCode'], 0)
+ self.assertIn('Running', state)
+ self.assertEqual(state['Running'], False)
+
+
+class TestStopWithDictInsteadOfId(BaseTestCase):
+ def runTest(self):
+ container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
+ self.assertIn('Id', container)
+ id = container['Id']
+ self.client.start(container)
+ self.tmp_containers.append(id)
+ self.client.stop(container, timeout=2)
+ container_info = self.client.inspect_container(id)
+ self.assertIn('State', container_info)
+ state = container_info['State']
+ self.assertIn('ExitCode', state)
+ if exec_driver_is_native():
+ self.assertNotEqual(state['ExitCode'], 0)
+ self.assertIn('Running', state)
+ self.assertEqual(state['Running'], False)
+
+
+class TestKill(BaseTestCase):
+ def runTest(self):
+ container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ self.client.kill(id)
+ container_info = self.client.inspect_container(id)
+ self.assertIn('State', container_info)
+ state = container_info['State']
+ self.assertIn('ExitCode', state)
+ if exec_driver_is_native():
+ self.assertNotEqual(state['ExitCode'], 0)
+ self.assertIn('Running', state)
+ self.assertEqual(state['Running'], False)
+
+
+class TestKillWithDictInsteadOfId(BaseTestCase):
+ def runTest(self):
+ container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ self.client.kill(container)
+ container_info = self.client.inspect_container(id)
+ self.assertIn('State', container_info)
+ state = container_info['State']
+ self.assertIn('ExitCode', state)
+ if exec_driver_is_native():
+ self.assertNotEqual(state['ExitCode'], 0)
+ self.assertIn('Running', state)
+ self.assertEqual(state['Running'], False)
+
+
+class TestKillWithSignal(BaseTestCase):
+ def runTest(self):
+ container = self.client.create_container(BUSYBOX, ['sleep', '60'])
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ self.client.kill(id, signal=signal.SIGKILL)
+ exitcode = self.client.wait(id)
+ self.assertNotEqual(exitcode, 0)
+ container_info = self.client.inspect_container(id)
+ self.assertIn('State', container_info)
+ state = container_info['State']
+ self.assertIn('ExitCode', state)
+ self.assertNotEqual(state['ExitCode'], 0)
+ self.assertIn('Running', state)
+ self.assertEqual(state['Running'], False, state)
+
+
+class TestPort(BaseTestCase):
+ def runTest(self):
+
+ port_bindings = {
+ '1111': ('127.0.0.1', '4567'),
+ '2222': ('127.0.0.1', '4568')
+ }
+
+ container = self.client.create_container(
+ BUSYBOX, ['sleep', '60'], ports=list(port_bindings.keys()),
+ host_config=self.client.create_host_config(
+ port_bindings=port_bindings, network_mode='bridge'
+ )
+ )
+ id = container['Id']
+
+ self.client.start(container)
+
+ # Call the port function on each biding and compare expected vs actual
+ for port in port_bindings:
+ actual_bindings = self.client.port(container, port)
+ port_binding = actual_bindings.pop()
+
+ ip, host_port = port_binding['HostIp'], port_binding['HostPort']
+
+ self.assertEqual(ip, port_bindings[port][0])
+ self.assertEqual(host_port, port_bindings[port][1])
+
+ self.client.kill(id)
+
+
+class TestMacAddress(BaseTestCase):
+ def runTest(self):
+ mac_address_expected = "02:42:ac:11:00:0a"
+ container = self.client.create_container(
+ BUSYBOX, ['sleep', '60'], mac_address=mac_address_expected)
+
+ id = container['Id']
+
+ self.client.start(container)
+ res = self.client.inspect_container(container['Id'])
+ self.assertEqual(mac_address_expected,
+ res['NetworkSettings']['MacAddress'])
+
+ self.client.kill(id)
+
+
+class TestRestart(BaseTestCase):
+ def runTest(self):
+ container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ info = self.client.inspect_container(id)
+ self.assertIn('State', info)
+ self.assertIn('StartedAt', info['State'])
+ start_time1 = info['State']['StartedAt']
+ self.client.restart(id, timeout=2)
+ info2 = self.client.inspect_container(id)
+ self.assertIn('State', info2)
+ self.assertIn('StartedAt', info2['State'])
+ start_time2 = info2['State']['StartedAt']
+ self.assertNotEqual(start_time1, start_time2)
+ self.assertIn('Running', info2['State'])
+ self.assertEqual(info2['State']['Running'], True)
+ self.client.kill(id)
+
+
+class TestRestartWithDictInsteadOfId(BaseTestCase):
+ def runTest(self):
+ container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
+ self.assertIn('Id', container)
+ id = container['Id']
+ self.client.start(container)
+ self.tmp_containers.append(id)
+ info = self.client.inspect_container(id)
+ self.assertIn('State', info)
+ self.assertIn('StartedAt', info['State'])
+ start_time1 = info['State']['StartedAt']
+ self.client.restart(container, timeout=2)
+ info2 = self.client.inspect_container(id)
+ self.assertIn('State', info2)
+ self.assertIn('StartedAt', info2['State'])
+ start_time2 = info2['State']['StartedAt']
+ self.assertNotEqual(start_time1, start_time2)
+ self.assertIn('Running', info2['State'])
+ self.assertEqual(info2['State']['Running'], True)
+ self.client.kill(id)
+
+
+class TestRemoveContainer(BaseTestCase):
+ def runTest(self):
+ container = self.client.create_container(BUSYBOX, ['true'])
+ id = container['Id']
+ self.client.start(id)
+ self.client.wait(id)
+ self.client.remove_container(id)
+ containers = self.client.containers(all=True)
+ res = [x for x in containers if 'Id' in x and x['Id'].startswith(id)]
+ self.assertEqual(len(res), 0)
+
+
+class TestRemoveContainerWithDictInsteadOfId(BaseTestCase):
+ def runTest(self):
+ container = self.client.create_container(BUSYBOX, ['true'])
+ id = container['Id']
+ self.client.start(id)
+ self.client.wait(id)
+ self.client.remove_container(container)
+ containers = self.client.containers(all=True)
+ res = [x for x in containers if 'Id' in x and x['Id'].startswith(id)]
+ self.assertEqual(len(res), 0)
+
+
+class TestCreateContainerWithVolumesFrom(BaseTestCase):
+ def runTest(self):
+ vol_names = ['foobar_vol0', 'foobar_vol1']
+
+ res0 = self.client.create_container(
+ BUSYBOX, 'true', name=vol_names[0]
+ )
+ container1_id = res0['Id']
+ self.tmp_containers.append(container1_id)
+ self.client.start(container1_id)
+
+ res1 = self.client.create_container(
+ BUSYBOX, 'true', name=vol_names[1]
+ )
+ container2_id = res1['Id']
+ self.tmp_containers.append(container2_id)
+ self.client.start(container2_id)
+ with self.assertRaises(docker.errors.DockerException):
+ self.client.create_container(
+ BUSYBOX, 'cat', detach=True, stdin_open=True,
+ volumes_from=vol_names
+ )
+ res2 = self.client.create_container(
+ BUSYBOX, 'cat', detach=True, stdin_open=True,
+ host_config=self.client.create_host_config(
+ volumes_from=vol_names, network_mode='none'
+ )
+ )
+ container3_id = res2['Id']
+ self.tmp_containers.append(container3_id)
+ self.client.start(container3_id)
+
+ info = self.client.inspect_container(res2['Id'])
+ self.assertCountEqual(info['HostConfig']['VolumesFrom'], vol_names)
+
+
+class TestCreateContainerWithLinks(BaseTestCase):
+ def runTest(self):
+ res0 = self.client.create_container(
+ BUSYBOX, 'cat',
+ detach=True, stdin_open=True,
+ environment={'FOO': '1'})
+
+ container1_id = res0['Id']
+ self.tmp_containers.append(container1_id)
+
+ self.client.start(container1_id)
+
+ res1 = self.client.create_container(
+ BUSYBOX, 'cat',
+ detach=True, stdin_open=True,
+ environment={'FOO': '1'})
+
+ container2_id = res1['Id']
+ self.tmp_containers.append(container2_id)
+
+ self.client.start(container2_id)
+
+ # we don't want the first /
+ link_path1 = self.client.inspect_container(container1_id)['Name'][1:]
+ link_alias1 = 'mylink1'
+ link_env_prefix1 = link_alias1.upper()
+
+ link_path2 = self.client.inspect_container(container2_id)['Name'][1:]
+ link_alias2 = 'mylink2'
+ link_env_prefix2 = link_alias2.upper()
+
+ res2 = self.client.create_container(
+ BUSYBOX, 'env', host_config=self.client.create_host_config(
+ links={link_path1: link_alias1, link_path2: link_alias2},
+ network_mode='none'
+ )
+ )
+ container3_id = res2['Id']
+ self.tmp_containers.append(container3_id)
+ self.client.start(container3_id)
+ self.assertEqual(self.client.wait(container3_id), 0)
+
+ logs = self.client.logs(container3_id)
+ if six.PY3:
+ logs = logs.decode('utf-8')
+ self.assertIn('{0}_NAME='.format(link_env_prefix1), logs)
+ self.assertIn('{0}_ENV_FOO=1'.format(link_env_prefix1), logs)
+ self.assertIn('{0}_NAME='.format(link_env_prefix2), logs)
+ self.assertIn('{0}_ENV_FOO=1'.format(link_env_prefix2), logs)
+
+
+class TestRestartingContainer(BaseTestCase):
+ def runTest(self):
+ container = self.client.create_container(
+ BUSYBOX, ['sleep', '2'],
+ host_config=self.client.create_host_config(
+ restart_policy={"Name": "always", "MaximumRetryCount": 0},
+ network_mode='none'
+ )
+ )
+ id = container['Id']
+ self.client.start(id)
+ self.client.wait(id)
+ with self.assertRaises(docker.errors.APIError) as exc:
+ self.client.remove_container(id)
+ err = exc.exception.response.text
+ self.assertIn(
+ 'You cannot remove a running container', err
+ )
+ self.client.remove_container(id, force=True)
+
+
+class TestExecuteCommand(BaseTestCase):
+ def runTest(self):
+ if not exec_driver_is_native():
+ pytest.skip('Exec driver not native')
+
+ container = self.client.create_container(BUSYBOX, 'cat',
+ detach=True, stdin_open=True)
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+
+ res = self.client.exec_create(id, ['echo', 'hello'])
+ self.assertIn('Id', res)
+
+ exec_log = self.client.exec_start(res)
+ self.assertEqual(exec_log, b'hello\n')
+
+
+class TestExecuteCommandString(BaseTestCase):
+ def runTest(self):
+ if not exec_driver_is_native():
+ pytest.skip('Exec driver not native')
+
+ container = self.client.create_container(BUSYBOX, 'cat',
+ detach=True, stdin_open=True)
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+
+ res = self.client.exec_create(id, 'echo hello world')
+ self.assertIn('Id', res)
+
+ exec_log = self.client.exec_start(res)
+ self.assertEqual(exec_log, b'hello world\n')
+
+
+class TestExecuteCommandStringAsUser(BaseTestCase):
+ def runTest(self):
+ if not exec_driver_is_native():
+ pytest.skip('Exec driver not native')
+
+ container = self.client.create_container(BUSYBOX, 'cat',
+ detach=True, stdin_open=True)
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+
+ res = self.client.exec_create(id, 'whoami', user='default')
+ self.assertIn('Id', res)
+
+ exec_log = self.client.exec_start(res)
+ self.assertEqual(exec_log, b'default\n')
+
+
+class TestExecuteCommandStringAsRoot(BaseTestCase):
+ def runTest(self):
+ if not exec_driver_is_native():
+ pytest.skip('Exec driver not native')
+
+ container = self.client.create_container(BUSYBOX, 'cat',
+ detach=True, stdin_open=True)
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+
+ res = self.client.exec_create(id, 'whoami')
+ self.assertIn('Id', res)
+
+ exec_log = self.client.exec_start(res)
+ self.assertEqual(exec_log, b'root\n')
+
+
+class TestExecuteCommandStreaming(BaseTestCase):
+ def runTest(self):
+ if not exec_driver_is_native():
+ pytest.skip('Exec driver not native')
+
+ container = self.client.create_container(BUSYBOX, 'cat',
+ detach=True, stdin_open=True)
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+
+ exec_id = self.client.exec_create(id, ['echo', 'hello\nworld'])
+ self.assertIn('Id', exec_id)
+
+ res = b''
+ for chunk in self.client.exec_start(exec_id, stream=True):
+ res += chunk
+ self.assertEqual(res, b'hello\nworld\n')
+
+
+class TestExecInspect(BaseTestCase):
+ def runTest(self):
+ if not exec_driver_is_native():
+ pytest.skip('Exec driver not native')
+
+ container = self.client.create_container(BUSYBOX, 'cat',
+ detach=True, stdin_open=True)
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+
+ exec_id = self.client.exec_create(id, ['mkdir', '/does/not/exist'])
+ self.assertIn('Id', exec_id)
+ self.client.exec_start(exec_id)
+ exec_info = self.client.exec_inspect(exec_id)
+ self.assertIn('ExitCode', exec_info)
+ self.assertNotEqual(exec_info['ExitCode'], 0)
+
+
+class TestRunContainerStreaming(BaseTestCase):
+ def runTest(self):
+ container = self.client.create_container(BUSYBOX, '/bin/sh',
+ detach=True, stdin_open=True)
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ sock = self.client.attach_socket(container, ws=False)
+ self.assertTrue(sock.fileno() > -1)
+
+
+class TestPauseUnpauseContainer(BaseTestCase):
+ def runTest(self):
+ container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
+ id = container['Id']
+ self.tmp_containers.append(id)
+ self.client.start(container)
+ self.client.pause(id)
+ container_info = self.client.inspect_container(id)
+ self.assertIn('State', container_info)
+ state = container_info['State']
+ self.assertIn('ExitCode', state)
+ self.assertEqual(state['ExitCode'], 0)
+ self.assertIn('Running', state)
+ self.assertEqual(state['Running'], True)
+ self.assertIn('Paused', state)
+ self.assertEqual(state['Paused'], True)
+
+ self.client.unpause(id)
+ container_info = self.client.inspect_container(id)
+ self.assertIn('State', container_info)
+ state = container_info['State']
+ self.assertIn('ExitCode', state)
+ self.assertEqual(state['ExitCode'], 0)
+ self.assertIn('Running', state)
+ self.assertEqual(state['Running'], True)
+ self.assertIn('Paused', state)
+ self.assertEqual(state['Paused'], False)
+
+
+class TestCreateContainerWithHostPidMode(BaseTestCase):
+ def runTest(self):
+ ctnr = self.client.create_container(
+ BUSYBOX, 'true', host_config=self.client.create_host_config(
+ pid_mode='host', network_mode='none'
+ )
+ )
+ self.assertIn('Id', ctnr)
+ self.tmp_containers.append(ctnr['Id'])
+ self.client.start(ctnr)
+ inspect = self.client.inspect_container(ctnr)
+ self.assertIn('HostConfig', inspect)
+ host_config = inspect['HostConfig']
+ self.assertIn('PidMode', host_config)
+ self.assertEqual(host_config['PidMode'], 'host')
+
+
+#################
+# LINKS TESTS #
+#################
+
+
+class TestRemoveLink(BaseTestCase):
+ def runTest(self):
+ # Create containers
+ container1 = self.client.create_container(
+ BUSYBOX, 'cat', detach=True, stdin_open=True
+ )
+ container1_id = container1['Id']
+ self.tmp_containers.append(container1_id)
+ self.client.start(container1_id)
+
+ # Create Link
+ # we don't want the first /
+ link_path = self.client.inspect_container(container1_id)['Name'][1:]
+ link_alias = 'mylink'
+
+ container2 = self.client.create_container(
+ BUSYBOX, 'cat', host_config=self.client.create_host_config(
+ links={link_path: link_alias}, network_mode='none'
+ )
+ )
+ container2_id = container2['Id']
+ self.tmp_containers.append(container2_id)
+ self.client.start(container2_id)
+
+ # Remove link
+ linked_name = self.client.inspect_container(container2_id)['Name'][1:]
+ link_name = '%s/%s' % (linked_name, link_alias)
+ self.client.remove_container(link_name, link=True)
+
+ # Link is gone
+ containers = self.client.containers(all=True)
+ retrieved = [x for x in containers if link_name in x['Names']]
+ self.assertEqual(len(retrieved), 0)
+
+ # Containers are still there
+ retrieved = [
+ x for x in containers if x['Id'].startswith(container1_id) or
+ x['Id'].startswith(container2_id)
+ ]
+ self.assertEqual(len(retrieved), 2)
+
+##################
+# IMAGES TESTS #
+##################
+
+
+class TestPull(BaseTestCase):
+ def runTest(self):
+ try:
+ self.client.remove_image('hello-world')
+ except docker.errors.APIError:
+ pass
+ res = self.client.pull('hello-world')
+ self.tmp_imgs.append('hello-world')
+ self.assertEqual(type(res), six.text_type)
+ self.assertGreaterEqual(
+ len(self.client.images('hello-world')), 1
+ )
+ img_info = self.client.inspect_image('hello-world')
+ self.assertIn('Id', img_info)
+
+
+class TestPullStream(BaseTestCase):
+ def runTest(self):
+ try:
+ self.client.remove_image('hello-world')
+ except docker.errors.APIError:
+ pass
+ stream = self.client.pull('hello-world', stream=True)
+ self.tmp_imgs.append('hello-world')
+ for chunk in stream:
+ if six.PY3:
+ chunk = chunk.decode('utf-8')
+ json.loads(chunk) # ensure chunk is a single, valid JSON blob
+ self.assertGreaterEqual(
+ len(self.client.images('hello-world')), 1
+ )
+ img_info = self.client.inspect_image('hello-world')
+ self.assertIn('Id', img_info)
+
+
+class TestCommit(BaseTestCase):
+ def runTest(self):
+ container = self.client.create_container(BUSYBOX, ['touch', '/test'])
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ res = self.client.commit(id)
+ self.assertIn('Id', res)
+ img_id = res['Id']
+ self.tmp_imgs.append(img_id)
+ img = self.client.inspect_image(img_id)
+ self.assertIn('Container', img)
+ self.assertTrue(img['Container'].startswith(id))
+ self.assertIn('ContainerConfig', img)
+ self.assertIn('Image', img['ContainerConfig'])
+ self.assertEqual(BUSYBOX, img['ContainerConfig']['Image'])
+ busybox_id = self.client.inspect_image(BUSYBOX)['Id']
+ self.assertIn('Parent', img)
+ self.assertEqual(img['Parent'], busybox_id)
+
+
+class TestRemoveImage(BaseTestCase):
+ def runTest(self):
+ container = self.client.create_container(BUSYBOX, ['touch', '/test'])
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ res = self.client.commit(id)
+ self.assertIn('Id', res)
+ img_id = res['Id']
+ self.tmp_imgs.append(img_id)
+ self.client.remove_image(img_id, force=True)
+ images = self.client.images(all=True)
+ res = [x for x in images if x['Id'].startswith(img_id)]
+ self.assertEqual(len(res), 0)
+
+
+##################
+# IMPORT TESTS #
+##################
+
+
+class ImportTestCase(BaseTestCase):
+ '''Base class for `docker import` test cases.'''
+
+ TAR_SIZE = 512 * 1024
+
+ def write_dummy_tar_content(self, n_bytes, tar_fd):
+ def extend_file(f, n_bytes):
+ f.seek(n_bytes - 1)
+ f.write(bytearray([65]))
+ f.seek(0)
+
+ tar = tarfile.TarFile(fileobj=tar_fd, mode='w')
+
+ with tempfile.NamedTemporaryFile() as f:
+ extend_file(f, n_bytes)
+ tarinfo = tar.gettarinfo(name=f.name, arcname='testdata')
+ tar.addfile(tarinfo, fileobj=f)
+
+ tar.close()
+
+ @contextlib.contextmanager
+ def dummy_tar_stream(self, n_bytes):
+ '''Yields a stream that is valid tar data of size n_bytes.'''
+ with tempfile.NamedTemporaryFile() as tar_file:
+ self.write_dummy_tar_content(n_bytes, tar_file)
+ tar_file.seek(0)
+ yield tar_file
+
+ @contextlib.contextmanager
+ def dummy_tar_file(self, n_bytes):
+ '''Yields the name of a valid tar file of size n_bytes.'''
+ with tempfile.NamedTemporaryFile() as tar_file:
+ self.write_dummy_tar_content(n_bytes, tar_file)
+ tar_file.seek(0)
+ yield tar_file.name
+
+
+class TestImportFromBytes(ImportTestCase):
+ '''Tests importing an image from in-memory byte data.'''
+
+ def runTest(self):
+ with self.dummy_tar_stream(n_bytes=500) as f:
+ content = f.read()
+
+ # The generic import_image() function cannot import in-memory bytes
+ # data that happens to be represented as a string type, because
+ # import_image() will try to use it as a filename and usually then
+ # trigger an exception. So we test the import_image_from_data()
+ # function instead.
+ statuses = self.client.import_image_from_data(
+ content, repository='test/import-from-bytes')
+
+ result_text = statuses.splitlines()[-1]
+ result = json.loads(result_text)
+
+ self.assertNotIn('error', result)
+
+ img_id = result['status']
+ self.tmp_imgs.append(img_id)
+
+
+class TestImportFromFile(ImportTestCase):
+ '''Tests importing an image from a tar file on disk.'''
+
+ def runTest(self):
+ with self.dummy_tar_file(n_bytes=self.TAR_SIZE) as tar_filename:
+ # statuses = self.client.import_image(
+ # src=tar_filename, repository='test/import-from-file')
+ statuses = self.client.import_image_from_file(
+ tar_filename, repository='test/import-from-file')
+
+ result_text = statuses.splitlines()[-1]
+ result = json.loads(result_text)
+
+ self.assertNotIn('error', result)
+
+ self.assertIn('status', result)
+ img_id = result['status']
+ self.tmp_imgs.append(img_id)
+
+
+class TestImportFromStream(ImportTestCase):
+ '''Tests importing an image from a stream containing tar data.'''
+
+ def runTest(self):
+ with self.dummy_tar_stream(n_bytes=self.TAR_SIZE) as tar_stream:
+ statuses = self.client.import_image(
+ src=tar_stream, repository='test/import-from-stream')
+ # statuses = self.client.import_image_from_stream(
+ # tar_stream, repository='test/import-from-stream')
+ result_text = statuses.splitlines()[-1]
+ result = json.loads(result_text)
+
+ self.assertNotIn('error', result)
+
+ self.assertIn('status', result)
+ img_id = result['status']
+ self.tmp_imgs.append(img_id)
+
+
+class TestImportFromURL(ImportTestCase):
+ '''Tests downloading an image over HTTP.'''
+
+ @contextlib.contextmanager
+ def temporary_http_file_server(self, stream):
+ '''Serve data from an IO stream over HTTP.'''
+
+ class Handler(BaseHTTPServer.BaseHTTPRequestHandler):
+ def do_GET(self):
+ self.send_response(200)
+ self.send_header('Content-Type', 'application/x-tar')
+ self.end_headers()
+ shutil.copyfileobj(stream, self.wfile)
+
+ server = socketserver.TCPServer(('', 0), Handler)
+ thread = threading.Thread(target=server.serve_forever)
+ thread.setDaemon(True)
+ thread.start()
+
+ yield 'http://%s:%s' % (socket.gethostname(), server.server_address[1])
+
+ server.shutdown()
+
+ @pytest.mark.skipif(True, reason="Doesn't work inside a container - FIXME")
+ def runTest(self):
+ # The crappy test HTTP server doesn't handle large files well, so use
+ # a small file.
+ TAR_SIZE = 10240
+
+ with self.dummy_tar_stream(n_bytes=TAR_SIZE) as tar_data:
+ with self.temporary_http_file_server(tar_data) as url:
+ statuses = self.client.import_image(
+ src=url, repository='test/import-from-url')
+
+ result_text = statuses.splitlines()[-1]
+ result = json.loads(result_text)
+
+ self.assertNotIn('error', result)
+
+ self.assertIn('status', result)
+ img_id = result['status']
+ self.tmp_imgs.append(img_id)
+
+
+#################
+# VOLUMES TESTS #
+#################
+
+@requires_api_version('1.21')
+class TestVolumes(BaseTestCase):
+ def test_create_volume(self):
+ name = 'perfectcherryblossom'
+ self.tmp_volumes.append(name)
+ result = self.client.create_volume(name)
+ self.assertIn('Name', result)
+ self.assertEqual(result['Name'], name)
+ self.assertIn('Driver', result)
+ self.assertEqual(result['Driver'], 'local')
+
+ def test_create_volume_invalid_driver(self):
+ driver_name = 'invalid.driver'
+
+ with pytest.raises(docker.errors.NotFound):
+ self.client.create_volume('perfectcherryblossom', driver_name)
+
+ def test_list_volumes(self):
+ name = 'imperishablenight'
+ self.tmp_volumes.append(name)
+ volume_info = self.client.create_volume(name)
+ result = self.client.volumes()
+ self.assertIn('Volumes', result)
+ volumes = result['Volumes']
+ self.assertIn(volume_info, volumes)
+
+ def test_inspect_volume(self):
+ name = 'embodimentofscarletdevil'
+ self.tmp_volumes.append(name)
+ volume_info = self.client.create_volume(name)
+ result = self.client.inspect_volume(name)
+ self.assertEqual(volume_info, result)
+
+ def test_inspect_nonexistent_volume(self):
+ name = 'embodimentofscarletdevil'
+ with pytest.raises(docker.errors.NotFound):
+ self.client.inspect_volume(name)
+
+ def test_remove_volume(self):
+ name = 'shootthebullet'
+ self.tmp_volumes.append(name)
+ self.client.create_volume(name)
+ result = self.client.remove_volume(name)
+ self.assertTrue(result)
+
+ def test_remove_nonexistent_volume(self):
+ name = 'shootthebullet'
+ with pytest.raises(docker.errors.NotFound):
+ self.client.remove_volume(name)
+
+
+#################
+# BUILDER TESTS #
+#################
+
+class TestBuildStream(BaseTestCase):
+ def runTest(self):
+ script = io.BytesIO('\n'.join([
+ 'FROM busybox',
+ 'MAINTAINER docker-py',
+ 'RUN mkdir -p /tmp/test',
+ 'EXPOSE 8080',
+ 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
+ ' /tmp/silence.tar.gz'
+ ]).encode('ascii'))
+ stream = self.client.build(fileobj=script, stream=True)
+ logs = ''
+ for chunk in stream:
+ if six.PY3:
+ chunk = chunk.decode('utf-8')
+ json.loads(chunk) # ensure chunk is a single, valid JSON blob
+ logs += chunk
+ self.assertNotEqual(logs, '')
+
+
+class TestBuildFromStringIO(BaseTestCase):
+ def runTest(self):
+ if six.PY3:
+ return
+ script = io.StringIO(six.text_type('\n').join([
+ 'FROM busybox',
+ 'MAINTAINER docker-py',
+ 'RUN mkdir -p /tmp/test',
+ 'EXPOSE 8080',
+ 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
+ ' /tmp/silence.tar.gz'
+ ]))
+ stream = self.client.build(fileobj=script, stream=True)
+ logs = ''
+ for chunk in stream:
+ if six.PY3:
+ chunk = chunk.decode('utf-8')
+ logs += chunk
+ self.assertNotEqual(logs, '')
+
+
+@requires_api_version('1.8')
+class TestBuildWithDockerignore(Cleanup, BaseTestCase):
+ def runTest(self):
+ base_dir = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, base_dir)
+
+ with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:
+ f.write("\n".join([
+ 'FROM busybox',
+ 'MAINTAINER docker-py',
+ 'ADD . /test',
+ ]))
+
+ with open(os.path.join(base_dir, '.dockerignore'), 'w') as f:
+ f.write("\n".join([
+ 'ignored',
+ 'Dockerfile',
+ '.dockerignore',
+ '', # empty line
+ ]))
+
+ with open(os.path.join(base_dir, 'not-ignored'), 'w') as f:
+ f.write("this file should not be ignored")
+
+ subdir = os.path.join(base_dir, 'ignored', 'subdir')
+ os.makedirs(subdir)
+ with open(os.path.join(subdir, 'file'), 'w') as f:
+ f.write("this file should be ignored")
+
+ tag = 'docker-py-test-build-with-dockerignore'
+ stream = self.client.build(
+ path=base_dir,
+ tag=tag,
+ )
+ for chunk in stream:
+ pass
+
+ c = self.client.create_container(tag, ['ls', '-1A', '/test'])
+ self.client.start(c)
+ self.client.wait(c)
+ logs = self.client.logs(c)
+
+ if six.PY3:
+ logs = logs.decode('utf-8')
+
+ self.assertEqual(
+ list(filter(None, logs.split('\n'))),
+ ['not-ignored'],
+ )
+
+#######################
+# PY SPECIFIC TESTS #
+#######################
+
+
+class TestRunShlex(BaseTestCase):
+ def runTest(self):
+ commands = [
+ 'true',
+ 'echo "The Young Descendant of Tepes & Septette for the '
+ 'Dead Princess"',
+ 'echo -n "The Young Descendant of Tepes & Septette for the '
+ 'Dead Princess"',
+ '/bin/sh -c "echo Hello World"',
+ '/bin/sh -c \'echo "Hello World"\'',
+ 'echo "\"Night of Nights\""',
+ 'true && echo "Night of Nights"'
+ ]
+ for cmd in commands:
+ container = self.client.create_container(BUSYBOX, cmd)
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ exitcode = self.client.wait(id)
+ self.assertEqual(exitcode, 0, msg=cmd)
+
+
+class TestLoadConfig(BaseTestCase):
+ def runTest(self):
+ folder = tempfile.mkdtemp()
+ self.tmp_folders.append(folder)
+ cfg_path = os.path.join(folder, '.dockercfg')
+ f = open(cfg_path, 'w')
+ auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
+ f.write('auth = {0}\n'.format(auth_))
+ f.write('email = sakuya@scarlet.net')
+ f.close()
+ cfg = docker.auth.load_config(cfg_path)
+ self.assertNotEqual(cfg[docker.auth.INDEX_NAME], None)
+ cfg = cfg[docker.auth.INDEX_NAME]
+ self.assertEqual(cfg['username'], 'sakuya')
+ self.assertEqual(cfg['password'], 'izayoi')
+ self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
+ self.assertEqual(cfg.get('Auth'), None)
+
+
+class TestLoadJSONConfig(BaseTestCase):
+ def runTest(self):
+ folder = tempfile.mkdtemp()
+ self.tmp_folders.append(folder)
+ cfg_path = os.path.join(folder, '.dockercfg')
+ f = open(os.path.join(folder, '.dockercfg'), 'w')
+ auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
+ email_ = 'sakuya@scarlet.net'
+ f.write('{{"{0}": {{"auth": "{1}", "email": "{2}"}}}}\n'.format(
+ docker.auth.INDEX_URL, auth_, email_))
+ f.close()
+ cfg = docker.auth.load_config(cfg_path)
+ self.assertNotEqual(cfg[docker.auth.INDEX_URL], None)
+ cfg = cfg[docker.auth.INDEX_URL]
+ self.assertEqual(cfg['username'], 'sakuya')
+ self.assertEqual(cfg['password'], 'izayoi')
+ self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
+ self.assertEqual(cfg.get('Auth'), None)
+
+
+class TestAutoDetectVersion(unittest.TestCase):
+ def test_client_init(self):
+ client = docker_client(version='auto')
+ client_version = client._version
+ api_version = client.version(api_version=False)['ApiVersion']
+ self.assertEqual(client_version, api_version)
+ api_version_2 = client.version()['ApiVersion']
+ self.assertEqual(client_version, api_version_2)
+ client.close()
+
+ def test_auto_client(self):
+ client = docker.AutoVersionClient(**docker_client_kwargs())
+ client_version = client._version
+ api_version = client.version(api_version=False)['ApiVersion']
+ self.assertEqual(client_version, api_version)
+ api_version_2 = client.version()['ApiVersion']
+ self.assertEqual(client_version, api_version_2)
+ client.close()
+ with self.assertRaises(docker.errors.DockerException):
+ docker.AutoVersionClient(**docker_client_kwargs(version='1.11'))
+
+
+class TestConnectionTimeout(unittest.TestCase):
+ def setUp(self):
+ self.timeout = 0.5
+ self.client = docker.client.Client(base_url='http://192.168.10.2:4243',
+ timeout=self.timeout)
+
+ def runTest(self):
+ start = time.time()
+ res = None
+ # This call isn't supposed to complete, and it should fail fast.
+ try:
+ res = self.client.inspect_container('id')
+ except:
+ pass
+ end = time.time()
+ self.assertTrue(res is None)
+ self.assertTrue(end - start < 2 * self.timeout)
+
+
+class UnixconnTestCase(unittest.TestCase):
+ """
+ Test UNIX socket connection adapter.
+ """
+
+ def test_resource_warnings(self):
+ """
+ Test no warnings are produced when using the client.
+ """
+
+ with warnings.catch_warnings(record=True) as w:
+ warnings.simplefilter('always')
+
+ client = docker_client()
+ client.images()
+ client.close()
+ del client
+
+ assert len(w) == 0, \
+ "No warnings produced: {0}".format(w[0].message)
+
+
+####################
+# REGRESSION TESTS #
+####################
+
+class TestRegressions(BaseTestCase):
+ def test_443(self):
+ dfile = io.BytesIO()
+ with self.assertRaises(docker.errors.APIError) as exc:
+ for line in self.client.build(fileobj=dfile, tag="a/b/c"):
+ pass
+ self.assertEqual(exc.exception.response.status_code, 500)
+ dfile.close()
+
+ def test_542(self):
+ self.client.start(
+ self.client.create_container(BUSYBOX, ['true'])
+ )
+ result = self.client.containers(all=True, trunc=True)
+ self.assertEqual(len(result[0]['Id']), 12)
+
+ def test_647(self):
+ with self.assertRaises(docker.errors.APIError):
+ self.client.inspect_image('gensokyo.jp//kirisame')
+
+ def test_649(self):
+ self.client.timeout = None
+ ctnr = self.client.create_container(BUSYBOX, ['sleep', '2'])
+ self.client.start(ctnr)
+ self.client.stop(ctnr)
+
+ def test_715(self):
+ ctnr = self.client.create_container(BUSYBOX, ['id', '-u'], user=1000)
+ self.client.start(ctnr)
+ self.client.wait(ctnr)
+ logs = self.client.logs(ctnr)
+ if six.PY3:
+ logs = logs.decode('utf-8')
+ assert logs == '1000\n'
diff --git a/tests/integration_test.py b/tests/integration_test.py
index 763c863..01987e7 100644
--- a/tests/integration_test.py
+++ b/tests/integration_test.py
@@ -1,1720 +1,4 @@
-# Copyright 2013 dotCloud inc.
-
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import base64
-import contextlib
-import json
-import io
-import os
-import shutil
-import signal
-import socket
-import tarfile
-import tempfile
-import threading
-import time
-import unittest
-import warnings
-
-import pytest
-import six
-from six.moves import BaseHTTPServer
-from six.moves import socketserver
-
-import docker
-from docker.errors import APIError, NotFound
-from docker.utils import kwargs_from_env
-
-from .base import requires_api_version
-from .test import Cleanup
-
-
-# FIXME: missing tests for
-# export; history; insert; port; push; tag; get; load; stats
-
-warnings.simplefilter('error')
-compare_version = docker.utils.compare_version
-
-EXEC_DRIVER = []
-BUSYBOX = 'busybox:buildroot-2014.02'
-
-
-def exec_driver_is_native():
- global EXEC_DRIVER
- if not EXEC_DRIVER:
- c = docker_client()
- EXEC_DRIVER = c.info()['ExecutionDriver']
- c.close()
- return EXEC_DRIVER.startswith('native')
-
-
-def docker_client(**kwargs):
- return docker.Client(**docker_client_kwargs(**kwargs))
-
-
-def docker_client_kwargs(**kwargs):
- client_kwargs = kwargs_from_env(assert_hostname=False)
- client_kwargs.update(kwargs)
- return client_kwargs
-
-
-def setup_module():
- c = docker_client()
- try:
- c.inspect_image(BUSYBOX)
- except NotFound:
- c.pull(BUSYBOX)
- c.inspect_image(BUSYBOX)
- c.close()
-
-
-class BaseTestCase(unittest.TestCase):
- tmp_imgs = []
- tmp_containers = []
- tmp_folders = []
- tmp_volumes = []
-
- def setUp(self):
- if six.PY2:
- self.assertRegex = self.assertRegexpMatches
- self.assertCountEqual = self.assertItemsEqual
- self.client = docker_client(timeout=60)
- self.tmp_imgs = []
- self.tmp_containers = []
- self.tmp_folders = []
- self.tmp_volumes = []
-
- def tearDown(self):
- for img in self.tmp_imgs:
- try:
- self.client.remove_image(img)
- except docker.errors.APIError:
- pass
- for container in self.tmp_containers:
- try:
- self.client.stop(container, timeout=1)
- self.client.remove_container(container)
- except docker.errors.APIError:
- pass
- for folder in self.tmp_folders:
- shutil.rmtree(folder)
-
- for volume in self.tmp_volumes:
- try:
- self.client.remove_volume(volume)
- except docker.errors.APIError:
- pass
-
- self.client.close()
-
- def run_container(self, *args, **kwargs):
- container = self.client.create_container(*args, **kwargs)
- self.tmp_containers.append(container)
- self.client.start(container)
- exitcode = self.client.wait(container)
-
- if exitcode != 0:
- output = self.client.logs(container)
- raise Exception(
- "Container exited with code {}:\n{}"
- .format(exitcode, output))
-
- return container
-
-
-#########################
-# INFORMATION TESTS #
-#########################
-
-
-class TestVersion(BaseTestCase):
- def runTest(self):
- res = self.client.version()
- self.assertIn('GoVersion', res)
- self.assertIn('Version', res)
- self.assertEqual(len(res['Version'].split('.')), 3)
-
-
-class TestInfo(BaseTestCase):
- def runTest(self):
- res = self.client.info()
- self.assertIn('Containers', res)
- self.assertIn('Images', res)
- self.assertIn('Debug', res)
-
-
-class TestSearch(BaseTestCase):
- def runTest(self):
- self.client = docker_client(timeout=10)
- res = self.client.search('busybox')
- self.assertTrue(len(res) >= 1)
- base_img = [x for x in res if x['name'] == 'busybox']
- self.assertEqual(len(base_img), 1)
- self.assertIn('description', base_img[0])
-
-###################
-# LISTING TESTS #
-###################
-
-
-class TestImages(BaseTestCase):
- def runTest(self):
- res1 = self.client.images(all=True)
- self.assertIn('Id', res1[0])
- res10 = res1[0]
- self.assertIn('Created', res10)
- self.assertIn('RepoTags', res10)
- distinct = []
- for img in res1:
- if img['Id'] not in distinct:
- distinct.append(img['Id'])
- self.assertEqual(len(distinct), self.client.info()['Images'])
-
-
-class TestImageIds(BaseTestCase):
- def runTest(self):
- res1 = self.client.images(quiet=True)
- self.assertEqual(type(res1[0]), six.text_type)
-
-
-class TestListContainers(BaseTestCase):
- def runTest(self):
- res0 = self.client.containers(all=True)
- size = len(res0)
- res1 = self.client.create_container(BUSYBOX, 'true')
- self.assertIn('Id', res1)
- self.client.start(res1['Id'])
- self.tmp_containers.append(res1['Id'])
- res2 = self.client.containers(all=True)
- self.assertEqual(size + 1, len(res2))
- retrieved = [x for x in res2 if x['Id'].startswith(res1['Id'])]
- self.assertEqual(len(retrieved), 1)
- retrieved = retrieved[0]
- self.assertIn('Command', retrieved)
- self.assertEqual(retrieved['Command'], six.text_type('true'))
- self.assertIn('Image', retrieved)
- self.assertRegex(retrieved['Image'], r'busybox:.*')
- self.assertIn('Status', retrieved)
-
-#####################
-# CONTAINER TESTS #
-#####################
-
-
-class TestCreateContainer(BaseTestCase):
- def runTest(self):
- res = self.client.create_container(BUSYBOX, 'true')
- self.assertIn('Id', res)
- self.tmp_containers.append(res['Id'])
-
-
-class TestCreateContainerWithBinds(BaseTestCase):
- def setUp(self):
- super(TestCreateContainerWithBinds, self).setUp()
-
- self.mount_dest = '/mnt'
-
- # Get a random pathname - we don't need it to exist locally
- self.mount_origin = tempfile.mkdtemp()
- shutil.rmtree(self.mount_origin)
-
- self.filename = 'shared.txt'
-
- self.run_with_volume(
- False,
- BUSYBOX,
- ['touch', os.path.join(self.mount_dest, self.filename)],
- )
-
- def run_with_volume(self, ro, *args, **kwargs):
- return self.run_container(
- *args,
- volumes={self.mount_dest: {}},
- host_config=self.client.create_host_config(
- binds={
- self.mount_origin: {
- 'bind': self.mount_dest,
- 'ro': ro,
- },
- },
- network_mode='none'
- ),
- **kwargs
- )
-
- def test_rw(self):
- container = self.run_with_volume(
- False,
- BUSYBOX,
- ['ls', self.mount_dest],
- )
- logs = self.client.logs(container)
-
- if six.PY3:
- logs = logs.decode('utf-8')
- self.assertIn(self.filename, logs)
- inspect_data = self.client.inspect_container(container)
- self.check_container_data(inspect_data, True)
-
- def test_ro(self):
- container = self.run_with_volume(
- True,
- BUSYBOX,
- ['ls', self.mount_dest],
- )
- logs = self.client.logs(container)
-
- if six.PY3:
- logs = logs.decode('utf-8')
- self.assertIn(self.filename, logs)
-
- inspect_data = self.client.inspect_container(container)
- self.check_container_data(inspect_data, False)
-
- def check_container_data(self, inspect_data, rw):
- if docker.utils.compare_version('1.20', self.client._version) < 0:
- self.assertIn('Volumes', inspect_data)
- self.assertIn(self.mount_dest, inspect_data['Volumes'])
- self.assertEqual(
- self.mount_origin, inspect_data['Volumes'][self.mount_dest]
- )
- self.assertIn(self.mount_dest, inspect_data['VolumesRW'])
- self.assertFalse(inspect_data['VolumesRW'][self.mount_dest])
- else:
- self.assertIn('Mounts', inspect_data)
- filtered = list(filter(
- lambda x: x['Destination'] == self.mount_dest,
- inspect_data['Mounts']
- ))
- self.assertEqual(len(filtered), 1)
- mount_data = filtered[0]
- self.assertEqual(mount_data['Source'], self.mount_origin)
- self.assertEqual(mount_data['RW'], rw)
-
-
-@requires_api_version('1.20')
-class CreateContainerWithGroupAddTest(BaseTestCase):
- def test_group_id_ints(self):
- container = self.client.create_container(
- BUSYBOX, 'id -G',
- host_config=self.client.create_host_config(group_add=[1000, 1001])
- )
- self.tmp_containers.append(container)
- self.client.start(container)
- self.client.wait(container)
-
- logs = self.client.logs(container)
- if six.PY3:
- logs = logs.decode('utf-8')
- groups = logs.strip().split(' ')
- self.assertIn('1000', groups)
- self.assertIn('1001', groups)
-
- def test_group_id_strings(self):
- container = self.client.create_container(
- BUSYBOX, 'id -G', host_config=self.client.create_host_config(
- group_add=['1000', '1001']
- )
- )
- self.tmp_containers.append(container)
- self.client.start(container)
- self.client.wait(container)
-
- logs = self.client.logs(container)
- if six.PY3:
- logs = logs.decode('utf-8')
-
- groups = logs.strip().split(' ')
- self.assertIn('1000', groups)
- self.assertIn('1001', groups)
-
-
-class CreateContainerWithLogConfigTest(BaseTestCase):
- def test_valid_log_driver_and_log_opt(self):
- log_config = docker.utils.LogConfig(
- type='json-file',
- config={'max-file': '100'}
- )
-
- container = self.client.create_container(
- BUSYBOX, ['true'],
- host_config=self.client.create_host_config(log_config=log_config)
- )
- self.tmp_containers.append(container['Id'])
- self.client.start(container)
-
- info = self.client.inspect_container(container)
- container_log_config = info['HostConfig']['LogConfig']
-
- self.assertEqual(container_log_config['Type'], log_config.type)
- self.assertEqual(container_log_config['Config'], log_config.config)
-
- def test_invalid_log_driver_raises_exception(self):
- log_config = docker.utils.LogConfig(
- type='asdf-nope',
- config={}
- )
-
- container = self.client.create_container(
- BUSYBOX, ['true'],
- host_config=self.client.create_host_config(log_config=log_config)
- )
-
- expected_msg = "logger: no log driver named 'asdf-nope' is registered"
-
- with pytest.raises(APIError) as excinfo:
- # raises an internal server error 500
- self.client.start(container)
-
- assert expected_msg in str(excinfo.value)
-
- @pytest.mark.skipif(True,
- reason="https://github.com/docker/docker/issues/15633")
- def test_valid_no_log_driver_specified(self):
- log_config = docker.utils.LogConfig(
- type="",
- config={'max-file': '100'}
- )
-
- container = self.client.create_container(
- BUSYBOX, ['true'],
- host_config=self.client.create_host_config(log_config=log_config)
- )
- self.tmp_containers.append(container['Id'])
- self.client.start(container)
-
- info = self.client.inspect_container(container)
- container_log_config = info['HostConfig']['LogConfig']
-
- self.assertEqual(container_log_config['Type'], "json-file")
- self.assertEqual(container_log_config['Config'], log_config.config)
-
- def test_valid_no_config_specified(self):
- log_config = docker.utils.LogConfig(
- type="json-file",
- config=None
- )
-
- container = self.client.create_container(
- BUSYBOX, ['true'],
- host_config=self.client.create_host_config(log_config=log_config)
- )
- self.tmp_containers.append(container['Id'])
- self.client.start(container)
-
- info = self.client.inspect_container(container)
- container_log_config = info['HostConfig']['LogConfig']
-
- self.assertEqual(container_log_config['Type'], "json-file")
- self.assertEqual(container_log_config['Config'], {})
-
-
-class TestCreateContainerReadOnlyFs(BaseTestCase):
- def runTest(self):
- if not exec_driver_is_native():
- pytest.skip('Exec driver not native')
-
- ctnr = self.client.create_container(
- BUSYBOX, ['mkdir', '/shrine'],
- host_config=self.client.create_host_config(
- read_only=True, network_mode='none'
- )
- )
- self.assertIn('Id', ctnr)
- self.tmp_containers.append(ctnr['Id'])
- self.client.start(ctnr)
- res = self.client.wait(ctnr)
- self.assertNotEqual(res, 0)
-
-
-class TestCreateContainerWithName(BaseTestCase):
- def runTest(self):
- res = self.client.create_container(BUSYBOX, 'true', name='foobar')
- self.assertIn('Id', res)
- self.tmp_containers.append(res['Id'])
- inspect = self.client.inspect_container(res['Id'])
- self.assertIn('Name', inspect)
- self.assertEqual('/foobar', inspect['Name'])
-
-
-class TestRenameContainer(BaseTestCase):
- def runTest(self):
- version = self.client.version()['Version']
- name = 'hong_meiling'
- res = self.client.create_container(BUSYBOX, 'true')
- self.assertIn('Id', res)
- self.tmp_containers.append(res['Id'])
- self.client.rename(res, name)
- inspect = self.client.inspect_container(res['Id'])
- self.assertIn('Name', inspect)
- if version == '1.5.0':
- self.assertEqual(name, inspect['Name'])
- else:
- self.assertEqual('/{0}'.format(name), inspect['Name'])
-
-
-class TestStartContainer(BaseTestCase):
- def runTest(self):
- res = self.client.create_container(BUSYBOX, 'true')
- self.assertIn('Id', res)
- self.tmp_containers.append(res['Id'])
- self.client.start(res['Id'])
- inspect = self.client.inspect_container(res['Id'])
- self.assertIn('Config', inspect)
- self.assertIn('Id', inspect)
- self.assertTrue(inspect['Id'].startswith(res['Id']))
- self.assertIn('Image', inspect)
- self.assertIn('State', inspect)
- self.assertIn('Running', inspect['State'])
- if not inspect['State']['Running']:
- self.assertIn('ExitCode', inspect['State'])
- self.assertEqual(inspect['State']['ExitCode'], 0)
-
-
-class TestStartContainerWithDictInsteadOfId(BaseTestCase):
- def runTest(self):
- res = self.client.create_container(BUSYBOX, 'true')
- self.assertIn('Id', res)
- self.tmp_containers.append(res['Id'])
- self.client.start(res)
- inspect = self.client.inspect_container(res['Id'])
- self.assertIn('Config', inspect)
- self.assertIn('Id', inspect)
- self.assertTrue(inspect['Id'].startswith(res['Id']))
- self.assertIn('Image', inspect)
- self.assertIn('State', inspect)
- self.assertIn('Running', inspect['State'])
- if not inspect['State']['Running']:
- self.assertIn('ExitCode', inspect['State'])
- self.assertEqual(inspect['State']['ExitCode'], 0)
-
-
-class TestCreateContainerPrivileged(BaseTestCase):
- def runTest(self):
- res = self.client.create_container(
- BUSYBOX, 'true', host_config=self.client.create_host_config(
- privileged=True, network_mode='none'
- )
- )
- self.assertIn('Id', res)
- self.tmp_containers.append(res['Id'])
- self.client.start(res['Id'])
- inspect = self.client.inspect_container(res['Id'])
- self.assertIn('Config', inspect)
- self.assertIn('Id', inspect)
- self.assertTrue(inspect['Id'].startswith(res['Id']))
- self.assertIn('Image', inspect)
- self.assertIn('State', inspect)
- self.assertIn('Running', inspect['State'])
- if not inspect['State']['Running']:
- self.assertIn('ExitCode', inspect['State'])
- self.assertEqual(inspect['State']['ExitCode'], 0)
- # Since Nov 2013, the Privileged flag is no longer part of the
- # container's config exposed via the API (safety concerns?).
- #
- if 'Privileged' in inspect['Config']:
- self.assertEqual(inspect['Config']['Privileged'], True)
-
-
-class TestWait(BaseTestCase):
- def runTest(self):
- res = self.client.create_container(BUSYBOX, ['sleep', '3'])
- id = res['Id']
- self.tmp_containers.append(id)
- self.client.start(id)
- exitcode = self.client.wait(id)
- self.assertEqual(exitcode, 0)
- inspect = self.client.inspect_container(id)
- self.assertIn('Running', inspect['State'])
- self.assertEqual(inspect['State']['Running'], False)
- self.assertIn('ExitCode', inspect['State'])
- self.assertEqual(inspect['State']['ExitCode'], exitcode)
-
-
-class TestWaitWithDictInsteadOfId(BaseTestCase):
- def runTest(self):
- res = self.client.create_container(BUSYBOX, ['sleep', '3'])
- id = res['Id']
- self.tmp_containers.append(id)
- self.client.start(res)
- exitcode = self.client.wait(res)
- self.assertEqual(exitcode, 0)
- inspect = self.client.inspect_container(res)
- self.assertIn('Running', inspect['State'])
- self.assertEqual(inspect['State']['Running'], False)
- self.assertIn('ExitCode', inspect['State'])
- self.assertEqual(inspect['State']['ExitCode'], exitcode)
-
-
-class TestLogs(BaseTestCase):
- def runTest(self):
- snippet = 'Flowering Nights (Sakuya Iyazoi)'
- container = self.client.create_container(
- BUSYBOX, 'echo {0}'.format(snippet)
- )
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
- exitcode = self.client.wait(id)
- self.assertEqual(exitcode, 0)
- logs = self.client.logs(id)
- self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii'))
-
-
-class TestLogsWithTailOption(BaseTestCase):
- def runTest(self):
- snippet = '''Line1
-Line2'''
- container = self.client.create_container(
- BUSYBOX, 'echo "{0}"'.format(snippet)
- )
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
- exitcode = self.client.wait(id)
- self.assertEqual(exitcode, 0)
- logs = self.client.logs(id, tail=1)
- self.assertEqual(logs, ('Line2\n').encode(encoding='ascii'))
-
-
-# class TestLogsStreaming(BaseTestCase):
-# def runTest(self):
-# snippet = 'Flowering Nights (Sakuya Iyazoi)'
-# container = self.client.create_container(
-# BUSYBOX, 'echo {0}'.format(snippet)
-# )
-# id = container['Id']
-# self.client.start(id)
-# self.tmp_containers.append(id)
-# logs = bytes() if six.PY3 else str()
-# for chunk in self.client.logs(id, stream=True):
-# logs += chunk
-
-# exitcode = self.client.wait(id)
-# self.assertEqual(exitcode, 0)
-
-# self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii'))
-
-
-class TestLogsWithDictInsteadOfId(BaseTestCase):
- def runTest(self):
- snippet = 'Flowering Nights (Sakuya Iyazoi)'
- container = self.client.create_container(
- BUSYBOX, 'echo {0}'.format(snippet)
- )
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
- exitcode = self.client.wait(id)
- self.assertEqual(exitcode, 0)
- logs = self.client.logs(container)
- self.assertEqual(logs, (snippet + '\n').encode(encoding='ascii'))
-
-
-class TestDiff(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, ['touch', '/test'])
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
- exitcode = self.client.wait(id)
- self.assertEqual(exitcode, 0)
- diff = self.client.diff(id)
- test_diff = [x for x in diff if x.get('Path', None) == '/test']
- self.assertEqual(len(test_diff), 1)
- self.assertIn('Kind', test_diff[0])
- self.assertEqual(test_diff[0]['Kind'], 1)
-
-
-class TestDiffWithDictInsteadOfId(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, ['touch', '/test'])
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
- exitcode = self.client.wait(id)
- self.assertEqual(exitcode, 0)
- diff = self.client.diff(container)
- test_diff = [x for x in diff if x.get('Path', None) == '/test']
- self.assertEqual(len(test_diff), 1)
- self.assertIn('Kind', test_diff[0])
- self.assertEqual(test_diff[0]['Kind'], 1)
-
-
-class TestStop(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
- self.client.stop(id, timeout=2)
- container_info = self.client.inspect_container(id)
- self.assertIn('State', container_info)
- state = container_info['State']
- self.assertIn('ExitCode', state)
- if exec_driver_is_native():
- self.assertNotEqual(state['ExitCode'], 0)
- self.assertIn('Running', state)
- self.assertEqual(state['Running'], False)
-
-
-class TestStopWithDictInsteadOfId(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
- self.assertIn('Id', container)
- id = container['Id']
- self.client.start(container)
- self.tmp_containers.append(id)
- self.client.stop(container, timeout=2)
- container_info = self.client.inspect_container(id)
- self.assertIn('State', container_info)
- state = container_info['State']
- self.assertIn('ExitCode', state)
- if exec_driver_is_native():
- self.assertNotEqual(state['ExitCode'], 0)
- self.assertIn('Running', state)
- self.assertEqual(state['Running'], False)
-
-
-class TestKill(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
- self.client.kill(id)
- container_info = self.client.inspect_container(id)
- self.assertIn('State', container_info)
- state = container_info['State']
- self.assertIn('ExitCode', state)
- if exec_driver_is_native():
- self.assertNotEqual(state['ExitCode'], 0)
- self.assertIn('Running', state)
- self.assertEqual(state['Running'], False)
-
-
-class TestKillWithDictInsteadOfId(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
- self.client.kill(container)
- container_info = self.client.inspect_container(id)
- self.assertIn('State', container_info)
- state = container_info['State']
- self.assertIn('ExitCode', state)
- if exec_driver_is_native():
- self.assertNotEqual(state['ExitCode'], 0)
- self.assertIn('Running', state)
- self.assertEqual(state['Running'], False)
-
-
-class TestKillWithSignal(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, ['sleep', '60'])
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
- self.client.kill(id, signal=signal.SIGKILL)
- exitcode = self.client.wait(id)
- self.assertNotEqual(exitcode, 0)
- container_info = self.client.inspect_container(id)
- self.assertIn('State', container_info)
- state = container_info['State']
- self.assertIn('ExitCode', state)
- self.assertNotEqual(state['ExitCode'], 0)
- self.assertIn('Running', state)
- self.assertEqual(state['Running'], False, state)
-
-
-class TestPort(BaseTestCase):
- def runTest(self):
-
- port_bindings = {
- '1111': ('127.0.0.1', '4567'),
- '2222': ('127.0.0.1', '4568')
- }
-
- container = self.client.create_container(
- BUSYBOX, ['sleep', '60'], ports=list(port_bindings.keys()),
- host_config=self.client.create_host_config(
- port_bindings=port_bindings, network_mode='bridge'
- )
- )
- id = container['Id']
-
- self.client.start(container)
-
- # Call the port function on each biding and compare expected vs actual
- for port in port_bindings:
- actual_bindings = self.client.port(container, port)
- port_binding = actual_bindings.pop()
-
- ip, host_port = port_binding['HostIp'], port_binding['HostPort']
-
- self.assertEqual(ip, port_bindings[port][0])
- self.assertEqual(host_port, port_bindings[port][1])
-
- self.client.kill(id)
-
-
-class TestMacAddress(BaseTestCase):
- def runTest(self):
- mac_address_expected = "02:42:ac:11:00:0a"
- container = self.client.create_container(
- BUSYBOX, ['sleep', '60'], mac_address=mac_address_expected)
-
- id = container['Id']
-
- self.client.start(container)
- res = self.client.inspect_container(container['Id'])
- self.assertEqual(mac_address_expected,
- res['NetworkSettings']['MacAddress'])
-
- self.client.kill(id)
-
-
-class TestRestart(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
- info = self.client.inspect_container(id)
- self.assertIn('State', info)
- self.assertIn('StartedAt', info['State'])
- start_time1 = info['State']['StartedAt']
- self.client.restart(id, timeout=2)
- info2 = self.client.inspect_container(id)
- self.assertIn('State', info2)
- self.assertIn('StartedAt', info2['State'])
- start_time2 = info2['State']['StartedAt']
- self.assertNotEqual(start_time1, start_time2)
- self.assertIn('Running', info2['State'])
- self.assertEqual(info2['State']['Running'], True)
- self.client.kill(id)
-
-
-class TestRestartWithDictInsteadOfId(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
- self.assertIn('Id', container)
- id = container['Id']
- self.client.start(container)
- self.tmp_containers.append(id)
- info = self.client.inspect_container(id)
- self.assertIn('State', info)
- self.assertIn('StartedAt', info['State'])
- start_time1 = info['State']['StartedAt']
- self.client.restart(container, timeout=2)
- info2 = self.client.inspect_container(id)
- self.assertIn('State', info2)
- self.assertIn('StartedAt', info2['State'])
- start_time2 = info2['State']['StartedAt']
- self.assertNotEqual(start_time1, start_time2)
- self.assertIn('Running', info2['State'])
- self.assertEqual(info2['State']['Running'], True)
- self.client.kill(id)
-
-
-class TestRemoveContainer(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, ['true'])
- id = container['Id']
- self.client.start(id)
- self.client.wait(id)
- self.client.remove_container(id)
- containers = self.client.containers(all=True)
- res = [x for x in containers if 'Id' in x and x['Id'].startswith(id)]
- self.assertEqual(len(res), 0)
-
-
-class TestRemoveContainerWithDictInsteadOfId(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, ['true'])
- id = container['Id']
- self.client.start(id)
- self.client.wait(id)
- self.client.remove_container(container)
- containers = self.client.containers(all=True)
- res = [x for x in containers if 'Id' in x and x['Id'].startswith(id)]
- self.assertEqual(len(res), 0)
-
-
-class TestCreateContainerWithVolumesFrom(BaseTestCase):
- def runTest(self):
- vol_names = ['foobar_vol0', 'foobar_vol1']
-
- res0 = self.client.create_container(
- BUSYBOX, 'true', name=vol_names[0]
- )
- container1_id = res0['Id']
- self.tmp_containers.append(container1_id)
- self.client.start(container1_id)
-
- res1 = self.client.create_container(
- BUSYBOX, 'true', name=vol_names[1]
- )
- container2_id = res1['Id']
- self.tmp_containers.append(container2_id)
- self.client.start(container2_id)
- with self.assertRaises(docker.errors.DockerException):
- self.client.create_container(
- BUSYBOX, 'cat', detach=True, stdin_open=True,
- volumes_from=vol_names
- )
- res2 = self.client.create_container(
- BUSYBOX, 'cat', detach=True, stdin_open=True,
- host_config=self.client.create_host_config(
- volumes_from=vol_names, network_mode='none'
- )
- )
- container3_id = res2['Id']
- self.tmp_containers.append(container3_id)
- self.client.start(container3_id)
-
- info = self.client.inspect_container(res2['Id'])
- self.assertCountEqual(info['HostConfig']['VolumesFrom'], vol_names)
-
-
-class TestCreateContainerWithLinks(BaseTestCase):
- def runTest(self):
- res0 = self.client.create_container(
- BUSYBOX, 'cat',
- detach=True, stdin_open=True,
- environment={'FOO': '1'})
-
- container1_id = res0['Id']
- self.tmp_containers.append(container1_id)
-
- self.client.start(container1_id)
-
- res1 = self.client.create_container(
- BUSYBOX, 'cat',
- detach=True, stdin_open=True,
- environment={'FOO': '1'})
-
- container2_id = res1['Id']
- self.tmp_containers.append(container2_id)
-
- self.client.start(container2_id)
-
- # we don't want the first /
- link_path1 = self.client.inspect_container(container1_id)['Name'][1:]
- link_alias1 = 'mylink1'
- link_env_prefix1 = link_alias1.upper()
-
- link_path2 = self.client.inspect_container(container2_id)['Name'][1:]
- link_alias2 = 'mylink2'
- link_env_prefix2 = link_alias2.upper()
-
- res2 = self.client.create_container(
- BUSYBOX, 'env', host_config=self.client.create_host_config(
- links={link_path1: link_alias1, link_path2: link_alias2},
- network_mode='none'
- )
- )
- container3_id = res2['Id']
- self.tmp_containers.append(container3_id)
- self.client.start(container3_id)
- self.assertEqual(self.client.wait(container3_id), 0)
-
- logs = self.client.logs(container3_id)
- if six.PY3:
- logs = logs.decode('utf-8')
- self.assertIn('{0}_NAME='.format(link_env_prefix1), logs)
- self.assertIn('{0}_ENV_FOO=1'.format(link_env_prefix1), logs)
- self.assertIn('{0}_NAME='.format(link_env_prefix2), logs)
- self.assertIn('{0}_ENV_FOO=1'.format(link_env_prefix2), logs)
-
-
-class TestRestartingContainer(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(
- BUSYBOX, ['sleep', '2'],
- host_config=self.client.create_host_config(
- restart_policy={"Name": "always", "MaximumRetryCount": 0},
- network_mode='none'
- )
- )
- id = container['Id']
- self.client.start(id)
- self.client.wait(id)
- with self.assertRaises(docker.errors.APIError) as exc:
- self.client.remove_container(id)
- err = exc.exception.response.text
- self.assertIn(
- 'You cannot remove a running container', err
- )
- self.client.remove_container(id, force=True)
-
-
-class TestExecuteCommand(BaseTestCase):
- def runTest(self):
- if not exec_driver_is_native():
- pytest.skip('Exec driver not native')
-
- container = self.client.create_container(BUSYBOX, 'cat',
- detach=True, stdin_open=True)
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
-
- res = self.client.exec_create(id, ['echo', 'hello'])
- self.assertIn('Id', res)
-
- exec_log = self.client.exec_start(res)
- self.assertEqual(exec_log, b'hello\n')
-
-
-class TestExecuteCommandString(BaseTestCase):
- def runTest(self):
- if not exec_driver_is_native():
- pytest.skip('Exec driver not native')
-
- container = self.client.create_container(BUSYBOX, 'cat',
- detach=True, stdin_open=True)
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
-
- res = self.client.exec_create(id, 'echo hello world')
- self.assertIn('Id', res)
-
- exec_log = self.client.exec_start(res)
- self.assertEqual(exec_log, b'hello world\n')
-
-
-class TestExecuteCommandStringAsUser(BaseTestCase):
- def runTest(self):
- if not exec_driver_is_native():
- pytest.skip('Exec driver not native')
-
- container = self.client.create_container(BUSYBOX, 'cat',
- detach=True, stdin_open=True)
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
-
- res = self.client.exec_create(id, 'whoami', user='default')
- self.assertIn('Id', res)
-
- exec_log = self.client.exec_start(res)
- self.assertEqual(exec_log, b'default\n')
-
-
-class TestExecuteCommandStringAsRoot(BaseTestCase):
- def runTest(self):
- if not exec_driver_is_native():
- pytest.skip('Exec driver not native')
-
- container = self.client.create_container(BUSYBOX, 'cat',
- detach=True, stdin_open=True)
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
-
- res = self.client.exec_create(id, 'whoami')
- self.assertIn('Id', res)
-
- exec_log = self.client.exec_start(res)
- self.assertEqual(exec_log, b'root\n')
-
-
-class TestExecuteCommandStreaming(BaseTestCase):
- def runTest(self):
- if not exec_driver_is_native():
- pytest.skip('Exec driver not native')
-
- container = self.client.create_container(BUSYBOX, 'cat',
- detach=True, stdin_open=True)
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
-
- exec_id = self.client.exec_create(id, ['echo', 'hello\nworld'])
- self.assertIn('Id', exec_id)
-
- res = b''
- for chunk in self.client.exec_start(exec_id, stream=True):
- res += chunk
- self.assertEqual(res, b'hello\nworld\n')
-
-
-class TestExecInspect(BaseTestCase):
- def runTest(self):
- if not exec_driver_is_native():
- pytest.skip('Exec driver not native')
-
- container = self.client.create_container(BUSYBOX, 'cat',
- detach=True, stdin_open=True)
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
-
- exec_id = self.client.exec_create(id, ['mkdir', '/does/not/exist'])
- self.assertIn('Id', exec_id)
- self.client.exec_start(exec_id)
- exec_info = self.client.exec_inspect(exec_id)
- self.assertIn('ExitCode', exec_info)
- self.assertNotEqual(exec_info['ExitCode'], 0)
-
-
-class TestRunContainerStreaming(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, '/bin/sh',
- detach=True, stdin_open=True)
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
- sock = self.client.attach_socket(container, ws=False)
- self.assertTrue(sock.fileno() > -1)
-
-
-class TestPauseUnpauseContainer(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, ['sleep', '9999'])
- id = container['Id']
- self.tmp_containers.append(id)
- self.client.start(container)
- self.client.pause(id)
- container_info = self.client.inspect_container(id)
- self.assertIn('State', container_info)
- state = container_info['State']
- self.assertIn('ExitCode', state)
- self.assertEqual(state['ExitCode'], 0)
- self.assertIn('Running', state)
- self.assertEqual(state['Running'], True)
- self.assertIn('Paused', state)
- self.assertEqual(state['Paused'], True)
-
- self.client.unpause(id)
- container_info = self.client.inspect_container(id)
- self.assertIn('State', container_info)
- state = container_info['State']
- self.assertIn('ExitCode', state)
- self.assertEqual(state['ExitCode'], 0)
- self.assertIn('Running', state)
- self.assertEqual(state['Running'], True)
- self.assertIn('Paused', state)
- self.assertEqual(state['Paused'], False)
-
-
-class TestCreateContainerWithHostPidMode(BaseTestCase):
- def runTest(self):
- ctnr = self.client.create_container(
- BUSYBOX, 'true', host_config=self.client.create_host_config(
- pid_mode='host', network_mode='none'
- )
- )
- self.assertIn('Id', ctnr)
- self.tmp_containers.append(ctnr['Id'])
- self.client.start(ctnr)
- inspect = self.client.inspect_container(ctnr)
- self.assertIn('HostConfig', inspect)
- host_config = inspect['HostConfig']
- self.assertIn('PidMode', host_config)
- self.assertEqual(host_config['PidMode'], 'host')
-
-
-#################
-# LINKS TESTS #
-#################
-
-
-class TestRemoveLink(BaseTestCase):
- def runTest(self):
- # Create containers
- container1 = self.client.create_container(
- BUSYBOX, 'cat', detach=True, stdin_open=True
- )
- container1_id = container1['Id']
- self.tmp_containers.append(container1_id)
- self.client.start(container1_id)
-
- # Create Link
- # we don't want the first /
- link_path = self.client.inspect_container(container1_id)['Name'][1:]
- link_alias = 'mylink'
-
- container2 = self.client.create_container(
- BUSYBOX, 'cat', host_config=self.client.create_host_config(
- links={link_path: link_alias}, network_mode='none'
- )
- )
- container2_id = container2['Id']
- self.tmp_containers.append(container2_id)
- self.client.start(container2_id)
-
- # Remove link
- linked_name = self.client.inspect_container(container2_id)['Name'][1:]
- link_name = '%s/%s' % (linked_name, link_alias)
- self.client.remove_container(link_name, link=True)
-
- # Link is gone
- containers = self.client.containers(all=True)
- retrieved = [x for x in containers if link_name in x['Names']]
- self.assertEqual(len(retrieved), 0)
-
- # Containers are still there
- retrieved = [
- x for x in containers if x['Id'].startswith(container1_id) or
- x['Id'].startswith(container2_id)
- ]
- self.assertEqual(len(retrieved), 2)
-
-##################
-# IMAGES TESTS #
-##################
-
-
-class TestPull(BaseTestCase):
- def runTest(self):
- try:
- self.client.remove_image('hello-world')
- except docker.errors.APIError:
- pass
- res = self.client.pull('hello-world')
- self.tmp_imgs.append('hello-world')
- self.assertEqual(type(res), six.text_type)
- self.assertGreaterEqual(
- len(self.client.images('hello-world')), 1
- )
- img_info = self.client.inspect_image('hello-world')
- self.assertIn('Id', img_info)
-
-
-class TestPullStream(BaseTestCase):
- def runTest(self):
- try:
- self.client.remove_image('hello-world')
- except docker.errors.APIError:
- pass
- stream = self.client.pull('hello-world', stream=True)
- self.tmp_imgs.append('hello-world')
- for chunk in stream:
- if six.PY3:
- chunk = chunk.decode('utf-8')
- json.loads(chunk) # ensure chunk is a single, valid JSON blob
- self.assertGreaterEqual(
- len(self.client.images('hello-world')), 1
- )
- img_info = self.client.inspect_image('hello-world')
- self.assertIn('Id', img_info)
-
-
-class TestCommit(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, ['touch', '/test'])
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
- res = self.client.commit(id)
- self.assertIn('Id', res)
- img_id = res['Id']
- self.tmp_imgs.append(img_id)
- img = self.client.inspect_image(img_id)
- self.assertIn('Container', img)
- self.assertTrue(img['Container'].startswith(id))
- self.assertIn('ContainerConfig', img)
- self.assertIn('Image', img['ContainerConfig'])
- self.assertEqual(BUSYBOX, img['ContainerConfig']['Image'])
- busybox_id = self.client.inspect_image(BUSYBOX)['Id']
- self.assertIn('Parent', img)
- self.assertEqual(img['Parent'], busybox_id)
-
-
-class TestRemoveImage(BaseTestCase):
- def runTest(self):
- container = self.client.create_container(BUSYBOX, ['touch', '/test'])
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
- res = self.client.commit(id)
- self.assertIn('Id', res)
- img_id = res['Id']
- self.tmp_imgs.append(img_id)
- self.client.remove_image(img_id, force=True)
- images = self.client.images(all=True)
- res = [x for x in images if x['Id'].startswith(img_id)]
- self.assertEqual(len(res), 0)
-
-
-##################
-# IMPORT TESTS #
-##################
-
-
-class ImportTestCase(BaseTestCase):
- '''Base class for `docker import` test cases.'''
-
- TAR_SIZE = 512 * 1024
-
- def write_dummy_tar_content(self, n_bytes, tar_fd):
- def extend_file(f, n_bytes):
- f.seek(n_bytes - 1)
- f.write(bytearray([65]))
- f.seek(0)
-
- tar = tarfile.TarFile(fileobj=tar_fd, mode='w')
-
- with tempfile.NamedTemporaryFile() as f:
- extend_file(f, n_bytes)
- tarinfo = tar.gettarinfo(name=f.name, arcname='testdata')
- tar.addfile(tarinfo, fileobj=f)
-
- tar.close()
-
- @contextlib.contextmanager
- def dummy_tar_stream(self, n_bytes):
- '''Yields a stream that is valid tar data of size n_bytes.'''
- with tempfile.NamedTemporaryFile() as tar_file:
- self.write_dummy_tar_content(n_bytes, tar_file)
- tar_file.seek(0)
- yield tar_file
-
- @contextlib.contextmanager
- def dummy_tar_file(self, n_bytes):
- '''Yields the name of a valid tar file of size n_bytes.'''
- with tempfile.NamedTemporaryFile() as tar_file:
- self.write_dummy_tar_content(n_bytes, tar_file)
- tar_file.seek(0)
- yield tar_file.name
-
-
-class TestImportFromBytes(ImportTestCase):
- '''Tests importing an image from in-memory byte data.'''
-
- def runTest(self):
- with self.dummy_tar_stream(n_bytes=500) as f:
- content = f.read()
-
- # The generic import_image() function cannot import in-memory bytes
- # data that happens to be represented as a string type, because
- # import_image() will try to use it as a filename and usually then
- # trigger an exception. So we test the import_image_from_data()
- # function instead.
- statuses = self.client.import_image_from_data(
- content, repository='test/import-from-bytes')
-
- result_text = statuses.splitlines()[-1]
- result = json.loads(result_text)
-
- self.assertNotIn('error', result)
-
- img_id = result['status']
- self.tmp_imgs.append(img_id)
-
-
-class TestImportFromFile(ImportTestCase):
- '''Tests importing an image from a tar file on disk.'''
-
- def runTest(self):
- with self.dummy_tar_file(n_bytes=self.TAR_SIZE) as tar_filename:
- # statuses = self.client.import_image(
- # src=tar_filename, repository='test/import-from-file')
- statuses = self.client.import_image_from_file(
- tar_filename, repository='test/import-from-file')
-
- result_text = statuses.splitlines()[-1]
- result = json.loads(result_text)
-
- self.assertNotIn('error', result)
-
- self.assertIn('status', result)
- img_id = result['status']
- self.tmp_imgs.append(img_id)
-
-
-class TestImportFromStream(ImportTestCase):
- '''Tests importing an image from a stream containing tar data.'''
-
- def runTest(self):
- with self.dummy_tar_stream(n_bytes=self.TAR_SIZE) as tar_stream:
- statuses = self.client.import_image(
- src=tar_stream, repository='test/import-from-stream')
- # statuses = self.client.import_image_from_stream(
- # tar_stream, repository='test/import-from-stream')
- result_text = statuses.splitlines()[-1]
- result = json.loads(result_text)
-
- self.assertNotIn('error', result)
-
- self.assertIn('status', result)
- img_id = result['status']
- self.tmp_imgs.append(img_id)
-
-
-class TestImportFromURL(ImportTestCase):
- '''Tests downloading an image over HTTP.'''
-
- @contextlib.contextmanager
- def temporary_http_file_server(self, stream):
- '''Serve data from an IO stream over HTTP.'''
-
- class Handler(BaseHTTPServer.BaseHTTPRequestHandler):
- def do_GET(self):
- self.send_response(200)
- self.send_header('Content-Type', 'application/x-tar')
- self.end_headers()
- shutil.copyfileobj(stream, self.wfile)
-
- server = socketserver.TCPServer(('', 0), Handler)
- thread = threading.Thread(target=server.serve_forever)
- thread.setDaemon(True)
- thread.start()
-
- yield 'http://%s:%s' % (socket.gethostname(), server.server_address[1])
-
- server.shutdown()
-
- @pytest.mark.skipif(True, reason="Doesn't work inside a container - FIXME")
- def runTest(self):
- # The crappy test HTTP server doesn't handle large files well, so use
- # a small file.
- TAR_SIZE = 10240
-
- with self.dummy_tar_stream(n_bytes=TAR_SIZE) as tar_data:
- with self.temporary_http_file_server(tar_data) as url:
- statuses = self.client.import_image(
- src=url, repository='test/import-from-url')
-
- result_text = statuses.splitlines()[-1]
- result = json.loads(result_text)
-
- self.assertNotIn('error', result)
-
- self.assertIn('status', result)
- img_id = result['status']
- self.tmp_imgs.append(img_id)
-
-
-#################
-# VOLUMES TESTS #
-#################
-
-@requires_api_version('1.21')
-class TestVolumes(BaseTestCase):
- def test_create_volume(self):
- name = 'perfectcherryblossom'
- self.tmp_volumes.append(name)
- result = self.client.create_volume(name)
- self.assertIn('Name', result)
- self.assertEqual(result['Name'], name)
- self.assertIn('Driver', result)
- self.assertEqual(result['Driver'], 'local')
-
- def test_create_volume_invalid_driver(self):
- driver_name = 'invalid.driver'
-
- with pytest.raises(docker.errors.NotFound):
- self.client.create_volume('perfectcherryblossom', driver_name)
-
- def test_list_volumes(self):
- name = 'imperishablenight'
- self.tmp_volumes.append(name)
- volume_info = self.client.create_volume(name)
- result = self.client.volumes()
- self.assertIn('Volumes', result)
- volumes = result['Volumes']
- self.assertIn(volume_info, volumes)
-
- def test_inspect_volume(self):
- name = 'embodimentofscarletdevil'
- self.tmp_volumes.append(name)
- volume_info = self.client.create_volume(name)
- result = self.client.inspect_volume(name)
- self.assertEqual(volume_info, result)
-
- def test_inspect_nonexistent_volume(self):
- name = 'embodimentofscarletdevil'
- with pytest.raises(docker.errors.NotFound):
- self.client.inspect_volume(name)
-
- def test_remove_volume(self):
- name = 'shootthebullet'
- self.tmp_volumes.append(name)
- self.client.create_volume(name)
- result = self.client.remove_volume(name)
- self.assertTrue(result)
-
- def test_remove_nonexistent_volume(self):
- name = 'shootthebullet'
- with pytest.raises(docker.errors.NotFound):
- self.client.remove_volume(name)
-
-
-#################
-# BUILDER TESTS #
-#################
-
-class TestBuildStream(BaseTestCase):
- def runTest(self):
- script = io.BytesIO('\n'.join([
- 'FROM busybox',
- 'MAINTAINER docker-py',
- 'RUN mkdir -p /tmp/test',
- 'EXPOSE 8080',
- 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
- ' /tmp/silence.tar.gz'
- ]).encode('ascii'))
- stream = self.client.build(fileobj=script, stream=True)
- logs = ''
- for chunk in stream:
- if six.PY3:
- chunk = chunk.decode('utf-8')
- json.loads(chunk) # ensure chunk is a single, valid JSON blob
- logs += chunk
- self.assertNotEqual(logs, '')
-
-
-class TestBuildFromStringIO(BaseTestCase):
- def runTest(self):
- if six.PY3:
- return
- script = io.StringIO(six.text_type('\n').join([
- 'FROM busybox',
- 'MAINTAINER docker-py',
- 'RUN mkdir -p /tmp/test',
- 'EXPOSE 8080',
- 'ADD https://dl.dropboxusercontent.com/u/20637798/silence.tar.gz'
- ' /tmp/silence.tar.gz'
- ]))
- stream = self.client.build(fileobj=script, stream=True)
- logs = ''
- for chunk in stream:
- if six.PY3:
- chunk = chunk.decode('utf-8')
- logs += chunk
- self.assertNotEqual(logs, '')
-
-
-@requires_api_version('1.8')
-class TestBuildWithDockerignore(Cleanup, BaseTestCase):
- def runTest(self):
- base_dir = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, base_dir)
-
- with open(os.path.join(base_dir, 'Dockerfile'), 'w') as f:
- f.write("\n".join([
- 'FROM busybox',
- 'MAINTAINER docker-py',
- 'ADD . /test',
- ]))
-
- with open(os.path.join(base_dir, '.dockerignore'), 'w') as f:
- f.write("\n".join([
- 'ignored',
- 'Dockerfile',
- '.dockerignore',
- '', # empty line
- ]))
-
- with open(os.path.join(base_dir, 'not-ignored'), 'w') as f:
- f.write("this file should not be ignored")
-
- subdir = os.path.join(base_dir, 'ignored', 'subdir')
- os.makedirs(subdir)
- with open(os.path.join(subdir, 'file'), 'w') as f:
- f.write("this file should be ignored")
-
- tag = 'docker-py-test-build-with-dockerignore'
- stream = self.client.build(
- path=base_dir,
- tag=tag,
- )
- for chunk in stream:
- pass
-
- c = self.client.create_container(tag, ['ls', '-1A', '/test'])
- self.client.start(c)
- self.client.wait(c)
- logs = self.client.logs(c)
-
- if six.PY3:
- logs = logs.decode('utf-8')
-
- self.assertEqual(
- list(filter(None, logs.split('\n'))),
- ['not-ignored'],
- )
-
-#######################
-# PY SPECIFIC TESTS #
-#######################
-
-
-class TestRunShlex(BaseTestCase):
- def runTest(self):
- commands = [
- 'true',
- 'echo "The Young Descendant of Tepes & Septette for the '
- 'Dead Princess"',
- 'echo -n "The Young Descendant of Tepes & Septette for the '
- 'Dead Princess"',
- '/bin/sh -c "echo Hello World"',
- '/bin/sh -c \'echo "Hello World"\'',
- 'echo "\"Night of Nights\""',
- 'true && echo "Night of Nights"'
- ]
- for cmd in commands:
- container = self.client.create_container(BUSYBOX, cmd)
- id = container['Id']
- self.client.start(id)
- self.tmp_containers.append(id)
- exitcode = self.client.wait(id)
- self.assertEqual(exitcode, 0, msg=cmd)
-
-
-class TestLoadConfig(BaseTestCase):
- def runTest(self):
- folder = tempfile.mkdtemp()
- self.tmp_folders.append(folder)
- cfg_path = os.path.join(folder, '.dockercfg')
- f = open(cfg_path, 'w')
- auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
- f.write('auth = {0}\n'.format(auth_))
- f.write('email = sakuya@scarlet.net')
- f.close()
- cfg = docker.auth.load_config(cfg_path)
- self.assertNotEqual(cfg[docker.auth.INDEX_NAME], None)
- cfg = cfg[docker.auth.INDEX_NAME]
- self.assertEqual(cfg['username'], 'sakuya')
- self.assertEqual(cfg['password'], 'izayoi')
- self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
- self.assertEqual(cfg.get('Auth'), None)
-
-
-class TestLoadJSONConfig(BaseTestCase):
- def runTest(self):
- folder = tempfile.mkdtemp()
- self.tmp_folders.append(folder)
- cfg_path = os.path.join(folder, '.dockercfg')
- f = open(os.path.join(folder, '.dockercfg'), 'w')
- auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
- email_ = 'sakuya@scarlet.net'
- f.write('{{"{0}": {{"auth": "{1}", "email": "{2}"}}}}\n'.format(
- docker.auth.INDEX_URL, auth_, email_))
- f.close()
- cfg = docker.auth.load_config(cfg_path)
- self.assertNotEqual(cfg[docker.auth.INDEX_URL], None)
- cfg = cfg[docker.auth.INDEX_URL]
- self.assertEqual(cfg['username'], 'sakuya')
- self.assertEqual(cfg['password'], 'izayoi')
- self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
- self.assertEqual(cfg.get('Auth'), None)
-
-
-class TestAutoDetectVersion(unittest.TestCase):
- def test_client_init(self):
- client = docker_client(version='auto')
- client_version = client._version
- api_version = client.version(api_version=False)['ApiVersion']
- self.assertEqual(client_version, api_version)
- api_version_2 = client.version()['ApiVersion']
- self.assertEqual(client_version, api_version_2)
- client.close()
-
- def test_auto_client(self):
- client = docker.AutoVersionClient(**docker_client_kwargs())
- client_version = client._version
- api_version = client.version(api_version=False)['ApiVersion']
- self.assertEqual(client_version, api_version)
- api_version_2 = client.version()['ApiVersion']
- self.assertEqual(client_version, api_version_2)
- client.close()
- with self.assertRaises(docker.errors.DockerException):
- docker.AutoVersionClient(**docker_client_kwargs(version='1.11'))
-
-
-class TestConnectionTimeout(unittest.TestCase):
- def setUp(self):
- self.timeout = 0.5
- self.client = docker.client.Client(base_url='http://192.168.10.2:4243',
- timeout=self.timeout)
-
- def runTest(self):
- start = time.time()
- res = None
- # This call isn't supposed to complete, and it should fail fast.
- try:
- res = self.client.inspect_container('id')
- except:
- pass
- end = time.time()
- self.assertTrue(res is None)
- self.assertTrue(end - start < 2 * self.timeout)
-
-
-class UnixconnTestCase(unittest.TestCase):
- """
- Test UNIX socket connection adapter.
- """
-
- def test_resource_warnings(self):
- """
- Test no warnings are produced when using the client.
- """
-
- with warnings.catch_warnings(record=True) as w:
- warnings.simplefilter('always')
-
- client = docker_client()
- client.images()
- client.close()
- del client
-
- assert len(w) == 0, \
- "No warnings produced: {0}".format(w[0].message)
-
-
-####################
-# REGRESSION TESTS #
-####################
-
-class TestRegressions(BaseTestCase):
- def test_443(self):
- dfile = io.BytesIO()
- with self.assertRaises(docker.errors.APIError) as exc:
- for line in self.client.build(fileobj=dfile, tag="a/b/c"):
- pass
- self.assertEqual(exc.exception.response.status_code, 500)
- dfile.close()
-
- def test_542(self):
- self.client.start(
- self.client.create_container(BUSYBOX, ['true'])
- )
- result = self.client.containers(all=True, trunc=True)
- self.assertEqual(len(result[0]['Id']), 12)
-
- def test_647(self):
- with self.assertRaises(docker.errors.APIError):
- self.client.inspect_image('gensokyo.jp//kirisame')
-
- def test_649(self):
- self.client.timeout = None
- ctnr = self.client.create_container(BUSYBOX, ['sleep', '2'])
- self.client.start(ctnr)
- self.client.stop(ctnr)
-
- def test_715(self):
- ctnr = self.client.create_container(BUSYBOX, ['id', '-u'], user=1000)
- self.client.start(ctnr)
- self.client.wait(ctnr)
- logs = self.client.logs(ctnr)
- if six.PY3:
- logs = logs.decode('utf-8')
- assert logs == '1000\n'
+# FIXME: placeholder while we transition to the new folder architecture
+# Remove when merged in master and Jenkins is updated to find the tests
+# in the new location.
+from integration import * # flake8: noqa
diff --git a/tests/testdata/certs/ca.pem b/tests/unit/__init__.py
index e69de29..e69de29 100644
--- a/tests/testdata/certs/ca.pem
+++ b/tests/unit/__init__.py
diff --git a/tests/test.py b/tests/unit/api_test.py
index 9993484..e44e562 100644
--- a/tests/test.py
+++ b/tests/unit/api_test.py
@@ -34,9 +34,9 @@ import docker.efficiency
import requests
import six
-from . import base
+from .. import base
from . import fake_api
-from .helpers import make_tree
+from ..helpers import make_tree
import pytest
@@ -109,34 +109,9 @@ url_prefix = 'http+docker://localunixsocket/v{0}/'.format(
docker.constants.DEFAULT_DOCKER_API_VERSION)
-class Cleanup(object):
- if sys.version_info < (2, 7):
- # Provide a basic implementation of addCleanup for Python < 2.7
- def __init__(self, *args, **kwargs):
- super(Cleanup, self).__init__(*args, **kwargs)
- self._cleanups = []
-
- def tearDown(self):
- super(Cleanup, self).tearDown()
- ok = True
- while self._cleanups:
- fn, args, kwargs = self._cleanups.pop(-1)
- try:
- fn(*args, **kwargs)
- except KeyboardInterrupt:
- raise
- except:
- ok = False
- if not ok:
- raise
-
- def addCleanup(self, function, *args, **kwargs):
- self._cleanups.append((function, args, kwargs))
-
-
@mock.patch.multiple('docker.Client', get=fake_get, post=fake_post,
put=fake_put, delete=fake_delete)
-class DockerClientTest(Cleanup, base.BaseTestCase):
+class DockerClientTest(base.Cleanup, base.BaseTestCase):
def setUp(self):
self.client = docker.Client()
# Force-clear authconfig to avoid tampering with the tests
@@ -2376,7 +2351,7 @@ class DockerClientTest(Cleanup, base.BaseTestCase):
)
-class StreamTest(Cleanup, base.BaseTestCase):
+class StreamTest(base.Cleanup, base.BaseTestCase):
def setUp(self):
socket_dir = tempfile.mkdtemp()
diff --git a/tests/fake_api.py b/tests/unit/fake_api.py
index 5a89dee..5a89dee 100644
--- a/tests/fake_api.py
+++ b/tests/unit/fake_api.py
diff --git a/tests/fake_stat.py b/tests/unit/fake_stat.py
index a7f1029..a7f1029 100644
--- a/tests/fake_stat.py
+++ b/tests/unit/fake_stat.py
diff --git a/tests/testdata/certs/cert.pem b/tests/unit/testdata/certs/ca.pem
index e69de29..e69de29 100644
--- a/tests/testdata/certs/cert.pem
+++ b/tests/unit/testdata/certs/ca.pem
diff --git a/tests/testdata/certs/key.pem b/tests/unit/testdata/certs/cert.pem
index e69de29..e69de29 100644
--- a/tests/testdata/certs/key.pem
+++ b/tests/unit/testdata/certs/cert.pem
diff --git a/tests/unit/testdata/certs/key.pem b/tests/unit/testdata/certs/key.pem
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/unit/testdata/certs/key.pem
diff --git a/tests/testdata/context/Dockerfile b/tests/unit/testdata/context/Dockerfile
index d1ceac6..d1ceac6 100644
--- a/tests/testdata/context/Dockerfile
+++ b/tests/unit/testdata/context/Dockerfile
diff --git a/tests/testdata/context/ctx.tar.gz b/tests/unit/testdata/context/ctx.tar.gz
index c14e5b9..c14e5b9 100644
--- a/tests/testdata/context/ctx.tar.gz
+++ b/tests/unit/testdata/context/ctx.tar.gz
Binary files differ
diff --git a/tests/testdata/context/custom_dockerfile b/tests/unit/testdata/context/custom_dockerfile
index d1ceac6..d1ceac6 100644
--- a/tests/testdata/context/custom_dockerfile
+++ b/tests/unit/testdata/context/custom_dockerfile
diff --git a/tests/utils_test.py b/tests/unit/utils_test.py
index b67ac4e..71a382b 100644
--- a/tests/utils_test.py
+++ b/tests/unit/utils_test.py
@@ -14,8 +14,8 @@ from docker.utils import (
from docker.utils.ports import build_port_bindings, split_port
from docker.auth import resolve_repository_name, resolve_authconfig
-from . import base
-from .helpers import make_tree
+from .. import base
+from ..helpers import make_tree
import pytest
diff --git a/tox.ini b/tox.ini
index eb31bee..96b9177 100644
--- a/tox.ini
+++ b/tox.ini
@@ -5,7 +5,7 @@ skipsdist=True
[testenv]
usedevelop=True
commands =
- py.test --cov=docker tests/test.py tests/utils_test.py
+ py.test --cov=docker tests/unit/
deps =
-r{toxinidir}/test-requirements.txt
-r{toxinidir}/requirements.txt