1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
|
import json
import struct
import requests
import requests.exceptions
import six
import websocket
from . import constants
from . import errors
from .auth import auth
from .unixconn import unixconn
from .ssladapter import ssladapter
from .utils import utils, check_resource
from .tls import TLSConfig
class ClientBase(requests.Session):
def __init__(self, base_url=None, version=None,
timeout=constants.DEFAULT_TIMEOUT_SECONDS, tls=False):
super(ClientBase, self).__init__()
if tls and not base_url.startswith('https://'):
raise errors.TLSParameterError(
'If using TLS, the base_url argument must begin with '
'"https://".')
self.base_url = base_url
self.timeout = timeout
self._auth_configs = auth.load_config()
base_url = utils.parse_host(base_url)
if base_url.startswith('http+unix://'):
self._custom_adapter = unixconn.UnixAdapter(base_url, timeout)
self.mount('http+docker://', self._custom_adapter)
self.base_url = 'http+docker://localunixsocket'
else:
# Use SSLAdapter for the ability to specify SSL version
if isinstance(tls, TLSConfig):
tls.configure_client(self)
elif tls:
self._custom_adapter = ssladapter.SSLAdapter()
self.mount('https://', self._custom_adapter)
self.base_url = base_url
# version detection needs to be after unix adapter mounting
if version is None:
self._version = constants.DEFAULT_DOCKER_API_VERSION
elif isinstance(version, six.string_types):
if version.lower() == 'auto':
self._version = self._retrieve_server_version()
else:
self._version = version
else:
raise errors.DockerException(
'Version parameter must be a string or None. Found {0}'.format(
type(version).__name__
)
)
def _retrieve_server_version(self):
try:
return self.version(api_version=False)["ApiVersion"]
except KeyError:
raise errors.DockerException(
'Invalid response from docker daemon: key "ApiVersion"'
' is missing.'
)
except Exception as e:
raise errors.DockerException(
'Error while fetching server API version: {0}'.format(e)
)
def _set_request_timeout(self, kwargs):
"""Prepare the kwargs for an HTTP request by inserting the timeout
parameter, if not already present."""
kwargs.setdefault('timeout', self.timeout)
return kwargs
def _post(self, url, **kwargs):
return self.post(url, **self._set_request_timeout(kwargs))
def _get(self, url, **kwargs):
return self.get(url, **self._set_request_timeout(kwargs))
def _delete(self, url, **kwargs):
return self.delete(url, **self._set_request_timeout(kwargs))
def _url(self, path, versioned_api=True):
if versioned_api:
return '{0}/v{1}{2}'.format(self.base_url, self._version, path)
else:
return '{0}{1}'.format(self.base_url, path)
def _raise_for_status(self, response, explanation=None):
"""Raises stored :class:`APIError`, if one occurred."""
try:
response.raise_for_status()
except requests.exceptions.HTTPError as e:
raise errors.APIError(e, response, explanation=explanation)
def _result(self, response, json=False, binary=False):
assert not (json and binary)
self._raise_for_status(response)
if json:
return response.json()
if binary:
return response.content
return response.text
def _post_json(self, url, data, **kwargs):
# Go <1.1 can't unserialize null to a string
# so we do this disgusting thing here.
data2 = {}
if data is not None:
for k, v in six.iteritems(data):
if v is not None:
data2[k] = v
if 'headers' not in kwargs:
kwargs['headers'] = {}
kwargs['headers']['Content-Type'] = 'application/json'
return self._post(url, data=json.dumps(data2), **kwargs)
def _attach_params(self, override=None):
return override or {
'stdout': 1,
'stderr': 1,
'stream': 1
}
@check_resource
def _attach_websocket(self, container, params=None):
url = self._url("/containers/{0}/attach/ws".format(container))
req = requests.Request("POST", url, params=self._attach_params(params))
full_url = req.prepare().url
full_url = full_url.replace("http://", "ws://", 1)
full_url = full_url.replace("https://", "wss://", 1)
return self._create_websocket_connection(full_url)
def _create_websocket_connection(self, url):
return websocket.create_connection(url)
def _get_raw_response_socket(self, response):
self._raise_for_status(response)
if six.PY3:
sock = response.raw._fp.fp.raw
else:
sock = response.raw._fp.fp._sock
try:
# Keep a reference to the response to stop it being garbage
# collected. If the response is garbage collected, it will
# close TLS sockets.
sock._response = response
except AttributeError:
# UNIX sockets can't have attributes set on them, but that's
# fine because we won't be doing TLS over them
pass
return sock
def _stream_helper(self, response, decode=False):
"""Generator for data coming from a chunked-encoded HTTP response."""
if response.raw._fp.chunked:
reader = response.raw
while not reader.closed:
# this read call will block until we get a chunk
data = reader.read(1)
if not data:
break
if reader._fp.chunk_left:
data += reader.read(reader._fp.chunk_left)
if decode:
if six.PY3:
data = data.decode('utf-8')
data = json.loads(data)
yield data
else:
# Response isn't chunked, meaning we probably
# encountered an error immediately
yield self._result(response)
def _multiplexed_buffer_helper(self, response):
"""A generator of multiplexed data blocks read from a buffered
response."""
buf = self._result(response, binary=True)
walker = 0
while True:
if len(buf[walker:]) < 8:
break
_, length = struct.unpack_from('>BxxxL', buf[walker:])
start = walker + constants.STREAM_HEADER_SIZE_BYTES
end = start + length
walker = end
yield buf[start:end]
def _multiplexed_response_stream_helper(self, response):
"""A generator of multiplexed data blocks coming from a response
stream."""
# Disable timeout on the underlying socket to prevent
# Read timed out(s) for long running processes
socket = self._get_raw_response_socket(response)
if six.PY3:
socket._sock.settimeout(None)
else:
socket.settimeout(None)
while True:
header = response.raw.read(constants.STREAM_HEADER_SIZE_BYTES)
if not header:
break
_, length = struct.unpack('>BxxxL', header)
if not length:
break
data = response.raw.read(length)
if not data:
break
yield data
def get_adapter(self, url):
try:
return super(ClientBase, self).get_adapter(url)
except requests.exceptions.InvalidSchema as e:
if self._custom_adapter:
return self._custom_adapter
else:
raise e
@property
def api_version(self):
return self._version
|