From 4a04a09ef1a7abc9085ab8208b62fbbfa6fc64bb Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 4 Apr 2016 22:51:33 -0700 Subject: Coerce exceptions to bool in Future.succeeded() and Future.failed() --- kafka/future.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'kafka') diff --git a/kafka/future.py b/kafka/future.py index 06b8c3a..c7e0b14 100644 --- a/kafka/future.py +++ b/kafka/future.py @@ -15,10 +15,10 @@ class Future(object): self._errbacks = [] def succeeded(self): - return self.is_done and not self.exception + return self.is_done and not bool(self.exception) def failed(self): - return self.is_done and self.exception + return self.is_done and bool(self.exception) def retriable(self): try: -- cgit v1.2.1 From bb25469fdaf6e0bfe929f12173578e8fdf114094 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 23 Mar 2016 09:59:52 -0700 Subject: Handle partial socket send() --- kafka/conn.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) (limited to 'kafka') diff --git a/kafka/conn.py b/kafka/conn.py index 2b82b6d..ffc839e 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -188,10 +188,12 @@ class BrokerConnection(object): # and send bytes asynchronously. For now, just block # sending each request payload self._sock.setblocking(True) - sent_bytes = self._sock.send(size) - assert sent_bytes == len(size) - sent_bytes = self._sock.send(message) - assert sent_bytes == len(message) + for data in (size, message): + total_sent = 0 + while total_sent < len(data): + sent_bytes = self._sock.send(data[total_sent:]) + total_sent += sent_bytes + assert total_sent == len(data) self._sock.setblocking(False) except (AssertionError, ConnectionError) as e: log.exception("Error sending %s to %s", request, self) -- cgit v1.2.1