summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--funtests/transport.py6
-rw-r--r--kombu/tests/test_serialization.py4
-rw-r--r--kombu/tests/transport/virtual/test_base.py6
-rw-r--r--kombu/transport/virtual/__init__.py20
-rw-r--r--kombu/utils/__init__.py19
5 files changed, 26 insertions, 29 deletions
diff --git a/funtests/transport.py b/funtests/transport.py
index 18872336..6e943742 100644
--- a/funtests/transport.py
+++ b/funtests/transport.py
@@ -22,10 +22,6 @@ else:
from sha import new as _digest # noqa
-def say(msg):
- print(msg, file=sys.stderr)
-
-
def _nobuf(x):
return [str(i) if isinstance(i, buffer) else i for i in x]
@@ -51,7 +47,7 @@ def consumeN(conn, consumer, n=1, timeout=30):
if seconds >= timeout:
raise socket.timeout(msg)
if seconds > 1:
- say(msg)
+ print(msg)
if len(messages) >= n:
break
diff --git a/kombu/tests/test_serialization.py b/kombu/tests/test_serialization.py
index 23120711..c048f799 100644
--- a/kombu/tests/test_serialization.py
+++ b/kombu/tests/test_serialization.py
@@ -69,10 +69,6 @@ ApVGggcXVpY2sgYnJvd24gZm94IGp1bXBzIG92ZXIgdGggbGF6eSBkb2c=\
"""))
-def say(m):
- sys.stderr.write('%s\n' % (m, ))
-
-
registry.register('testS', lambda s: s, lambda s: 'decoded',
'application/testS', 'utf-8')
diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py
index d249c4e7..fbe8f064 100644
--- a/kombu/tests/transport/virtual/test_base.py
+++ b/kombu/tests/transport/virtual/test_base.py
@@ -393,8 +393,8 @@ class test_Channel(Case):
self.assertFalse(q._delivered)
@patch('kombu.transport.virtual.emergency_dump_state')
- @patch('kombu.transport.virtual.say')
- def test_restore_unacked_once_when_unrestored(self, say,
+ @patch('__builtin__.print')
+ def test_restore_unacked_once_when_unrestored(self, print_,
emergency_dump_state):
q = self.channel.qos
q._flush = Mock()
@@ -413,7 +413,7 @@ class test_Channel(Case):
self.channel.do_restore = True
q.restore_unacked_once()
- self.assertTrue(say.called)
+ self.assertTrue(print_.called)
self.assertTrue(emergency_dump_state.called)
def test_basic_recover(self):
diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py
index 35f46bd3..5a05386e 100644
--- a/kombu/transport/virtual/__init__.py
+++ b/kombu/transport/virtual/__init__.py
@@ -7,7 +7,7 @@ Virtual transport implementation.
Emulates the AMQ API for non-AMQ transports.
"""
-from __future__ import absolute_import, unicode_literals
+from __future__ import absolute_import, print_function, unicode_literals
import base64
import socket
@@ -24,7 +24,7 @@ from amqp.protocol import queue_declare_ok_t
from kombu.exceptions import ResourceError, ChannelError
from kombu.five import Empty, items, monotonic
-from kombu.utils import emergency_dump_state, say, uuid
+from kombu.utils import emergency_dump_state, uuid
from kombu.utils.encoding import str_to_bytes, bytes_to_str
from kombu.transport import base
@@ -44,6 +44,9 @@ Cannot redeclare exchange {0!r} in vhost {1!r} with \
different type, durable, autodelete or arguments value.\
"""
+RESTORING_FMT = 'Restoring {0!r} unacknowledged message(s)'
+RESTORE_PANIC_FMT = 'UNABLE TO RESTORE {0} MESSAGES: {1}'
+
class Base64(object):
@@ -196,7 +199,7 @@ class QoS(object):
delivered.clear()
return errors
- def restore_unacked_once(self):
+ def restore_unacked_once(self, stderr=None):
"""Restores all unacknowledged messages at shutdown/gc collect.
Will only be done once for each instance.
@@ -204,6 +207,7 @@ class QoS(object):
"""
self._on_collect.cancel()
self._flush()
+ stderr = sys.stderr if stderr is None else stderr
state = self._delivered
if not self.restore_at_shutdown or not self.channel.do_restore:
@@ -213,15 +217,15 @@ class QoS(object):
return
try:
if state:
- say('Restoring {0!r} unacknowledged message(s).',
- len(self._delivered))
+ print(RESTORING_FMT.format(len(self._delivered)),
+ file=stderr)
unrestored = self.restore_unacked()
if unrestored:
errors, messages = list(zip(*unrestored))
- say('UNABLE TO RESTORE {0} MESSAGES: {1}',
- len(errors), errors)
- emergency_dump_state(messages)
+ print(RESTORE_PANIC_FMT.format(len(errors), errors),
+ file=stderr)
+ emergency_dump_state(messages, stderr=stderr)
finally:
state.restored = True
diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py
index 43e1322d..ba4f88be 100644
--- a/kombu/utils/__init__.py
+++ b/kombu/utils/__init__.py
@@ -5,7 +5,7 @@ kombu.utils
Internal utilities.
"""
-from __future__ import absolute_import, print_function
+from __future__ import absolute_import, print_function, unicode_literals
import importlib
import numbers
@@ -35,7 +35,7 @@ except ImportError: # pragma: no cover
FILENO_ERRORS = (AttributeError, ValueError) # noqa
-__all__ = ['EqualityDict', 'say', 'uuid', 'maybe_list',
+__all__ = ['EqualityDict', 'uuid', 'maybe_list',
'fxrange', 'fxrangemax', 'retry_over_time',
'emergency_dump_state', 'cached_property',
'reprkwargs', 'reprcall', 'nested', 'fileno', 'maybe_fileno']
@@ -136,10 +136,6 @@ class EqualityDict(dict):
return dict.__delitem__(self, eqhash(key))
-def say(m, *fargs, **fkwargs):
- print(str(m).format(*fargs, **fkwargs), file=sys.stderr)
-
-
def uuid4():
# Workaround for http://bugs.python.org/issue4607
if ctypes and _uuid_generate_random: # pragma: no cover
@@ -241,21 +237,26 @@ def retry_over_time(fun, catch, args=[], kwargs={}, errback=None,
sleep(abs(int(tts) - tts))
-def emergency_dump_state(state, open_file=open, dump=None):
+def emergency_dump_state(state, open_file=open, dump=None, stderr=None):
from pprint import pformat
from tempfile import mktemp
+ stderr = sys.stderr if stderr is None else stderr
if dump is None:
import pickle
dump = pickle.dump
persist = mktemp()
- say('EMERGENCY DUMP STATE TO FILE -> {0} <-', persist)
+ print('EMERGENCY DUMP STATE TO FILE -> {0} <-'.format(persist),
+ file=stderr)
fh = open_file(persist, 'w')
try:
try:
dump(state, fh, protocol=0)
except Exception as exc:
- say('Cannot pickle state: {0!r}. Fallback to pformat.', exc)
+ print(
+ 'Cannot pickle state: {0!r}. Fallback to pformat.'.format(exc),
+ file=stderr,
+ )
fh.write(default_encode(pformat(state)))
finally:
fh.flush()