diff options
author | Ask Solem <ask@celeryproject.org> | 2014-01-16 15:01:19 +0000 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2014-01-16 15:01:19 +0000 |
commit | 97abf06e054398f22b7bf77aa6349d8a2ccd860b (patch) | |
tree | d4cabb26972ef44dbd836d3f7d43470654a50bd2 | |
parent | bff6ddb0ea116e7c3b06f6cf2509c6411b3e4803 (diff) | |
parent | 07a312582d90b1303ff5d675613935392f6a6283 (diff) | |
download | py-amqp-readwrite.tar.gz |
Merge branch 'master' into readwritereadwrite
Conflicts:
amqp/transport.py
-rw-r--r-- | AUTHORS | 2 | ||||
-rw-r--r-- | Changelog | 58 | ||||
-rw-r--r-- | README.rst | 5 | ||||
-rw-r--r-- | amqp/__init__.py | 2 | ||||
-rw-r--r-- | amqp/abstract_channel.py | 6 | ||||
-rw-r--r-- | amqp/channel.py | 12 | ||||
-rw-r--r-- | amqp/connection.py | 69 | ||||
-rw-r--r-- | amqp/five.py | 55 | ||||
-rw-r--r-- | amqp/method_framing.py | 6 | ||||
-rw-r--r-- | amqp/serialization.py | 18 | ||||
-rw-r--r-- | amqp/transport.py | 100 | ||||
-rw-r--r-- | amqp/utils.py | 5 | ||||
-rwxr-xr-x | demo/amqp_clock.py | 17 | ||||
-rwxr-xr-x | demo/demo_receive.py | 24 | ||||
-rwxr-xr-x | demo/demo_send.py | 12 | ||||
-rw-r--r-- | docs/_ext/applyxrefs.py | 4 | ||||
-rw-r--r-- | docs/_ext/literals_to_xrefs.py | 4 | ||||
-rw-r--r-- | docs/conf.py | 6 | ||||
-rwxr-xr-x | extra/generate_skeleton_0_8.py | 17 | ||||
-rwxr-xr-x | funtests/test_channel.py | 56 | ||||
-rwxr-xr-x | funtests/test_serialization.py | 59 | ||||
-rw-r--r-- | pavement.py | 123 | ||||
-rw-r--r-- | setup.py | 7 |
23 files changed, 371 insertions, 296 deletions
@@ -8,3 +8,5 @@ Adam Wentz Adrien Guinet <aguinet@quarkslab.com> Tommie Gannert <tommie@spotify.com> Dong Weiming <ciici123@gmail.com> +Dominik Fässler <dfa@bezono.org> +Dustin J. Mitchell <dustin@mozilla.com> @@ -5,11 +5,61 @@ py-amqp is fork of amqplib used by Kombu containing additional features and impr The previous amqplib changelog is here: http://code.google.com/p/py-amqplib/source/browse/CHANGES +.. _version-1.4.1: + +1.4.1 +===== +:release-date: 2014-01-14 09:30 P.M UTC + +- Fixed error occurring when heartbeats disabled. + +.. _version-1.4.0: + +1.4.0 +===== +:release-date: 2014-01-13 03:00 P.M UTC + +- Heartbeat implementation improved (Issue #6). + + The new heartbeat behavior is the same approach as taken by the + RabbitMQ java library. + + This also means that clients should preferably call the ``heartbeat_tick`` + method more frequently (like every second) instead of using the old + ``rate`` argument (which is now ignored). + + - Heartbeat interval is negotiated with the server. + - Some delay is allowed if the heartbeat is late. + - Monotonic time is used to keep track of the heartbeat + instead of relying on the caller to call the checking function + at the right time. + + Contributed by Dustin J. Mitchell. + +- NoneType is now supported in tables and arrays. + + Contributed by Dominik Fässler. + +- SSLTransport: Now handles ``ENOENT``. + + Fix contributed by Adrien Guinet. + +.. _version-1.3.3: + +1.3.3 +===== +:release-date: 2013-11-11 03:30 P.M UTC + +- SSLTransport: Now keeps read buffer if an exception is raised + (Issue #26). + + Fix contributed by Tommie Gannert. + .. _version-1.3.2: 1.3.2 ===== -:release-date: 2013-10-29 2:00 P.M UTC +:release-date: 2013-10-29 02:00 P.M UTC - Message.channel is now a channel object (not the channel id). @@ -130,7 +180,7 @@ http://code.google.com/p/py-amqplib/source/browse/CHANGES 1.0.13 ====== -:release-date: 2013-07-31 16:00 P.M BST +:release-date: 2013-07-31 04:00 P.M BST - Fixed problems with the SSL transport (Issue #15). @@ -142,7 +192,7 @@ http://code.google.com/p/py-amqplib/source/browse/CHANGES 1.0.12 ====== -:release-date: 2013-06-25 14:00 P.M BST +:release-date: 2013-06-25 02:00 P.M BST - Fixed another Python 3 compatibility problem. @@ -150,7 +200,7 @@ http://code.google.com/p/py-amqplib/source/browse/CHANGES 1.0.11 ====== -:release-date: 2013-04-11 18:00 P.M BST +:release-date: 2013-04-11 06:00 P.M BST - Fixed Python 3 incompatibility in ``amqp/transport.py``. @@ -2,7 +2,7 @@ Python AMQP 0.9.1 client library ===================================================================== -:Version: 1.3.2 +:Version: 1.4.1 :Web: http://amqp.readthedocs.org/ :Download: http://pypi.python.org/pypi/amqp/ :Source: http://github.com/celery/py-amqp/ @@ -99,3 +99,6 @@ Further http://www.rabbitmq.com/devtools.html#python-dev +.. image:: https://d2weczhvl823v0.cloudfront.net/celery/celery/trend.png + :alt: Bitdeli badge + :target: https://bitdeli.com/free diff --git a/amqp/__init__.py b/amqp/__init__.py index de7259b..3a5ba82 100644 --- a/amqp/__init__.py +++ b/amqp/__init__.py @@ -16,7 +16,7 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 from __future__ import absolute_import -VERSION = (1, 3, 2) +VERSION = (1, 4, 1) __version__ = '.'.join(map(str, VERSION[0:3])) + ''.join(VERSION[3:]) __author__ = 'Barry Pederson' __maintainer__ = 'Ask Solem' diff --git a/amqp/abstract_channel.py b/amqp/abstract_channel.py index 5e37bf9..28cfe13 100644 --- a/amqp/abstract_channel.py +++ b/amqp/abstract_channel.py @@ -19,12 +19,6 @@ from __future__ import absolute_import from .exceptions import AMQPNotImplementedError, RecoverableConnectionError from .serialization import AMQPWriter -try: - bytes -except NameError: - # Python 2.5 and lower - bytes = str - __all__ = ['AbstractChannel'] diff --git a/amqp/channel.py b/amqp/channel.py index ea59f0c..05eb09a 100644 --- a/amqp/channel.py +++ b/amqp/channel.py @@ -31,6 +31,15 @@ __all__ = ['Channel'] AMQP_LOGGER = logging.getLogger('amqp') +EXCHANGE_AUTODELETE_DEPRECATED = """\ +The auto_delete flag for exchanges has been deprecated and will be removed +from py-amqp v1.5.0.\ +""" + + +class VDeprecationWarning(DeprecationWarning): + pass + class Channel(AbstractChannel): """Work with channels @@ -604,8 +613,7 @@ class Channel(AbstractChannel): self._send_method((40, 10), args) if auto_delete: - warn(DeprecationWarning( - 'auto_delete exchanges has been deprecated')) + warn(VDeprecationWarning(EXCHANGE_AUTODELETE_DEPRECATED)) if not nowait: return self.wait(allowed_methods=[ diff --git a/amqp/connection.py b/amqp/connection.py index 8808a58..1d4980f 100644 --- a/amqp/connection.py +++ b/amqp/connection.py @@ -34,7 +34,7 @@ from .exceptions import ( ConnectionForced, ConnectionError, error_for_code, RecoverableConnectionError, RecoverableChannelError, ) -from .five import items, range, values +from .five import items, range, values, monotonic from .method_framing import MethodReader, MethodWriter from .serialization import AMQPWriter from .transport import create_transport @@ -80,9 +80,26 @@ class Connection(AbstractChannel): """ Channel = Channel + #: Final heartbeat interval value (in float seconds) after negotiation + heartbeat = None + + #: Original heartbeat interval value proposed by client. + client_heartbeat = None + + #: Original heartbeat interval proposed by server. + server_heartbeat = None + + #: Time of last heartbeat sent (in monotonic time, if available). + last_heartbeat_sent = 0 + + #: Time of last heartbeat received (in monotonic time, if available). + last_heartbeat_received = 0 + + #: Number of bytes sent to socket at the last heartbeat check. prev_sent = None + + #: Number of bytes received from socket at the last heartbeat check. prev_recv = None - missed_heartbeats = 0 def __init__(self, host='localhost', userid='guest', password='guest', login_method='AMQPLAIN', login_response=None, @@ -125,7 +142,7 @@ class Connection(AbstractChannel): # Properties set in the Tune method self.channel_max = channel_max self.frame_max = frame_max - self.heartbeat = heartbeat + self.client_heartbeat = heartbeat self.confirm_publish = confirm_publish @@ -841,10 +858,18 @@ class Connection(AbstractChannel): want a heartbeat. """ + client_heartbeat = self.client_heartbeat or 0 self.channel_max = args.read_short() or self.channel_max self.frame_max = args.read_long() or self.frame_max self.method_writer.frame_max = self.frame_max - heartbeat = args.read_short() # noqa + self.server_heartbeat = args.read_short() or 0 + + # negotiate the heartbeat interval to the smaller of the + # specified values + if self.server_heartbeat == 0 or client_heartbeat == 0: + self.heartbeat = max(self.server_heartbeat, client_heartbeat) + else: + self.heartbeat = min(self.server_heartbeat, client_heartbeat) self._x_tune_ok(self.channel_max, self.frame_max, self.heartbeat) @@ -852,28 +877,34 @@ class Connection(AbstractChannel): self.transport.write_frame(8, 0, bytes()) def heartbeat_tick(self, rate=2): - """Verify that hartbeats are sent and received. - - :keyword rate: Rate is how often the tick is called - compared to the actual heartbeat value. E.g. if - the heartbeat is set to 3 seconds, and the tick - is called every 3 / 2 seconds, then the rate is 2. + """Send heartbeat packets, if necessary, and fail if none have been + received recently. This should be called frequently, on the order of + once per second. + :keyword rate: Ignored """ + if not self.heartbeat: + return + + # treat actual data exchange in either direction as a heartbeat sent_now = self.method_writer.bytes_sent recv_now = self.method_reader.bytes_recv + if self.prev_sent is None or self.prev_sent != sent_now: + self.last_heartbeat_sent = monotonic() + if self.prev_recv is None or self.prev_recv != recv_now: + self.last_heartbeat_received = monotonic() + self.prev_sent, self.prev_recv = sent_now, recv_now - if self.prev_sent is not None and self.prev_sent == sent_now: + # send a heartbeat if it's time to do so + if monotonic() > self.last_heartbeat_sent + self.heartbeat: self.send_heartbeat() + self.last_heartbeat_sent = monotonic() - if self.prev_recv is not None and self.prev_recv == recv_now: - self.missed_heartbeats += 1 - else: - self.missed_heartbeats = 0 - - self.prev_sent, self.prev_recv = sent_now, recv_now - - if self.missed_heartbeats >= rate: + # if we've missed two intervals' heartbeats, fail; this gives the + # server enough time to send heartbeats a little late + if (self.last_heartbeat_received and + self.last_heartbeat_received + 2 * + self.heartbeat < monotonic()): raise ConnectionForced('Too many heartbeats missed') def _x_tune_ok(self, channel_max, frame_max, heartbeat): diff --git a/amqp/five.py b/amqp/five.py index 25b83fc..5157df5 100644 --- a/amqp/five.py +++ b/amqp/five.py @@ -131,3 +131,58 @@ def with_metaclass(Type, skip_attrs=set(['__dict__', '__weakref__'])): return Type(Class.__name__, Class.__bases__, attrs) return _clone_with_metaclass + +############## time.monotonic ################################################ + +if sys.version_info < (3, 3): + + import platform + SYSTEM = platform.system() + + if SYSTEM == 'Darwin': + import ctypes + from ctypes.util import find_library + libSystem = ctypes.CDLL('libSystem.dylib') + CoreServices = ctypes.CDLL(find_library('CoreServices'), + use_errno=True) + mach_absolute_time = libSystem.mach_absolute_time + mach_absolute_time.restype = ctypes.c_uint64 + absolute_to_nanoseconds = CoreServices.AbsoluteToNanoseconds + absolute_to_nanoseconds.restype = ctypes.c_uint64 + absolute_to_nanoseconds.argtypes = [ctypes.c_uint64] + + def _monotonic(): + return absolute_to_nanoseconds(mach_absolute_time()) * 1e-9 + + elif SYSTEM == 'Linux': + # from stackoverflow: + # questions/1205722/how-do-i-get-monotonic-time-durations-in-python + import ctypes + import os + + CLOCK_MONOTONIC = 1 # see <linux/time.h> + + class timespec(ctypes.Structure): + _fields_ = [ + ('tv_sec', ctypes.c_long), + ('tv_nsec', ctypes.c_long), + ] + + librt = ctypes.CDLL('librt.so.1', use_errno=True) + clock_gettime = librt.clock_gettime + clock_gettime.argtypes = [ + ctypes.c_int, ctypes.POINTER(timespec), + ] + + def _monotonic(): # noqa + t = timespec() + if clock_gettime(CLOCK_MONOTONIC, ctypes.pointer(t)) != 0: + errno_ = ctypes.get_errno() + raise OSError(errno_, os.strerror(errno_)) + return t.tv_sec + t.tv_nsec * 1e-9 + else: + from time import time as _monotonic +try: + from time import monotonic +except ImportError: + monotonic = _monotonic # noqa diff --git a/amqp/method_framing.py b/amqp/method_framing.py index 85fbfba..b454524 100644 --- a/amqp/method_framing.py +++ b/amqp/method_framing.py @@ -19,12 +19,6 @@ from __future__ import absolute_import from collections import defaultdict, deque from struct import pack, unpack -try: - bytes -except NameError: - # Python 2.5 and lower - bytes = str - from .basic_message import Message from .exceptions import AMQPError, UnexpectedFrame from .five import range, string diff --git a/amqp/serialization.py b/amqp/serialization.py index 6a74702..528d0b7 100644 --- a/amqp/serialization.py +++ b/amqp/serialization.py @@ -25,6 +25,7 @@ import sys from datetime import datetime from decimal import Decimal +from io import BytesIO from struct import pack, unpack from time import mktime @@ -39,19 +40,6 @@ if IS_PY3K: else: byte = chr -try: - from io import BytesIO -except ImportError: # Py2.5 - try: - from cStringIO import StringIO as BytesIO # noqa - except ImportError: - from StringIO import StringIO as BytesIO # noqa - -try: - bytes -except NameError: - # Python 2.5 and lower - bytes = str ILLEGAL_TABLE_TYPE_WITH_KEY = """\ Table type {0!r} for key {1!r} not handled by amqp. [value: {2!r}] @@ -174,6 +162,8 @@ class AMQPReader(object): val = self.read_bit() elif ftype == 100: val = self.read_float() + elif ftype == 86: # 'V' + val = None else: raise FrameSyntaxError( 'Unknown value in table: {0!r} ({1!r})'.format( @@ -357,6 +347,8 @@ class AMQPWriter(object): elif isinstance(v, (list, tuple)): self.write(b'A') self.write_array(v) + elif v is None: + self.write(b'V') else: err = (ILLEGAL_TABLE_TYPE_WITH_KEY.format(type(v), k, v) if k else ILLEGAL_TABLE_TYPE.format(type(v), v)) diff --git a/amqp/transport.py b/amqp/transport.py index 3123072..b567d79 100644 --- a/amqp/transport.py +++ b/amqp/transport.py @@ -1,9 +1,3 @@ -""" -Read/Write AMQP frames over network transports. - -2009-01-14 Barry Pederson <bp@barryp.org> - -""" # Copyright (C) 2009 Barry Pederson <bp@barryp.org> # # This library is free software; you can redistribute it and/or @@ -25,6 +19,7 @@ import errno import re import select import socket +import ssl # Jython does not have this attribute try: @@ -32,29 +27,12 @@ try: except ImportError: # pragma: no cover from socket import IPPROTO_TCP as SOL_TCP # noqa -# -# See if Python 2.6+ SSL support is available -# -try: - import ssl - HAVE_PY26_SSL = True -except: - HAVE_PY26_SSL = False - -try: - bytes -except: - # Python 2.5 and lower - bytes = str - -UNAVAIL = frozenset([errno.EAGAIN, errno.EINTR]) - from struct import pack, unpack from .exceptions import UnexpectedFrame from .utils import get_errno, set_cloexec -_UNAVAIL = errno.EAGAIN, errno.EINTR +_UNAVAIL = frozenset([errno.EAGAIN, errno.EINTR, errno.ENOENT]) AMQP_PORT = 5672 @@ -206,22 +184,17 @@ class SSLTransport(_AbstractTransport): super(SSLTransport, self).__init__(host, connect_timeout) def _setup_transport(self): - """Wrap the socket in an SSL object, either the - new Python 2.6 version, or the older Python 2.5 and - lower version.""" - if HAVE_PY26_SSL: - if hasattr(self, 'sslopts'): - self.sock = ssl.wrap_socket(self.sock, **self.sslopts) - else: - self.sock = ssl.wrap_socket(self.sock) - self.sock.do_handshake() + """Wrap the socket in an SSL object.""" + if hasattr(self, 'sslopts'): + self.sock = ssl.wrap_socket(self.sock, **self.sslopts) else: - self.sock = socket.ssl(self.sock) + self.sock = ssl.wrap_socket(self.sock) + self.sock.do_handshake() self._quick_recv = self.sock.read def _shutdown_transport(self): """Unwrap a Python 2.6 SSL socket, so we can call shutdown()""" - if HAVE_PY26_SSL and self.sock is not None: + if self.sock is not None: try: unwrap = self.sock.unwrap except AttributeError: @@ -235,19 +208,22 @@ class SSLTransport(_AbstractTransport): # to get the exact number of bytes wanted. recv = self._quick_recv rbuf = self._read_buffer - while len(rbuf) < n: - try: - s = recv(131072) # see note above - except socket.error as exc: - # ssl.sock.read may cause ENOENT if the - # operation couldn't be performed (Issue celery#1414). - if not initial and exc.errno in _errnos: - continue - raise exc - if not s: - raise IOError('Socket closed') - rbuf += s - + try: + while len(rbuf) < n: + try: + s = recv(131072) # see note above + except socket.error as exc: + # ssl.sock.read may cause ENOENT if the + # operation couldn't be performed (Issue celery#1414). + if not initial and exc.errno in _errnos: + continue + raise + if not s: + raise IOError('Socket closed') + rbuf += s + except: + self._read_buffer = rbuf + raise result, self._read_buffer = rbuf[:n], rbuf[n:] return result @@ -270,7 +246,7 @@ class TCPTransport(_AbstractTransport): self._read_buffer = EMPTY_BUFFER self._quick_recv = self.sock.recv - def _write(self, s, select=select.select, unavail=UNAVAIL): + def _write(self, s, select=select.select, unavail=_UNAVAIL): write = self.sock.send while s: r, w, e = select([self.sock], [self.sock], [self.sock], 1.0) @@ -291,20 +267,24 @@ class TCPTransport(_AbstractTransport): raise IOError('Socket closed') s = s[n:] - def _read(self, n, initial=False, _errnos=UNAVAIL): + def _read(self, n, initial=False, _errnos=_UNAVAIL): """Read exactly n bytes from the socket""" recv = self._quick_recv rbuf = self._read_buffer - while len(rbuf) < n: - try: - s = recv(131072) - except socket.error as exc: - if not initial and exc.errno in _errnos: - continue - raise - if not s: - raise IOError('Socket closed') - rbuf += s + try: + while len(rbuf) < n: + try: + s = recv(131072) + except socket.error as exc: + if not initial and exc.errno in _errnos: + continue + raise + if not s: + raise IOError('Socket closed') + rbuf += s + except: + self._read_buffer = rbuf + raise result, self._read_buffer = rbuf[:n], rbuf[n:] return result diff --git a/amqp/utils.py b/amqp/utils.py index 994030b..900d2aa 100644 --- a/amqp/utils.py +++ b/amqp/utils.py @@ -11,7 +11,8 @@ except ImportError: class promise(object): if not hasattr(sys, 'pypy_version_info'): __slots__ = tuple( - 'fun args kwargs value ready failed on_success on_error'.split() + 'fun args kwargs value ready failed ' + ' on_success on_error calls'.split() ) def __init__(self, fun, args=(), kwargs=(), @@ -24,6 +25,7 @@ class promise(object): self.on_success = on_success self.on_error = on_error self.value = None + self.calls = 0 def __repr__(self): return '<$: {0.fun.__name__}(*{0.args!r}, **{0.kwargs!r})'.format( @@ -43,6 +45,7 @@ class promise(object): self.on_success(self.value) finally: self.ready = True + self.calls += 1 def then(self, callback=None, on_error=None): self.on_success = callback diff --git a/demo/amqp_clock.py b/demo/amqp_clock.py index d842296..c718266 100755 --- a/demo/amqp_clock.py +++ b/demo/amqp_clock.py @@ -27,18 +27,23 @@ TOPIC_PATTERN = '%Y.%m.%d.%w.%H.%M' # Python datetime.strftime() pattern def main(): parser = OptionParser() - parser.add_option('--host', dest='host', - help='AMQP server to connect to (default: %default)', - default='localhost') - parser.add_option('-u', '--userid', dest='userid', + parser.add_option( + '--host', dest='host', + help='AMQP server to connect to (default: %default)', + default='localhost', + ) + parser.add_option( + '-u', '--userid', dest='userid', help='AMQP userid to authenticate as (default: %default)', default='guest', ) - parser.add_option('-p', '--password', dest='password', + parser.add_option( + '-p', '--password', dest='password', help='AMQP password to authenticate with (default: %default)', default='guest', ) - parser.add_option('--ssl', dest='ssl', action='store_true', + parser.add_option( + '--ssl', dest='ssl', action='store_true', help='Enable SSL with AMQP server (default: not enabled)', default=False, ) diff --git a/demo/demo_receive.py b/demo/demo_receive.py index 7382604..bfda624 100755 --- a/demo/demo_receive.py +++ b/demo/demo_receive.py @@ -16,14 +16,14 @@ import amqp def callback(channel, msg): for key, val in msg.properties.items(): - print ('%s: %s' % (key, str(val))) + print('%s: %s' % (key, str(val))) for key, val in msg.delivery_info.items(): - print ('> %s: %s' % (key, str(val))) + print('> %s: %s' % (key, str(val))) - print ('') - print (msg.body) - print ('-------') - print msg.delivery_tag + print('') + print(msg.body) + print('-------') + print(msg.delivery_tag) channel.basic_ack(msg.delivery_tag) # @@ -35,19 +35,23 @@ def callback(channel, msg): def main(): parser = OptionParser() - parser.add_option('--host', dest='host', + parser.add_option( + '--host', dest='host', help='AMQP server to connect to (default: %default)', default='localhost', ) - parser.add_option('-u', '--userid', dest='userid', + parser.add_option( + '-u', '--userid', dest='userid', help='userid to authenticate as (default: %default)', default='guest', ) - parser.add_option('-p', '--password', dest='password', + parser.add_option( + '-p', '--password', dest='password', help='password to authenticate with (default: %default)', default='guest', ) - parser.add_option('--ssl', dest='ssl', action='store_true', + parser.add_option( + '--ssl', dest='ssl', action='store_true', help='Enable SSL (default: not enabled)', default=False, ) diff --git a/demo/demo_send.py b/demo/demo_send.py index 0084815..27bb1b1 100755 --- a/demo/demo_send.py +++ b/demo/demo_send.py @@ -19,19 +19,23 @@ def main(): parser = OptionParser( usage='usage: %prog [options] message\nexample: %prog hello world', ) - parser.add_option('--host', dest='host', + parser.add_option( + '--host', dest='host', help='AMQP server to connect to (default: %default)', default='localhost', ) - parser.add_option('-u', '--userid', dest='userid', + parser.add_option( + '-u', '--userid', dest='userid', help='userid to authenticate as (default: %default)', default='guest', ) - parser.add_option('-p', '--password', dest='password', + parser.add_option( + '-p', '--password', dest='password', help='password to authenticate with (default: %default)', default='guest', ) - parser.add_option('--ssl', dest='ssl', action='store_true', + parser.add_option( + '--ssl', dest='ssl', action='store_true', help='Enable SSL (default: not enabled)', default=False, ) diff --git a/docs/_ext/applyxrefs.py b/docs/_ext/applyxrefs.py index 027490b..deed5d9 100644 --- a/docs/_ext/applyxrefs.py +++ b/docs/_ext/applyxrefs.py @@ -6,8 +6,8 @@ import os testing = False DONT_TOUCH = ( - './index.txt', - ) + './index.txt', +) def target_name(fn): diff --git a/docs/_ext/literals_to_xrefs.py b/docs/_ext/literals_to_xrefs.py index e9dc7ca..41aa616 100644 --- a/docs/_ext/literals_to_xrefs.py +++ b/docs/_ext/literals_to_xrefs.py @@ -95,8 +95,8 @@ def fixliterals(fname): replace_type in ("class", "func", "meth"): default = default[:-2] replace_value = raw_input( - colorize("Text <target> [", fg="yellow") + default + \ - colorize("]: ", fg="yellow")).strip() + colorize("Text <target> [", fg="yellow") + default + + colorize("]: ", fg="yellow")).strip() if not replace_value: replace_value = default new.append(":%s:`%s`" % (replace_type, replace_value)) diff --git a/docs/conf.py b/docs/conf.py index 3b6f524..1191395 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -70,8 +70,8 @@ html_use_modindex = True html_use_index = True latex_documents = [ - ('index', 'py-amqp.tex', ur'py-amqp Documentation', - ur'Ask Solem & Contributors', 'manual'), + ('index', 'py-amqp.tex', ur'py-amqp Documentation', + ur'Ask Solem & Contributors', 'manual'), ] html_theme = "celery" @@ -89,7 +89,7 @@ if False: issuetracker_project = "celery/py-amqp" issuetracker_issue_pattern = r'[Ii]ssue #(\d+)' -# -- Options for Epub output --------------------------------------------------- +# -- Options for Epub output ------------------------------------------------ # Bibliographic Dublin Core info. epub_title = 'py-amqp Manual, Version 1.0' diff --git a/extra/generate_skeleton_0_8.py b/extra/generate_skeleton_0_8.py index 442bc83..3bf4482 100755 --- a/extra/generate_skeleton_0_8.py +++ b/extra/generate_skeleton_0_8.py @@ -204,9 +204,10 @@ def generate_methods(class_element, out): if 'content' in amqp_method.attrib: params.append('msg') - out.write(' def %s(%s):\n' % - (_fixup_method_name(class_element, amqp_method), - ', '.join(params + fieldnames))) + out.write(' def %s(%s):\n' % ( + _fixup_method_name(class_element, amqp_method), + ', '.join(params + fieldnames)), + ) s = generate_docstr(amqp_method, ' ', ' """') if s: @@ -231,7 +232,7 @@ def generate_methods(class_element, out): if 'synchronous' in amqp_method.attrib: responses = [x.attrib['name'] - for x in amqp_method.findall('response')] + for x in amqp_method.findall('response')] out.write(' return self.wait(allowed_methods=[\n') for r in responses: resp = method_name_map[(class_element.attrib['name'], r)] @@ -274,7 +275,7 @@ def generate_class(spec, class_element, out): # for amqp_class in spec.findall('class'): if (amqp_class.attrib['handler'] == class_element.attrib['name']) and \ - (amqp_class.attrib['name'] != class_element.attrib['name']): + (amqp_class.attrib['name'] != class_element.attrib['name']): out.write(' #############\n') out.write(' #\n') out.write(' # %s\n' % amqp_class.attrib['name'].capitalize()) @@ -315,8 +316,8 @@ def generate_module(spec, out): ( amqp_class.attrib['index'], amqp_method.attrib['index'], - amqp_class.attrib['handler'].capitalize() + '.' + - _fixup_method_name(amqp_class, amqp_method), + (amqp_class.attrib['handler'].capitalize() + '.' + + _fixup_method_name(amqp_class, amqp_method)), ) #### Actually generate output @@ -335,7 +336,7 @@ def generate_module(spec, out): # for chassis in amqp_method.findall('chassis'): # print ' ', chassis.attrib chassis = [x.attrib['name'] - for x in amqp_method.findall('chassis')] + for x in amqp_method.findall('chassis')] if 'client' in chassis: out.write(" (%s, %s): (%s, %s._%s),\n" % ( amqp_class.attrib['index'], diff --git a/funtests/test_channel.py b/funtests/test_channel.py index 503a48e..10b860f 100755 --- a/funtests/test_channel.py +++ b/funtests/test_channel.py @@ -22,15 +22,8 @@ Test amqp.channel module import sys import unittest -try: - bytes -except NameError: - # Python 2.5 and lower - bytes = str - import settings - from amqp import ChannelError, Connection, Message, FrameSyntaxError @@ -163,14 +156,19 @@ class TestChannel(unittest.TestCase): self.assertTrue(isinstance(msg2.body, bytes)) self.assertEqual(msg2.body, u'hello w\xf6rld'.encode('latin_1')) - def test_exception(self): - """ - Check that Channel exceptions are actually raised as Python - exceptions. + def test_queue_delete_empty(self): + self.assertFalse( + self.ch.queue_delete('bogus_queue_that_does_not_exist') + ) - """ + def test_survives_channel_error(self): with self.assertRaises(ChannelError): - self.ch.queue_delete('bogus_queue_that_does_not_exist') + self.ch.queue_declare('krjqheewq_bogus', passive=True) + self.ch.queue_declare('funtest_survive') + self.ch.queue_declare('funtest_survive', passive=True) + self.assertEqual( + 0, self.ch.queue_delete('funtest_survive'), + ) def test_invalid_header(self): """ @@ -181,7 +179,7 @@ class TestChannel(unittest.TestCase): """ qname, _, _ = self.ch.queue_declare() - msg = Message(application_headers={'test': None}) + msg = Message(application_headers={'test': object()}) self.assertRaises( FrameSyntaxError, self.ch.basic_publish, msg, routing_key=qname, @@ -248,10 +246,10 @@ class TestChannel(unittest.TestCase): content_type='text/plain', application_headers={'foo': 7, 'bar': 'baz'}) - self.ch.basic_publish(msg, 'unittest.fanout') - self.ch.basic_publish(msg, 'unittest.fanout', mandatory=True) - self.ch.basic_publish(msg, 'unittest.fanout', mandatory=True) - self.ch.basic_publish(msg, 'unittest.fanout', mandatory=True) + self.ch.basic_publish(msg, 'funtest.fanout') + self.ch.basic_publish(msg, 'funtest.fanout', mandatory=True) + self.ch.basic_publish(msg, 'funtest.fanout', mandatory=True) + self.ch.basic_publish(msg, 'funtest.fanout', mandatory=True) self.ch.close() # @@ -264,38 +262,37 @@ class TestChannel(unittest.TestCase): Network configuration is as follows (-> is forwards to : source_exchange -> dest_exchange -> queue The test checks that once the message is publish to the - destination exchange(unittest.topic_dest) it is delivered to the queue. + destination exchange(funtest.topic_dest) it is delivered to the queue. """ test_routing_key = 'unit_test__key' - dest_exchange = 'unittest.topic_dest_bind' - source_exchange = 'unittest.topic_source_bind' + dest_exchange = 'funtest.topic_dest_bind' + source_exchange = 'funtest.topic_source_bind' self.ch.exchange_declare(dest_exchange, 'topic', auto_delete=True) self.ch.exchange_declare(source_exchange, 'topic', auto_delete=True) qname, _, _ = self.ch.queue_declare() - self.ch.exchange_bind(destination = dest_exchange, - source = source_exchange, - routing_key = test_routing_key) + self.ch.exchange_bind(destination=dest_exchange, + source=source_exchange, + routing_key=test_routing_key) self.ch.queue_bind(qname, dest_exchange, routing_key=test_routing_key) - msg = Message('unittest message', + msg = Message('funtest message', content_type='text/plain', application_headers={'foo': 7, 'bar': 'baz'}) - self.ch.basic_publish(msg, source_exchange, - routing_key = test_routing_key) + routing_key=test_routing_key) msg2 = self.ch.basic_get(qname, no_ack=True) self.assertEqual(msg, msg2) def test_exchange_unbind(self): - dest_exchange = 'unittest.topic_dest_unbind' - source_exchange = 'unittest.topic_source_unbind' + dest_exchange = 'funtest.topic_dest_unbind' + source_exchange = 'funtest.topic_source_unbind' test_routing_key = 'unit_test__key' self.ch.exchange_declare(dest_exchange, @@ -311,6 +308,7 @@ class TestChannel(unittest.TestCase): source=source_exchange, routing_key=test_routing_key) + def main(): suite = unittest.TestLoader().loadTestsFromTestCase(TestChannel) unittest.TextTestRunner(**settings.test_args).run(suite) diff --git a/funtests/test_serialization.py b/funtests/test_serialization.py index 9a8793c..ce70438 100755 --- a/funtests/test_serialization.py +++ b/funtests/test_serialization.py @@ -26,12 +26,6 @@ from random import randint import sys import unittest -try: - bytes -except NameError: - # Python 2.5 and lower - bytes = str - import settings from amqp.serialization import ( @@ -316,7 +310,7 @@ class TestSerialization(unittest.TestCase): Check that an un-serializable table entry raises a ValueError """ - val = {'test': None} + val = {'test': object()} w = AMQPWriter() self.assertRaises(FrameSyntaxError, w.write_table, val) @@ -327,6 +321,7 @@ class TestSerialization(unittest.TestCase): 'baz': 'this is some random string I typed', 'ubaz': u'And something in unicode', 'dday_aniv': datetime(1994, 6, 6), + 'nothing': None, 'more': { 'abc': -123, 'def': 'hello world', @@ -351,26 +346,26 @@ class TestSerialization(unittest.TestCase): # Array # def test_array_from_list(self): - val = [1, 'foo'] + val = [1, 'foo', None] w = AMQPWriter() w.write_array(val) s = w.getvalue() self.assertEqualBinary( - s, '\x00\x00\x00\x0DI\x00\x00\x00\x01S\x00\x00\x00\x03foo', + s, '\x00\x00\x00\x0EI\x00\x00\x00\x01S\x00\x00\x00\x03fooV', ) r = AMQPReader(s) self.assertEqual(r.read_array(), val) def test_array_from_tuple(self): - val = (1, 'foo') + val = (1, 'foo', None) w = AMQPWriter() w.write_array(val) s = w.getvalue() self.assertEqualBinary( - s, '\x00\x00\x00\x0DI\x00\x00\x00\x01S\x00\x00\x00\x03foo', + s, '\x00\x00\x00\x0EI\x00\x00\x00\x01S\x00\x00\x00\x03fooV', ) r = AMQPReader(s) @@ -394,48 +389,6 @@ class TestSerialization(unittest.TestCase): self.assertEqual(r.read_table(), val) # - # Array - # - def test_array_from_list(self): - val = [1, 'foo'] - w = AMQPWriter() - w.write_array(val) - s = w.getvalue() - - self.assertEqualBinary(s, '\x00\x00\x00\x0DI\x00\x00\x00\x01S\x00\x00\x00\x03foo') - - r = AMQPReader(s) - self.assertEqual(r.read_array(), val) - - def test_array_from_tuple(self): - val = (1, 'foo') - w = AMQPWriter() - w.write_array(val) - s = w.getvalue() - - self.assertEqualBinary(s, '\x00\x00\x00\x0DI\x00\x00\x00\x01S\x00\x00\x00\x03foo') - - r = AMQPReader(s) - self.assertEqual(r.read_array(), list(val)) - - def test_table_with_array(self): - val = { - 'foo': 7, - 'bar': Decimal('123345.1234'), - 'baz': 'this is some random string I typed', - 'blist': [1,2,3], - 'nlist': [1, [2,3,4]], - 'ndictl': {'nfoo': 8, 'nblist': [5,6,7] } - } - - w = AMQPWriter() - w.write_table(val) - s = w.getvalue() - - r = AMQPReader(s) - self.assertEqual(r.read_table(), val) - - # # GenericContent # def test_generic_content_eq(self): diff --git a/pavement.py b/pavement.py index 685ca15..f3a4175 100644 --- a/pavement.py +++ b/pavement.py @@ -1,13 +1,11 @@ -import os -import sys -from paver.easy import * -from paver import doctools -from paver.setuputils import setup -sys.path.insert(0, os.path.abspath('.')) +from paver.easy import * # noqa +from paver import doctools # noqa +from paver.setuputils import setup # noqa + PYCOMPILE_CACHES = ['*.pyc', '*$py.class'] options( - sphinx=Bunch(builddir='.build'), + sphinx=Bunch(builddir='.build'), ) @@ -34,7 +32,7 @@ def html(options): def qhtml(options): destdir = path('Documentation') builtdocs = sphinx_builddir(options) - sh('rsync -az {0}/ {1}'.format(builtdocs, destdir)) + sh('rsync -az %s/ %s' % (builtdocs, destdir)) @task @@ -42,18 +40,17 @@ def qhtml(options): def ghdocs(options): builtdocs = sphinx_builddir(options) sh("git checkout gh-pages && \ - cp -r {0}/* . && \ + cp -r %s/* . && \ git commit . -m 'Rendered documentation for Github Pages.' && \ git push origin gh-pages && \ - git checkout master".format(builtdocs)) + git checkout master" % builtdocs) @task @needs('clean_docs', 'paver.doctools.html') def upload_pypi_docs(options): builtdocs = path('docs') / options.builddir / 'html' - sh("{0} setup.py upload_sphinx --upload-dir='{1}'".format( - sys.executable, builtdocs)) + sh("python setup.py upload_sphinx --upload-dir='%s'" % (builtdocs)) @task @@ -73,38 +70,6 @@ def verifyindex(options): @task -@cmdopts([ - ('noerror', 'E', 'Ignore errors'), -]) -def flake8(options): - noerror = getattr(options, 'noerror', False) - complexity = getattr(options, 'complexity', 22) - sh("""flake8 amqp | perl -mstrict -mwarnings -nle' - my $ignore = m/too complex \((\d+)\)/ && $1 le {0}; - if (! $ignore) {{ print STDERR; our $FOUND_FLAKE = 1 }} - }}{{exit $FOUND_FLAKE; - '""".format(complexity), ignore_error=noerror) - - -@task -@cmdopts([ - ('noerror', 'E', 'Ignore errors'), -]) -def flakeplus(options): - noerror = getattr(options, 'noerror', False) - sh('flakeplus amqp', ignore_error=noerror) - - -@task -@cmdopts([ - ('noerror', 'E', 'Ignore errors') -]) -def flakes(options): - flake8(options) - flakeplus(options) - - -@task def clean_readme(options): path('README').unlink() path('README.rst').unlink() @@ -113,15 +78,21 @@ def clean_readme(options): @task @needs('clean_readme') def readme(options): - sh('{0} extra/release/sphinx-to-rst.py docs/templates/readme.txt \ - > README.rst'.format(sys.executable)) + sh('python extra/release/sphinx-to-rst.py docs/templates/readme.txt \ + > README.rst') + sh('ln -sf README.rst README') @task +@cmdopts([ + ('custom=', 'C', 'custom version'), +]) def bump(options): - sh("extra/release/bump_version.py \ - amqp/__init__.py docs/includes/intro.txt \ - --before-commit='paver readme'") + s = ("-- '%s'" % (options.custom, ) if getattr(options, 'custom', None) + else '') + sh('extra/release/bump_version.py \ + amqp/__init__.py README.rst %s' % (s, )) + @task @cmdopts([ @@ -133,6 +104,8 @@ def test(options): cmd = 'nosetests' if getattr(options, 'coverage', False): cmd += ' --with-coverage3' + if getattr(options, 'quick', False): + cmd = 'QUICKTEST=1 SKIP_RLIMITS=1 %s' % cmd if getattr(options, 'verbose', False): cmd += ' --verbosity=2' sh(cmd) @@ -142,18 +115,52 @@ def test(options): @cmdopts([ ('noerror', 'E', 'Ignore errors'), ]) +def flake8(options): + noerror = getattr(options, 'noerror', False) + complexity = getattr(options, 'complexity', 22) + sh("""flake8 amqp | perl -mstrict -mwarnings -nle' + my $ignore = (m/too complex \((\d+)\)/ && $1 le %s); + if (! $ignore) { print STDERR; our $FOUND_FLAKE = 1 } + }{exit $FOUND_FLAKE; + '""" % (complexity, ), ignore_error=noerror) + + +@task +@cmdopts([ + ('noerror', 'E', 'Ignore errors'), +]) +def flakeplus(options): + noerror = getattr(options, 'noerror', False) + sh('flakeplus amqp --2.6', + ignore_error=noerror) + + +@task +@cmdopts([ + ('noerror', 'E', 'Ignore errors'), +]) +def flakes(options): + flake8(options) + flakeplus(options) + + +@task +@cmdopts([ + ('noerror', 'E', 'Ignore errors'), +]) def pep8(options): noerror = getattr(options, 'noerror', False) - return sh("""find . -name "*.py" | xargs pep8 | perl -nle'\ + return sh("""find amqp -name "*.py" | xargs pep8 | perl -nle'\ print; $a=1 if $_}{exit($a)'""", ignore_error=noerror) @task def removepyc(options): - sh('find . -type f -a \\( {0} \\) | xargs rm'.format( - ' -o '.join("-name '{0}'".format(pat) for pat in PYCOMPILE_CACHES))) + sh('find . -type f -a \\( %s \\) | xargs rm' % ( + ' -o '.join("-name '%s'" % (pat, ) for pat in PYCOMPILE_CACHES), )) sh('find . -type d -name "__pycache__" | xargs rm -r') + @task @needs('removepyc') def gitclean(options): @@ -167,7 +174,7 @@ def gitcleanforce(options): @task -@needs('flakes', 'autodoc', 'verifyindex', 'gitclean') +@needs('flakes', 'autodoc', 'verifyindex', 'test', 'gitclean') def releaseok(options): pass @@ -176,13 +183,3 @@ def releaseok(options): @needs('releaseok', 'removepyc', 'upload_docs') def release(options): pass - - -@task -def testloc(options): - sh('sloccount tests') - - -@task -def loc(options): - sh('sloccount amqp') @@ -15,8 +15,8 @@ import os import sys import codecs -if sys.version_info < (2, 5): - raise Exception('amqp requires Python 2.5 or higher.') +if sys.version_info < (2, 6): + raise Exception('amqp requires Python 2.6 or higher.') NAME = 'amqp' entrypoints = {} @@ -35,7 +35,8 @@ classes = """ Programming Language :: Python :: 3.1 Programming Language :: Python :: 3.2 Programming Language :: Python :: 3.3 - License :: OSI Approved :: GNU Library or Lesser General Public License (LGPL) + License :: OSI Approved :: GNU Library or \ +Lesser General Public License (LGPL) Intended Audience :: Developers License :: OSI Approved :: BSD License Operating System :: OS Independent |