summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2013-03-08 11:15:44 +0000
committerAsk Solem <ask@celeryproject.org>2013-03-08 11:15:44 +0000
commit4518aa8be2a72f1a41bddc90be455fe73a4b5b11 (patch)
tree71bb998e97e0f55bc2b6e2db47eac10e9e2fed8a
parentc28665b0463e59b1a9ba515546547aa4e3b748b7 (diff)
parentf8ea0375779c43516bb2f41207879faa210431e9 (diff)
downloadpy-amqp-4518aa8be2a72f1a41bddc90be455fe73a4b5b11.tar.gz
Merge branch '1.0'
Conflicts: Changelog README.rst amqp/__init__.py funtests/test_channel.py
-rw-r--r--Changelog13
-rw-r--r--amqp/channel.py8
-rw-r--r--amqp/connection.py7
-rw-r--r--amqp/transport.py10
-rwxr-xr-xfuntests/test_channel.py42
5 files changed, 52 insertions, 28 deletions
diff --git a/Changelog b/Changelog
index cafdd26..14cc7e9 100644
--- a/Changelog
+++ b/Changelog
@@ -54,6 +54,19 @@ http://code.google.com/p/py-amqplib/source/browse/CHANGES
- Python 3 related fixes.
+.. _version-1.0.9:
+
+1.0.9
+=====
+:release-date: 2013-03-08 10:40 A.M UTC
+
+- Publisher ack callbacks should now work after typo fix (Issue #9).
+
+- ``channel(explicit_id)`` will now claim that id from the array
+ of unused channel ids.
+
+- Fixes Jython compatibility.
+
.. _version-1.0.8:
1.0.8
diff --git a/amqp/channel.py b/amqp/channel.py
index a5b8635..48188a5 100644
--- a/amqp/channel.py
+++ b/amqp/channel.py
@@ -62,8 +62,11 @@ class Channel(AbstractChannel):
is left as plain bytes.
"""
- if channel_id is None:
+ if channel_id:
+ connection._claim_channel_id(channel_id)
+ else:
channel_id = connection._get_free_channel_id()
+
AMQP_LOGGER.debug('using channel_id: %d', channel_id)
super(Channel, self).__init__(connection, channel_id)
@@ -1885,7 +1888,6 @@ class Channel(AbstractChannel):
fun(msg)
def basic_get(self, queue='', no_ack=False):
- print('BASIC GET: %r' % (queue, ))
"""Direct access to a queue
This method provides a direct access to the messages in a
@@ -2467,7 +2469,7 @@ class Channel(AbstractChannel):
def _basic_ack_recv(self, args):
delivery_tag = args.read_longlong()
multiple = args.read_bit()
- for callback in self.handlers['basic_ack']:
+ for callback in self.events['basic_ack']:
callback(delivery_tag, multiple)
_METHOD_MAP = {
diff --git a/amqp/connection.py b/amqp/connection.py
index e6b7a49..f09cd22 100644
--- a/amqp/connection.py
+++ b/amqp/connection.py
@@ -178,6 +178,13 @@ class Connection(AbstractChannel):
'No free channel ids, current={0}, channel_max={1}'.format(
len(self.channels), self.channel_max), (20, 10))
+ def _claim_channel_id(self, channel_id):
+ try:
+ return self._avail_channel_ids.remove(channel_id)
+ except ValueError:
+ raise ConnectionError(
+ 'Channel %r already open' % (channel_id, ))
+
def _wait_method(self, channel_id, allowed_methods):
"""Wait for a method from the server destined for
a particular channel."""
diff --git a/amqp/transport.py b/amqp/transport.py
index a9a7b2c..6563bd2 100644
--- a/amqp/transport.py
+++ b/amqp/transport.py
@@ -25,6 +25,12 @@ import errno
import re
import socket
+# Jython does not have this attribute
+try:
+ from socket import SOL_TCP
+except ImportError: # pragma: no cover
+ from socket import IPPROTO_TCP as SOL_TCP # noqa
+
#
# See if Python 2.6+ SSL support is available
#
@@ -72,7 +78,7 @@ class _AbstractTransport(object):
self.sock = None
for res in socket.getaddrinfo(host, port, 0,
- socket.SOCK_STREAM, socket.SOL_TCP):
+ socket.SOCK_STREAM, SOL_TCP):
af, socktype, proto, canonname, sa = res
try:
self.sock = socket.socket(af, socktype, proto)
@@ -91,7 +97,7 @@ class _AbstractTransport(object):
raise socket.error(msg)
self.sock.settimeout(None)
- self.sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
+ self.sock.setsockopt(SOL_TCP, socket.TCP_NODELAY, 1)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self._setup_transport()
diff --git a/funtests/test_channel.py b/funtests/test_channel.py
index 43918a9..503a48e 100755
--- a/funtests/test_channel.py
+++ b/funtests/test_channel.py
@@ -246,14 +246,12 @@ class TestChannel(unittest.TestCase):
msg = Message(
'funtest message',
content_type='text/plain',
- application_headers={'foo': 7, 'bar': 'baz'},
- )
+ application_headers={'foo': 7, 'bar': 'baz'})
- self.ch.basic_publish(msg, 'funtest.fanout')
- self.ch.basic_publish(msg, 'funtest.fanout', immediate=True)
- self.ch.basic_publish(msg, 'funtest.fanout', mandatory=True)
- self.ch.basic_publish(msg, 'funtest.fanout',
- immediate=True, mandatory=True)
+ self.ch.basic_publish(msg, 'unittest.fanout')
+ self.ch.basic_publish(msg, 'unittest.fanout', mandatory=True)
+ self.ch.basic_publish(msg, 'unittest.fanout', mandatory=True)
+ self.ch.basic_publish(msg, 'unittest.fanout', mandatory=True)
self.ch.close()
#
@@ -266,40 +264,39 @@ class TestChannel(unittest.TestCase):
Network configuration is as follows (-> is forwards to :
source_exchange -> dest_exchange -> queue
The test checks that once the message is publish to the
- destination exchange(funtest.topic_dest) it is delivered to the queue.
+ destination exchange(unittest.topic_dest) it is delivered to the queue.
"""
test_routing_key = 'unit_test__key'
- dest_exchange = 'funtest.topic_dest_bind'
- source_exchange = 'funtest.topic_source_bind'
+ dest_exchange = 'unittest.topic_dest_bind'
+ source_exchange = 'unittest.topic_source_bind'
self.ch.exchange_declare(dest_exchange, 'topic', auto_delete=True)
self.ch.exchange_declare(source_exchange, 'topic', auto_delete=True)
qname, _, _ = self.ch.queue_declare()
- self.ch.exchange_bind(destination=dest_exchange,
- source=source_exchange,
- routing_key=test_routing_key)
+ self.ch.exchange_bind(destination = dest_exchange,
+ source = source_exchange,
+ routing_key = test_routing_key)
self.ch.queue_bind(qname, dest_exchange,
routing_key=test_routing_key)
- msg = Message(
- 'funtest message',
- content_type='text/plain',
- application_headers={'foo': 7, 'bar': 'baz'},
- )
+ msg = Message('unittest message',
+ content_type='text/plain',
+ application_headers={'foo': 7, 'bar': 'baz'})
+
self.ch.basic_publish(msg, source_exchange,
- routing_key=test_routing_key)
+ routing_key = test_routing_key)
msg2 = self.ch.basic_get(qname, no_ack=True)
self.assertEqual(msg, msg2)
def test_exchange_unbind(self):
- dest_exchange = 'funtest.topic_dest_unbind'
- source_exchange = 'funtest.topic_source_unbind'
- test_routing_key = 'fun_test__key'
+ dest_exchange = 'unittest.topic_dest_unbind'
+ source_exchange = 'unittest.topic_source_unbind'
+ test_routing_key = 'unit_test__key'
self.ch.exchange_declare(dest_exchange,
'topic', auto_delete=True)
@@ -314,7 +311,6 @@ class TestChannel(unittest.TestCase):
source=source_exchange,
routing_key=test_routing_key)
-
def main():
suite = unittest.TestLoader().loadTestsFromTestCase(TestChannel)
unittest.TextTestRunner(**settings.test_args).run(suite)