diff options
-rw-r--r-- | funtests/transport.py | 6 | ||||
-rw-r--r-- | kombu/tests/test_serialization.py | 4 | ||||
-rw-r--r-- | kombu/tests/transport/virtual/test_base.py | 6 | ||||
-rw-r--r-- | kombu/transport/virtual/__init__.py | 20 | ||||
-rw-r--r-- | kombu/utils/__init__.py | 19 |
5 files changed, 26 insertions, 29 deletions
diff --git a/funtests/transport.py b/funtests/transport.py index 18872336..6e943742 100644 --- a/funtests/transport.py +++ b/funtests/transport.py @@ -22,10 +22,6 @@ else: from sha import new as _digest # noqa -def say(msg): - print(msg, file=sys.stderr) - - def _nobuf(x): return [str(i) if isinstance(i, buffer) else i for i in x] @@ -51,7 +47,7 @@ def consumeN(conn, consumer, n=1, timeout=30): if seconds >= timeout: raise socket.timeout(msg) if seconds > 1: - say(msg) + print(msg) if len(messages) >= n: break diff --git a/kombu/tests/test_serialization.py b/kombu/tests/test_serialization.py index 23120711..c048f799 100644 --- a/kombu/tests/test_serialization.py +++ b/kombu/tests/test_serialization.py @@ -69,10 +69,6 @@ ApVGggcXVpY2sgYnJvd24gZm94IGp1bXBzIG92ZXIgdGggbGF6eSBkb2c=\ """)) -def say(m): - sys.stderr.write('%s\n' % (m, )) - - registry.register('testS', lambda s: s, lambda s: 'decoded', 'application/testS', 'utf-8') diff --git a/kombu/tests/transport/virtual/test_base.py b/kombu/tests/transport/virtual/test_base.py index d249c4e7..fbe8f064 100644 --- a/kombu/tests/transport/virtual/test_base.py +++ b/kombu/tests/transport/virtual/test_base.py @@ -393,8 +393,8 @@ class test_Channel(Case): self.assertFalse(q._delivered) @patch('kombu.transport.virtual.emergency_dump_state') - @patch('kombu.transport.virtual.say') - def test_restore_unacked_once_when_unrestored(self, say, + @patch('__builtin__.print') + def test_restore_unacked_once_when_unrestored(self, print_, emergency_dump_state): q = self.channel.qos q._flush = Mock() @@ -413,7 +413,7 @@ class test_Channel(Case): self.channel.do_restore = True q.restore_unacked_once() - self.assertTrue(say.called) + self.assertTrue(print_.called) self.assertTrue(emergency_dump_state.called) def test_basic_recover(self): diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index 35f46bd3..5a05386e 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -7,7 +7,7 @@ Virtual transport implementation. Emulates the AMQ API for non-AMQ transports. """ -from __future__ import absolute_import, unicode_literals +from __future__ import absolute_import, print_function, unicode_literals import base64 import socket @@ -24,7 +24,7 @@ from amqp.protocol import queue_declare_ok_t from kombu.exceptions import ResourceError, ChannelError from kombu.five import Empty, items, monotonic -from kombu.utils import emergency_dump_state, say, uuid +from kombu.utils import emergency_dump_state, uuid from kombu.utils.encoding import str_to_bytes, bytes_to_str from kombu.transport import base @@ -44,6 +44,9 @@ Cannot redeclare exchange {0!r} in vhost {1!r} with \ different type, durable, autodelete or arguments value.\ """ +RESTORING_FMT = 'Restoring {0!r} unacknowledged message(s)' +RESTORE_PANIC_FMT = 'UNABLE TO RESTORE {0} MESSAGES: {1}' + class Base64(object): @@ -196,7 +199,7 @@ class QoS(object): delivered.clear() return errors - def restore_unacked_once(self): + def restore_unacked_once(self, stderr=None): """Restores all unacknowledged messages at shutdown/gc collect. Will only be done once for each instance. @@ -204,6 +207,7 @@ class QoS(object): """ self._on_collect.cancel() self._flush() + stderr = sys.stderr if stderr is None else stderr state = self._delivered if not self.restore_at_shutdown or not self.channel.do_restore: @@ -213,15 +217,15 @@ class QoS(object): return try: if state: - say('Restoring {0!r} unacknowledged message(s).', - len(self._delivered)) + print(RESTORING_FMT.format(len(self._delivered)), + file=stderr) unrestored = self.restore_unacked() if unrestored: errors, messages = list(zip(*unrestored)) - say('UNABLE TO RESTORE {0} MESSAGES: {1}', - len(errors), errors) - emergency_dump_state(messages) + print(RESTORE_PANIC_FMT.format(len(errors), errors), + file=stderr) + emergency_dump_state(messages, stderr=stderr) finally: state.restored = True diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py index 43e1322d..ba4f88be 100644 --- a/kombu/utils/__init__.py +++ b/kombu/utils/__init__.py @@ -5,7 +5,7 @@ kombu.utils Internal utilities. """ -from __future__ import absolute_import, print_function +from __future__ import absolute_import, print_function, unicode_literals import importlib import numbers @@ -35,7 +35,7 @@ except ImportError: # pragma: no cover FILENO_ERRORS = (AttributeError, ValueError) # noqa -__all__ = ['EqualityDict', 'say', 'uuid', 'maybe_list', +__all__ = ['EqualityDict', 'uuid', 'maybe_list', 'fxrange', 'fxrangemax', 'retry_over_time', 'emergency_dump_state', 'cached_property', 'reprkwargs', 'reprcall', 'nested', 'fileno', 'maybe_fileno'] @@ -136,10 +136,6 @@ class EqualityDict(dict): return dict.__delitem__(self, eqhash(key)) -def say(m, *fargs, **fkwargs): - print(str(m).format(*fargs, **fkwargs), file=sys.stderr) - - def uuid4(): # Workaround for http://bugs.python.org/issue4607 if ctypes and _uuid_generate_random: # pragma: no cover @@ -241,21 +237,26 @@ def retry_over_time(fun, catch, args=[], kwargs={}, errback=None, sleep(abs(int(tts) - tts)) -def emergency_dump_state(state, open_file=open, dump=None): +def emergency_dump_state(state, open_file=open, dump=None, stderr=None): from pprint import pformat from tempfile import mktemp + stderr = sys.stderr if stderr is None else stderr if dump is None: import pickle dump = pickle.dump persist = mktemp() - say('EMERGENCY DUMP STATE TO FILE -> {0} <-', persist) + print('EMERGENCY DUMP STATE TO FILE -> {0} <-'.format(persist), + file=stderr) fh = open_file(persist, 'w') try: try: dump(state, fh, protocol=0) except Exception as exc: - say('Cannot pickle state: {0!r}. Fallback to pformat.', exc) + print( + 'Cannot pickle state: {0!r}. Fallback to pformat.'.format(exc), + file=stderr, + ) fh.write(default_encode(pformat(state))) finally: fh.flush() |