summaryrefslogtreecommitdiff
path: root/amqp/transport.py
diff options
context:
space:
mode:
Diffstat (limited to 'amqp/transport.py')
-rw-r--r--amqp/transport.py100
1 files changed, 40 insertions, 60 deletions
diff --git a/amqp/transport.py b/amqp/transport.py
index 3123072..b567d79 100644
--- a/amqp/transport.py
+++ b/amqp/transport.py
@@ -1,9 +1,3 @@
-"""
-Read/Write AMQP frames over network transports.
-
-2009-01-14 Barry Pederson <bp@barryp.org>
-
-"""
# Copyright (C) 2009 Barry Pederson <bp@barryp.org>
#
# This library is free software; you can redistribute it and/or
@@ -25,6 +19,7 @@ import errno
import re
import select
import socket
+import ssl
# Jython does not have this attribute
try:
@@ -32,29 +27,12 @@ try:
except ImportError: # pragma: no cover
from socket import IPPROTO_TCP as SOL_TCP # noqa
-#
-# See if Python 2.6+ SSL support is available
-#
-try:
- import ssl
- HAVE_PY26_SSL = True
-except:
- HAVE_PY26_SSL = False
-
-try:
- bytes
-except:
- # Python 2.5 and lower
- bytes = str
-
-UNAVAIL = frozenset([errno.EAGAIN, errno.EINTR])
-
from struct import pack, unpack
from .exceptions import UnexpectedFrame
from .utils import get_errno, set_cloexec
-_UNAVAIL = errno.EAGAIN, errno.EINTR
+_UNAVAIL = frozenset([errno.EAGAIN, errno.EINTR, errno.ENOENT])
AMQP_PORT = 5672
@@ -206,22 +184,17 @@ class SSLTransport(_AbstractTransport):
super(SSLTransport, self).__init__(host, connect_timeout)
def _setup_transport(self):
- """Wrap the socket in an SSL object, either the
- new Python 2.6 version, or the older Python 2.5 and
- lower version."""
- if HAVE_PY26_SSL:
- if hasattr(self, 'sslopts'):
- self.sock = ssl.wrap_socket(self.sock, **self.sslopts)
- else:
- self.sock = ssl.wrap_socket(self.sock)
- self.sock.do_handshake()
+ """Wrap the socket in an SSL object."""
+ if hasattr(self, 'sslopts'):
+ self.sock = ssl.wrap_socket(self.sock, **self.sslopts)
else:
- self.sock = socket.ssl(self.sock)
+ self.sock = ssl.wrap_socket(self.sock)
+ self.sock.do_handshake()
self._quick_recv = self.sock.read
def _shutdown_transport(self):
"""Unwrap a Python 2.6 SSL socket, so we can call shutdown()"""
- if HAVE_PY26_SSL and self.sock is not None:
+ if self.sock is not None:
try:
unwrap = self.sock.unwrap
except AttributeError:
@@ -235,19 +208,22 @@ class SSLTransport(_AbstractTransport):
# to get the exact number of bytes wanted.
recv = self._quick_recv
rbuf = self._read_buffer
- while len(rbuf) < n:
- try:
- s = recv(131072) # see note above
- except socket.error as exc:
- # ssl.sock.read may cause ENOENT if the
- # operation couldn't be performed (Issue celery#1414).
- if not initial and exc.errno in _errnos:
- continue
- raise exc
- if not s:
- raise IOError('Socket closed')
- rbuf += s
-
+ try:
+ while len(rbuf) < n:
+ try:
+ s = recv(131072) # see note above
+ except socket.error as exc:
+ # ssl.sock.read may cause ENOENT if the
+ # operation couldn't be performed (Issue celery#1414).
+ if not initial and exc.errno in _errnos:
+ continue
+ raise
+ if not s:
+ raise IOError('Socket closed')
+ rbuf += s
+ except:
+ self._read_buffer = rbuf
+ raise
result, self._read_buffer = rbuf[:n], rbuf[n:]
return result
@@ -270,7 +246,7 @@ class TCPTransport(_AbstractTransport):
self._read_buffer = EMPTY_BUFFER
self._quick_recv = self.sock.recv
- def _write(self, s, select=select.select, unavail=UNAVAIL):
+ def _write(self, s, select=select.select, unavail=_UNAVAIL):
write = self.sock.send
while s:
r, w, e = select([self.sock], [self.sock], [self.sock], 1.0)
@@ -291,20 +267,24 @@ class TCPTransport(_AbstractTransport):
raise IOError('Socket closed')
s = s[n:]
- def _read(self, n, initial=False, _errnos=UNAVAIL):
+ def _read(self, n, initial=False, _errnos=_UNAVAIL):
"""Read exactly n bytes from the socket"""
recv = self._quick_recv
rbuf = self._read_buffer
- while len(rbuf) < n:
- try:
- s = recv(131072)
- except socket.error as exc:
- if not initial and exc.errno in _errnos:
- continue
- raise
- if not s:
- raise IOError('Socket closed')
- rbuf += s
+ try:
+ while len(rbuf) < n:
+ try:
+ s = recv(131072)
+ except socket.error as exc:
+ if not initial and exc.errno in _errnos:
+ continue
+ raise
+ if not s:
+ raise IOError('Socket closed')
+ rbuf += s
+ except:
+ self._read_buffer = rbuf
+ raise
result, self._read_buffer = rbuf[:n], rbuf[n:]
return result