summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-08-01 08:53:17 -0700
committerGitHub <noreply@github.com>2016-08-01 08:53:17 -0700
commit4162989b77e44a47ecd0cc3db8788233251a4fb4 (patch)
tree23981a905d92c19dc81574906d0beb443f9f152b
parent64d3607b8796f6ef1cf71fbecfc6887b3b15c700 (diff)
downloadkafka-python-4162989b77e44a47ecd0cc3db8788233251a4fb4.tar.gz
Use socket_options configuration to setsockopts(). Default TCP_NODELAY (#783)
-rw-r--r--kafka/client_async.py18
-rw-r--r--kafka/conn.py20
-rw-r--r--kafka/consumer/group.py13
-rw-r--r--kafka/producer/kafka.py5
4 files changed, 39 insertions, 17 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index dee4a12..6bffa9e 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -54,6 +54,7 @@ class KafkaClient(object):
'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
+ 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
'retry_backoff_ms': 100,
'metadata_max_age_ms': 300000,
'security_protocol': 'PLAINTEXT',
@@ -93,26 +94,29 @@ class KafkaClient(object):
server-side log entries that correspond to this client. Also
submitted to GroupCoordinator for logging with respect to
consumer group administration. Default: 'kafka-python-{version}'
- request_timeout_ms (int): Client request timeout in milliseconds.
- Default: 40000.
reconnect_backoff_ms (int): The amount of time in milliseconds to
wait before attempting to reconnect to a given host.
Default: 50.
+ request_timeout_ms (int): Client request timeout in milliseconds.
+ Default: 40000.
+ retry_backoff_ms (int): Milliseconds to backoff when retrying on
+ errors. Default: 100.
max_in_flight_requests_per_connection (int): Requests are pipelined
to kafka brokers up to this number of maximum requests per
broker connection. Default: 5.
- send_buffer_bytes (int): The size of the TCP send buffer
- (SO_SNDBUF) to use when sending data. Default: None (relies on
- system defaults). Java client defaults to 131072.
receive_buffer_bytes (int): The size of the TCP receive buffer
(SO_RCVBUF) to use when reading data. Default: None (relies on
system defaults). Java client defaults to 32768.
+ send_buffer_bytes (int): The size of the TCP send buffer
+ (SO_SNDBUF) to use when sending data. Default: None (relies on
+ system defaults). Java client defaults to 131072.
+ socket_options (list): List of tuple-arguments to socket.setsockopt
+ to apply to broker connection sockets. Default:
+ [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
metadata_max_age_ms (int): The period of time in milliseconds after
which we force a refresh of metadata even if we haven't seen any
partition leadership changes to proactively discover any new
brokers or partitions. Default: 300000
- retry_backoff_ms (int): Milliseconds to backoff when retrying on
- errors. Default: 100.
security_protocol (str): Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
diff --git a/kafka/conn.py b/kafka/conn.py
index 5489d1f..da98028 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -60,6 +60,7 @@ class BrokerConnection(object):
'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
+ 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
'ssl_check_hostname': True,
@@ -84,6 +85,15 @@ class BrokerConnection(object):
if key in configs:
self.config[key] = configs[key]
+ if self.config['receive_buffer_bytes'] is not None:
+ self.config['socket_options'].append(
+ (socket.SOL_SOCKET, socket.SO_RCVBUF,
+ self.config['receive_buffer_bytes']))
+ if self.config['send_buffer_bytes'] is not None:
+ self.config['socket_options'].append(
+ (socket.SOL_SOCKET, socket.SO_SNDBUF,
+ self.config['send_buffer_bytes']))
+
self.state = ConnectionStates.DISCONNECTED
self._sock = None
self._ssl_context = None
@@ -144,12 +154,10 @@ class BrokerConnection(object):
self._sock = socket.socket(afi, socket.SOCK_STREAM)
else:
self._sock = socket.socket(self.afi, socket.SOCK_STREAM)
- if self.config['receive_buffer_bytes'] is not None:
- self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF,
- self.config['receive_buffer_bytes'])
- if self.config['send_buffer_bytes'] is not None:
- self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF,
- self.config['send_buffer_bytes'])
+
+ for option in self.config['socket_options']:
+ self._sock.setsockopt(*option)
+
self._sock.setblocking(False)
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
self._wrap_ssl()
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index ed28573..fcd5ede 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -2,6 +2,7 @@ from __future__ import absolute_import
import copy
import logging
+import socket
import time
import six
@@ -114,12 +115,15 @@ class KafkaConsumer(six.Iterator):
rebalances. Default: 3000
session_timeout_ms (int): The timeout used to detect failures when
using Kafka's group managementment facilities. Default: 30000
- send_buffer_bytes (int): The size of the TCP send buffer
- (SO_SNDBUF) to use when sending data. Default: None (relies on
- system defaults). The java client defaults to 131072.
receive_buffer_bytes (int): The size of the TCP receive buffer
(SO_RCVBUF) to use when reading data. Default: None (relies on
system defaults). The java client defaults to 32768.
+ send_buffer_bytes (int): The size of the TCP send buffer
+ (SO_SNDBUF) to use when sending data. Default: None (relies on
+ system defaults). The java client defaults to 131072.
+ socket_options (list): List of tuple-arguments to socket.setsockopt
+ to apply to broker connection sockets. Default:
+ [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
consumer_timeout_ms (int): number of milliseconds to block during
message iteration before raising StopIteration (i.e., ending the
iterator). Default -1 (block forever).
@@ -209,8 +213,9 @@ class KafkaConsumer(six.Iterator):
'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
'heartbeat_interval_ms': 3000,
'session_timeout_ms': 30000,
- 'send_buffer_bytes': None,
'receive_buffer_bytes': None,
+ 'send_buffer_bytes': None,
+ 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
'consumer_timeout_ms': -1,
'skip_double_compressed_messages': False,
'security_protocol': 'PLAINTEXT',
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 02e4621..b91ba24 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -3,6 +3,7 @@ from __future__ import absolute_import
import atexit
import copy
import logging
+import socket
import threading
import time
import weakref
@@ -188,6 +189,9 @@ class KafkaProducer(object):
send_buffer_bytes (int): The size of the TCP send buffer
(SO_SNDBUF) to use when sending data. Default: None (relies on
system defaults). Java client defaults to 131072.
+ socket_options (list): List of tuple-arguments to socket.setsockopt
+ to apply to broker connection sockets. Default:
+ [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
reconnect_backoff_ms (int): The amount of time in milliseconds to
wait before attempting to reconnect to a given host.
Default: 50.
@@ -256,6 +260,7 @@ class KafkaProducer(object):
'request_timeout_ms': 30000,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
+ 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
'reconnect_backoff_ms': 50,
'max_in_flight_requests_per_connection': 5,
'security_protocol': 'PLAINTEXT',