diff options
Diffstat (limited to 'amqp/transport.py')
-rw-r--r-- | amqp/transport.py | 100 |
1 files changed, 40 insertions, 60 deletions
diff --git a/amqp/transport.py b/amqp/transport.py index 3123072..b567d79 100644 --- a/amqp/transport.py +++ b/amqp/transport.py @@ -1,9 +1,3 @@ -""" -Read/Write AMQP frames over network transports. - -2009-01-14 Barry Pederson <bp@barryp.org> - -""" # Copyright (C) 2009 Barry Pederson <bp@barryp.org> # # This library is free software; you can redistribute it and/or @@ -25,6 +19,7 @@ import errno import re import select import socket +import ssl # Jython does not have this attribute try: @@ -32,29 +27,12 @@ try: except ImportError: # pragma: no cover from socket import IPPROTO_TCP as SOL_TCP # noqa -# -# See if Python 2.6+ SSL support is available -# -try: - import ssl - HAVE_PY26_SSL = True -except: - HAVE_PY26_SSL = False - -try: - bytes -except: - # Python 2.5 and lower - bytes = str - -UNAVAIL = frozenset([errno.EAGAIN, errno.EINTR]) - from struct import pack, unpack from .exceptions import UnexpectedFrame from .utils import get_errno, set_cloexec -_UNAVAIL = errno.EAGAIN, errno.EINTR +_UNAVAIL = frozenset([errno.EAGAIN, errno.EINTR, errno.ENOENT]) AMQP_PORT = 5672 @@ -206,22 +184,17 @@ class SSLTransport(_AbstractTransport): super(SSLTransport, self).__init__(host, connect_timeout) def _setup_transport(self): - """Wrap the socket in an SSL object, either the - new Python 2.6 version, or the older Python 2.5 and - lower version.""" - if HAVE_PY26_SSL: - if hasattr(self, 'sslopts'): - self.sock = ssl.wrap_socket(self.sock, **self.sslopts) - else: - self.sock = ssl.wrap_socket(self.sock) - self.sock.do_handshake() + """Wrap the socket in an SSL object.""" + if hasattr(self, 'sslopts'): + self.sock = ssl.wrap_socket(self.sock, **self.sslopts) else: - self.sock = socket.ssl(self.sock) + self.sock = ssl.wrap_socket(self.sock) + self.sock.do_handshake() self._quick_recv = self.sock.read def _shutdown_transport(self): """Unwrap a Python 2.6 SSL socket, so we can call shutdown()""" - if HAVE_PY26_SSL and self.sock is not None: + if self.sock is not None: try: unwrap = self.sock.unwrap except AttributeError: @@ -235,19 +208,22 @@ class SSLTransport(_AbstractTransport): # to get the exact number of bytes wanted. recv = self._quick_recv rbuf = self._read_buffer - while len(rbuf) < n: - try: - s = recv(131072) # see note above - except socket.error as exc: - # ssl.sock.read may cause ENOENT if the - # operation couldn't be performed (Issue celery#1414). - if not initial and exc.errno in _errnos: - continue - raise exc - if not s: - raise IOError('Socket closed') - rbuf += s - + try: + while len(rbuf) < n: + try: + s = recv(131072) # see note above + except socket.error as exc: + # ssl.sock.read may cause ENOENT if the + # operation couldn't be performed (Issue celery#1414). + if not initial and exc.errno in _errnos: + continue + raise + if not s: + raise IOError('Socket closed') + rbuf += s + except: + self._read_buffer = rbuf + raise result, self._read_buffer = rbuf[:n], rbuf[n:] return result @@ -270,7 +246,7 @@ class TCPTransport(_AbstractTransport): self._read_buffer = EMPTY_BUFFER self._quick_recv = self.sock.recv - def _write(self, s, select=select.select, unavail=UNAVAIL): + def _write(self, s, select=select.select, unavail=_UNAVAIL): write = self.sock.send while s: r, w, e = select([self.sock], [self.sock], [self.sock], 1.0) @@ -291,20 +267,24 @@ class TCPTransport(_AbstractTransport): raise IOError('Socket closed') s = s[n:] - def _read(self, n, initial=False, _errnos=UNAVAIL): + def _read(self, n, initial=False, _errnos=_UNAVAIL): """Read exactly n bytes from the socket""" recv = self._quick_recv rbuf = self._read_buffer - while len(rbuf) < n: - try: - s = recv(131072) - except socket.error as exc: - if not initial and exc.errno in _errnos: - continue - raise - if not s: - raise IOError('Socket closed') - rbuf += s + try: + while len(rbuf) < n: + try: + s = recv(131072) + except socket.error as exc: + if not initial and exc.errno in _errnos: + continue + raise + if not s: + raise IOError('Socket closed') + rbuf += s + except: + self._read_buffer = rbuf + raise result, self._read_buffer = rbuf[:n], rbuf[n:] return result |