summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-05 08:10:00 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-05 08:10:00 -0700
commit679d7485b1fdf049879099404dcecc8496569d2b (patch)
treec83a38ee159dadf55b0f88ae579b64f6d4bb1885 /kafka
parentd81963a919fa8161c94b5bef5e6de0697b91c4a6 (diff)
parentbb25469fdaf6e0bfe929f12173578e8fdf114094 (diff)
downloadkafka-python-679d7485b1fdf049879099404dcecc8496569d2b.tar.gz
Merge pull request #611 from dpkp/sock_send_bytes
Handle partial socket send()
Diffstat (limited to 'kafka')
-rw-r--r--kafka/conn.py10
-rw-r--r--kafka/future.py4
2 files changed, 8 insertions, 6 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 2b82b6d..ffc839e 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -188,10 +188,12 @@ class BrokerConnection(object):
# and send bytes asynchronously. For now, just block
# sending each request payload
self._sock.setblocking(True)
- sent_bytes = self._sock.send(size)
- assert sent_bytes == len(size)
- sent_bytes = self._sock.send(message)
- assert sent_bytes == len(message)
+ for data in (size, message):
+ total_sent = 0
+ while total_sent < len(data):
+ sent_bytes = self._sock.send(data[total_sent:])
+ total_sent += sent_bytes
+ assert total_sent == len(data)
self._sock.setblocking(False)
except (AssertionError, ConnectionError) as e:
log.exception("Error sending %s to %s", request, self)
diff --git a/kafka/future.py b/kafka/future.py
index 06b8c3a..c7e0b14 100644
--- a/kafka/future.py
+++ b/kafka/future.py
@@ -15,10 +15,10 @@ class Future(object):
self._errbacks = []
def succeeded(self):
- return self.is_done and not self.exception
+ return self.is_done and not bool(self.exception)
def failed(self):
- return self.is_done and self.exception
+ return self.is_done and bool(self.exception)
def retriable(self):
try: