summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2014-01-16 15:01:19 +0000
committerAsk Solem <ask@celeryproject.org>2014-01-16 15:01:19 +0000
commit97abf06e054398f22b7bf77aa6349d8a2ccd860b (patch)
treed4cabb26972ef44dbd836d3f7d43470654a50bd2
parentbff6ddb0ea116e7c3b06f6cf2509c6411b3e4803 (diff)
parent07a312582d90b1303ff5d675613935392f6a6283 (diff)
downloadpy-amqp-readwrite.tar.gz
Merge branch 'master' into readwritereadwrite
Conflicts: amqp/transport.py
-rw-r--r--AUTHORS2
-rw-r--r--Changelog58
-rw-r--r--README.rst5
-rw-r--r--amqp/__init__.py2
-rw-r--r--amqp/abstract_channel.py6
-rw-r--r--amqp/channel.py12
-rw-r--r--amqp/connection.py69
-rw-r--r--amqp/five.py55
-rw-r--r--amqp/method_framing.py6
-rw-r--r--amqp/serialization.py18
-rw-r--r--amqp/transport.py100
-rw-r--r--amqp/utils.py5
-rwxr-xr-xdemo/amqp_clock.py17
-rwxr-xr-xdemo/demo_receive.py24
-rwxr-xr-xdemo/demo_send.py12
-rw-r--r--docs/_ext/applyxrefs.py4
-rw-r--r--docs/_ext/literals_to_xrefs.py4
-rw-r--r--docs/conf.py6
-rwxr-xr-xextra/generate_skeleton_0_8.py17
-rwxr-xr-xfuntests/test_channel.py56
-rwxr-xr-xfuntests/test_serialization.py59
-rw-r--r--pavement.py123
-rw-r--r--setup.py7
23 files changed, 371 insertions, 296 deletions
diff --git a/AUTHORS b/AUTHORS
index 780d505..ad92086 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -8,3 +8,5 @@ Adam Wentz
Adrien Guinet <aguinet@quarkslab.com>
Tommie Gannert <tommie@spotify.com>
Dong Weiming <ciici123@gmail.com>
+Dominik Fässler <dfa@bezono.org>
+Dustin J. Mitchell <dustin@mozilla.com>
diff --git a/Changelog b/Changelog
index 77fc48d..42dcd41 100644
--- a/Changelog
+++ b/Changelog
@@ -5,11 +5,61 @@ py-amqp is fork of amqplib used by Kombu containing additional features and impr
The previous amqplib changelog is here:
http://code.google.com/p/py-amqplib/source/browse/CHANGES
+.. _version-1.4.1:
+
+1.4.1
+=====
+:release-date: 2014-01-14 09:30 P.M UTC
+
+- Fixed error occurring when heartbeats disabled.
+
+.. _version-1.4.0:
+
+1.4.0
+=====
+:release-date: 2014-01-13 03:00 P.M UTC
+
+- Heartbeat implementation improved (Issue #6).
+
+ The new heartbeat behavior is the same approach as taken by the
+ RabbitMQ java library.
+
+ This also means that clients should preferably call the ``heartbeat_tick``
+ method more frequently (like every second) instead of using the old
+ ``rate`` argument (which is now ignored).
+
+ - Heartbeat interval is negotiated with the server.
+ - Some delay is allowed if the heartbeat is late.
+ - Monotonic time is used to keep track of the heartbeat
+ instead of relying on the caller to call the checking function
+ at the right time.
+
+ Contributed by Dustin J. Mitchell.
+
+- NoneType is now supported in tables and arrays.
+
+ Contributed by Dominik Fässler.
+
+- SSLTransport: Now handles ``ENOENT``.
+
+ Fix contributed by Adrien Guinet.
+
+.. _version-1.3.3:
+
+1.3.3
+=====
+:release-date: 2013-11-11 03:30 P.M UTC
+
+- SSLTransport: Now keeps read buffer if an exception is raised
+ (Issue #26).
+
+ Fix contributed by Tommie Gannert.
+
.. _version-1.3.2:
1.3.2
=====
-:release-date: 2013-10-29 2:00 P.M UTC
+:release-date: 2013-10-29 02:00 P.M UTC
- Message.channel is now a channel object (not the channel id).
@@ -130,7 +180,7 @@ http://code.google.com/p/py-amqplib/source/browse/CHANGES
1.0.13
======
-:release-date: 2013-07-31 16:00 P.M BST
+:release-date: 2013-07-31 04:00 P.M BST
- Fixed problems with the SSL transport (Issue #15).
@@ -142,7 +192,7 @@ http://code.google.com/p/py-amqplib/source/browse/CHANGES
1.0.12
======
-:release-date: 2013-06-25 14:00 P.M BST
+:release-date: 2013-06-25 02:00 P.M BST
- Fixed another Python 3 compatibility problem.
@@ -150,7 +200,7 @@ http://code.google.com/p/py-amqplib/source/browse/CHANGES
1.0.11
======
-:release-date: 2013-04-11 18:00 P.M BST
+:release-date: 2013-04-11 06:00 P.M BST
- Fixed Python 3 incompatibility in ``amqp/transport.py``.
diff --git a/README.rst b/README.rst
index db54285..e313cc5 100644
--- a/README.rst
+++ b/README.rst
@@ -2,7 +2,7 @@
Python AMQP 0.9.1 client library
=====================================================================
-:Version: 1.3.2
+:Version: 1.4.1
:Web: http://amqp.readthedocs.org/
:Download: http://pypi.python.org/pypi/amqp/
:Source: http://github.com/celery/py-amqp/
@@ -99,3 +99,6 @@ Further
http://www.rabbitmq.com/devtools.html#python-dev
+.. image:: https://d2weczhvl823v0.cloudfront.net/celery/celery/trend.png
+ :alt: Bitdeli badge
+ :target: https://bitdeli.com/free
diff --git a/amqp/__init__.py b/amqp/__init__.py
index de7259b..3a5ba82 100644
--- a/amqp/__init__.py
+++ b/amqp/__init__.py
@@ -16,7 +16,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
from __future__ import absolute_import
-VERSION = (1, 3, 2)
+VERSION = (1, 4, 1)
__version__ = '.'.join(map(str, VERSION[0:3])) + ''.join(VERSION[3:])
__author__ = 'Barry Pederson'
__maintainer__ = 'Ask Solem'
diff --git a/amqp/abstract_channel.py b/amqp/abstract_channel.py
index 5e37bf9..28cfe13 100644
--- a/amqp/abstract_channel.py
+++ b/amqp/abstract_channel.py
@@ -19,12 +19,6 @@ from __future__ import absolute_import
from .exceptions import AMQPNotImplementedError, RecoverableConnectionError
from .serialization import AMQPWriter
-try:
- bytes
-except NameError:
- # Python 2.5 and lower
- bytes = str
-
__all__ = ['AbstractChannel']
diff --git a/amqp/channel.py b/amqp/channel.py
index ea59f0c..05eb09a 100644
--- a/amqp/channel.py
+++ b/amqp/channel.py
@@ -31,6 +31,15 @@ __all__ = ['Channel']
AMQP_LOGGER = logging.getLogger('amqp')
+EXCHANGE_AUTODELETE_DEPRECATED = """\
+The auto_delete flag for exchanges has been deprecated and will be removed
+from py-amqp v1.5.0.\
+"""
+
+
+class VDeprecationWarning(DeprecationWarning):
+ pass
+
class Channel(AbstractChannel):
"""Work with channels
@@ -604,8 +613,7 @@ class Channel(AbstractChannel):
self._send_method((40, 10), args)
if auto_delete:
- warn(DeprecationWarning(
- 'auto_delete exchanges has been deprecated'))
+ warn(VDeprecationWarning(EXCHANGE_AUTODELETE_DEPRECATED))
if not nowait:
return self.wait(allowed_methods=[
diff --git a/amqp/connection.py b/amqp/connection.py
index 8808a58..1d4980f 100644
--- a/amqp/connection.py
+++ b/amqp/connection.py
@@ -34,7 +34,7 @@ from .exceptions import (
ConnectionForced, ConnectionError, error_for_code,
RecoverableConnectionError, RecoverableChannelError,
)
-from .five import items, range, values
+from .five import items, range, values, monotonic
from .method_framing import MethodReader, MethodWriter
from .serialization import AMQPWriter
from .transport import create_transport
@@ -80,9 +80,26 @@ class Connection(AbstractChannel):
"""
Channel = Channel
+ #: Final heartbeat interval value (in float seconds) after negotiation
+ heartbeat = None
+
+ #: Original heartbeat interval value proposed by client.
+ client_heartbeat = None
+
+ #: Original heartbeat interval proposed by server.
+ server_heartbeat = None
+
+ #: Time of last heartbeat sent (in monotonic time, if available).
+ last_heartbeat_sent = 0
+
+ #: Time of last heartbeat received (in monotonic time, if available).
+ last_heartbeat_received = 0
+
+ #: Number of bytes sent to socket at the last heartbeat check.
prev_sent = None
+
+ #: Number of bytes received from socket at the last heartbeat check.
prev_recv = None
- missed_heartbeats = 0
def __init__(self, host='localhost', userid='guest', password='guest',
login_method='AMQPLAIN', login_response=None,
@@ -125,7 +142,7 @@ class Connection(AbstractChannel):
# Properties set in the Tune method
self.channel_max = channel_max
self.frame_max = frame_max
- self.heartbeat = heartbeat
+ self.client_heartbeat = heartbeat
self.confirm_publish = confirm_publish
@@ -841,10 +858,18 @@ class Connection(AbstractChannel):
want a heartbeat.
"""
+ client_heartbeat = self.client_heartbeat or 0
self.channel_max = args.read_short() or self.channel_max
self.frame_max = args.read_long() or self.frame_max
self.method_writer.frame_max = self.frame_max
- heartbeat = args.read_short() # noqa
+ self.server_heartbeat = args.read_short() or 0
+
+ # negotiate the heartbeat interval to the smaller of the
+ # specified values
+ if self.server_heartbeat == 0 or client_heartbeat == 0:
+ self.heartbeat = max(self.server_heartbeat, client_heartbeat)
+ else:
+ self.heartbeat = min(self.server_heartbeat, client_heartbeat)
self._x_tune_ok(self.channel_max, self.frame_max, self.heartbeat)
@@ -852,28 +877,34 @@ class Connection(AbstractChannel):
self.transport.write_frame(8, 0, bytes())
def heartbeat_tick(self, rate=2):
- """Verify that hartbeats are sent and received.
-
- :keyword rate: Rate is how often the tick is called
- compared to the actual heartbeat value. E.g. if
- the heartbeat is set to 3 seconds, and the tick
- is called every 3 / 2 seconds, then the rate is 2.
+ """Send heartbeat packets, if necessary, and fail if none have been
+ received recently. This should be called frequently, on the order of
+ once per second.
+ :keyword rate: Ignored
"""
+ if not self.heartbeat:
+ return
+
+ # treat actual data exchange in either direction as a heartbeat
sent_now = self.method_writer.bytes_sent
recv_now = self.method_reader.bytes_recv
+ if self.prev_sent is None or self.prev_sent != sent_now:
+ self.last_heartbeat_sent = monotonic()
+ if self.prev_recv is None or self.prev_recv != recv_now:
+ self.last_heartbeat_received = monotonic()
+ self.prev_sent, self.prev_recv = sent_now, recv_now
- if self.prev_sent is not None and self.prev_sent == sent_now:
+ # send a heartbeat if it's time to do so
+ if monotonic() > self.last_heartbeat_sent + self.heartbeat:
self.send_heartbeat()
+ self.last_heartbeat_sent = monotonic()
- if self.prev_recv is not None and self.prev_recv == recv_now:
- self.missed_heartbeats += 1
- else:
- self.missed_heartbeats = 0
-
- self.prev_sent, self.prev_recv = sent_now, recv_now
-
- if self.missed_heartbeats >= rate:
+ # if we've missed two intervals' heartbeats, fail; this gives the
+ # server enough time to send heartbeats a little late
+ if (self.last_heartbeat_received and
+ self.last_heartbeat_received + 2 *
+ self.heartbeat < monotonic()):
raise ConnectionForced('Too many heartbeats missed')
def _x_tune_ok(self, channel_max, frame_max, heartbeat):
diff --git a/amqp/five.py b/amqp/five.py
index 25b83fc..5157df5 100644
--- a/amqp/five.py
+++ b/amqp/five.py
@@ -131,3 +131,58 @@ def with_metaclass(Type, skip_attrs=set(['__dict__', '__weakref__'])):
return Type(Class.__name__, Class.__bases__, attrs)
return _clone_with_metaclass
+
+############## time.monotonic ################################################
+
+if sys.version_info < (3, 3):
+
+ import platform
+ SYSTEM = platform.system()
+
+ if SYSTEM == 'Darwin':
+ import ctypes
+ from ctypes.util import find_library
+ libSystem = ctypes.CDLL('libSystem.dylib')
+ CoreServices = ctypes.CDLL(find_library('CoreServices'),
+ use_errno=True)
+ mach_absolute_time = libSystem.mach_absolute_time
+ mach_absolute_time.restype = ctypes.c_uint64
+ absolute_to_nanoseconds = CoreServices.AbsoluteToNanoseconds
+ absolute_to_nanoseconds.restype = ctypes.c_uint64
+ absolute_to_nanoseconds.argtypes = [ctypes.c_uint64]
+
+ def _monotonic():
+ return absolute_to_nanoseconds(mach_absolute_time()) * 1e-9
+
+ elif SYSTEM == 'Linux':
+ # from stackoverflow:
+ # questions/1205722/how-do-i-get-monotonic-time-durations-in-python
+ import ctypes
+ import os
+
+ CLOCK_MONOTONIC = 1 # see <linux/time.h>
+
+ class timespec(ctypes.Structure):
+ _fields_ = [
+ ('tv_sec', ctypes.c_long),
+ ('tv_nsec', ctypes.c_long),
+ ]
+
+ librt = ctypes.CDLL('librt.so.1', use_errno=True)
+ clock_gettime = librt.clock_gettime
+ clock_gettime.argtypes = [
+ ctypes.c_int, ctypes.POINTER(timespec),
+ ]
+
+ def _monotonic(): # noqa
+ t = timespec()
+ if clock_gettime(CLOCK_MONOTONIC, ctypes.pointer(t)) != 0:
+ errno_ = ctypes.get_errno()
+ raise OSError(errno_, os.strerror(errno_))
+ return t.tv_sec + t.tv_nsec * 1e-9
+ else:
+ from time import time as _monotonic
+try:
+ from time import monotonic
+except ImportError:
+ monotonic = _monotonic # noqa
diff --git a/amqp/method_framing.py b/amqp/method_framing.py
index 85fbfba..b454524 100644
--- a/amqp/method_framing.py
+++ b/amqp/method_framing.py
@@ -19,12 +19,6 @@ from __future__ import absolute_import
from collections import defaultdict, deque
from struct import pack, unpack
-try:
- bytes
-except NameError:
- # Python 2.5 and lower
- bytes = str
-
from .basic_message import Message
from .exceptions import AMQPError, UnexpectedFrame
from .five import range, string
diff --git a/amqp/serialization.py b/amqp/serialization.py
index 6a74702..528d0b7 100644
--- a/amqp/serialization.py
+++ b/amqp/serialization.py
@@ -25,6 +25,7 @@ import sys
from datetime import datetime
from decimal import Decimal
+from io import BytesIO
from struct import pack, unpack
from time import mktime
@@ -39,19 +40,6 @@ if IS_PY3K:
else:
byte = chr
-try:
- from io import BytesIO
-except ImportError: # Py2.5
- try:
- from cStringIO import StringIO as BytesIO # noqa
- except ImportError:
- from StringIO import StringIO as BytesIO # noqa
-
-try:
- bytes
-except NameError:
- # Python 2.5 and lower
- bytes = str
ILLEGAL_TABLE_TYPE_WITH_KEY = """\
Table type {0!r} for key {1!r} not handled by amqp. [value: {2!r}]
@@ -174,6 +162,8 @@ class AMQPReader(object):
val = self.read_bit()
elif ftype == 100:
val = self.read_float()
+ elif ftype == 86: # 'V'
+ val = None
else:
raise FrameSyntaxError(
'Unknown value in table: {0!r} ({1!r})'.format(
@@ -357,6 +347,8 @@ class AMQPWriter(object):
elif isinstance(v, (list, tuple)):
self.write(b'A')
self.write_array(v)
+ elif v is None:
+ self.write(b'V')
else:
err = (ILLEGAL_TABLE_TYPE_WITH_KEY.format(type(v), k, v) if k
else ILLEGAL_TABLE_TYPE.format(type(v), v))
diff --git a/amqp/transport.py b/amqp/transport.py
index 3123072..b567d79 100644
--- a/amqp/transport.py
+++ b/amqp/transport.py
@@ -1,9 +1,3 @@
-"""
-Read/Write AMQP frames over network transports.
-
-2009-01-14 Barry Pederson <bp@barryp.org>
-
-"""
# Copyright (C) 2009 Barry Pederson <bp@barryp.org>
#
# This library is free software; you can redistribute it and/or
@@ -25,6 +19,7 @@ import errno
import re
import select
import socket
+import ssl
# Jython does not have this attribute
try:
@@ -32,29 +27,12 @@ try:
except ImportError: # pragma: no cover
from socket import IPPROTO_TCP as SOL_TCP # noqa
-#
-# See if Python 2.6+ SSL support is available
-#
-try:
- import ssl
- HAVE_PY26_SSL = True
-except:
- HAVE_PY26_SSL = False
-
-try:
- bytes
-except:
- # Python 2.5 and lower
- bytes = str
-
-UNAVAIL = frozenset([errno.EAGAIN, errno.EINTR])
-
from struct import pack, unpack
from .exceptions import UnexpectedFrame
from .utils import get_errno, set_cloexec
-_UNAVAIL = errno.EAGAIN, errno.EINTR
+_UNAVAIL = frozenset([errno.EAGAIN, errno.EINTR, errno.ENOENT])
AMQP_PORT = 5672
@@ -206,22 +184,17 @@ class SSLTransport(_AbstractTransport):
super(SSLTransport, self).__init__(host, connect_timeout)
def _setup_transport(self):
- """Wrap the socket in an SSL object, either the
- new Python 2.6 version, or the older Python 2.5 and
- lower version."""
- if HAVE_PY26_SSL:
- if hasattr(self, 'sslopts'):
- self.sock = ssl.wrap_socket(self.sock, **self.sslopts)
- else:
- self.sock = ssl.wrap_socket(self.sock)
- self.sock.do_handshake()
+ """Wrap the socket in an SSL object."""
+ if hasattr(self, 'sslopts'):
+ self.sock = ssl.wrap_socket(self.sock, **self.sslopts)
else:
- self.sock = socket.ssl(self.sock)
+ self.sock = ssl.wrap_socket(self.sock)
+ self.sock.do_handshake()
self._quick_recv = self.sock.read
def _shutdown_transport(self):
"""Unwrap a Python 2.6 SSL socket, so we can call shutdown()"""
- if HAVE_PY26_SSL and self.sock is not None:
+ if self.sock is not None:
try:
unwrap = self.sock.unwrap
except AttributeError:
@@ -235,19 +208,22 @@ class SSLTransport(_AbstractTransport):
# to get the exact number of bytes wanted.
recv = self._quick_recv
rbuf = self._read_buffer
- while len(rbuf) < n:
- try:
- s = recv(131072) # see note above
- except socket.error as exc:
- # ssl.sock.read may cause ENOENT if the
- # operation couldn't be performed (Issue celery#1414).
- if not initial and exc.errno in _errnos:
- continue
- raise exc
- if not s:
- raise IOError('Socket closed')
- rbuf += s
-
+ try:
+ while len(rbuf) < n:
+ try:
+ s = recv(131072) # see note above
+ except socket.error as exc:
+ # ssl.sock.read may cause ENOENT if the
+ # operation couldn't be performed (Issue celery#1414).
+ if not initial and exc.errno in _errnos:
+ continue
+ raise
+ if not s:
+ raise IOError('Socket closed')
+ rbuf += s
+ except:
+ self._read_buffer = rbuf
+ raise
result, self._read_buffer = rbuf[:n], rbuf[n:]
return result
@@ -270,7 +246,7 @@ class TCPTransport(_AbstractTransport):
self._read_buffer = EMPTY_BUFFER
self._quick_recv = self.sock.recv
- def _write(self, s, select=select.select, unavail=UNAVAIL):
+ def _write(self, s, select=select.select, unavail=_UNAVAIL):
write = self.sock.send
while s:
r, w, e = select([self.sock], [self.sock], [self.sock], 1.0)
@@ -291,20 +267,24 @@ class TCPTransport(_AbstractTransport):
raise IOError('Socket closed')
s = s[n:]
- def _read(self, n, initial=False, _errnos=UNAVAIL):
+ def _read(self, n, initial=False, _errnos=_UNAVAIL):
"""Read exactly n bytes from the socket"""
recv = self._quick_recv
rbuf = self._read_buffer
- while len(rbuf) < n:
- try:
- s = recv(131072)
- except socket.error as exc:
- if not initial and exc.errno in _errnos:
- continue
- raise
- if not s:
- raise IOError('Socket closed')
- rbuf += s
+ try:
+ while len(rbuf) < n:
+ try:
+ s = recv(131072)
+ except socket.error as exc:
+ if not initial and exc.errno in _errnos:
+ continue
+ raise
+ if not s:
+ raise IOError('Socket closed')
+ rbuf += s
+ except:
+ self._read_buffer = rbuf
+ raise
result, self._read_buffer = rbuf[:n], rbuf[n:]
return result
diff --git a/amqp/utils.py b/amqp/utils.py
index 994030b..900d2aa 100644
--- a/amqp/utils.py
+++ b/amqp/utils.py
@@ -11,7 +11,8 @@ except ImportError:
class promise(object):
if not hasattr(sys, 'pypy_version_info'):
__slots__ = tuple(
- 'fun args kwargs value ready failed on_success on_error'.split()
+ 'fun args kwargs value ready failed '
+ ' on_success on_error calls'.split()
)
def __init__(self, fun, args=(), kwargs=(),
@@ -24,6 +25,7 @@ class promise(object):
self.on_success = on_success
self.on_error = on_error
self.value = None
+ self.calls = 0
def __repr__(self):
return '<$: {0.fun.__name__}(*{0.args!r}, **{0.kwargs!r})'.format(
@@ -43,6 +45,7 @@ class promise(object):
self.on_success(self.value)
finally:
self.ready = True
+ self.calls += 1
def then(self, callback=None, on_error=None):
self.on_success = callback
diff --git a/demo/amqp_clock.py b/demo/amqp_clock.py
index d842296..c718266 100755
--- a/demo/amqp_clock.py
+++ b/demo/amqp_clock.py
@@ -27,18 +27,23 @@ TOPIC_PATTERN = '%Y.%m.%d.%w.%H.%M' # Python datetime.strftime() pattern
def main():
parser = OptionParser()
- parser.add_option('--host', dest='host',
- help='AMQP server to connect to (default: %default)',
- default='localhost')
- parser.add_option('-u', '--userid', dest='userid',
+ parser.add_option(
+ '--host', dest='host',
+ help='AMQP server to connect to (default: %default)',
+ default='localhost',
+ )
+ parser.add_option(
+ '-u', '--userid', dest='userid',
help='AMQP userid to authenticate as (default: %default)',
default='guest',
)
- parser.add_option('-p', '--password', dest='password',
+ parser.add_option(
+ '-p', '--password', dest='password',
help='AMQP password to authenticate with (default: %default)',
default='guest',
)
- parser.add_option('--ssl', dest='ssl', action='store_true',
+ parser.add_option(
+ '--ssl', dest='ssl', action='store_true',
help='Enable SSL with AMQP server (default: not enabled)',
default=False,
)
diff --git a/demo/demo_receive.py b/demo/demo_receive.py
index 7382604..bfda624 100755
--- a/demo/demo_receive.py
+++ b/demo/demo_receive.py
@@ -16,14 +16,14 @@ import amqp
def callback(channel, msg):
for key, val in msg.properties.items():
- print ('%s: %s' % (key, str(val)))
+ print('%s: %s' % (key, str(val)))
for key, val in msg.delivery_info.items():
- print ('> %s: %s' % (key, str(val)))
+ print('> %s: %s' % (key, str(val)))
- print ('')
- print (msg.body)
- print ('-------')
- print msg.delivery_tag
+ print('')
+ print(msg.body)
+ print('-------')
+ print(msg.delivery_tag)
channel.basic_ack(msg.delivery_tag)
#
@@ -35,19 +35,23 @@ def callback(channel, msg):
def main():
parser = OptionParser()
- parser.add_option('--host', dest='host',
+ parser.add_option(
+ '--host', dest='host',
help='AMQP server to connect to (default: %default)',
default='localhost',
)
- parser.add_option('-u', '--userid', dest='userid',
+ parser.add_option(
+ '-u', '--userid', dest='userid',
help='userid to authenticate as (default: %default)',
default='guest',
)
- parser.add_option('-p', '--password', dest='password',
+ parser.add_option(
+ '-p', '--password', dest='password',
help='password to authenticate with (default: %default)',
default='guest',
)
- parser.add_option('--ssl', dest='ssl', action='store_true',
+ parser.add_option(
+ '--ssl', dest='ssl', action='store_true',
help='Enable SSL (default: not enabled)',
default=False,
)
diff --git a/demo/demo_send.py b/demo/demo_send.py
index 0084815..27bb1b1 100755
--- a/demo/demo_send.py
+++ b/demo/demo_send.py
@@ -19,19 +19,23 @@ def main():
parser = OptionParser(
usage='usage: %prog [options] message\nexample: %prog hello world',
)
- parser.add_option('--host', dest='host',
+ parser.add_option(
+ '--host', dest='host',
help='AMQP server to connect to (default: %default)',
default='localhost',
)
- parser.add_option('-u', '--userid', dest='userid',
+ parser.add_option(
+ '-u', '--userid', dest='userid',
help='userid to authenticate as (default: %default)',
default='guest',
)
- parser.add_option('-p', '--password', dest='password',
+ parser.add_option(
+ '-p', '--password', dest='password',
help='password to authenticate with (default: %default)',
default='guest',
)
- parser.add_option('--ssl', dest='ssl', action='store_true',
+ parser.add_option(
+ '--ssl', dest='ssl', action='store_true',
help='Enable SSL (default: not enabled)',
default=False,
)
diff --git a/docs/_ext/applyxrefs.py b/docs/_ext/applyxrefs.py
index 027490b..deed5d9 100644
--- a/docs/_ext/applyxrefs.py
+++ b/docs/_ext/applyxrefs.py
@@ -6,8 +6,8 @@ import os
testing = False
DONT_TOUCH = (
- './index.txt',
- )
+ './index.txt',
+)
def target_name(fn):
diff --git a/docs/_ext/literals_to_xrefs.py b/docs/_ext/literals_to_xrefs.py
index e9dc7ca..41aa616 100644
--- a/docs/_ext/literals_to_xrefs.py
+++ b/docs/_ext/literals_to_xrefs.py
@@ -95,8 +95,8 @@ def fixliterals(fname):
replace_type in ("class", "func", "meth"):
default = default[:-2]
replace_value = raw_input(
- colorize("Text <target> [", fg="yellow") + default + \
- colorize("]: ", fg="yellow")).strip()
+ colorize("Text <target> [", fg="yellow") + default +
+ colorize("]: ", fg="yellow")).strip()
if not replace_value:
replace_value = default
new.append(":%s:`%s`" % (replace_type, replace_value))
diff --git a/docs/conf.py b/docs/conf.py
index 3b6f524..1191395 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -70,8 +70,8 @@ html_use_modindex = True
html_use_index = True
latex_documents = [
- ('index', 'py-amqp.tex', ur'py-amqp Documentation',
- ur'Ask Solem & Contributors', 'manual'),
+ ('index', 'py-amqp.tex', ur'py-amqp Documentation',
+ ur'Ask Solem & Contributors', 'manual'),
]
html_theme = "celery"
@@ -89,7 +89,7 @@ if False:
issuetracker_project = "celery/py-amqp"
issuetracker_issue_pattern = r'[Ii]ssue #(\d+)'
-# -- Options for Epub output ---------------------------------------------------
+# -- Options for Epub output ------------------------------------------------
# Bibliographic Dublin Core info.
epub_title = 'py-amqp Manual, Version 1.0'
diff --git a/extra/generate_skeleton_0_8.py b/extra/generate_skeleton_0_8.py
index 442bc83..3bf4482 100755
--- a/extra/generate_skeleton_0_8.py
+++ b/extra/generate_skeleton_0_8.py
@@ -204,9 +204,10 @@ def generate_methods(class_element, out):
if 'content' in amqp_method.attrib:
params.append('msg')
- out.write(' def %s(%s):\n' %
- (_fixup_method_name(class_element, amqp_method),
- ', '.join(params + fieldnames)))
+ out.write(' def %s(%s):\n' % (
+ _fixup_method_name(class_element, amqp_method),
+ ', '.join(params + fieldnames)),
+ )
s = generate_docstr(amqp_method, ' ', ' """')
if s:
@@ -231,7 +232,7 @@ def generate_methods(class_element, out):
if 'synchronous' in amqp_method.attrib:
responses = [x.attrib['name']
- for x in amqp_method.findall('response')]
+ for x in amqp_method.findall('response')]
out.write(' return self.wait(allowed_methods=[\n')
for r in responses:
resp = method_name_map[(class_element.attrib['name'], r)]
@@ -274,7 +275,7 @@ def generate_class(spec, class_element, out):
#
for amqp_class in spec.findall('class'):
if (amqp_class.attrib['handler'] == class_element.attrib['name']) and \
- (amqp_class.attrib['name'] != class_element.attrib['name']):
+ (amqp_class.attrib['name'] != class_element.attrib['name']):
out.write(' #############\n')
out.write(' #\n')
out.write(' # %s\n' % amqp_class.attrib['name'].capitalize())
@@ -315,8 +316,8 @@ def generate_module(spec, out):
(
amqp_class.attrib['index'],
amqp_method.attrib['index'],
- amqp_class.attrib['handler'].capitalize() + '.' +
- _fixup_method_name(amqp_class, amqp_method),
+ (amqp_class.attrib['handler'].capitalize() + '.' +
+ _fixup_method_name(amqp_class, amqp_method)),
)
#### Actually generate output
@@ -335,7 +336,7 @@ def generate_module(spec, out):
# for chassis in amqp_method.findall('chassis'):
# print ' ', chassis.attrib
chassis = [x.attrib['name']
- for x in amqp_method.findall('chassis')]
+ for x in amqp_method.findall('chassis')]
if 'client' in chassis:
out.write(" (%s, %s): (%s, %s._%s),\n" % (
amqp_class.attrib['index'],
diff --git a/funtests/test_channel.py b/funtests/test_channel.py
index 503a48e..10b860f 100755
--- a/funtests/test_channel.py
+++ b/funtests/test_channel.py
@@ -22,15 +22,8 @@ Test amqp.channel module
import sys
import unittest
-try:
- bytes
-except NameError:
- # Python 2.5 and lower
- bytes = str
-
import settings
-
from amqp import ChannelError, Connection, Message, FrameSyntaxError
@@ -163,14 +156,19 @@ class TestChannel(unittest.TestCase):
self.assertTrue(isinstance(msg2.body, bytes))
self.assertEqual(msg2.body, u'hello w\xf6rld'.encode('latin_1'))
- def test_exception(self):
- """
- Check that Channel exceptions are actually raised as Python
- exceptions.
+ def test_queue_delete_empty(self):
+ self.assertFalse(
+ self.ch.queue_delete('bogus_queue_that_does_not_exist')
+ )
- """
+ def test_survives_channel_error(self):
with self.assertRaises(ChannelError):
- self.ch.queue_delete('bogus_queue_that_does_not_exist')
+ self.ch.queue_declare('krjqheewq_bogus', passive=True)
+ self.ch.queue_declare('funtest_survive')
+ self.ch.queue_declare('funtest_survive', passive=True)
+ self.assertEqual(
+ 0, self.ch.queue_delete('funtest_survive'),
+ )
def test_invalid_header(self):
"""
@@ -181,7 +179,7 @@ class TestChannel(unittest.TestCase):
"""
qname, _, _ = self.ch.queue_declare()
- msg = Message(application_headers={'test': None})
+ msg = Message(application_headers={'test': object()})
self.assertRaises(
FrameSyntaxError, self.ch.basic_publish, msg, routing_key=qname,
@@ -248,10 +246,10 @@ class TestChannel(unittest.TestCase):
content_type='text/plain',
application_headers={'foo': 7, 'bar': 'baz'})
- self.ch.basic_publish(msg, 'unittest.fanout')
- self.ch.basic_publish(msg, 'unittest.fanout', mandatory=True)
- self.ch.basic_publish(msg, 'unittest.fanout', mandatory=True)
- self.ch.basic_publish(msg, 'unittest.fanout', mandatory=True)
+ self.ch.basic_publish(msg, 'funtest.fanout')
+ self.ch.basic_publish(msg, 'funtest.fanout', mandatory=True)
+ self.ch.basic_publish(msg, 'funtest.fanout', mandatory=True)
+ self.ch.basic_publish(msg, 'funtest.fanout', mandatory=True)
self.ch.close()
#
@@ -264,38 +262,37 @@ class TestChannel(unittest.TestCase):
Network configuration is as follows (-> is forwards to :
source_exchange -> dest_exchange -> queue
The test checks that once the message is publish to the
- destination exchange(unittest.topic_dest) it is delivered to the queue.
+ destination exchange(funtest.topic_dest) it is delivered to the queue.
"""
test_routing_key = 'unit_test__key'
- dest_exchange = 'unittest.topic_dest_bind'
- source_exchange = 'unittest.topic_source_bind'
+ dest_exchange = 'funtest.topic_dest_bind'
+ source_exchange = 'funtest.topic_source_bind'
self.ch.exchange_declare(dest_exchange, 'topic', auto_delete=True)
self.ch.exchange_declare(source_exchange, 'topic', auto_delete=True)
qname, _, _ = self.ch.queue_declare()
- self.ch.exchange_bind(destination = dest_exchange,
- source = source_exchange,
- routing_key = test_routing_key)
+ self.ch.exchange_bind(destination=dest_exchange,
+ source=source_exchange,
+ routing_key=test_routing_key)
self.ch.queue_bind(qname, dest_exchange,
routing_key=test_routing_key)
- msg = Message('unittest message',
+ msg = Message('funtest message',
content_type='text/plain',
application_headers={'foo': 7, 'bar': 'baz'})
-
self.ch.basic_publish(msg, source_exchange,
- routing_key = test_routing_key)
+ routing_key=test_routing_key)
msg2 = self.ch.basic_get(qname, no_ack=True)
self.assertEqual(msg, msg2)
def test_exchange_unbind(self):
- dest_exchange = 'unittest.topic_dest_unbind'
- source_exchange = 'unittest.topic_source_unbind'
+ dest_exchange = 'funtest.topic_dest_unbind'
+ source_exchange = 'funtest.topic_source_unbind'
test_routing_key = 'unit_test__key'
self.ch.exchange_declare(dest_exchange,
@@ -311,6 +308,7 @@ class TestChannel(unittest.TestCase):
source=source_exchange,
routing_key=test_routing_key)
+
def main():
suite = unittest.TestLoader().loadTestsFromTestCase(TestChannel)
unittest.TextTestRunner(**settings.test_args).run(suite)
diff --git a/funtests/test_serialization.py b/funtests/test_serialization.py
index 9a8793c..ce70438 100755
--- a/funtests/test_serialization.py
+++ b/funtests/test_serialization.py
@@ -26,12 +26,6 @@ from random import randint
import sys
import unittest
-try:
- bytes
-except NameError:
- # Python 2.5 and lower
- bytes = str
-
import settings
from amqp.serialization import (
@@ -316,7 +310,7 @@ class TestSerialization(unittest.TestCase):
Check that an un-serializable table entry raises a ValueError
"""
- val = {'test': None}
+ val = {'test': object()}
w = AMQPWriter()
self.assertRaises(FrameSyntaxError, w.write_table, val)
@@ -327,6 +321,7 @@ class TestSerialization(unittest.TestCase):
'baz': 'this is some random string I typed',
'ubaz': u'And something in unicode',
'dday_aniv': datetime(1994, 6, 6),
+ 'nothing': None,
'more': {
'abc': -123,
'def': 'hello world',
@@ -351,26 +346,26 @@ class TestSerialization(unittest.TestCase):
# Array
#
def test_array_from_list(self):
- val = [1, 'foo']
+ val = [1, 'foo', None]
w = AMQPWriter()
w.write_array(val)
s = w.getvalue()
self.assertEqualBinary(
- s, '\x00\x00\x00\x0DI\x00\x00\x00\x01S\x00\x00\x00\x03foo',
+ s, '\x00\x00\x00\x0EI\x00\x00\x00\x01S\x00\x00\x00\x03fooV',
)
r = AMQPReader(s)
self.assertEqual(r.read_array(), val)
def test_array_from_tuple(self):
- val = (1, 'foo')
+ val = (1, 'foo', None)
w = AMQPWriter()
w.write_array(val)
s = w.getvalue()
self.assertEqualBinary(
- s, '\x00\x00\x00\x0DI\x00\x00\x00\x01S\x00\x00\x00\x03foo',
+ s, '\x00\x00\x00\x0EI\x00\x00\x00\x01S\x00\x00\x00\x03fooV',
)
r = AMQPReader(s)
@@ -394,48 +389,6 @@ class TestSerialization(unittest.TestCase):
self.assertEqual(r.read_table(), val)
#
- # Array
- #
- def test_array_from_list(self):
- val = [1, 'foo']
- w = AMQPWriter()
- w.write_array(val)
- s = w.getvalue()
-
- self.assertEqualBinary(s, '\x00\x00\x00\x0DI\x00\x00\x00\x01S\x00\x00\x00\x03foo')
-
- r = AMQPReader(s)
- self.assertEqual(r.read_array(), val)
-
- def test_array_from_tuple(self):
- val = (1, 'foo')
- w = AMQPWriter()
- w.write_array(val)
- s = w.getvalue()
-
- self.assertEqualBinary(s, '\x00\x00\x00\x0DI\x00\x00\x00\x01S\x00\x00\x00\x03foo')
-
- r = AMQPReader(s)
- self.assertEqual(r.read_array(), list(val))
-
- def test_table_with_array(self):
- val = {
- 'foo': 7,
- 'bar': Decimal('123345.1234'),
- 'baz': 'this is some random string I typed',
- 'blist': [1,2,3],
- 'nlist': [1, [2,3,4]],
- 'ndictl': {'nfoo': 8, 'nblist': [5,6,7] }
- }
-
- w = AMQPWriter()
- w.write_table(val)
- s = w.getvalue()
-
- r = AMQPReader(s)
- self.assertEqual(r.read_table(), val)
-
- #
# GenericContent
#
def test_generic_content_eq(self):
diff --git a/pavement.py b/pavement.py
index 685ca15..f3a4175 100644
--- a/pavement.py
+++ b/pavement.py
@@ -1,13 +1,11 @@
-import os
-import sys
-from paver.easy import *
-from paver import doctools
-from paver.setuputils import setup
-sys.path.insert(0, os.path.abspath('.'))
+from paver.easy import * # noqa
+from paver import doctools # noqa
+from paver.setuputils import setup # noqa
+
PYCOMPILE_CACHES = ['*.pyc', '*$py.class']
options(
- sphinx=Bunch(builddir='.build'),
+ sphinx=Bunch(builddir='.build'),
)
@@ -34,7 +32,7 @@ def html(options):
def qhtml(options):
destdir = path('Documentation')
builtdocs = sphinx_builddir(options)
- sh('rsync -az {0}/ {1}'.format(builtdocs, destdir))
+ sh('rsync -az %s/ %s' % (builtdocs, destdir))
@task
@@ -42,18 +40,17 @@ def qhtml(options):
def ghdocs(options):
builtdocs = sphinx_builddir(options)
sh("git checkout gh-pages && \
- cp -r {0}/* . && \
+ cp -r %s/* . && \
git commit . -m 'Rendered documentation for Github Pages.' && \
git push origin gh-pages && \
- git checkout master".format(builtdocs))
+ git checkout master" % builtdocs)
@task
@needs('clean_docs', 'paver.doctools.html')
def upload_pypi_docs(options):
builtdocs = path('docs') / options.builddir / 'html'
- sh("{0} setup.py upload_sphinx --upload-dir='{1}'".format(
- sys.executable, builtdocs))
+ sh("python setup.py upload_sphinx --upload-dir='%s'" % (builtdocs))
@task
@@ -73,38 +70,6 @@ def verifyindex(options):
@task
-@cmdopts([
- ('noerror', 'E', 'Ignore errors'),
-])
-def flake8(options):
- noerror = getattr(options, 'noerror', False)
- complexity = getattr(options, 'complexity', 22)
- sh("""flake8 amqp | perl -mstrict -mwarnings -nle'
- my $ignore = m/too complex \((\d+)\)/ && $1 le {0};
- if (! $ignore) {{ print STDERR; our $FOUND_FLAKE = 1 }}
- }}{{exit $FOUND_FLAKE;
- '""".format(complexity), ignore_error=noerror)
-
-
-@task
-@cmdopts([
- ('noerror', 'E', 'Ignore errors'),
-])
-def flakeplus(options):
- noerror = getattr(options, 'noerror', False)
- sh('flakeplus amqp', ignore_error=noerror)
-
-
-@task
-@cmdopts([
- ('noerror', 'E', 'Ignore errors')
-])
-def flakes(options):
- flake8(options)
- flakeplus(options)
-
-
-@task
def clean_readme(options):
path('README').unlink()
path('README.rst').unlink()
@@ -113,15 +78,21 @@ def clean_readme(options):
@task
@needs('clean_readme')
def readme(options):
- sh('{0} extra/release/sphinx-to-rst.py docs/templates/readme.txt \
- > README.rst'.format(sys.executable))
+ sh('python extra/release/sphinx-to-rst.py docs/templates/readme.txt \
+ > README.rst')
+ sh('ln -sf README.rst README')
@task
+@cmdopts([
+ ('custom=', 'C', 'custom version'),
+])
def bump(options):
- sh("extra/release/bump_version.py \
- amqp/__init__.py docs/includes/intro.txt \
- --before-commit='paver readme'")
+ s = ("-- '%s'" % (options.custom, ) if getattr(options, 'custom', None)
+ else '')
+ sh('extra/release/bump_version.py \
+ amqp/__init__.py README.rst %s' % (s, ))
+
@task
@cmdopts([
@@ -133,6 +104,8 @@ def test(options):
cmd = 'nosetests'
if getattr(options, 'coverage', False):
cmd += ' --with-coverage3'
+ if getattr(options, 'quick', False):
+ cmd = 'QUICKTEST=1 SKIP_RLIMITS=1 %s' % cmd
if getattr(options, 'verbose', False):
cmd += ' --verbosity=2'
sh(cmd)
@@ -142,18 +115,52 @@ def test(options):
@cmdopts([
('noerror', 'E', 'Ignore errors'),
])
+def flake8(options):
+ noerror = getattr(options, 'noerror', False)
+ complexity = getattr(options, 'complexity', 22)
+ sh("""flake8 amqp | perl -mstrict -mwarnings -nle'
+ my $ignore = (m/too complex \((\d+)\)/ && $1 le %s);
+ if (! $ignore) { print STDERR; our $FOUND_FLAKE = 1 }
+ }{exit $FOUND_FLAKE;
+ '""" % (complexity, ), ignore_error=noerror)
+
+
+@task
+@cmdopts([
+ ('noerror', 'E', 'Ignore errors'),
+])
+def flakeplus(options):
+ noerror = getattr(options, 'noerror', False)
+ sh('flakeplus amqp --2.6',
+ ignore_error=noerror)
+
+
+@task
+@cmdopts([
+ ('noerror', 'E', 'Ignore errors'),
+])
+def flakes(options):
+ flake8(options)
+ flakeplus(options)
+
+
+@task
+@cmdopts([
+ ('noerror', 'E', 'Ignore errors'),
+])
def pep8(options):
noerror = getattr(options, 'noerror', False)
- return sh("""find . -name "*.py" | xargs pep8 | perl -nle'\
+ return sh("""find amqp -name "*.py" | xargs pep8 | perl -nle'\
print; $a=1 if $_}{exit($a)'""", ignore_error=noerror)
@task
def removepyc(options):
- sh('find . -type f -a \\( {0} \\) | xargs rm'.format(
- ' -o '.join("-name '{0}'".format(pat) for pat in PYCOMPILE_CACHES)))
+ sh('find . -type f -a \\( %s \\) | xargs rm' % (
+ ' -o '.join("-name '%s'" % (pat, ) for pat in PYCOMPILE_CACHES), ))
sh('find . -type d -name "__pycache__" | xargs rm -r')
+
@task
@needs('removepyc')
def gitclean(options):
@@ -167,7 +174,7 @@ def gitcleanforce(options):
@task
-@needs('flakes', 'autodoc', 'verifyindex', 'gitclean')
+@needs('flakes', 'autodoc', 'verifyindex', 'test', 'gitclean')
def releaseok(options):
pass
@@ -176,13 +183,3 @@ def releaseok(options):
@needs('releaseok', 'removepyc', 'upload_docs')
def release(options):
pass
-
-
-@task
-def testloc(options):
- sh('sloccount tests')
-
-
-@task
-def loc(options):
- sh('sloccount amqp')
diff --git a/setup.py b/setup.py
index ec4420e..a1cfed5 100644
--- a/setup.py
+++ b/setup.py
@@ -15,8 +15,8 @@ import os
import sys
import codecs
-if sys.version_info < (2, 5):
- raise Exception('amqp requires Python 2.5 or higher.')
+if sys.version_info < (2, 6):
+ raise Exception('amqp requires Python 2.6 or higher.')
NAME = 'amqp'
entrypoints = {}
@@ -35,7 +35,8 @@ classes = """
Programming Language :: Python :: 3.1
Programming Language :: Python :: 3.2
Programming Language :: Python :: 3.3
- License :: OSI Approved :: GNU Library or Lesser General Public License (LGPL)
+ License :: OSI Approved :: GNU Library or \
+Lesser General Public License (LGPL)
Intended Audience :: Developers
License :: OSI Approved :: BSD License
Operating System :: OS Independent