From 894cfec656e44f92766775baf84cfd1d2e257901 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 12 Mar 2017 19:57:36 -0700 Subject: Derive all api classes from Request / Response base classes --- kafka/conn.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'kafka/conn.py') diff --git a/kafka/conn.py b/kafka/conn.py index 29f6911..d5b7c50 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -525,7 +525,7 @@ class BrokerConnection(object): ifr.future.failure(error) self.config['state_change_callback'](self) - def send(self, request, expect_response=True): + def send(self, request): """send request, return Future() Can block on network if request is larger than send_buffer_bytes @@ -537,9 +537,9 @@ class BrokerConnection(object): return future.failure(Errors.ConnectionError(str(self))) elif not self.can_send_more(): return future.failure(Errors.TooManyInFlightRequests(str(self))) - return self._send(request, expect_response=expect_response) + return self._send(request) - def _send(self, request, expect_response=True): + def _send(self, request): assert self.state in (ConnectionStates.AUTHENTICATING, ConnectionStates.CONNECTED) future = Future() correlation_id = self._next_correlation_id() @@ -569,7 +569,7 @@ class BrokerConnection(object): return future.failure(error) log.debug('%s Request %d: %s', self, correlation_id, request) - if expect_response: + if request.expect_response(): ifr = InFlightRequest(request=request, correlation_id=correlation_id, response_type=request.RESPONSE_TYPE, -- cgit v1.2.1