summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCorentin Henry <corentinhenry@gmail.com>2018-10-08 11:02:31 -0700
committerCorentin Henry <corentinhenry@gmail.com>2018-11-27 17:01:48 -0800
commit5f157bbaca5ae62a5bb71547106beb6ef02bc485 (patch)
tree4c70fc861915e84f5aa752cf860bdfa1a3d4e150
parente1e4048753aafc96571752cf54d96df7b24156d3 (diff)
downloaddocker-py-5f157bbaca5ae62a5bb71547106beb6ef02bc485.tar.gz
implement stream demultiplexing for exec commands
fixes https://github.com/docker/docker-py/issues/1952 Signed-off-by: Corentin Henry <corentinhenry@gmail.com>
-rw-r--r--docker/api/client.py18
-rw-r--r--docker/api/container.py13
-rw-r--r--docker/api/exec_api.py13
-rw-r--r--docker/models/containers.py70
-rw-r--r--docker/utils/socket.py89
-rw-r--r--tests/integration/api_container_test.py5
-rw-r--r--tests/integration/api_exec_test.py5
-rw-r--r--tests/unit/api_test.py2
-rw-r--r--tests/unit/models_containers_test.py6
9 files changed, 181 insertions, 40 deletions
diff --git a/docker/api/client.py b/docker/api/client.py
index 197846d..8a5a60b 100644
--- a/docker/api/client.py
+++ b/docker/api/client.py
@@ -32,7 +32,7 @@ from ..errors import (
from ..tls import TLSConfig
from ..transport import SSLAdapter, UnixAdapter
from ..utils import utils, check_resource, update_headers, config
-from ..utils.socket import frames_iter, socket_raw_iter
+from ..utils.socket import frames_iter, consume_socket_output, demux_adaptor
from ..utils.json_stream import json_stream
try:
from ..transport import NpipeAdapter
@@ -381,19 +381,23 @@ class APIClient(
for out in response.iter_content(chunk_size, decode):
yield out
- def _read_from_socket(self, response, stream, tty=False):
+ def _read_from_socket(self, response, stream, tty=True, demux=False):
socket = self._get_raw_response_socket(response)
- gen = None
- if tty is False:
- gen = frames_iter(socket)
+ gen = frames_iter(socket, tty)
+
+ if demux:
+ # The generator will output tuples (stdout, stderr)
+ gen = (demux_adaptor(*frame) for frame in gen)
else:
- gen = socket_raw_iter(socket)
+ # The generator will output strings
+ gen = (data for (_, data) in gen)
if stream:
return gen
else:
- return six.binary_type().join(gen)
+ # Wait for all the frames, concatenate them, and return the result
+ return consume_socket_output(gen, demux=demux)
def _disable_socket_timeout(self, socket):
""" Depending on the combination of python version and whether we're
diff --git a/docker/api/container.py b/docker/api/container.py
index fce73af..ab3b1cf 100644
--- a/docker/api/container.py
+++ b/docker/api/container.py
@@ -13,7 +13,7 @@ from ..types import (
class ContainerApiMixin(object):
@utils.check_resource('container')
def attach(self, container, stdout=True, stderr=True,
- stream=False, logs=False):
+ stream=False, logs=False, demux=False):
"""
Attach to a container.
@@ -28,11 +28,15 @@ class ContainerApiMixin(object):
stream (bool): Return container output progressively as an iterator
of strings, rather than a single string.
logs (bool): Include the container's previous output.
+ demux (bool): Keep stdout and stderr separate.
Returns:
- By default, the container's output as a single string.
+ By default, the container's output as a single string (two if
+ ``demux=True``: one for stdout and one for stderr).
- If ``stream=True``, an iterator of output strings.
+ If ``stream=True``, an iterator of output strings. If
+ ``demux=True``, two iterators are returned: one for stdout and one
+ for stderr.
Raises:
:py:class:`docker.errors.APIError`
@@ -54,8 +58,7 @@ class ContainerApiMixin(object):
response = self._post(u, headers=headers, params=params, stream=True)
output = self._read_from_socket(
- response, stream, self._check_is_tty(container)
- )
+ response, stream, self._check_is_tty(container), demux=demux)
if stream:
return CancellableStream(output, response)
diff --git a/docker/api/exec_api.py b/docker/api/exec_api.py
index 986d87f..3950991 100644
--- a/docker/api/exec_api.py
+++ b/docker/api/exec_api.py
@@ -118,7 +118,7 @@ class ExecApiMixin(object):
@utils.check_resource('exec_id')
def exec_start(self, exec_id, detach=False, tty=False, stream=False,
- socket=False):
+ socket=False, demux=False):
"""
Start a previously set up exec instance.
@@ -130,11 +130,14 @@ class ExecApiMixin(object):
stream (bool): Stream response data. Default: False
socket (bool): Return the connection socket to allow custom
read/write operations.
+ demux (bool): Separate return stdin, stdout and stderr separately
Returns:
- (generator or str): If ``stream=True``, a generator yielding
- response chunks. If ``socket=True``, a socket object for the
- connection. A string containing response data otherwise.
+
+ (generator or str or tuple): If ``stream=True``, a generator
+ yielding response chunks. If ``socket=True``, a socket object for
+ the connection. A string containing response data otherwise. If
+ ``demux=True``, stdin, stdout and stderr are separated.
Raises:
:py:class:`docker.errors.APIError`
@@ -162,4 +165,4 @@ class ExecApiMixin(object):
return self._result(res)
if socket:
return self._get_raw_response_socket(res)
- return self._read_from_socket(res, stream, tty)
+ return self._read_from_socket(res, stream, tty=tty, demux=demux)
diff --git a/docker/models/containers.py b/docker/models/containers.py
index 9d6f2cc..15cc212 100644
--- a/docker/models/containers.py
+++ b/docker/models/containers.py
@@ -144,7 +144,7 @@ class Container(Model):
def exec_run(self, cmd, stdout=True, stderr=True, stdin=False, tty=False,
privileged=False, user='', detach=False, stream=False,
- socket=False, environment=None, workdir=None):
+ socket=False, environment=None, workdir=None, demux=False):
"""
Run a command inside this container. Similar to
``docker exec``.
@@ -166,6 +166,7 @@ class Container(Model):
the following format ``["PASSWORD=xxx"]`` or
``{"PASSWORD": "xxx"}``.
workdir (str): Path to working directory for this exec session
+ demux (bool): Return stdout and stderr separately
Returns:
(ExecResult): A tuple of (exit_code, output)
@@ -180,6 +181,70 @@ class Container(Model):
Raises:
:py:class:`docker.errors.APIError`
If the server returns an error.
+
+ Example:
+
+ Create a container that runs in the background
+
+ >>> client = docker.from_env()
+ >>> container = client.containers.run(
+ ... 'bfirsh/reticulate-splines', detach=True)
+
+ Prepare the command we are going to use. It prints "hello stdout"
+ in `stdout`, followed by "hello stderr" in `stderr`:
+
+ >>> cmd = '/bin/sh -c "echo hello stdout ; echo hello stderr >&2"'
+
+ We'll run this command with all four the combinations of ``stream``
+ and ``demux``.
+
+ With ``stream=False`` and ``demux=False``, the output is a string
+ that contains both the `stdout` and the `stderr` output:
+
+ >>> res = container.exec_run(cmd, stream=False, demux=False)
+ >>> res.output
+ b'hello stderr\nhello stdout\n'
+
+ With ``stream=True``, and ``demux=False``, the output is a
+ generator that yields strings containing the output of both
+ `stdout` and `stderr`:
+
+ >>> res = container.exec_run(cmd, stream=True, demux=False)
+ >>> next(res.output)
+ b'hello stdout\n'
+ >>> next(res.output)
+ b'hello stderr\n'
+ >>> next(res.output)
+ Traceback (most recent call last):
+ File "<stdin>", line 1, in <module>
+ StopIteration
+
+ With ``stream=True`` and ``demux=True``, the generator now
+ separates the streams, and yield tuples
+ ``(stdout, stderr)``:
+
+ >>> res = container.exec_run(cmd, stream=True, demux=True)
+ >>> next(res.output)
+ (b'hello stdout\n', None)
+ >>> next(res.output)
+ (None, b'hello stderr\n')
+ >>> next(res.output)
+ Traceback (most recent call last):
+ File "<stdin>", line 1, in <module>
+ StopIteration
+
+ Finally, with ``stream=False`` and ``demux=True``, the whole output
+ is returned, but the streams are still separated:
+
+ >>> res = container.exec_run(cmd, stream=True, demux=True)
+ >>> next(res.output)
+ (b'hello stdout\n', None)
+ >>> next(res.output)
+ (None, b'hello stderr\n')
+ >>> next(res.output)
+ Traceback (most recent call last):
+ File "<stdin>", line 1, in <module>
+ StopIteration
"""
resp = self.client.api.exec_create(
self.id, cmd, stdout=stdout, stderr=stderr, stdin=stdin, tty=tty,
@@ -187,7 +252,8 @@ class Container(Model):
workdir=workdir
)
exec_output = self.client.api.exec_start(
- resp['Id'], detach=detach, tty=tty, stream=stream, socket=socket
+ resp['Id'], detach=detach, tty=tty, stream=stream, socket=socket,
+ demux=demux
)
if socket or stream:
return ExecResult(None, exec_output)
diff --git a/docker/utils/socket.py b/docker/utils/socket.py
index 7b96d4f..fe4a332 100644
--- a/docker/utils/socket.py
+++ b/docker/utils/socket.py
@@ -12,6 +12,10 @@ except ImportError:
NpipeSocket = type(None)
+STDOUT = 1
+STDERR = 2
+
+
class SocketError(Exception):
pass
@@ -51,28 +55,43 @@ def read_exactly(socket, n):
return data
-def next_frame_size(socket):
+def next_frame_header(socket):
"""
- Returns the size of the next frame of data waiting to be read from socket,
- according to the protocol defined here:
+ Returns the stream and size of the next frame of data waiting to be read
+ from socket, according to the protocol defined here:
- https://docs.docker.com/engine/reference/api/docker_remote_api_v1.24/#/attach-to-a-container
+ https://docs.docker.com/engine/api/v1.24/#attach-to-a-container
"""
try:
data = read_exactly(socket, 8)
except SocketError:
- return -1
+ return (-1, -1)
+
+ stream, actual = struct.unpack('>BxxxL', data)
+ return (stream, actual)
+
- _, actual = struct.unpack('>BxxxL', data)
- return actual
+def frames_iter(socket, tty):
+ """
+ Return a generator of frames read from socket. A frame is a tuple where
+ the first item is the stream number and the second item is a chunk of data.
+
+ If the tty setting is enabled, the streams are multiplexed into the stdout
+ stream.
+ """
+ if tty:
+ return ((STDOUT, frame) for frame in frames_iter_tty(socket))
+ else:
+ return frames_iter_no_tty(socket)
-def frames_iter(socket):
+def frames_iter_no_tty(socket):
"""
- Returns a generator of frames read from socket
+ Returns a generator of data read from the socket when the tty setting is
+ not enabled.
"""
while True:
- n = next_frame_size(socket)
+ (stream, n) = next_frame_header(socket)
if n < 0:
break
while n > 0:
@@ -84,13 +103,13 @@ def frames_iter(socket):
# We have reached EOF
return
n -= data_length
- yield result
+ yield (stream, result)
-def socket_raw_iter(socket):
+def frames_iter_tty(socket):
"""
- Returns a generator of data read from the socket.
- This is used for non-multiplexed streams.
+ Return a generator of data read from the socket when the tty setting is
+ enabled.
"""
while True:
result = read(socket)
@@ -98,3 +117,45 @@ def socket_raw_iter(socket):
# We have reached EOF
return
yield result
+
+
+def consume_socket_output(frames, demux=False):
+ """
+ Iterate through frames read from the socket and return the result.
+
+ Args:
+
+ demux (bool):
+ If False, stdout and stderr are multiplexed, and the result is the
+ concatenation of all the frames. If True, the streams are
+ demultiplexed, and the result is a 2-tuple where each item is the
+ concatenation of frames belonging to the same stream.
+ """
+ if demux is False:
+ # If the streams are multiplexed, the generator returns strings, that
+ # we just need to concatenate.
+ return six.binary_type().join(frames)
+
+ # If the streams are demultiplexed, the generator returns tuples
+ # (stdin, stdout, stderr)
+ out = [six.binary_type(), six.binary_type()]
+ for frame in frames:
+ for stream_id in [STDOUT, STDERR]:
+ # It is guaranteed that for each frame, one and only one stream
+ # is not None.
+ if frame[stream_id] is not None:
+ out[stream_id] += frame[stream_id]
+ return tuple(out)
+
+
+def demux_adaptor(stream_id, data):
+ """
+ Utility to demultiplex stdout and stderr when reading frames from the
+ socket.
+ """
+ if stream_id == STDOUT:
+ return (data, None)
+ elif stream_id == STDERR:
+ return (None, data)
+ else:
+ raise ValueError('{0} is not a valid stream'.format(stream_id))
diff --git a/tests/integration/api_container_test.py b/tests/integration/api_container_test.py
index 02f3603..83df342 100644
--- a/tests/integration/api_container_test.py
+++ b/tests/integration/api_container_test.py
@@ -7,7 +7,7 @@ from datetime import datetime
import docker
from docker.constants import IS_WINDOWS_PLATFORM
-from docker.utils.socket import next_frame_size
+from docker.utils.socket import next_frame_header
from docker.utils.socket import read_exactly
import pytest
@@ -1242,7 +1242,8 @@ class AttachContainerTest(BaseAPIIntegrationTest):
self.client.start(container)
- next_size = next_frame_size(pty_stdout)
+ (stream, next_size) = next_frame_header(pty_stdout)
+ assert stream == 1 # correspond to stdout
assert next_size == len(line)
data = read_exactly(pty_stdout, next_size)
assert data.decode('utf-8') == line
diff --git a/tests/integration/api_exec_test.py b/tests/integration/api_exec_test.py
index 1a5a4e5..ac64af7 100644
--- a/tests/integration/api_exec_test.py
+++ b/tests/integration/api_exec_test.py
@@ -1,4 +1,4 @@
-from docker.utils.socket import next_frame_size
+from docker.utils.socket import next_frame_header
from docker.utils.socket import read_exactly
from .base import BaseAPIIntegrationTest, BUSYBOX
@@ -91,7 +91,8 @@ class ExecTest(BaseAPIIntegrationTest):
socket = self.client.exec_start(exec_id, socket=True)
self.addCleanup(socket.close)
- next_size = next_frame_size(socket)
+ (stream, next_size) = next_frame_header(socket)
+ assert stream == 1 # stdout (0 = stdin, 1 = stdout, 2 = stderr)
assert next_size == len(line)
data = read_exactly(socket, next_size)
assert data.decode('utf-8') == line
diff --git a/tests/unit/api_test.py b/tests/unit/api_test.py
index af2bb1c..ccddbb1 100644
--- a/tests/unit/api_test.py
+++ b/tests/unit/api_test.py
@@ -83,7 +83,7 @@ def fake_delete(self, url, *args, **kwargs):
return fake_request('DELETE', url, *args, **kwargs)
-def fake_read_from_socket(self, response, stream, tty=False):
+def fake_read_from_socket(self, response, stream, tty=False, demux=False):
return six.binary_type()
diff --git a/tests/unit/models_containers_test.py b/tests/unit/models_containers_test.py
index 22dd241..24f6316 100644
--- a/tests/unit/models_containers_test.py
+++ b/tests/unit/models_containers_test.py
@@ -417,7 +417,8 @@ class ContainerTest(unittest.TestCase):
workdir=None
)
client.api.exec_start.assert_called_with(
- FAKE_EXEC_ID, detach=False, tty=False, stream=True, socket=False
+ FAKE_EXEC_ID, detach=False, tty=False, stream=True, socket=False,
+ demux=False,
)
def test_exec_run_failure(self):
@@ -430,7 +431,8 @@ class ContainerTest(unittest.TestCase):
workdir=None
)
client.api.exec_start.assert_called_with(
- FAKE_EXEC_ID, detach=False, tty=False, stream=False, socket=False
+ FAKE_EXEC_ID, detach=False, tty=False, stream=False, socket=False,
+ demux=False,
)
def test_export(self):