summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
committerKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
commitd43d1912b376322e27fdcda551a73f9ff5487972 (patch)
treece493e10baa95f44be8beb5778ce51783463196d /python
parent04877fec0c6346edec67072d7f2d247740cf2af5 (diff)
downloadqpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r--python/qpid/connection.py5
-rw-r--r--python/qpid/delegates.py67
-rw-r--r--python/qpid/framer.py14
-rw-r--r--python/qpid/messaging/util.py9
-rw-r--r--python/qpid/sasl.py3
-rw-r--r--python/qpid/selector.py24
-rw-r--r--python/qpid/tests/messaging/endpoints.py12
-rw-r--r--python/qpid/util.py4
-rwxr-xr-xpython/setup.py2
9 files changed, 78 insertions, 62 deletions
diff --git a/python/qpid/connection.py b/python/qpid/connection.py
index 66e1cb49be..2453f38c34 100644
--- a/python/qpid/connection.py
+++ b/python/qpid/connection.py
@@ -166,8 +166,9 @@ class Connection(Framer):
# If we have a security layer and it sends us no decoded data,
# that's OK as long as its return code is happy.
if self.security_layer_rx:
- status, data = self.security_layer_rx.decode(data)
- if not status:
+ try:
+ data = self.security_layer_rx.decode(data)
+ except:
self.detach_all()
break
# When we do not use SSL transport, we get periodic
diff --git a/python/qpid/delegates.py b/python/qpid/delegates.py
index 685cf49f54..5e44a3a6dc 100644
--- a/python/qpid/delegates.py
+++ b/python/qpid/delegates.py
@@ -24,13 +24,7 @@ from exceptions import VersionError, Closed
from logging import getLogger
from ops import Control
import sys
-
-_have_sasl = None
-try:
- import saslwrapper
- _have_sasl = True
-except:
- pass
+from qpid import sasl
log = getLogger("qpid.io.ctl")
@@ -172,20 +166,19 @@ class Client(Delegate):
self.username = username
self.password = password
- if _have_sasl:
- self.sasl = saslwrapper.Client()
- if username and len(username) > 0:
- self.sasl.setAttr("username", str(username))
- if password and len(password) > 0:
- self.sasl.setAttr("password", str(password))
- self.sasl.setAttr("service", str(kwargs.get("service", "qpidd")))
- if "host" in kwargs:
- self.sasl.setAttr("host", str(kwargs["host"]))
- if "min_ssf" in kwargs:
- self.sasl.setAttr("minssf", kwargs["min_ssf"])
- if "max_ssf" in kwargs:
- self.sasl.setAttr("maxssf", kwargs["max_ssf"])
- self.sasl.init()
+ self.sasl = sasl.Client()
+ if username and len(username) > 0:
+ self.sasl.setAttr("username", str(username))
+ if password and len(password) > 0:
+ self.sasl.setAttr("password", str(password))
+ self.sasl.setAttr("service", str(kwargs.get("service", "qpidd")))
+ if "host" in kwargs:
+ self.sasl.setAttr("host", str(kwargs["host"]))
+ if "min_ssf" in kwargs:
+ self.sasl.setAttr("minssf", kwargs["min_ssf"])
+ if "max_ssf" in kwargs:
+ self.sasl.setAttr("maxssf", kwargs["max_ssf"])
+ self.sasl.init()
def start(self):
# XXX
@@ -204,39 +197,29 @@ class Client(Delegate):
mech_list += str(mech) + " "
mech = None
initial = None
- if _have_sasl:
- status, mech, initial = self.sasl.start(mech_list)
- if status == False:
- raise Closed("SASL error: %s" % self.sasl.getError())
- else:
- if self.username and self.password and ("PLAIN" in mech_list):
- mech = "PLAIN"
- initial = "\0%s\0%s" % (self.username, self.password)
- else:
- mech = "ANONYMOUS"
- if not mech in mech_list:
- raise Closed("No acceptable SASL authentication mechanism available")
+ try:
+ mech, initial = self.sasl.start(mech_list)
+ except Exception, e:
+ raise Closed(str(e))
ch.connection_start_ok(client_properties=self.client_properties,
mechanism=mech, response=initial)
def connection_secure(self, ch, secure):
resp = None
- if _have_sasl:
- status, resp = self.sasl.step(secure.challenge)
- if status == False:
- raise Closed("SASL error: %s" % self.sasl.getError())
+ try:
+ resp = self.sasl.step(secure.challenge)
+ except Exception, e:
+ raise Closed(str(e))
ch.connection_secure_ok(response=resp)
def connection_tune(self, ch, tune):
ch.connection_tune_ok(heartbeat=self.heartbeat)
ch.connection_open()
- if _have_sasl:
- self.connection.user_id = self.sasl.getUserId()
- self.connection.security_layer_tx = self.sasl
+ self.connection.user_id = self.sasl.auth_username()
+ self.connection.security_layer_tx = self.sasl
def connection_open_ok(self, ch, open_ok):
- if _have_sasl:
- self.connection.security_layer_rx = self.sasl
+ self.connection.security_layer_rx = self.sasl
self.connection.opened = True
notify(self.connection.condition)
diff --git a/python/qpid/framer.py b/python/qpid/framer.py
index 47f57cf649..8e4ef014f1 100644
--- a/python/qpid/framer.py
+++ b/python/qpid/framer.py
@@ -51,9 +51,10 @@ class Framer(Packer):
self.sock_lock.acquire()
try:
if self.security_layer_tx:
- status, cipher_buf = self.security_layer_tx.encode(self.tx_buf)
- if status == False:
- raise Closed(self.security_layer_tx.getError())
+ try:
+ cipher_buf = self.security_layer_tx.encode(self.tx_buf)
+ except SASLError, e:
+ raise Closed(str(e))
self._write(cipher_buf)
else:
self._write(self.tx_buf)
@@ -91,9 +92,10 @@ class Framer(Packer):
try:
s = self.sock.recv(n) # NOTE: instead of "n", arg should be "self.maxbufsize"
if self.security_layer_rx:
- status, s = self.security_layer_rx.decode(s)
- if status == False:
- raise Closed(self.security_layer_tx.getError())
+ try:
+ s = self.security_layer_rx.decode(s)
+ except SASLError, e:
+ raise Closed(str(e))
except socket.timeout:
if self.aborted():
raise Closed()
diff --git a/python/qpid/messaging/util.py b/python/qpid/messaging/util.py
index 265cf7d51f..726cfd5172 100644
--- a/python/qpid/messaging/util.py
+++ b/python/qpid/messaging/util.py
@@ -50,10 +50,13 @@ def set_reconnect_urls(conn, msg):
reconnect_urls = []
urls = msg.properties["amq.failover"]
for u in urls:
+ # FIXME aconway 2012-06-12: Nasty hack parsing of the C++ broker's URL format.
if u.startswith("amqp:"):
- for p in u[5:].split(","):
- parts = p.split(":")
- host, port = parts[1:3]
+ for a in u[5:].split(","):
+ parts = a.split(":")
+ # Handle IPv6 addresses which have : in the host part.
+ port = parts[-1] # Last : separated field is port
+ host = ":".join(parts[1:-1]) # First : separated field is protocol, host is the rest.
reconnect_urls.append("%s:%s" % (host, port))
conn.reconnect_urls = reconnect_urls
log.warn("set reconnect_urls for conn %s: %s", conn, reconnect_urls)
diff --git a/python/qpid/sasl.py b/python/qpid/sasl.py
index 677a5e4e22..25de6dec45 100644
--- a/python/qpid/sasl.py
+++ b/python/qpid/sasl.py
@@ -29,6 +29,9 @@ class WrapperClient:
def setAttr(self, name, value):
status = self._cli.setAttr(str(name), str(value))
+ if status and name == 'username':
+ status = self._cli.setAttr('externaluser', str(value))
+
if not status:
raise SASLError(self._cli.getError())
diff --git a/python/qpid/selector.py b/python/qpid/selector.py
index ca5946c3f9..ff94091da0 100644
--- a/python/qpid/selector.py
+++ b/python/qpid/selector.py
@@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
#
-import atexit, time
+import atexit, time, errno
from compat import select, set, selectable_waiter
from threading import Thread, Lock
@@ -111,12 +111,24 @@ class Selector:
else:
wakeup = min(wakeup, t)
- if wakeup is None:
- timeout = None
- else:
- timeout = max(0, wakeup - time.time())
+ rd = []
+ wr = []
+ ex = []
- rd, wr, ex = select(self.reading, self.writing, (), timeout)
+ while True:
+ try:
+ if wakeup is None:
+ timeout = None
+ else:
+ timeout = max(0, wakeup - time.time())
+ rd, wr, ex = select(self.reading, self.writing, (), timeout)
+ break
+ except Exception, (err, strerror):
+ # Repeat the select call if we were interrupted.
+ if err == errno.EINTR:
+ continue
+ else:
+ raise
for sel in wr:
if sel.writing():
diff --git a/python/qpid/tests/messaging/endpoints.py b/python/qpid/tests/messaging/endpoints.py
index 62deacd0bd..a82a9e95ed 100644
--- a/python/qpid/tests/messaging/endpoints.py
+++ b/python/qpid/tests/messaging/endpoints.py
@@ -1333,3 +1333,15 @@ class SenderTests(Base):
self.drain(self.rcv, expected=msgs)
self.ssn.acknowledge()
assert caught, "did not exceed capacity"
+
+ def testEINTR(self):
+ m1 = self.content("testEINTR", 0)
+ m2 = self.content("testEINTR", 1)
+
+ self.snd.send(m1, timeout=self.timeout())
+ try:
+ os.setuid(500)
+ assert False, "setuid should fail"
+ except:
+ pass
+ self.snd.send(m2, timeout=self.timeout())
diff --git a/python/qpid/util.py b/python/qpid/util.py
index 89677289e2..7541595453 100644
--- a/python/qpid/util.py
+++ b/python/qpid/util.py
@@ -25,9 +25,9 @@ except ImportError:
from socket import ssl as wrap_socket
class ssl:
- def __init__(self, sock):
+ def __init__(self, sock, keyfile=None, certfile=None, trustfile=None):
self.sock = sock
- self.ssl = wrap_socket(sock)
+ self.ssl = wrap_socket(sock, keyfile=keyfile, certfile=certfile, ca_certs=trustfile)
def recv(self, n):
return self.ssl.read(n)
diff --git a/python/setup.py b/python/setup.py
index 225ee44b91..0b9d99a1af 100755
--- a/python/setup.py
+++ b/python/setup.py
@@ -298,7 +298,7 @@ class install_lib(_install_lib):
return outfiles + extra
setup(name="qpid-python",
- version="0.17",
+ version="0.19",
author="Apache Qpid",
author_email="dev@qpid.apache.org",
packages=["mllib", "qpid", "qpid.messaging", "qpid.tests",