summaryrefslogtreecommitdiff
path: root/tests/integration/image_test.py
diff options
context:
space:
mode:
authorJoffrey F <joffrey@docker.com>2015-09-23 17:42:29 -0700
committerJoffrey F <joffrey@docker.com>2015-10-21 16:02:09 -0700
commit93a296fb0448d9fccdf9f40f7a9996f49ea22c48 (patch)
tree633371cbdf88435664b530c4ca835c4929d92274 /tests/integration/image_test.py
parent5a1c7ed8bf0ac9a3914de7c80c1c29c13f6a62ea (diff)
downloaddocker-py-reorganize_tests.tar.gz
Reorganize test directoriesreorganize_tests
More clearly separate unit and integration tests Allow splitting into multiple files Cleaner Signed-off-by: Joffrey F <joffrey@docker.com>
Diffstat (limited to 'tests/integration/image_test.py')
-rw-r--r--tests/integration/image_test.py235
1 files changed, 235 insertions, 0 deletions
diff --git a/tests/integration/image_test.py b/tests/integration/image_test.py
new file mode 100644
index 0000000..6c186e4
--- /dev/null
+++ b/tests/integration/image_test.py
@@ -0,0 +1,235 @@
+import contextlib
+import json
+import shutil
+import socket
+import tarfile
+import tempfile
+import threading
+
+import pytest
+import six
+from six.moves import BaseHTTPServer
+from six.moves import socketserver
+
+
+import docker
+
+from . import api_test
+
+BUSYBOX = api_test.BUSYBOX
+
+
+class ListImagesTest(api_test.BaseTestCase):
+ def test_images(self):
+ res1 = self.client.images(all=True)
+ self.assertIn('Id', res1[0])
+ res10 = res1[0]
+ self.assertIn('Created', res10)
+ self.assertIn('RepoTags', res10)
+ distinct = []
+ for img in res1:
+ if img['Id'] not in distinct:
+ distinct.append(img['Id'])
+ self.assertEqual(len(distinct), self.client.info()['Images'])
+
+ def test_images_quiet(self):
+ res1 = self.client.images(quiet=True)
+ self.assertEqual(type(res1[0]), six.text_type)
+
+
+class PullImageTest(api_test.BaseTestCase):
+ def test_pull(self):
+ try:
+ self.client.remove_image('hello-world')
+ except docker.errors.APIError:
+ pass
+ res = self.client.pull('hello-world')
+ self.tmp_imgs.append('hello-world')
+ self.assertEqual(type(res), six.text_type)
+ self.assertGreaterEqual(
+ len(self.client.images('hello-world')), 1
+ )
+ img_info = self.client.inspect_image('hello-world')
+ self.assertIn('Id', img_info)
+
+ def test_pull_streaming(self):
+ try:
+ self.client.remove_image('hello-world')
+ except docker.errors.APIError:
+ pass
+ stream = self.client.pull('hello-world', stream=True)
+ self.tmp_imgs.append('hello-world')
+ for chunk in stream:
+ if six.PY3:
+ chunk = chunk.decode('utf-8')
+ json.loads(chunk) # ensure chunk is a single, valid JSON blob
+ self.assertGreaterEqual(
+ len(self.client.images('hello-world')), 1
+ )
+ img_info = self.client.inspect_image('hello-world')
+ self.assertIn('Id', img_info)
+
+
+class CommitTest(api_test.BaseTestCase):
+ def test_commit(self):
+ container = self.client.create_container(BUSYBOX, ['touch', '/test'])
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ res = self.client.commit(id)
+ self.assertIn('Id', res)
+ img_id = res['Id']
+ self.tmp_imgs.append(img_id)
+ img = self.client.inspect_image(img_id)
+ self.assertIn('Container', img)
+ self.assertTrue(img['Container'].startswith(id))
+ self.assertIn('ContainerConfig', img)
+ self.assertIn('Image', img['ContainerConfig'])
+ self.assertEqual(BUSYBOX, img['ContainerConfig']['Image'])
+ busybox_id = self.client.inspect_image(BUSYBOX)['Id']
+ self.assertIn('Parent', img)
+ self.assertEqual(img['Parent'], busybox_id)
+
+
+class RemoveImageTest(api_test.BaseTestCase):
+ def test_remove(self):
+ container = self.client.create_container(BUSYBOX, ['touch', '/test'])
+ id = container['Id']
+ self.client.start(id)
+ self.tmp_containers.append(id)
+ res = self.client.commit(id)
+ self.assertIn('Id', res)
+ img_id = res['Id']
+ self.tmp_imgs.append(img_id)
+ self.client.remove_image(img_id, force=True)
+ images = self.client.images(all=True)
+ res = [x for x in images if x['Id'].startswith(img_id)]
+ self.assertEqual(len(res), 0)
+
+
+class ImportImageTest(api_test.BaseTestCase):
+ '''Base class for `docker import` test cases.'''
+
+ TAR_SIZE = 512 * 1024
+
+ def write_dummy_tar_content(self, n_bytes, tar_fd):
+ def extend_file(f, n_bytes):
+ f.seek(n_bytes - 1)
+ f.write(bytearray([65]))
+ f.seek(0)
+
+ tar = tarfile.TarFile(fileobj=tar_fd, mode='w')
+
+ with tempfile.NamedTemporaryFile() as f:
+ extend_file(f, n_bytes)
+ tarinfo = tar.gettarinfo(name=f.name, arcname='testdata')
+ tar.addfile(tarinfo, fileobj=f)
+
+ tar.close()
+
+ @contextlib.contextmanager
+ def dummy_tar_stream(self, n_bytes):
+ '''Yields a stream that is valid tar data of size n_bytes.'''
+ with tempfile.NamedTemporaryFile() as tar_file:
+ self.write_dummy_tar_content(n_bytes, tar_file)
+ tar_file.seek(0)
+ yield tar_file
+
+ @contextlib.contextmanager
+ def dummy_tar_file(self, n_bytes):
+ '''Yields the name of a valid tar file of size n_bytes.'''
+ with tempfile.NamedTemporaryFile() as tar_file:
+ self.write_dummy_tar_content(n_bytes, tar_file)
+ tar_file.seek(0)
+ yield tar_file.name
+
+ def test_import_from_bytes(self):
+ with self.dummy_tar_stream(n_bytes=500) as f:
+ content = f.read()
+
+ # The generic import_image() function cannot import in-memory bytes
+ # data that happens to be represented as a string type, because
+ # import_image() will try to use it as a filename and usually then
+ # trigger an exception. So we test the import_image_from_data()
+ # function instead.
+ statuses = self.client.import_image_from_data(
+ content, repository='test/import-from-bytes')
+
+ result_text = statuses.splitlines()[-1]
+ result = json.loads(result_text)
+
+ self.assertNotIn('error', result)
+
+ img_id = result['status']
+ self.tmp_imgs.append(img_id)
+
+ def test_import_from_file(self):
+ with self.dummy_tar_file(n_bytes=self.TAR_SIZE) as tar_filename:
+ # statuses = self.client.import_image(
+ # src=tar_filename, repository='test/import-from-file')
+ statuses = self.client.import_image_from_file(
+ tar_filename, repository='test/import-from-file')
+
+ result_text = statuses.splitlines()[-1]
+ result = json.loads(result_text)
+
+ self.assertNotIn('error', result)
+
+ self.assertIn('status', result)
+ img_id = result['status']
+ self.tmp_imgs.append(img_id)
+
+ def test_import_from_stream(self):
+ with self.dummy_tar_stream(n_bytes=self.TAR_SIZE) as tar_stream:
+ statuses = self.client.import_image(
+ src=tar_stream, repository='test/import-from-stream')
+ # statuses = self.client.import_image_from_stream(
+ # tar_stream, repository='test/import-from-stream')
+ result_text = statuses.splitlines()[-1]
+ result = json.loads(result_text)
+
+ self.assertNotIn('error', result)
+
+ self.assertIn('status', result)
+ img_id = result['status']
+ self.tmp_imgs.append(img_id)
+
+ @contextlib.contextmanager
+ def temporary_http_file_server(self, stream):
+ '''Serve data from an IO stream over HTTP.'''
+
+ class Handler(BaseHTTPServer.BaseHTTPRequestHandler):
+ def do_GET(self):
+ self.send_response(200)
+ self.send_header('Content-Type', 'application/x-tar')
+ self.end_headers()
+ shutil.copyfileobj(stream, self.wfile)
+
+ server = socketserver.TCPServer(('', 0), Handler)
+ thread = threading.Thread(target=server.serve_forever)
+ thread.setDaemon(True)
+ thread.start()
+
+ yield 'http://%s:%s' % (socket.gethostname(), server.server_address[1])
+
+ server.shutdown()
+
+ @pytest.mark.skipif(True, reason="Doesn't work inside a container - FIXME")
+ def test_import_from_url(self):
+ # The crappy test HTTP server doesn't handle large files well, so use
+ # a small file.
+ tar_size = 10240
+
+ with self.dummy_tar_stream(n_bytes=tar_size) as tar_data:
+ with self.temporary_http_file_server(tar_data) as url:
+ statuses = self.client.import_image(
+ src=url, repository='test/import-from-url')
+
+ result_text = statuses.splitlines()[-1]
+ result = json.loads(result_text)
+
+ self.assertNotIn('error', result)
+
+ self.assertIn('status', result)
+ img_id = result['status']
+ self.tmp_imgs.append(img_id)