From 2f9eb53128b780628fb753d4851331c6aea19510 Mon Sep 17 00:00:00 2001 From: Arcadiy Ivanov Date: Sat, 13 Jan 2018 11:39:34 -0500 Subject: Virtual transport timeout does not supesede period polling (#689) fixes #688 --- AUTHORS | 1 + kombu/transport/virtual/base.py | 2 ++ t/unit/transport/virtual/test_base.py | 9 +++++++++ 3 files changed, 12 insertions(+) diff --git a/AUTHORS b/AUTHORS index df265337..5813c597 100644 --- a/AUTHORS +++ b/AUTHORS @@ -144,3 +144,4 @@ Vincent Driessen Wido den Hollander Zach Smith Zhao Xiaohong +Arcadiy Ivanov diff --git a/kombu/transport/virtual/base.py b/kombu/transport/virtual/base.py index 7f2c61ad..8ccb8f6c 100644 --- a/kombu/transport/virtual/base.py +++ b/kombu/transport/virtual/base.py @@ -956,6 +956,8 @@ class Transport(base.Transport): time_start = monotonic() get = self.cycle.get polling_interval = self.polling_interval + if timeout and polling_interval and polling_interval > timeout: + polling_interval = timeout while 1: try: get(self._deliver, timeout=timeout) diff --git a/t/unit/transport/virtual/test_base.py b/t/unit/transport/virtual/test_base.py index 69c1eafe..003c8d53 100644 --- a/t/unit/transport/virtual/test_base.py +++ b/t/unit/transport/virtual/test_base.py @@ -4,6 +4,7 @@ import io import pytest import sys import warnings +import socket from case import MagicMock, Mock, patch @@ -12,6 +13,7 @@ from kombu.compression import compress from kombu.exceptions import ResourceError, ChannelError from kombu.transport import virtual from kombu.utils.uuid import uuid +from kombu.five import monotonic PY3 = sys.version_info[0] == 3 PRINT_FQDN = 'builtins.print' if PY3 else '__builtin__.print' @@ -556,6 +558,13 @@ class test_Transport: x = client(transport_options={'polling_interval': 32.3}) assert x.transport.polling_interval == 32.3 + def test_timeout_over_polling_interval(self): + x = client(transport_options=dict(polling_interval=60)) + start = monotonic() + with pytest.raises(socket.timeout): + x.transport.drain_events(x, timeout=.5) + assert monotonic() - start < 60 + def test_close_connection(self): c1 = self.transport.create_channel(self.transport) c2 = self.transport.create_channel(self.transport) -- cgit v1.2.1