From 9cc13cebe127c04fb165be17c4fbe650e4609685 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Thu, 15 Nov 2012 15:59:33 +0000 Subject: Removes pika transport, will wait until pika comes out of experimental its state --- README.rst | 3 - docs/reference/index.rst | 4 +- docs/reference/kombu.transport.pika.rst | 46 ------ docs/reference/kombu.transport.pika2.rst | 34 ---- docs/userguide/connections.rst | 7 +- funtests/setup.py | 1 - funtests/tests/test_pika.py | 38 ----- kombu/transport/__init__.py | 2 - kombu/transport/pika.py | 262 ------------------------------- kombu/transport/pika2.py | 235 --------------------------- requirements/funtest.txt | 3 - setup.cfg | 1 - tox.ini | 2 - 13 files changed, 2 insertions(+), 636 deletions(-) delete mode 100644 docs/reference/kombu.transport.pika.rst delete mode 100644 docs/reference/kombu.transport.pika2.rst delete mode 100644 funtests/tests/test_pika.py delete mode 100644 kombu/transport/pika.py delete mode 100644 kombu/transport/pika2.py diff --git a/README.rst b/README.rst index b40e5cb5..b55cc8a4 100644 --- a/README.rst +++ b/README.rst @@ -68,7 +68,6 @@ and the `Wikipedia article about AMQP`_. .. _`Beanstalk`: http://kr.github.com/beanstalkd/ .. _`Rabbits and warrens`: http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/ .. _`amqplib`: http://barryp.org/software/py-amqplib/ -.. _`pika`: http://github.com/pika/pika .. _`Wikipedia article about AMQP`: http://en.wikipedia.org/wiki/AMQP .. _`carrot`: http://pypi.python.org/pypi/carrot/ .. _`librabbitmq`: http://pypi.python.org/pypi/librabbitmq @@ -90,8 +89,6 @@ Transport Comparison +---------------+----------+------------+------------+---------------+ | *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ | +---------------+----------+------------+------------+---------------+ -| *pika* | Native | Yes | Yes | Yes | -+---------------+----------+------------+------------+---------------+ | *couchdb* | Virtual | Yes | Yes [#f1]_ | No | +---------------+----------+------------+------------+---------------+ | *zookeeper* | Virtual | Yes | Yes [#f1]_ | No | diff --git a/docs/reference/index.rst b/docs/reference/index.rst index f55ca799..44e18f53 100644 --- a/docs/reference/index.rst +++ b/docs/reference/index.rst @@ -25,9 +25,6 @@ kombu.transport kombu.transport.pyamqp kombu.transport.librabbitmq - kombu.transport.pika - kombu.transport.pika2 - kombu.transport.amqplib kombu.transport.memory kombu.transport.redis kombu.transport.zmq @@ -43,6 +40,7 @@ kombu.transport.sqlalchemy kombu.transport.sqlalchemy.models kombu.transport.SQS + kombu.transport.amqplib kombu.transport.base kombu.transport.virtual kombu.transport.virtual.exchange diff --git a/docs/reference/kombu.transport.pika.rst b/docs/reference/kombu.transport.pika.rst deleted file mode 100644 index 783af0d6..00000000 --- a/docs/reference/kombu.transport.pika.rst +++ /dev/null @@ -1,46 +0,0 @@ -======================================= - kombu.transport.pika -======================================= - -.. currentmodule:: kombu.transport.pika - -.. automodule:: kombu.transport.pika - - .. contents:: - :local: - - Transports - ---------- - - .. autoclass:: AsyncoreTransport - :members: - :undoc-members: - - .. autoclass:: SyncTransport - :members: - :undoc-members: - - Connections - ----------- - - .. autoclass:: AsyncoreConnection - :members: - :undoc-members: - - .. autoclass:: BlockingConnection - :members: - :undoc-members: - - Channel - ------- - - .. autoclass:: Channel - :members: - :undoc-members: - - Message - ------- - - .. autoclass:: Message - :members: - :undoc-members: diff --git a/docs/reference/kombu.transport.pika2.rst b/docs/reference/kombu.transport.pika2.rst deleted file mode 100644 index b06dd0a7..00000000 --- a/docs/reference/kombu.transport.pika2.rst +++ /dev/null @@ -1,34 +0,0 @@ -.. currentmodule:: kombu.transport.pika2 - -.. automodule:: kombu.transport.pika2 - - .. contents:: - :local: - - Transport - --------- - - .. autoclass:: Transport - :members: - :undoc-members: - - Connection - ---------- - - .. autoclass:: Connection - :members: - :undoc-members: - - Channel - ------- - - .. autoclass:: Channel - :members: - :undoc-members: - - Message - ------- - - .. autoclass:: Message - :members: - :undoc-members: diff --git a/docs/userguide/connections.rst b/docs/userguide/connections.rst index d7275ea4..aa88add8 100644 --- a/docs/userguide/connections.rst +++ b/docs/userguide/connections.rst @@ -116,10 +116,7 @@ keyword arguments, these are: :ssl: Use SSL to connect to the server. Default is ``False``. Only supported by the amqp transport. :insist: Insist on connecting to a server. - In a configuration with multiple load-sharing servers, the insist - option tells the server that the client is insisting on a connection - to the specified server. Default is ``False``. - Only supported by the amqp and pika transports, and not by AMQP 0-9-1. + *No longer supported, relic from AMQP 0.8* :connect_timeout: Timeout in seconds for connecting to the server. May not be supported by the specified transport. :transport_options: A dict of additional connection arguments to @@ -142,8 +139,6 @@ Transport Comparison +---------------+----------+------------+------------+---------------+ | *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ | +---------------+----------+------------+------------+---------------+ -| *pika* | Native | Yes | Yes | Yes | -+---------------+----------+------------+------------+---------------+ | *couchdb* | Virtual | Yes | Yes [#f1]_ | No | +---------------+----------+------------+------------+---------------+ | *zookeeper* | Virtual | Yes | Yes [#f1]_ | No | diff --git a/funtests/setup.py b/funtests/setup.py index b024edff..fdb7a98c 100644 --- a/funtests/setup.py +++ b/funtests/setup.py @@ -53,7 +53,6 @@ setup( "pymongo", "couchdb", "kazoo", - "pika", "beanstalkc", "kombu-sqlalchemy", "django", diff --git a/funtests/tests/test_pika.py b/funtests/tests/test_pika.py deleted file mode 100644 index 4dd357b6..00000000 --- a/funtests/tests/test_pika.py +++ /dev/null @@ -1,38 +0,0 @@ -from funtests import transport -from nose import SkipTest - -from kombu.exceptions import VersionMismatch - - -class test_pika_blocking(transport.TransportCase): - transport = "syncpika" - prefix = "syncpika" - - def before_connect(self): - try: - from kombu.transport import pika - except VersionMismatch: - raise SkipTest("Pika version mismatch") - - def test_produce__consume_large_messages(self, *args, **kwargs): - raise SkipTest("test currently fails for sync pika") - - def test_cyclic_reference_channel(self, *args, **kwargs): - raise SkipTest("known memory leak") - - -class test_pika_async(transport.TransportCase): - transport = "pika" - prefix = "pika" - - def before_connect(self): - try: - from kombu.transport import pika - except VersionMismatch: - raise SkipTest("Pika version mismatch") - - def test_produce__consume_large_messages(self, *args, **kwargs): - raise SkipTest("test currently fails for async pika") - - def test_cyclic_reference_channel(self, *args, **kwargs): - raise SkipTest("known memory leak") diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py index 4c3efcfa..8c3b3506 100644 --- a/kombu/transport/__init__.py +++ b/kombu/transport/__init__.py @@ -48,8 +48,6 @@ TRANSPORT_ALIASES = { 'amqp': 'kombu.transport.pyamqp:Transport', 'pyamqp': 'kombu.transport.pyamqp:Transport', 'librabbitmq': 'kombu.transport.librabbitmq:Transport', - 'pika': 'kombu.transport.pika2:Transport', - 'oldpika': 'kombu.transport.pika:SyncTransport', 'memory': 'kombu.transport.memory:Transport', 'redis': 'kombu.transport.redis:Transport', 'SQS': 'kombu.transport.SQS:Transport', diff --git a/kombu/transport/pika.py b/kombu/transport/pika.py deleted file mode 100644 index ce60dd6c..00000000 --- a/kombu/transport/pika.py +++ /dev/null @@ -1,262 +0,0 @@ -""" -kombu.transport.pika -==================== - -Pika transport. - -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - -""" -from __future__ import absolute_import - -import socket - -from operator import attrgetter - -from kombu.exceptions import ( - StdConnectionError, - StdChannelError, - VersionMismatch, -) - -from . import base - -from pika import channel # must be here to raise import error -try: - from pika import asyncore_adapter -except ImportError: - raise VersionMismatch('Kombu only works with pika version 0.5.2') -from pika import blocking_adapter -from pika import connection -from pika import exceptions -from pika.spec import Basic, BasicProperties - - -DEFAULT_PORT = 5672 - - -BASIC_PROPERTIES = ('content_type', 'content_encoding', - 'headers', 'delivery_mode', 'priority', - 'correlation_id', 'reply_to', 'expiration', - 'message_id', 'timestamp', 'type', 'user_id', - 'app_id', 'cluster_id') - - -class Message(base.Message): - - def __init__(self, channel, amqp_message, **kwargs): - channel_id, method, props, body = amqp_message - propdict = dict(zip(BASIC_PROPERTIES, - attrgetter(*BASIC_PROPERTIES)(props))) - - kwargs.update({'body': body, - 'delivery_tag': method.delivery_tag, - 'content_type': props.content_type, - 'content_encoding': props.content_encoding, - 'headers': props.headers, - 'properties': propdict, - 'delivery_info': dict( - consumer_tag=getattr(method, 'consumer_tag', None), - routing_key=method.routing_key, - delivery_tag=method.delivery_tag, - redelivered=method.redelivered, - exchange=method.exchange)}) - - super(Message, self).__init__(channel, **kwargs) - - -class Channel(channel.Channel, base.StdChannel): - Message = Message - - def basic_get(self, queue, no_ack): - method = channel.Channel.basic_get(self, queue=queue, no_ack=no_ack) - # pika returns semi-predicates (GetEmpty/GetOk). - if isinstance(method, Basic.GetEmpty): - return - return None, method, method._properties, method._body - - def queue_purge(self, queue=None, nowait=False): - return channel.Channel.queue_purge(self, queue=queue, nowait=nowait) \ - .message_count - - def basic_publish(self, message, exchange, routing_key, mandatory=False, - immediate=False): - message_data, properties = message - try: - return channel.Channel.basic_publish(self, - exchange, - routing_key, - message_data, - properties, - mandatory, - immediate) - finally: - # Pika does not automatically flush the outbound buffer - # TODO async: Needs to support `nowait`. - self.handler.connection.flush_outbound() - - def basic_consume(self, queue, no_ack=False, consumer_tag=None, - callback=None, nowait=False): - - # Kombu callbacks only take a single `message` argument, - # but pika applies with 4 arguments, so need to wrap - # these into a single tuple. - def _callback_decode(channel, method, header, body): - return callback((channel, method, header, body)) - - return channel.Channel.basic_consume(self, _callback_decode, - queue, no_ack, - False, consumer_tag) - - def prepare_message(self, body, priority=None, - content_type=None, content_encoding=None, headers=None, - properties=None): - properties = BasicProperties(priority=priority, - content_type=content_type, - content_encoding=content_encoding, - headers=headers, - **properties) - return body, properties - - def message_to_python(self, raw_message): - return self.Message(channel=self, amqp_message=raw_message) - - def basic_ack(self, delivery_tag): - return channel.Channel.basic_ack(self, delivery_tag) - - def __enter__(self): - return self - - def __exit__(self, *exc_info): - self.close() - - def close(self): - super(Channel, self).close() - if getattr(self, 'handler', None): - if getattr(self.handler, 'connection', None): - self.handler.connection.channels.pop( - self.handler.channel_number, None) - self.handler.connection = None - self.handler = None - - @property - def channel_id(self): - return self.channel_number - - -class BlockingConnection(blocking_adapter.BlockingConnection): - Super = blocking_adapter.BlockingConnection - - def __init__(self, client, *args, **kwargs): - self.client = client - self.Super.__init__(self, *args, **kwargs) - - def channel(self): - c = Channel(channel.ChannelHandler(self)) - c.connection = self - return c - - def close(self): - self.client = None - self.Super.close(self) - - def ensure_drain_events(self, timeout=None): - return self.drain_events(timeout=timeout) - - -class AsyncoreConnection(asyncore_adapter.AsyncoreConnection): - _event_counter = 0 - Super = asyncore_adapter.AsyncoreConnection - - def __init__(self, client, *args, **kwargs): - self.client = client - self.Super.__init__(self, *args, **kwargs) - - def channel(self): - c = Channel(channel.ChannelHandler(self)) - c.connection = self - return c - - def ensure_drain_events(self, timeout=None): - # asyncore connection does not raise socket.timeout when timing out - # so need to do a little trick here to mimic the behavior - # of sync connection. - current_events = self._event_counter - self.drain_events(timeout=timeout) - if timeout and self._event_counter <= current_events: - raise socket.timeout('timed out') - - def on_data_available(self, buf): - self._event_counter += 1 - self.Super.on_data_available(self, buf) - - def close(self): - self.client = None - self.Super.close(self) - - -class SyncTransport(base.Transport): - Message = Message - Connection = BlockingConnection - - default_port = DEFAULT_PORT - connection_errors = (StdConnectionError, - socket.error, - exceptions.ConnectionClosed, - exceptions.ChannelClosed, - exceptions.LoginError, - exceptions.NoFreeChannels, - exceptions.DuplicateConsumerTag, - exceptions.UnknownConsumerTag, - exceptions.RecursiveOperationDetected, - exceptions.ContentTransmissionForbidden, - exceptions.ProtocolSyntaxError) - channel_errors = (StdChannelError, - exceptions.ChannelClosed, - exceptions.DuplicateConsumerTag, - exceptions.UnknownConsumerTag, - exceptions.ProtocolSyntaxError) - driver_type = 'amqp' - driver_name = 'pika' - - def __init__(self, client, **kwargs): - self.client = client - self.default_port = kwargs.get('default_port', self.default_port) - - def driver_version(self): - import pika - return pika.__version__ - - def create_channel(self, connection): - return connection.channel() - - def drain_events(self, connection, **kwargs): - return connection.ensure_drain_events(**kwargs) - - def establish_connection(self): - """Establish connection to the AMQP broker.""" - conninfo = self.client - for name, default_value in self.default_connection_params.items(): - if not getattr(conninfo, name, None): - setattr(conninfo, name, default_value) - credentials = connection.PlainCredentials(conninfo.userid, - conninfo.password) - return self.Connection(self.client, - connection.ConnectionParameters( - conninfo.hostname, port=conninfo.port, - virtual_host=conninfo.virtual_host, - credentials=credentials)) - - def close_connection(self, connection): - """Close the AMQP broker connection.""" - connection.close() - - @property - def default_connection_params(self): - return {'hostname': 'localhost', 'port': self.default_port, - 'userid': 'guest', 'password': 'guest'} - - -class AsyncoreTransport(SyncTransport): - Connection = AsyncoreConnection diff --git a/kombu/transport/pika2.py b/kombu/transport/pika2.py deleted file mode 100644 index 4015ecf5..00000000 --- a/kombu/transport/pika2.py +++ /dev/null @@ -1,235 +0,0 @@ -""" -kombu.transport.pika -==================== - -Pika transport. - -:copyright: (c) 2009 - 2012 by Ask Solem. -:license: BSD, see LICENSE for more details. - -""" -from __future__ import absolute_import - -import socket - -from operator import attrgetter - -from kombu.exceptions import StdConnectionError, StdChannelError -from kombu.utils.amq_manager import get_manager - -from . import base - -import pika -from pika import spec -from pika.adapters import blocking_connection as blocking -from pika import exceptions - -DEFAULT_PORT = 5672 -BASIC_PROPERTIES = ('content_type', 'content_encoding', - 'headers', 'delivery_mode', 'priority', - 'correlation_id', 'reply_to', 'expiration', - 'message_id', 'timestamp', 'type', 'user_id', - 'app_id', 'cluster_id') - - -class Message(base.Message): - - def __init__(self, channel, amqp_message, **kwargs): - channel_id, method, props, body = amqp_message - propdict = dict(zip(BASIC_PROPERTIES, - attrgetter(*BASIC_PROPERTIES)(props))) - - kwargs.update({'body': body, - 'delivery_tag': method.delivery_tag, - 'content_type': props.content_type, - 'content_encoding': props.content_encoding, - 'headers': props.headers, - 'properties': propdict, - 'delivery_info': dict( - consumer_tag=getattr(method, 'consumer_tag', None), - routing_key=method.routing_key, - delivery_tag=method.delivery_tag, - redelivered=method.redelivered, - exchange=method.exchange)}) - - super(Message, self).__init__(channel, **kwargs) - - -class Channel(blocking.BlockingChannel, base.StdChannel): - Message = Message - - def basic_get(self, queue, no_ack): - method = super(Channel, self).basic_get(self, queue=queue, - no_ack=no_ack) - # pika returns semi-predicates (GetEmpty/GetOk). - if isinstance(method, spec.Basic.GetEmpty): - return - return None, method, method._properties, method._body - - def queue_purge(self, queue=None, nowait=False): - return super(Channel, self).\ - queue_purge(queue=queue, nowait=nowait).method.message_count - - def basic_publish(self, message, exchange, routing_key, mandatory=False, - immediate=False): - body, properties = message - try: - return super(Channel, self).basic_publish(exchange, - routing_key, - body, - properties, - mandatory, - immediate) - finally: - # Pika does not automatically flush the outbound buffer - # TODO async: Needs to support `nowait`. - self.connection._flush_outbound() - - def basic_consume(self, queue, no_ack=False, consumer_tag=None, - callback=None, nowait=False): - - # Kombu callbacks only take a single `message` argument, - # but pika applies with 4 arguments, so need to wrap - # these into a single tuple. - def _callback_decode(channel, method, header, body): - return callback((channel, method, header, body)) - - return super(Channel, self).basic_consume( - _callback_decode, queue, no_ack, False, consumer_tag) - - def prepare_message(self, body, priority=None, - content_type=None, content_encoding=None, headers=None, - properties=None): - properties = spec.BasicProperties(priority=priority, - content_type=content_type, - content_encoding=content_encoding, - headers=headers) - return body, properties - - def message_to_python(self, raw_message): - return self.Message(channel=self, amqp_message=raw_message) - - def basic_qos(self, prefetch_size, prefetch_count, a_global=False): - return super(Channel, self).basic_qos(prefetch_size=prefetch_size, - prefetch_count=prefetch_count, - global_=a_global) - - def __enter__(self): - return self - - def __exit__(self, *exc_info): - self.close() - - def close(self, *args): - super(Channel, self).close(*args) - self.connection = None - if getattr(self, 'handler', None): - if getattr(self.handler, 'connection', None): - self.handler.connection.channels.pop( - self.handler.channel_number, None) - self.handler.connection = None - self.handler = None - - @property - def channel_id(self): - return self.channel_number - - -class Connection(blocking.BlockingConnection): - Channel = Channel - - def __init__(self, client, *args, **kwargs): - self.client = client - super(Connection, self).__init__(*args, **kwargs) - - def channel(self): - self._channel_open = False - cid = self._next_channel_number() - - self.callbacks.add(cid, spec.Channel.CloseOk, self._on_channel_close) - transport = blocking.BlockingChannelTransport(self, cid) - channel = self._channels[cid] = self.Channel(self, cid, transport) - channel.connection = self - return channel - - def drain_events(self, timeout=None): - if timeout: - prev = self.socket.gettimeout() - self.socket.settimeout(timeout) - try: - self._handle_read() - finally: - if timeout: - self.socket.settimeout(prev) - self._flush_outbound() - - def close(self, *args): - self.client = None - super(Connection, self).close(*args) - - -AuthenticationError = getattr(exceptions, 'AuthenticationError', - getattr(exceptions, 'LoginError')) - - -class Transport(base.Transport): - Message = Message - Connection = Connection - - default_port = DEFAULT_PORT - connection_errors = (StdConnectionError, - socket.error, - exceptions.ConnectionClosed, - exceptions.ChannelClosed, - AuthenticationError, - exceptions.NoFreeChannels, - exceptions.DuplicateConsumerTag, - exceptions.UnknownConsumerTag, - exceptions.RecursiveOperationDetected, - exceptions.ProtocolSyntaxError) - channel_errors = (StdChannelError, - exceptions.ChannelClosed, - exceptions.DuplicateConsumerTag, - exceptions.UnknownConsumerTag, - exceptions.ProtocolSyntaxError) - driver_type = 'amqp' - driver_name = 'pika' - - def __init__(self, client, **kwargs): - self.client = client - self.default_port = kwargs.get('default_port', self.default_port) - - def driver_version(self): - return pika.__version__ - - def create_channel(self, connection): - return connection.channel() - - def drain_events(self, connection, **kwargs): - return connection.drain_events(**kwargs) - - def establish_connection(self): - """Establish connection to the AMQP broker.""" - conninfo = self.client - for name, default_value in self.default_connection_params.items(): - if not getattr(conninfo, name, None): - setattr(conninfo, name, default_value) - credentials = pika.PlainCredentials(conninfo.userid, - conninfo.password) - return self.Connection(self.client, - pika.ConnectionParameters( - conninfo.hostname, port=conninfo.port, - virtual_host=conninfo.virtual_host, - credentials=credentials)) - - def close_connection(self, connection): - """Close the AMQP broker connection.""" - connection.close() - - def get_manager(self, *args, **kwargs): - return get_manager(self.client, *args, **kwargs) - - @property - def default_connection_params(self): - return {'hostname': 'localhost', 'port': self.default_port, - 'userid': 'guest', 'password': 'guest'} diff --git a/requirements/funtest.txt b/requirements/funtest.txt index 8fdd9bef..6ac859b7 100644 --- a/requirements/funtest.txt +++ b/requirements/funtest.txt @@ -7,9 +7,6 @@ pymongo # CouchDB transport couchdb -# Pika transport -pika==0.5.2 - # Beanstalk transport beanstalkc diff --git a/setup.cfg b/setup.cfg index 89bd1cd4..88ad9aa1 100644 --- a/setup.cfg +++ b/setup.cfg @@ -9,7 +9,6 @@ cover3-exclude = kombu kombu.utils.compat kombu.utils.eventio kombu.utils.finalize - kombu.transport.pika kombu.transport.couchdb kombu.transport.mongodb kombu.transport.beanstalk diff --git a/tox.ini b/tox.ini index b595f16a..162e9c79 100644 --- a/tox.ini +++ b/tox.ini @@ -28,8 +28,6 @@ commands = {toxinidir}/extra/release/removepyc.sh {toxinidir} #--cover3-html-dir={toxinidir}/cover \ --cover3-package=kombu \ --cover3-exclude="kombu kombu.utils.* \ - kombu.transport.pypika \ - kombu.transport.pycouchdb \ kombu.transport.mongodb \ kombu.transport.beanstalk \ kombu.transport.zookeeper" \ -- cgit v1.2.1