diff options
author | Corentin Henry <corentinhenry@gmail.com> | 2018-10-08 11:02:31 -0700 |
---|---|---|
committer | Corentin Henry <corentinhenry@gmail.com> | 2018-11-27 17:01:48 -0800 |
commit | 5f157bbaca5ae62a5bb71547106beb6ef02bc485 (patch) | |
tree | 4c70fc861915e84f5aa752cf860bdfa1a3d4e150 | |
parent | e1e4048753aafc96571752cf54d96df7b24156d3 (diff) | |
download | docker-py-5f157bbaca5ae62a5bb71547106beb6ef02bc485.tar.gz |
implement stream demultiplexing for exec commands
fixes https://github.com/docker/docker-py/issues/1952
Signed-off-by: Corentin Henry <corentinhenry@gmail.com>
-rw-r--r-- | docker/api/client.py | 18 | ||||
-rw-r--r-- | docker/api/container.py | 13 | ||||
-rw-r--r-- | docker/api/exec_api.py | 13 | ||||
-rw-r--r-- | docker/models/containers.py | 70 | ||||
-rw-r--r-- | docker/utils/socket.py | 89 | ||||
-rw-r--r-- | tests/integration/api_container_test.py | 5 | ||||
-rw-r--r-- | tests/integration/api_exec_test.py | 5 | ||||
-rw-r--r-- | tests/unit/api_test.py | 2 | ||||
-rw-r--r-- | tests/unit/models_containers_test.py | 6 |
9 files changed, 181 insertions, 40 deletions
diff --git a/docker/api/client.py b/docker/api/client.py index 197846d..8a5a60b 100644 --- a/docker/api/client.py +++ b/docker/api/client.py @@ -32,7 +32,7 @@ from ..errors import ( from ..tls import TLSConfig from ..transport import SSLAdapter, UnixAdapter from ..utils import utils, check_resource, update_headers, config -from ..utils.socket import frames_iter, socket_raw_iter +from ..utils.socket import frames_iter, consume_socket_output, demux_adaptor from ..utils.json_stream import json_stream try: from ..transport import NpipeAdapter @@ -381,19 +381,23 @@ class APIClient( for out in response.iter_content(chunk_size, decode): yield out - def _read_from_socket(self, response, stream, tty=False): + def _read_from_socket(self, response, stream, tty=True, demux=False): socket = self._get_raw_response_socket(response) - gen = None - if tty is False: - gen = frames_iter(socket) + gen = frames_iter(socket, tty) + + if demux: + # The generator will output tuples (stdout, stderr) + gen = (demux_adaptor(*frame) for frame in gen) else: - gen = socket_raw_iter(socket) + # The generator will output strings + gen = (data for (_, data) in gen) if stream: return gen else: - return six.binary_type().join(gen) + # Wait for all the frames, concatenate them, and return the result + return consume_socket_output(gen, demux=demux) def _disable_socket_timeout(self, socket): """ Depending on the combination of python version and whether we're diff --git a/docker/api/container.py b/docker/api/container.py index fce73af..ab3b1cf 100644 --- a/docker/api/container.py +++ b/docker/api/container.py @@ -13,7 +13,7 @@ from ..types import ( class ContainerApiMixin(object): @utils.check_resource('container') def attach(self, container, stdout=True, stderr=True, - stream=False, logs=False): + stream=False, logs=False, demux=False): """ Attach to a container. @@ -28,11 +28,15 @@ class ContainerApiMixin(object): stream (bool): Return container output progressively as an iterator of strings, rather than a single string. logs (bool): Include the container's previous output. + demux (bool): Keep stdout and stderr separate. Returns: - By default, the container's output as a single string. + By default, the container's output as a single string (two if + ``demux=True``: one for stdout and one for stderr). - If ``stream=True``, an iterator of output strings. + If ``stream=True``, an iterator of output strings. If + ``demux=True``, two iterators are returned: one for stdout and one + for stderr. Raises: :py:class:`docker.errors.APIError` @@ -54,8 +58,7 @@ class ContainerApiMixin(object): response = self._post(u, headers=headers, params=params, stream=True) output = self._read_from_socket( - response, stream, self._check_is_tty(container) - ) + response, stream, self._check_is_tty(container), demux=demux) if stream: return CancellableStream(output, response) diff --git a/docker/api/exec_api.py b/docker/api/exec_api.py index 986d87f..3950991 100644 --- a/docker/api/exec_api.py +++ b/docker/api/exec_api.py @@ -118,7 +118,7 @@ class ExecApiMixin(object): @utils.check_resource('exec_id') def exec_start(self, exec_id, detach=False, tty=False, stream=False, - socket=False): + socket=False, demux=False): """ Start a previously set up exec instance. @@ -130,11 +130,14 @@ class ExecApiMixin(object): stream (bool): Stream response data. Default: False socket (bool): Return the connection socket to allow custom read/write operations. + demux (bool): Separate return stdin, stdout and stderr separately Returns: - (generator or str): If ``stream=True``, a generator yielding - response chunks. If ``socket=True``, a socket object for the - connection. A string containing response data otherwise. + + (generator or str or tuple): If ``stream=True``, a generator + yielding response chunks. If ``socket=True``, a socket object for + the connection. A string containing response data otherwise. If + ``demux=True``, stdin, stdout and stderr are separated. Raises: :py:class:`docker.errors.APIError` @@ -162,4 +165,4 @@ class ExecApiMixin(object): return self._result(res) if socket: return self._get_raw_response_socket(res) - return self._read_from_socket(res, stream, tty) + return self._read_from_socket(res, stream, tty=tty, demux=demux) diff --git a/docker/models/containers.py b/docker/models/containers.py index 9d6f2cc..15cc212 100644 --- a/docker/models/containers.py +++ b/docker/models/containers.py @@ -144,7 +144,7 @@ class Container(Model): def exec_run(self, cmd, stdout=True, stderr=True, stdin=False, tty=False, privileged=False, user='', detach=False, stream=False, - socket=False, environment=None, workdir=None): + socket=False, environment=None, workdir=None, demux=False): """ Run a command inside this container. Similar to ``docker exec``. @@ -166,6 +166,7 @@ class Container(Model): the following format ``["PASSWORD=xxx"]`` or ``{"PASSWORD": "xxx"}``. workdir (str): Path to working directory for this exec session + demux (bool): Return stdout and stderr separately Returns: (ExecResult): A tuple of (exit_code, output) @@ -180,6 +181,70 @@ class Container(Model): Raises: :py:class:`docker.errors.APIError` If the server returns an error. + + Example: + + Create a container that runs in the background + + >>> client = docker.from_env() + >>> container = client.containers.run( + ... 'bfirsh/reticulate-splines', detach=True) + + Prepare the command we are going to use. It prints "hello stdout" + in `stdout`, followed by "hello stderr" in `stderr`: + + >>> cmd = '/bin/sh -c "echo hello stdout ; echo hello stderr >&2"' + + We'll run this command with all four the combinations of ``stream`` + and ``demux``. + + With ``stream=False`` and ``demux=False``, the output is a string + that contains both the `stdout` and the `stderr` output: + + >>> res = container.exec_run(cmd, stream=False, demux=False) + >>> res.output + b'hello stderr\nhello stdout\n' + + With ``stream=True``, and ``demux=False``, the output is a + generator that yields strings containing the output of both + `stdout` and `stderr`: + + >>> res = container.exec_run(cmd, stream=True, demux=False) + >>> next(res.output) + b'hello stdout\n' + >>> next(res.output) + b'hello stderr\n' + >>> next(res.output) + Traceback (most recent call last): + File "<stdin>", line 1, in <module> + StopIteration + + With ``stream=True`` and ``demux=True``, the generator now + separates the streams, and yield tuples + ``(stdout, stderr)``: + + >>> res = container.exec_run(cmd, stream=True, demux=True) + >>> next(res.output) + (b'hello stdout\n', None) + >>> next(res.output) + (None, b'hello stderr\n') + >>> next(res.output) + Traceback (most recent call last): + File "<stdin>", line 1, in <module> + StopIteration + + Finally, with ``stream=False`` and ``demux=True``, the whole output + is returned, but the streams are still separated: + + >>> res = container.exec_run(cmd, stream=True, demux=True) + >>> next(res.output) + (b'hello stdout\n', None) + >>> next(res.output) + (None, b'hello stderr\n') + >>> next(res.output) + Traceback (most recent call last): + File "<stdin>", line 1, in <module> + StopIteration """ resp = self.client.api.exec_create( self.id, cmd, stdout=stdout, stderr=stderr, stdin=stdin, tty=tty, @@ -187,7 +252,8 @@ class Container(Model): workdir=workdir ) exec_output = self.client.api.exec_start( - resp['Id'], detach=detach, tty=tty, stream=stream, socket=socket + resp['Id'], detach=detach, tty=tty, stream=stream, socket=socket, + demux=demux ) if socket or stream: return ExecResult(None, exec_output) diff --git a/docker/utils/socket.py b/docker/utils/socket.py index 7b96d4f..fe4a332 100644 --- a/docker/utils/socket.py +++ b/docker/utils/socket.py @@ -12,6 +12,10 @@ except ImportError: NpipeSocket = type(None) +STDOUT = 1 +STDERR = 2 + + class SocketError(Exception): pass @@ -51,28 +55,43 @@ def read_exactly(socket, n): return data -def next_frame_size(socket): +def next_frame_header(socket): """ - Returns the size of the next frame of data waiting to be read from socket, - according to the protocol defined here: + Returns the stream and size of the next frame of data waiting to be read + from socket, according to the protocol defined here: - https://docs.docker.com/engine/reference/api/docker_remote_api_v1.24/#/attach-to-a-container + https://docs.docker.com/engine/api/v1.24/#attach-to-a-container """ try: data = read_exactly(socket, 8) except SocketError: - return -1 + return (-1, -1) + + stream, actual = struct.unpack('>BxxxL', data) + return (stream, actual) + - _, actual = struct.unpack('>BxxxL', data) - return actual +def frames_iter(socket, tty): + """ + Return a generator of frames read from socket. A frame is a tuple where + the first item is the stream number and the second item is a chunk of data. + + If the tty setting is enabled, the streams are multiplexed into the stdout + stream. + """ + if tty: + return ((STDOUT, frame) for frame in frames_iter_tty(socket)) + else: + return frames_iter_no_tty(socket) -def frames_iter(socket): +def frames_iter_no_tty(socket): """ - Returns a generator of frames read from socket + Returns a generator of data read from the socket when the tty setting is + not enabled. """ while True: - n = next_frame_size(socket) + (stream, n) = next_frame_header(socket) if n < 0: break while n > 0: @@ -84,13 +103,13 @@ def frames_iter(socket): # We have reached EOF return n -= data_length - yield result + yield (stream, result) -def socket_raw_iter(socket): +def frames_iter_tty(socket): """ - Returns a generator of data read from the socket. - This is used for non-multiplexed streams. + Return a generator of data read from the socket when the tty setting is + enabled. """ while True: result = read(socket) @@ -98,3 +117,45 @@ def socket_raw_iter(socket): # We have reached EOF return yield result + + +def consume_socket_output(frames, demux=False): + """ + Iterate through frames read from the socket and return the result. + + Args: + + demux (bool): + If False, stdout and stderr are multiplexed, and the result is the + concatenation of all the frames. If True, the streams are + demultiplexed, and the result is a 2-tuple where each item is the + concatenation of frames belonging to the same stream. + """ + if demux is False: + # If the streams are multiplexed, the generator returns strings, that + # we just need to concatenate. + return six.binary_type().join(frames) + + # If the streams are demultiplexed, the generator returns tuples + # (stdin, stdout, stderr) + out = [six.binary_type(), six.binary_type()] + for frame in frames: + for stream_id in [STDOUT, STDERR]: + # It is guaranteed that for each frame, one and only one stream + # is not None. + if frame[stream_id] is not None: + out[stream_id] += frame[stream_id] + return tuple(out) + + +def demux_adaptor(stream_id, data): + """ + Utility to demultiplex stdout and stderr when reading frames from the + socket. + """ + if stream_id == STDOUT: + return (data, None) + elif stream_id == STDERR: + return (None, data) + else: + raise ValueError('{0} is not a valid stream'.format(stream_id)) diff --git a/tests/integration/api_container_test.py b/tests/integration/api_container_test.py index 02f3603..83df342 100644 --- a/tests/integration/api_container_test.py +++ b/tests/integration/api_container_test.py @@ -7,7 +7,7 @@ from datetime import datetime import docker from docker.constants import IS_WINDOWS_PLATFORM -from docker.utils.socket import next_frame_size +from docker.utils.socket import next_frame_header from docker.utils.socket import read_exactly import pytest @@ -1242,7 +1242,8 @@ class AttachContainerTest(BaseAPIIntegrationTest): self.client.start(container) - next_size = next_frame_size(pty_stdout) + (stream, next_size) = next_frame_header(pty_stdout) + assert stream == 1 # correspond to stdout assert next_size == len(line) data = read_exactly(pty_stdout, next_size) assert data.decode('utf-8') == line diff --git a/tests/integration/api_exec_test.py b/tests/integration/api_exec_test.py index 1a5a4e5..ac64af7 100644 --- a/tests/integration/api_exec_test.py +++ b/tests/integration/api_exec_test.py @@ -1,4 +1,4 @@ -from docker.utils.socket import next_frame_size +from docker.utils.socket import next_frame_header from docker.utils.socket import read_exactly from .base import BaseAPIIntegrationTest, BUSYBOX @@ -91,7 +91,8 @@ class ExecTest(BaseAPIIntegrationTest): socket = self.client.exec_start(exec_id, socket=True) self.addCleanup(socket.close) - next_size = next_frame_size(socket) + (stream, next_size) = next_frame_header(socket) + assert stream == 1 # stdout (0 = stdin, 1 = stdout, 2 = stderr) assert next_size == len(line) data = read_exactly(socket, next_size) assert data.decode('utf-8') == line diff --git a/tests/unit/api_test.py b/tests/unit/api_test.py index af2bb1c..ccddbb1 100644 --- a/tests/unit/api_test.py +++ b/tests/unit/api_test.py @@ -83,7 +83,7 @@ def fake_delete(self, url, *args, **kwargs): return fake_request('DELETE', url, *args, **kwargs) -def fake_read_from_socket(self, response, stream, tty=False): +def fake_read_from_socket(self, response, stream, tty=False, demux=False): return six.binary_type() diff --git a/tests/unit/models_containers_test.py b/tests/unit/models_containers_test.py index 22dd241..24f6316 100644 --- a/tests/unit/models_containers_test.py +++ b/tests/unit/models_containers_test.py @@ -417,7 +417,8 @@ class ContainerTest(unittest.TestCase): workdir=None ) client.api.exec_start.assert_called_with( - FAKE_EXEC_ID, detach=False, tty=False, stream=True, socket=False + FAKE_EXEC_ID, detach=False, tty=False, stream=True, socket=False, + demux=False, ) def test_exec_run_failure(self): @@ -430,7 +431,8 @@ class ContainerTest(unittest.TestCase): workdir=None ) client.api.exec_start.assert_called_with( - FAKE_EXEC_ID, detach=False, tty=False, stream=False, socket=False + FAKE_EXEC_ID, detach=False, tty=False, stream=False, socket=False, + demux=False, ) def test_export(self): |