summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTobias Henkel <tobias.henkel@bmw.de>2018-09-04 13:30:40 +0200
committerTobias Henkel <tobias.henkel@bmw.de>2018-09-04 13:50:04 +0200
commit71dbac070c7e6a74a192197ce6a6ce9cc65ee0f9 (patch)
treebb321409476b7518660aed661cc5309676750ac3
parentc00ca944db0d6dc6ef90859b0b9b7f3a58196fb0 (diff)
downloadgear-71dbac070c7e6a74a192197ce6a6ce9cc65ee0f9.tar.gz
Add support for keepalive to client
A gearman client only waiting for jobs will wait indefinitely if the gearman server vanishes (e.g. due to a VM crash). In this case there is no traffic on the connection and the client blocks forever if there is nothing in between that forcefully terminates the connection. Adding tcp keepalive can mitigate that and the connection will be terminated by the kernel in this situation which then triggers a reconnect. Change-Id: I8589cd45450245a25539c051355b38d16ee9f4b9
-rw-r--r--gear/__init__.py31
1 files changed, 28 insertions, 3 deletions
diff --git a/gear/__init__.py b/gear/__init__.py
index 4ed674e..57d80e3 100644
--- a/gear/__init__.py
+++ b/gear/__init__.py
@@ -124,16 +124,25 @@ class Connection(object):
:arg str client_id: The client ID associated with this connection.
It will be appending to the name of the logger (e.g.,
gear.Connection.client_id). Defaults to 'unknown'.
+ :arg bool keepalive: Whether to use TCP keepalives
+ :arg int tcp_keepidle: Idle time after which to start keepalives sending
+ :arg int tcp_keepintvl: Interval in seconds between TCP keepalives
+ :arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect
"""
def __init__(self, host, port, ssl_key=None, ssl_cert=None, ssl_ca=None,
- client_id='unknown'):
+ client_id='unknown', keepalive=False, tcp_keepidle=7200,
+ tcp_keepintvl=75, tcp_keepcnt=9):
self.log = logging.getLogger("gear.Connection.%s" % (client_id,))
self.host = host
self.port = port
self.ssl_key = ssl_key
self.ssl_cert = ssl_cert
self.ssl_ca = ssl_ca
+ self.keepalive = keepalive
+ self.tcp_keepcnt = tcp_keepcnt
+ self.tcp_keepintvl = tcp_keepintvl
+ self.tcp_keepidle = tcp_keepidle
self.use_ssl = False
if all([self.ssl_key, self.ssl_cert, self.ssl_ca]):
@@ -182,6 +191,14 @@ class Connection(object):
af, socktype, proto, canonname, sa = res
try:
s = socket.socket(af, socktype, proto)
+ if self.keepalive:
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+ s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE,
+ self.tcp_keepidle)
+ s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL,
+ self.tcp_keepintvl)
+ s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT,
+ self.tcp_keepcnt)
except socket.error:
s = None
continue
@@ -1162,7 +1179,9 @@ class BaseClient(BaseClientServer):
self.broadcast_lock = threading.RLock()
def addServer(self, host, port=4730,
- ssl_key=None, ssl_cert=None, ssl_ca=None):
+ ssl_key=None, ssl_cert=None, ssl_ca=None,
+ keepalive=False, tcp_keepidle=7200, tcp_keepintvl=75,
+ tcp_keepcnt=9):
"""Add a server to the client's connection pool.
Any number of Gearman servers may be added to a client. The
@@ -1184,6 +1203,11 @@ class BaseClient(BaseClientServer):
:arg str ssl_key: Path to the SSL private key.
:arg str ssl_cert: Path to the SSL certificate.
:arg str ssl_ca: Path to the CA certificate.
+ :arg bool keepalive: Whether to use TCP keepalives
+ :arg int tcp_keepidle: Idle time after which to start keepalives
+ sending
+ :arg int tcp_keepintvl: Interval in seconds between TCP keepalives
+ :arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect
:raises ConfigurationError: If the host/port combination has
already been added to the client.
"""
@@ -1196,7 +1220,8 @@ class BaseClient(BaseClientServer):
if conn.host == host and conn.port == port:
raise ConfigurationError("Host/port already specified")
conn = Connection(host, port, ssl_key, ssl_cert, ssl_ca,
- self.client_id)
+ self.client_id, keepalive, tcp_keepidle,
+ tcp_keepintvl, tcp_keepcnt)
self.inactive_connections.append(conn)
self.connections_condition.notifyAll()
finally: