1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
|
import base64
import os
import tempfile
import time
import unittest
import warnings
import docker
from docker.utils import kwargs_from_env
from .base import BaseAPIIntegrationTest, BUSYBOX
class InformationTest(BaseAPIIntegrationTest):
def test_version(self):
res = self.client.version()
self.assertIn('GoVersion', res)
self.assertIn('Version', res)
self.assertEqual(len(res['Version'].split('.')), 3)
def test_info(self):
res = self.client.info()
self.assertIn('Containers', res)
self.assertIn('Images', res)
self.assertIn('Debug', res)
def test_search(self):
client = docker.APIClient(timeout=10, **kwargs_from_env())
res = client.search('busybox')
self.assertTrue(len(res) >= 1)
base_img = [x for x in res if x['name'] == 'busybox']
self.assertEqual(len(base_img), 1)
self.assertIn('description', base_img[0])
class LinkTest(BaseAPIIntegrationTest):
def test_remove_link(self):
# Create containers
container1 = self.client.create_container(
BUSYBOX, 'cat', detach=True, stdin_open=True
)
container1_id = container1['Id']
self.tmp_containers.append(container1_id)
self.client.start(container1_id)
# Create Link
# we don't want the first /
link_path = self.client.inspect_container(container1_id)['Name'][1:]
link_alias = 'mylink'
container2 = self.client.create_container(
BUSYBOX, 'cat', host_config=self.client.create_host_config(
links={link_path: link_alias}
)
)
container2_id = container2['Id']
self.tmp_containers.append(container2_id)
self.client.start(container2_id)
# Remove link
linked_name = self.client.inspect_container(container2_id)['Name'][1:]
link_name = '%s/%s' % (linked_name, link_alias)
self.client.remove_container(link_name, link=True)
# Link is gone
containers = self.client.containers(all=True)
retrieved = [x for x in containers if link_name in x['Names']]
self.assertEqual(len(retrieved), 0)
# Containers are still there
retrieved = [
x for x in containers if x['Id'].startswith(container1_id) or
x['Id'].startswith(container2_id)
]
self.assertEqual(len(retrieved), 2)
class LoadConfigTest(BaseAPIIntegrationTest):
def test_load_legacy_config(self):
folder = tempfile.mkdtemp()
self.tmp_folders.append(folder)
cfg_path = os.path.join(folder, '.dockercfg')
f = open(cfg_path, 'w')
auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
f.write('auth = {0}\n'.format(auth_))
f.write('email = sakuya@scarlet.net')
f.close()
cfg = docker.auth.load_config(cfg_path)
self.assertNotEqual(cfg[docker.auth.INDEX_NAME], None)
cfg = cfg[docker.auth.INDEX_NAME]
self.assertEqual(cfg['username'], 'sakuya')
self.assertEqual(cfg['password'], 'izayoi')
self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
self.assertEqual(cfg.get('Auth'), None)
def test_load_json_config(self):
folder = tempfile.mkdtemp()
self.tmp_folders.append(folder)
cfg_path = os.path.join(folder, '.dockercfg')
f = open(os.path.join(folder, '.dockercfg'), 'w')
auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
email_ = 'sakuya@scarlet.net'
f.write('{{"{0}": {{"auth": "{1}", "email": "{2}"}}}}\n'.format(
docker.auth.INDEX_URL, auth_, email_))
f.close()
cfg = docker.auth.load_config(cfg_path)
self.assertNotEqual(cfg[docker.auth.INDEX_URL], None)
cfg = cfg[docker.auth.INDEX_URL]
self.assertEqual(cfg['username'], 'sakuya')
self.assertEqual(cfg['password'], 'izayoi')
self.assertEqual(cfg['email'], 'sakuya@scarlet.net')
self.assertEqual(cfg.get('Auth'), None)
class AutoDetectVersionTest(unittest.TestCase):
def test_client_init(self):
client = docker.APIClient(version='auto', **kwargs_from_env())
client_version = client._version
api_version = client.version(api_version=False)['ApiVersion']
self.assertEqual(client_version, api_version)
api_version_2 = client.version()['ApiVersion']
self.assertEqual(client_version, api_version_2)
client.close()
class ConnectionTimeoutTest(unittest.TestCase):
def setUp(self):
self.timeout = 0.5
self.client = docker.api.APIClient(base_url='http://192.168.10.2:4243',
timeout=self.timeout)
def test_timeout(self):
start = time.time()
res = None
# This call isn't supposed to complete, and it should fail fast.
try:
res = self.client.inspect_container('id')
except:
pass
end = time.time()
self.assertTrue(res is None)
self.assertTrue(end - start < 2 * self.timeout)
class UnixconnTest(unittest.TestCase):
"""
Test UNIX socket connection adapter.
"""
def test_resource_warnings(self):
"""
Test no warnings are produced when using the client.
"""
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
client = docker.APIClient(**kwargs_from_env())
client.images()
client.close()
del client
assert len(w) == 0, \
"No warnings produced: {0}".format(w[0].message)
|