summaryrefslogtreecommitdiff
path: root/kombu/transport/pyamqplib.py
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2011-09-07 15:21:47 +0100
committerAsk Solem <ask@celeryproject.org>2011-09-07 15:21:47 +0100
commit66ef95038ffb3351592623309119946418c639bf (patch)
treec8189a6f578d6ee9196e1968f6c15be460fbc0bd /kombu/transport/pyamqplib.py
parent0b29991566b3d510d39ac61861576ec3e2ef1ae7 (diff)
downloadkombu-66ef95038ffb3351592623309119946418c639bf.tar.gz
2.0-devel: No longer supports Python 2.4
Diffstat (limited to 'kombu/transport/pyamqplib.py')
-rw-r--r--kombu/transport/pyamqplib.py270
1 files changed, 0 insertions, 270 deletions
diff --git a/kombu/transport/pyamqplib.py b/kombu/transport/pyamqplib.py
deleted file mode 100644
index 12bcd672..00000000
--- a/kombu/transport/pyamqplib.py
+++ /dev/null
@@ -1,270 +0,0 @@
-"""
-kombu.transport.pyamqplib
-=========================
-
-amqplib transport.
-
-:copyright: (c) 2009 - 2011 by Ask Solem.
-:license: BSD, see LICENSE for more details.
-
-"""
-import socket
-
-try:
- from ssl import SSLError
-except ImportError:
- class SSLError(Exception): # noqa
- pass
-
-from amqplib import client_0_8 as amqp
-from amqplib.client_0_8 import transport
-from amqplib.client_0_8.channel import Channel as _Channel
-from amqplib.client_0_8.exceptions import AMQPConnectionException
-from amqplib.client_0_8.exceptions import AMQPChannelException
-
-from kombu.transport import base
-
-DEFAULT_PORT = 5672
-
-# amqplib's handshake mistakenly identifies as protocol version 1191,
-# this breaks in RabbitMQ tip, which no longer falls back to
-# 0-8 for unknown ids.
-transport.AMQP_PROTOCOL_HEADER = "AMQP\x01\x01\x08\x00"
-
-
-class Connection(amqp.Connection): # pragma: no cover
-
- def _dispatch_basic_return(self, channel, args, msg):
- reply_code = args.read_short()
- reply_text = args.read_shortstr()
- exchange = args.read_shortstr()
- routing_key = args.read_shortstr()
-
- exc = AMQPChannelException(reply_code, reply_text, (50, 60))
- if channel.events["basic_return"]:
- for callback in channel.events["basic_return"]:
- callback(exc, exchange, routing_key, msg)
- else:
- raise exc
-
- def __init__(self, *args, **kwargs):
- super(Connection, self).__init__(*args, **kwargs)
- self._method_override = {(60, 50): self._dispatch_basic_return}
-
- def drain_events(self, allowed_methods=None, timeout=None):
- """Wait for an event on any channel."""
- return self.wait_multi(self.channels.values(), timeout=timeout)
-
- def wait_multi(self, channels, allowed_methods=None, timeout=None):
- """Wait for an event on a channel."""
- chanmap = dict((chan.channel_id, chan) for chan in channels)
- chanid, method_sig, args, content = self._wait_multiple(
- chanmap.keys(), allowed_methods, timeout=timeout)
-
- channel = chanmap[chanid]
-
- if content \
- and channel.auto_decode \
- and hasattr(content, 'content_encoding'):
- try:
- content.body = content.body.decode(content.content_encoding)
- except Exception:
- pass
-
- amqp_method = self._method_override.get(method_sig) or \
- channel._METHOD_MAP.get(method_sig, None)
-
- if amqp_method is None:
- raise Exception('Unknown AMQP method (%d, %d)' % method_sig)
-
- if content is None:
- return amqp_method(channel, args)
- else:
- return amqp_method(channel, args, content)
-
- def read_timeout(self, timeout=None):
- if timeout is None:
- return self.method_reader.read_method()
- sock = self.transport.sock
- prev = sock.gettimeout()
- sock.settimeout(timeout)
- try:
- try:
- return self.method_reader.read_method()
- except SSLError, exc:
- # http://bugs.python.org/issue10272
- if "timed out" in str(exc):
- raise socket.timeout()
- raise
- finally:
- sock.settimeout(prev)
-
- def _wait_multiple(self, channel_ids, allowed_methods, timeout=None):
- for channel_id in channel_ids:
- method_queue = self.channels[channel_id].method_queue
- for queued_method in method_queue:
- method_sig = queued_method[0]
- if (allowed_methods is None) \
- or (method_sig in allowed_methods) \
- or (method_sig == (20, 40)):
- method_queue.remove(queued_method)
- method_sig, args, content = queued_method
- return channel_id, method_sig, args, content
-
- # Nothing queued, need to wait for a method from the peer
- read_timeout = self.read_timeout
- channels = self.channels
- wait = self.wait
- while 1:
- channel, method_sig, args, content = read_timeout(timeout)
-
- if (channel in channel_ids) \
- and ((allowed_methods is None) \
- or (method_sig in allowed_methods) \
- or (method_sig == (20, 40))):
- return channel, method_sig, args, content
-
- # Not the channel and/or method we were looking for. Queue
- # this method for later
- channels[channel].method_queue.append((method_sig, args, content))
-
- #
- # If we just queued up a method for channel 0 (the Connection
- # itself) it's probably a close method in reaction to some
- # error, so deal with it right away.
- #
- if channel == 0:
- wait()
-
- def channel(self, channel_id=None):
- try:
- return self.channels[channel_id]
- except KeyError:
- return Channel(self, channel_id)
-
-
-class Message(base.Message):
- """A message received by the broker.
-
- .. attribute:: body
-
- The message body.
-
- .. attribute:: delivery_tag
-
- The message delivery tag, uniquely identifying this message.
-
- .. attribute:: channel
-
- The channel instance the message was received on.
-
- """
-
- def __init__(self, channel, msg, **kwargs):
- props = msg.properties
- super(Message, self).__init__(channel,
- body=msg.body,
- delivery_tag=msg.delivery_tag,
- content_type=props.get("content_type"),
- content_encoding=props.get("content_encoding"),
- delivery_info=msg.delivery_info,
- properties=msg.properties,
- headers=props.get("application_headers"),
- **kwargs)
-
-
-class Channel(_Channel, base.StdChannel):
- Message = Message
- events = {"basic_return": []}
-
- def __init__(self, *args, **kwargs):
- self.no_ack_consumers = set()
- super(Channel, self).__init__(*args, **kwargs)
-
- def prepare_message(self, message_data, priority=None,
- content_type=None, content_encoding=None, headers=None,
- properties=None):
- """Encapsulate data into a AMQP message."""
- return amqp.Message(message_data, priority=priority,
- content_type=content_type,
- content_encoding=content_encoding,
- application_headers=headers,
- **properties)
-
- def message_to_python(self, raw_message):
- """Convert encoded message body back to a Python value."""
- return self.Message(self, raw_message)
-
- def close(self):
- try:
- super(Channel, self).close()
- finally:
- self.connection = None
-
- def basic_consume(self, *args, **kwargs):
- consumer_tag = super(Channel, self).basic_consume(*args, **kwargs)
- if kwargs["no_ack"]:
- self.no_ack_consumers.add(consumer_tag)
- return consumer_tag
-
- def basic_cancel(self, consumer_tag, **kwargs):
- self.no_ack_consumers.discard(consumer_tag)
- return super(Channel, self).basic_cancel(consumer_tag, **kwargs)
-
-
-class Transport(base.Transport):
- Connection = Connection
-
- default_port = DEFAULT_PORT
-
- # it's very annoying that amqplib sometimes raises AttributeError
- # if the connection is lost, but nothing we can do about that here.
- connection_errors = (AMQPConnectionException,
- socket.error,
- IOError,
- OSError,
- AttributeError)
- channel_errors = (AMQPChannelException, )
-
- def __init__(self, client, **kwargs):
- self.client = client
- self.default_port = kwargs.get("default_port") or self.default_port
-
- def create_channel(self, connection):
- return connection.channel()
-
- def drain_events(self, connection, **kwargs):
- return connection.drain_events(**kwargs)
-
- def establish_connection(self):
- """Establish connection to the AMQP broker."""
- conninfo = self.client
- for name, default_value in self.default_connection_params.items():
- if not getattr(conninfo, name, None):
- setattr(conninfo, name, default_value)
- if conninfo.hostname == "localhost":
- conninfo.hostname = "127.0.0.1"
- conn = self.Connection(host=conninfo.host,
- userid=conninfo.userid,
- password=conninfo.password,
- login_method=conninfo.login_method,
- virtual_host=conninfo.virtual_host,
- insist=conninfo.insist,
- ssl=conninfo.ssl,
- connect_timeout=conninfo.connect_timeout)
- conn.client = self.client
- return conn
-
- def close_connection(self, connection):
- """Close the AMQP broker connection."""
- connection.client = None
- connection.close()
-
- def verify_connection(self, connection):
- return connection.channels is not None
-
- @property
- def default_connection_params(self):
- return {"userid": "guest", "password": "guest",
- "port": self.default_port,
- "hostname": "localhost", "login_method": "AMQPLAIN"}