summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoffrey F <joffrey@docker.com>2015-09-10 16:05:27 -0700
committerJoffrey F <joffrey@docker.com>2015-09-11 16:19:29 -0700
commit7a6980d4794f3efaf85ad65458fbfac4c3afdec7 (patch)
treeae8d53efc9b99d5ba5a6582e8c7573824bd69df0
parentc735cc69a02f6a34417a24d37a7934bc235bc99e (diff)
downloaddocker-py-api_splitup.tar.gz
Move client internals back to Client class. Remove obsolete ClientBase classapi_splitup
Signed-off-by: Joffrey F <joffrey@docker.com>
-rw-r--r--docker/api/build.py2
-rw-r--r--docker/client.py290
-rw-r--r--docker/clientbase.py288
3 files changed, 287 insertions, 293 deletions
diff --git a/docker/api/build.py b/docker/api/build.py
index 99f72b0..ce6fd46 100644
--- a/docker/api/build.py
+++ b/docker/api/build.py
@@ -77,7 +77,7 @@ class BuildApiMixin(object):
headers['Content-Encoding'] = encoding
if utils.compare_version('1.9', self._version) >= 0:
- self._set_auth_headers(self, headers)
+ self._set_auth_headers(headers)
response = self._post(
u,
diff --git a/docker/client.py b/docker/client.py
index 65777b7..2eb859c 100644
--- a/docker/client.py
+++ b/docker/client.py
@@ -12,19 +12,301 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from . import clientbase
-from . import errors
+import json
+import struct
+import sys
+
+import requests
+import requests.exceptions
+import six
+import websocket
+
+
from . import api
+from . import constants
+from . import errors
+from .auth import auth
+from .unixconn import unixconn
+from .ssladapter import ssladapter
+from .utils import utils, check_resource
+from .tls import TLSConfig
class Client(
- clientbase.ClientBase,
+ requests.Session,
api.BuildApiMixin,
api.ContainerApiMixin,
api.DaemonApiMixin,
api.ExecApiMixin,
api.ImageApiMixin):
- pass
+ def __init__(self, base_url=None, version=None,
+ timeout=constants.DEFAULT_TIMEOUT_SECONDS, tls=False):
+ super(Client, self).__init__()
+
+ if tls and not base_url.startswith('https://'):
+ raise errors.TLSParameterError(
+ 'If using TLS, the base_url argument must begin with '
+ '"https://".')
+
+ self.base_url = base_url
+ self.timeout = timeout
+
+ self._auth_configs = auth.load_config()
+
+ base_url = utils.parse_host(base_url, sys.platform)
+ if base_url.startswith('http+unix://'):
+ self._custom_adapter = unixconn.UnixAdapter(base_url, timeout)
+ self.mount('http+docker://', self._custom_adapter)
+ self.base_url = 'http+docker://localunixsocket'
+ else:
+ # Use SSLAdapter for the ability to specify SSL version
+ if isinstance(tls, TLSConfig):
+ tls.configure_client(self)
+ elif tls:
+ self._custom_adapter = ssladapter.SSLAdapter()
+ self.mount('https://', self._custom_adapter)
+ self.base_url = base_url
+
+ # version detection needs to be after unix adapter mounting
+ if version is None:
+ self._version = constants.DEFAULT_DOCKER_API_VERSION
+ elif isinstance(version, six.string_types):
+ if version.lower() == 'auto':
+ self._version = self._retrieve_server_version()
+ else:
+ self._version = version
+ else:
+ raise errors.DockerException(
+ 'Version parameter must be a string or None. Found {0}'.format(
+ type(version).__name__
+ )
+ )
+
+ def _retrieve_server_version(self):
+ try:
+ return self.version(api_version=False)["ApiVersion"]
+ except KeyError:
+ raise errors.DockerException(
+ 'Invalid response from docker daemon: key "ApiVersion"'
+ ' is missing.'
+ )
+ except Exception as e:
+ raise errors.DockerException(
+ 'Error while fetching server API version: {0}'.format(e)
+ )
+
+ def _set_request_timeout(self, kwargs):
+ """Prepare the kwargs for an HTTP request by inserting the timeout
+ parameter, if not already present."""
+ kwargs.setdefault('timeout', self.timeout)
+ return kwargs
+
+ def _post(self, url, **kwargs):
+ return self.post(url, **self._set_request_timeout(kwargs))
+
+ def _get(self, url, **kwargs):
+ return self.get(url, **self._set_request_timeout(kwargs))
+
+ def _delete(self, url, **kwargs):
+ return self.delete(url, **self._set_request_timeout(kwargs))
+
+ def _url(self, pathfmt, resource_id=None, versioned_api=True):
+ if resource_id and not isinstance(resource_id, six.string_types):
+ raise ValueError(
+ 'Expected a resource ID string but found {0} ({1}) '
+ 'instead'.format(resource_id, type(resource_id))
+ )
+ elif resource_id:
+ resource_id = six.moves.urllib.parse.quote_plus(resource_id)
+
+ if versioned_api:
+ return '{0}/v{1}{2}'.format(
+ self.base_url, self._version, pathfmt.format(resource_id)
+ )
+ else:
+ return '{0}{1}'.format(self.base_url, pathfmt.format(resource_id))
+
+ def _raise_for_status(self, response, explanation=None):
+ """Raises stored :class:`APIError`, if one occurred."""
+ try:
+ response.raise_for_status()
+ except requests.exceptions.HTTPError as e:
+ if e.response.status_code == 404:
+ raise errors.NotFound(e, response, explanation=explanation)
+ raise errors.APIError(e, response, explanation=explanation)
+
+ def _result(self, response, json=False, binary=False):
+ assert not (json and binary)
+ self._raise_for_status(response)
+
+ if json:
+ return response.json()
+ if binary:
+ return response.content
+ return response.text
+
+ def _post_json(self, url, data, **kwargs):
+ # Go <1.1 can't unserialize null to a string
+ # so we do this disgusting thing here.
+ data2 = {}
+ if data is not None:
+ for k, v in six.iteritems(data):
+ if v is not None:
+ data2[k] = v
+
+ if 'headers' not in kwargs:
+ kwargs['headers'] = {}
+ kwargs['headers']['Content-Type'] = 'application/json'
+ return self._post(url, data=json.dumps(data2), **kwargs)
+
+ def _attach_params(self, override=None):
+ return override or {
+ 'stdout': 1,
+ 'stderr': 1,
+ 'stream': 1
+ }
+
+ @check_resource
+ def _attach_websocket(self, container, params=None):
+ url = self._url("/containers/{0}/attach/ws", container)
+ req = requests.Request("POST", url, params=self._attach_params(params))
+ full_url = req.prepare().url
+ full_url = full_url.replace("http://", "ws://", 1)
+ full_url = full_url.replace("https://", "wss://", 1)
+ return self._create_websocket_connection(full_url)
+
+ def _create_websocket_connection(self, url):
+ return websocket.create_connection(url)
+
+ def _get_raw_response_socket(self, response):
+ self._raise_for_status(response)
+ if six.PY3:
+ sock = response.raw._fp.fp.raw
+ else:
+ sock = response.raw._fp.fp._sock
+ try:
+ # Keep a reference to the response to stop it being garbage
+ # collected. If the response is garbage collected, it will
+ # close TLS sockets.
+ sock._response = response
+ except AttributeError:
+ # UNIX sockets can't have attributes set on them, but that's
+ # fine because we won't be doing TLS over them
+ pass
+
+ return sock
+
+ def _stream_helper(self, response, decode=False):
+ """Generator for data coming from a chunked-encoded HTTP response."""
+ if response.raw._fp.chunked:
+ reader = response.raw
+ while not reader.closed:
+ # this read call will block until we get a chunk
+ data = reader.read(1)
+ if not data:
+ break
+ if reader._fp.chunk_left:
+ data += reader.read(reader._fp.chunk_left)
+ if decode:
+ if six.PY3:
+ data = data.decode('utf-8')
+ data = json.loads(data)
+ yield data
+ else:
+ # Response isn't chunked, meaning we probably
+ # encountered an error immediately
+ yield self._result(response)
+
+ def _multiplexed_buffer_helper(self, response):
+ """A generator of multiplexed data blocks read from a buffered
+ response."""
+ buf = self._result(response, binary=True)
+ walker = 0
+ while True:
+ if len(buf[walker:]) < 8:
+ break
+ _, length = struct.unpack_from('>BxxxL', buf[walker:])
+ start = walker + constants.STREAM_HEADER_SIZE_BYTES
+ end = start + length
+ walker = end
+ yield buf[start:end]
+
+ def _multiplexed_response_stream_helper(self, response):
+ """A generator of multiplexed data blocks coming from a response
+ stream."""
+
+ # Disable timeout on the underlying socket to prevent
+ # Read timed out(s) for long running processes
+ socket = self._get_raw_response_socket(response)
+ if six.PY3:
+ socket._sock.settimeout(None)
+ else:
+ socket.settimeout(None)
+
+ while True:
+ header = response.raw.read(constants.STREAM_HEADER_SIZE_BYTES)
+ if not header:
+ break
+ _, length = struct.unpack('>BxxxL', header)
+ if not length:
+ continue
+ data = response.raw.read(length)
+ if not data:
+ break
+ yield data
+
+ def _stream_raw_result_old(self, response):
+ ''' Stream raw output for API versions below 1.6 '''
+ self._raise_for_status(response)
+ for line in response.iter_lines(chunk_size=1,
+ decode_unicode=True):
+ # filter out keep-alive new lines
+ if line:
+ yield line
+
+ def _stream_raw_result(self, response):
+ ''' Stream result for TTY-enabled container above API 1.6 '''
+ self._raise_for_status(response)
+ for out in response.iter_content(chunk_size=1, decode_unicode=True):
+ yield out
+
+ def _get_result(self, container, stream, res):
+ cont = self.inspect_container(container)
+ return self._get_result_tty(stream, res, cont['Config']['Tty'])
+
+ def _get_result_tty(self, stream, res, is_tty):
+ # Stream multi-plexing was only introduced in API v1.6. Anything
+ # before that needs old-style streaming.
+ if utils.compare_version('1.6', self._version) < 0:
+ return self._stream_raw_result_old(res)
+
+ # We should also use raw streaming (without keep-alives)
+ # if we're dealing with a tty-enabled container.
+ if is_tty:
+ return self._stream_raw_result(res) if stream else \
+ self._result(res, binary=True)
+
+ self._raise_for_status(res)
+ sep = six.binary_type()
+ if stream:
+ return self._multiplexed_response_stream_helper(res)
+ else:
+ return sep.join(
+ [x for x in self._multiplexed_buffer_helper(res)]
+ )
+
+ def get_adapter(self, url):
+ try:
+ return super(Client, self).get_adapter(url)
+ except requests.exceptions.InvalidSchema as e:
+ if self._custom_adapter:
+ return self._custom_adapter
+ else:
+ raise e
+
+ @property
+ def api_version(self):
+ return self._version
class AutoVersionClient(Client):
diff --git a/docker/clientbase.py b/docker/clientbase.py
deleted file mode 100644
index a70339a..0000000
--- a/docker/clientbase.py
+++ /dev/null
@@ -1,288 +0,0 @@
-import json
-import struct
-import sys
-
-import requests
-import requests.exceptions
-import six
-import websocket
-
-
-from . import constants
-from . import errors
-from .auth import auth
-from .unixconn import unixconn
-from .ssladapter import ssladapter
-from .utils import utils, check_resource
-from .tls import TLSConfig
-
-
-class ClientBase(requests.Session):
- def __init__(self, base_url=None, version=None,
- timeout=constants.DEFAULT_TIMEOUT_SECONDS, tls=False):
- super(ClientBase, self).__init__()
-
- if tls and not base_url.startswith('https://'):
- raise errors.TLSParameterError(
- 'If using TLS, the base_url argument must begin with '
- '"https://".')
-
- self.base_url = base_url
- self.timeout = timeout
-
- self._auth_configs = auth.load_config()
-
- base_url = utils.parse_host(base_url, sys.platform)
- if base_url.startswith('http+unix://'):
- self._custom_adapter = unixconn.UnixAdapter(base_url, timeout)
- self.mount('http+docker://', self._custom_adapter)
- self.base_url = 'http+docker://localunixsocket'
- else:
- # Use SSLAdapter for the ability to specify SSL version
- if isinstance(tls, TLSConfig):
- tls.configure_client(self)
- elif tls:
- self._custom_adapter = ssladapter.SSLAdapter()
- self.mount('https://', self._custom_adapter)
- self.base_url = base_url
-
- # version detection needs to be after unix adapter mounting
- if version is None:
- self._version = constants.DEFAULT_DOCKER_API_VERSION
- elif isinstance(version, six.string_types):
- if version.lower() == 'auto':
- self._version = self._retrieve_server_version()
- else:
- self._version = version
- else:
- raise errors.DockerException(
- 'Version parameter must be a string or None. Found {0}'.format(
- type(version).__name__
- )
- )
-
- def _retrieve_server_version(self):
- try:
- return self.version(api_version=False)["ApiVersion"]
- except KeyError:
- raise errors.DockerException(
- 'Invalid response from docker daemon: key "ApiVersion"'
- ' is missing.'
- )
- except Exception as e:
- raise errors.DockerException(
- 'Error while fetching server API version: {0}'.format(e)
- )
-
- def _set_request_timeout(self, kwargs):
- """Prepare the kwargs for an HTTP request by inserting the timeout
- parameter, if not already present."""
- kwargs.setdefault('timeout', self.timeout)
- return kwargs
-
- def _post(self, url, **kwargs):
- return self.post(url, **self._set_request_timeout(kwargs))
-
- def _get(self, url, **kwargs):
- return self.get(url, **self._set_request_timeout(kwargs))
-
- def _delete(self, url, **kwargs):
- return self.delete(url, **self._set_request_timeout(kwargs))
-
- def _url(self, pathfmt, resource_id=None, versioned_api=True):
- if resource_id and not isinstance(resource_id, six.string_types):
- raise ValueError(
- 'Expected a resource ID string but found {0} ({1}) '
- 'instead'.format(resource_id, type(resource_id))
- )
- elif resource_id:
- resource_id = six.moves.urllib.parse.quote_plus(resource_id)
-
- if versioned_api:
- return '{0}/v{1}{2}'.format(
- self.base_url, self._version, pathfmt.format(resource_id)
- )
- else:
- return '{0}{1}'.format(self.base_url, pathfmt.format(resource_id))
-
- def _raise_for_status(self, response, explanation=None):
- """Raises stored :class:`APIError`, if one occurred."""
- try:
- response.raise_for_status()
- except requests.exceptions.HTTPError as e:
- if e.response.status_code == 404:
- raise errors.NotFound(e, response, explanation=explanation)
- raise errors.APIError(e, response, explanation=explanation)
-
- def _result(self, response, json=False, binary=False):
- assert not (json and binary)
- self._raise_for_status(response)
-
- if json:
- return response.json()
- if binary:
- return response.content
- return response.text
-
- def _post_json(self, url, data, **kwargs):
- # Go <1.1 can't unserialize null to a string
- # so we do this disgusting thing here.
- data2 = {}
- if data is not None:
- for k, v in six.iteritems(data):
- if v is not None:
- data2[k] = v
-
- if 'headers' not in kwargs:
- kwargs['headers'] = {}
- kwargs['headers']['Content-Type'] = 'application/json'
- return self._post(url, data=json.dumps(data2), **kwargs)
-
- def _attach_params(self, override=None):
- return override or {
- 'stdout': 1,
- 'stderr': 1,
- 'stream': 1
- }
-
- @check_resource
- def _attach_websocket(self, container, params=None):
- url = self._url("/containers/{0}/attach/ws", container)
- req = requests.Request("POST", url, params=self._attach_params(params))
- full_url = req.prepare().url
- full_url = full_url.replace("http://", "ws://", 1)
- full_url = full_url.replace("https://", "wss://", 1)
- return self._create_websocket_connection(full_url)
-
- def _create_websocket_connection(self, url):
- return websocket.create_connection(url)
-
- def _get_raw_response_socket(self, response):
- self._raise_for_status(response)
- if six.PY3:
- sock = response.raw._fp.fp.raw
- else:
- sock = response.raw._fp.fp._sock
- try:
- # Keep a reference to the response to stop it being garbage
- # collected. If the response is garbage collected, it will
- # close TLS sockets.
- sock._response = response
- except AttributeError:
- # UNIX sockets can't have attributes set on them, but that's
- # fine because we won't be doing TLS over them
- pass
-
- return sock
-
- def _stream_helper(self, response, decode=False):
- """Generator for data coming from a chunked-encoded HTTP response."""
- if response.raw._fp.chunked:
- reader = response.raw
- while not reader.closed:
- # this read call will block until we get a chunk
- data = reader.read(1)
- if not data:
- break
- if reader._fp.chunk_left:
- data += reader.read(reader._fp.chunk_left)
- if decode:
- if six.PY3:
- data = data.decode('utf-8')
- data = json.loads(data)
- yield data
- else:
- # Response isn't chunked, meaning we probably
- # encountered an error immediately
- yield self._result(response)
-
- def _multiplexed_buffer_helper(self, response):
- """A generator of multiplexed data blocks read from a buffered
- response."""
- buf = self._result(response, binary=True)
- walker = 0
- while True:
- if len(buf[walker:]) < 8:
- break
- _, length = struct.unpack_from('>BxxxL', buf[walker:])
- start = walker + constants.STREAM_HEADER_SIZE_BYTES
- end = start + length
- walker = end
- yield buf[start:end]
-
- def _multiplexed_response_stream_helper(self, response):
- """A generator of multiplexed data blocks coming from a response
- stream."""
-
- # Disable timeout on the underlying socket to prevent
- # Read timed out(s) for long running processes
- socket = self._get_raw_response_socket(response)
- if six.PY3:
- socket._sock.settimeout(None)
- else:
- socket.settimeout(None)
-
- while True:
- header = response.raw.read(constants.STREAM_HEADER_SIZE_BYTES)
- if not header:
- break
- _, length = struct.unpack('>BxxxL', header)
- if not length:
- continue
- data = response.raw.read(length)
- if not data:
- break
- yield data
-
- def _stream_raw_result_old(self, response):
- ''' Stream raw output for API versions below 1.6 '''
- self._raise_for_status(response)
- for line in response.iter_lines(chunk_size=1,
- decode_unicode=True):
- # filter out keep-alive new lines
- if line:
- yield line
-
- def _stream_raw_result(self, response):
- ''' Stream result for TTY-enabled container above API 1.6 '''
- self._raise_for_status(response)
- for out in response.iter_content(chunk_size=1, decode_unicode=True):
- yield out
-
- def _get_result(self, container, stream, res):
- cont = self.inspect_container(container)
- return self._get_result_tty(stream, res, cont['Config']['Tty'])
-
- def _get_result_tty(self, stream, res, is_tty):
- # Stream multi-plexing was only introduced in API v1.6. Anything
- # before that needs old-style streaming.
- if utils.compare_version('1.6', self._version) < 0:
- return self._stream_raw_result_old(res)
-
- # We should also use raw streaming (without keep-alives)
- # if we're dealing with a tty-enabled container.
- if is_tty:
- return self._stream_raw_result(res) if stream else \
- self._result(res, binary=True)
-
- self._raise_for_status(res)
- sep = six.binary_type()
- if stream:
- return self._multiplexed_response_stream_helper(res)
- else:
- return sep.join(
- [x for x in self._multiplexed_buffer_helper(res)]
- )
-
- def get_adapter(self, url):
- try:
- return super(ClientBase, self).get_adapter(url)
- except requests.exceptions.InvalidSchema as e:
- if self._custom_adapter:
- return self._custom_adapter
- else:
- raise e
-
- @property
- def api_version(self):
- return self._version