diff options
author | Ask Solem <ask@celeryproject.org> | 2013-03-08 11:15:44 +0000 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2013-03-08 11:15:44 +0000 |
commit | 4518aa8be2a72f1a41bddc90be455fe73a4b5b11 (patch) | |
tree | 71bb998e97e0f55bc2b6e2db47eac10e9e2fed8a | |
parent | c28665b0463e59b1a9ba515546547aa4e3b748b7 (diff) | |
parent | f8ea0375779c43516bb2f41207879faa210431e9 (diff) | |
download | py-amqp-4518aa8be2a72f1a41bddc90be455fe73a4b5b11.tar.gz |
Merge branch '1.0'
Conflicts:
Changelog
README.rst
amqp/__init__.py
funtests/test_channel.py
-rw-r--r-- | Changelog | 13 | ||||
-rw-r--r-- | amqp/channel.py | 8 | ||||
-rw-r--r-- | amqp/connection.py | 7 | ||||
-rw-r--r-- | amqp/transport.py | 10 | ||||
-rwxr-xr-x | funtests/test_channel.py | 42 |
5 files changed, 52 insertions, 28 deletions
@@ -54,6 +54,19 @@ http://code.google.com/p/py-amqplib/source/browse/CHANGES - Python 3 related fixes. +.. _version-1.0.9: + +1.0.9 +===== +:release-date: 2013-03-08 10:40 A.M UTC + +- Publisher ack callbacks should now work after typo fix (Issue #9). + +- ``channel(explicit_id)`` will now claim that id from the array + of unused channel ids. + +- Fixes Jython compatibility. + .. _version-1.0.8: 1.0.8 diff --git a/amqp/channel.py b/amqp/channel.py index a5b8635..48188a5 100644 --- a/amqp/channel.py +++ b/amqp/channel.py @@ -62,8 +62,11 @@ class Channel(AbstractChannel): is left as plain bytes. """ - if channel_id is None: + if channel_id: + connection._claim_channel_id(channel_id) + else: channel_id = connection._get_free_channel_id() + AMQP_LOGGER.debug('using channel_id: %d', channel_id) super(Channel, self).__init__(connection, channel_id) @@ -1885,7 +1888,6 @@ class Channel(AbstractChannel): fun(msg) def basic_get(self, queue='', no_ack=False): - print('BASIC GET: %r' % (queue, )) """Direct access to a queue This method provides a direct access to the messages in a @@ -2467,7 +2469,7 @@ class Channel(AbstractChannel): def _basic_ack_recv(self, args): delivery_tag = args.read_longlong() multiple = args.read_bit() - for callback in self.handlers['basic_ack']: + for callback in self.events['basic_ack']: callback(delivery_tag, multiple) _METHOD_MAP = { diff --git a/amqp/connection.py b/amqp/connection.py index e6b7a49..f09cd22 100644 --- a/amqp/connection.py +++ b/amqp/connection.py @@ -178,6 +178,13 @@ class Connection(AbstractChannel): 'No free channel ids, current={0}, channel_max={1}'.format( len(self.channels), self.channel_max), (20, 10)) + def _claim_channel_id(self, channel_id): + try: + return self._avail_channel_ids.remove(channel_id) + except ValueError: + raise ConnectionError( + 'Channel %r already open' % (channel_id, )) + def _wait_method(self, channel_id, allowed_methods): """Wait for a method from the server destined for a particular channel.""" diff --git a/amqp/transport.py b/amqp/transport.py index a9a7b2c..6563bd2 100644 --- a/amqp/transport.py +++ b/amqp/transport.py @@ -25,6 +25,12 @@ import errno import re import socket +# Jython does not have this attribute +try: + from socket import SOL_TCP +except ImportError: # pragma: no cover + from socket import IPPROTO_TCP as SOL_TCP # noqa + # # See if Python 2.6+ SSL support is available # @@ -72,7 +78,7 @@ class _AbstractTransport(object): self.sock = None for res in socket.getaddrinfo(host, port, 0, - socket.SOCK_STREAM, socket.SOL_TCP): + socket.SOCK_STREAM, SOL_TCP): af, socktype, proto, canonname, sa = res try: self.sock = socket.socket(af, socktype, proto) @@ -91,7 +97,7 @@ class _AbstractTransport(object): raise socket.error(msg) self.sock.settimeout(None) - self.sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) + self.sock.setsockopt(SOL_TCP, socket.TCP_NODELAY, 1) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) self._setup_transport() diff --git a/funtests/test_channel.py b/funtests/test_channel.py index 43918a9..503a48e 100755 --- a/funtests/test_channel.py +++ b/funtests/test_channel.py @@ -246,14 +246,12 @@ class TestChannel(unittest.TestCase): msg = Message( 'funtest message', content_type='text/plain', - application_headers={'foo': 7, 'bar': 'baz'}, - ) + application_headers={'foo': 7, 'bar': 'baz'}) - self.ch.basic_publish(msg, 'funtest.fanout') - self.ch.basic_publish(msg, 'funtest.fanout', immediate=True) - self.ch.basic_publish(msg, 'funtest.fanout', mandatory=True) - self.ch.basic_publish(msg, 'funtest.fanout', - immediate=True, mandatory=True) + self.ch.basic_publish(msg, 'unittest.fanout') + self.ch.basic_publish(msg, 'unittest.fanout', mandatory=True) + self.ch.basic_publish(msg, 'unittest.fanout', mandatory=True) + self.ch.basic_publish(msg, 'unittest.fanout', mandatory=True) self.ch.close() # @@ -266,40 +264,39 @@ class TestChannel(unittest.TestCase): Network configuration is as follows (-> is forwards to : source_exchange -> dest_exchange -> queue The test checks that once the message is publish to the - destination exchange(funtest.topic_dest) it is delivered to the queue. + destination exchange(unittest.topic_dest) it is delivered to the queue. """ test_routing_key = 'unit_test__key' - dest_exchange = 'funtest.topic_dest_bind' - source_exchange = 'funtest.topic_source_bind' + dest_exchange = 'unittest.topic_dest_bind' + source_exchange = 'unittest.topic_source_bind' self.ch.exchange_declare(dest_exchange, 'topic', auto_delete=True) self.ch.exchange_declare(source_exchange, 'topic', auto_delete=True) qname, _, _ = self.ch.queue_declare() - self.ch.exchange_bind(destination=dest_exchange, - source=source_exchange, - routing_key=test_routing_key) + self.ch.exchange_bind(destination = dest_exchange, + source = source_exchange, + routing_key = test_routing_key) self.ch.queue_bind(qname, dest_exchange, routing_key=test_routing_key) - msg = Message( - 'funtest message', - content_type='text/plain', - application_headers={'foo': 7, 'bar': 'baz'}, - ) + msg = Message('unittest message', + content_type='text/plain', + application_headers={'foo': 7, 'bar': 'baz'}) + self.ch.basic_publish(msg, source_exchange, - routing_key=test_routing_key) + routing_key = test_routing_key) msg2 = self.ch.basic_get(qname, no_ack=True) self.assertEqual(msg, msg2) def test_exchange_unbind(self): - dest_exchange = 'funtest.topic_dest_unbind' - source_exchange = 'funtest.topic_source_unbind' - test_routing_key = 'fun_test__key' + dest_exchange = 'unittest.topic_dest_unbind' + source_exchange = 'unittest.topic_source_unbind' + test_routing_key = 'unit_test__key' self.ch.exchange_declare(dest_exchange, 'topic', auto_delete=True) @@ -314,7 +311,6 @@ class TestChannel(unittest.TestCase): source=source_exchange, routing_key=test_routing_key) - def main(): suite = unittest.TestLoader().loadTestsFromTestCase(TestChannel) unittest.TextTestRunner(**settings.test_args).run(suite) |