diff options
author | Maxime Petazzoni <maxime.petazzoni@bulix.org> | 2015-07-01 17:55:59 -0700 |
---|---|---|
committer | Maxime Petazzoni <maxime.petazzoni@bulix.org> | 2015-07-01 17:55:59 -0700 |
commit | f3453aa23fca93209fcac881865e24f524090276 (patch) | |
tree | f27668e4b3fe47650d673bfaa0fcf159bb57c393 | |
parent | 2d7f1cfa1e3a93fe1aab67a64965fcbeda2fc08d (diff) | |
parent | 4f89ca73da90f4dc30b5b278f474bf5850e75488 (diff) | |
download | docker-py-f3453aa23fca93209fcac881865e24f524090276.tar.gz |
Merge pull request #652 from docker/clientbase_extraction
ClientBase extraction
-rw-r--r-- | docker/client.py | 232 | ||||
-rw-r--r-- | docker/clientbase.py | 235 |
2 files changed, 240 insertions, 227 deletions
diff --git a/docker/client.py b/docker/client.py index 321989f..bb12e00 100644 --- a/docker/client.py +++ b/docker/client.py @@ -12,246 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json import os import re import shlex -import struct import warnings from datetime import datetime -import requests -import requests.exceptions import six -import websocket - +from . import clientbase from . import constants from . import errors from .auth import auth -from .unixconn import unixconn -from .ssladapter import ssladapter from .utils import utils, check_resource -from .tls import TLSConfig - - -class Client(requests.Session): - def __init__(self, base_url=None, version=None, - timeout=constants.DEFAULT_TIMEOUT_SECONDS, tls=False): - super(Client, self).__init__() - - if tls and not base_url.startswith('https://'): - raise errors.TLSParameterError( - 'If using TLS, the base_url argument must begin with ' - '"https://".') - - self.base_url = base_url - self.timeout = timeout - - self._auth_configs = auth.load_config() - - base_url = utils.parse_host(base_url) - if base_url.startswith('http+unix://'): - self._adapter = unixconn.UnixAdapter(base_url, timeout) - self.mount('http+docker://', self._adapter) - self.base_url = 'http+docker://localunixsocket' - else: - # Use SSLAdapter for the ability to specify SSL version - if isinstance(tls, TLSConfig): - tls.configure_client(self) - elif tls: - self._adapter = ssladapter.SSLAdapter() - self.mount('https://', self._adapter) - self.base_url = base_url - - # version detection needs to be after unix adapter mounting - if version is None: - self._version = constants.DEFAULT_DOCKER_API_VERSION - elif isinstance(version, six.string_types): - if version.lower() == 'auto': - self._version = self._retrieve_server_version() - else: - self._version = version - else: - raise errors.DockerException( - 'Version parameter must be a string or None. Found {0}'.format( - type(version).__name__ - ) - ) - - def _retrieve_server_version(self): - try: - return self.version(api_version=False)["ApiVersion"] - except KeyError: - raise errors.DockerException( - 'Invalid response from docker daemon: key "ApiVersion"' - ' is missing.' - ) - except Exception as e: - raise errors.DockerException( - 'Error while fetching server API version: {0}'.format(e) - ) - - def _set_request_timeout(self, kwargs): - """Prepare the kwargs for an HTTP request by inserting the timeout - parameter, if not already present.""" - kwargs.setdefault('timeout', self.timeout) - return kwargs - - def _post(self, url, **kwargs): - return self.post(url, **self._set_request_timeout(kwargs)) - - def _get(self, url, **kwargs): - return self.get(url, **self._set_request_timeout(kwargs)) - - def _delete(self, url, **kwargs): - return self.delete(url, **self._set_request_timeout(kwargs)) - def _url(self, path, versioned_api=True): - if versioned_api: - return '{0}/v{1}{2}'.format(self.base_url, self._version, path) - else: - return '{0}{1}'.format(self.base_url, path) - - def _raise_for_status(self, response, explanation=None): - """Raises stored :class:`APIError`, if one occurred.""" - try: - response.raise_for_status() - except requests.exceptions.HTTPError as e: - raise errors.APIError(e, response, explanation=explanation) - - def _result(self, response, json=False, binary=False): - assert not (json and binary) - self._raise_for_status(response) - - if json: - return response.json() - if binary: - return response.content - return response.text - - def _post_json(self, url, data, **kwargs): - # Go <1.1 can't unserialize null to a string - # so we do this disgusting thing here. - data2 = {} - if data is not None: - for k, v in six.iteritems(data): - if v is not None: - data2[k] = v - - if 'headers' not in kwargs: - kwargs['headers'] = {} - kwargs['headers']['Content-Type'] = 'application/json' - return self._post(url, data=json.dumps(data2), **kwargs) - - def _attach_params(self, override=None): - return override or { - 'stdout': 1, - 'stderr': 1, - 'stream': 1 - } - - @check_resource - def _attach_websocket(self, container, params=None): - url = self._url("/containers/{0}/attach/ws".format(container)) - req = requests.Request("POST", url, params=self._attach_params(params)) - full_url = req.prepare().url - full_url = full_url.replace("http://", "ws://", 1) - full_url = full_url.replace("https://", "wss://", 1) - return self._create_websocket_connection(full_url) - - def _create_websocket_connection(self, url): - return websocket.create_connection(url) - - def _get_raw_response_socket(self, response): - self._raise_for_status(response) - if six.PY3: - sock = response.raw._fp.fp.raw - else: - sock = response.raw._fp.fp._sock - try: - # Keep a reference to the response to stop it being garbage - # collected. If the response is garbage collected, it will - # close TLS sockets. - sock._response = response - except AttributeError: - # UNIX sockets can't have attributes set on them, but that's - # fine because we won't be doing TLS over them - pass - - return sock - - def _stream_helper(self, response, decode=False): - """Generator for data coming from a chunked-encoded HTTP response.""" - if response.raw._fp.chunked: - reader = response.raw - while not reader.closed: - # this read call will block until we get a chunk - data = reader.read(1) - if not data: - break - if reader._fp.chunk_left: - data += reader.read(reader._fp.chunk_left) - if decode: - if six.PY3: - data = data.decode('utf-8') - data = json.loads(data) - yield data - else: - # Response isn't chunked, meaning we probably - # encountered an error immediately - yield self._result(response) - - def _multiplexed_buffer_helper(self, response): - """A generator of multiplexed data blocks read from a buffered - response.""" - buf = self._result(response, binary=True) - walker = 0 - while True: - if len(buf[walker:]) < 8: - break - _, length = struct.unpack_from('>BxxxL', buf[walker:]) - start = walker + constants.STREAM_HEADER_SIZE_BYTES - end = start + length - walker = end - yield buf[start:end] - - def _multiplexed_response_stream_helper(self, response): - """A generator of multiplexed data blocks coming from a response - stream.""" - - # Disable timeout on the underlying socket to prevent - # Read timed out(s) for long running processes - socket = self._get_raw_response_socket(response) - if six.PY3: - socket._sock.settimeout(None) - else: - socket.settimeout(None) - - while True: - header = response.raw.read(constants.STREAM_HEADER_SIZE_BYTES) - if not header: - break - _, length = struct.unpack('>BxxxL', header) - if not length: - break - data = response.raw.read(length) - if not data: - break - yield data - - @property - def api_version(self): - return self._version - - def get_adapter(self, url): - try: - return super(Client, self).get_adapter(url) - except requests.exceptions.InvalidSchema as e: - if self._adapter: - return self._adapter - raise e +class Client(clientbase.ClientBase): @check_resource def attach(self, container, stdout=True, stderr=True, stream=False, logs=False): @@ -745,7 +521,9 @@ class Client(requests.Session): @check_resource def inspect_image(self, image): return self._result( - self._get(self._url("/images/{0}/json".format(image))), + self._get( + self._url("/images/{0}/json".format(image.replace('/', '%2F'))) + ), True ) diff --git a/docker/clientbase.py b/docker/clientbase.py new file mode 100644 index 0000000..e51bf3e --- /dev/null +++ b/docker/clientbase.py @@ -0,0 +1,235 @@ +import json +import struct + +import requests +import requests.exceptions +import six +import websocket + + +from . import constants +from . import errors +from .auth import auth +from .unixconn import unixconn +from .ssladapter import ssladapter +from .utils import utils, check_resource +from .tls import TLSConfig + + +class ClientBase(requests.Session): + def __init__(self, base_url=None, version=None, + timeout=constants.DEFAULT_TIMEOUT_SECONDS, tls=False): + super(ClientBase, self).__init__() + + if tls and not base_url.startswith('https://'): + raise errors.TLSParameterError( + 'If using TLS, the base_url argument must begin with ' + '"https://".') + + self.base_url = base_url + self.timeout = timeout + + self._auth_configs = auth.load_config() + + base_url = utils.parse_host(base_url) + if base_url.startswith('http+unix://'): + self._custom_adapter = unixconn.UnixAdapter(base_url, timeout) + self.mount('http+docker://', self._custom_adapter) + self.base_url = 'http+docker://localunixsocket' + else: + # Use SSLAdapter for the ability to specify SSL version + if isinstance(tls, TLSConfig): + tls.configure_client(self) + elif tls: + self._custom_adapter = ssladapter.SSLAdapter() + self.mount('https://', self._custom_adapter) + self.base_url = base_url + + # version detection needs to be after unix adapter mounting + if version is None: + self._version = constants.DEFAULT_DOCKER_API_VERSION + elif isinstance(version, six.string_types): + if version.lower() == 'auto': + self._version = self._retrieve_server_version() + else: + self._version = version + else: + raise errors.DockerException( + 'Version parameter must be a string or None. Found {0}'.format( + type(version).__name__ + ) + ) + + def _retrieve_server_version(self): + try: + return self.version(api_version=False)["ApiVersion"] + except KeyError: + raise errors.DockerException( + 'Invalid response from docker daemon: key "ApiVersion"' + ' is missing.' + ) + except Exception as e: + raise errors.DockerException( + 'Error while fetching server API version: {0}'.format(e) + ) + + def _set_request_timeout(self, kwargs): + """Prepare the kwargs for an HTTP request by inserting the timeout + parameter, if not already present.""" + kwargs.setdefault('timeout', self.timeout) + return kwargs + + def _post(self, url, **kwargs): + return self.post(url, **self._set_request_timeout(kwargs)) + + def _get(self, url, **kwargs): + return self.get(url, **self._set_request_timeout(kwargs)) + + def _delete(self, url, **kwargs): + return self.delete(url, **self._set_request_timeout(kwargs)) + + def _url(self, path, versioned_api=True): + if versioned_api: + return '{0}/v{1}{2}'.format(self.base_url, self._version, path) + else: + return '{0}{1}'.format(self.base_url, path) + + def _raise_for_status(self, response, explanation=None): + """Raises stored :class:`APIError`, if one occurred.""" + try: + response.raise_for_status() + except requests.exceptions.HTTPError as e: + raise errors.APIError(e, response, explanation=explanation) + + def _result(self, response, json=False, binary=False): + assert not (json and binary) + self._raise_for_status(response) + + if json: + return response.json() + if binary: + return response.content + return response.text + + def _post_json(self, url, data, **kwargs): + # Go <1.1 can't unserialize null to a string + # so we do this disgusting thing here. + data2 = {} + if data is not None: + for k, v in six.iteritems(data): + if v is not None: + data2[k] = v + + if 'headers' not in kwargs: + kwargs['headers'] = {} + kwargs['headers']['Content-Type'] = 'application/json' + return self._post(url, data=json.dumps(data2), **kwargs) + + def _attach_params(self, override=None): + return override or { + 'stdout': 1, + 'stderr': 1, + 'stream': 1 + } + + @check_resource + def _attach_websocket(self, container, params=None): + url = self._url("/containers/{0}/attach/ws".format(container)) + req = requests.Request("POST", url, params=self._attach_params(params)) + full_url = req.prepare().url + full_url = full_url.replace("http://", "ws://", 1) + full_url = full_url.replace("https://", "wss://", 1) + return self._create_websocket_connection(full_url) + + def _create_websocket_connection(self, url): + return websocket.create_connection(url) + + def _get_raw_response_socket(self, response): + self._raise_for_status(response) + if six.PY3: + sock = response.raw._fp.fp.raw + else: + sock = response.raw._fp.fp._sock + try: + # Keep a reference to the response to stop it being garbage + # collected. If the response is garbage collected, it will + # close TLS sockets. + sock._response = response + except AttributeError: + # UNIX sockets can't have attributes set on them, but that's + # fine because we won't be doing TLS over them + pass + + return sock + + def _stream_helper(self, response, decode=False): + """Generator for data coming from a chunked-encoded HTTP response.""" + if response.raw._fp.chunked: + reader = response.raw + while not reader.closed: + # this read call will block until we get a chunk + data = reader.read(1) + if not data: + break + if reader._fp.chunk_left: + data += reader.read(reader._fp.chunk_left) + if decode: + if six.PY3: + data = data.decode('utf-8') + data = json.loads(data) + yield data + else: + # Response isn't chunked, meaning we probably + # encountered an error immediately + yield self._result(response) + + def _multiplexed_buffer_helper(self, response): + """A generator of multiplexed data blocks read from a buffered + response.""" + buf = self._result(response, binary=True) + walker = 0 + while True: + if len(buf[walker:]) < 8: + break + _, length = struct.unpack_from('>BxxxL', buf[walker:]) + start = walker + constants.STREAM_HEADER_SIZE_BYTES + end = start + length + walker = end + yield buf[start:end] + + def _multiplexed_response_stream_helper(self, response): + """A generator of multiplexed data blocks coming from a response + stream.""" + + # Disable timeout on the underlying socket to prevent + # Read timed out(s) for long running processes + socket = self._get_raw_response_socket(response) + if six.PY3: + socket._sock.settimeout(None) + else: + socket.settimeout(None) + + while True: + header = response.raw.read(constants.STREAM_HEADER_SIZE_BYTES) + if not header: + break + _, length = struct.unpack('>BxxxL', header) + if not length: + break + data = response.raw.read(length) + if not data: + break + yield data + + def get_adapter(self, url): + try: + return super(ClientBase, self).get_adapter(url) + except requests.exceptions.InvalidSchema as e: + if self._custom_adapter: + return self._custom_adapter + else: + raise e + + @property + def api_version(self): + return self._version |