summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gear/__init__.py111
-rw-r--r--gear/cmd/geard.py29
-rw-r--r--gear/tests/__init__.py6
-rw-r--r--gear/tests/test_functional.py12
-rw-r--r--gear/tests/test_gear.py34
-rw-r--r--other-requirements.txt11
-rw-r--r--requirements.txt4
-rw-r--r--setup.cfg4
-rw-r--r--test-requirements.txt2
-rw-r--r--tox.ini11
10 files changed, 168 insertions, 56 deletions
diff --git a/gear/__init__.py b/gear/__init__.py
index f820aa2..fecce32 100644
--- a/gear/__init__.py
+++ b/gear/__init__.py
@@ -314,7 +314,8 @@ class Connection(object):
admin_request = self._getAdminRequest()
if admin:
complete, remainder = admin_request.isComplete(raw_bytes)
- raw_bytes = remainder
+ if remainder is not None:
+ raw_bytes = remainder
if complete:
return admin_request
else:
@@ -431,6 +432,7 @@ class AdminRequest(object):
command = None
arguments = []
response = None
+ _complete_position = 0
def __init__(self, *arguments):
self.wait_event = threading.Event()
@@ -451,8 +453,10 @@ class AdminRequest(object):
def isComplete(self, data):
x = -1
- end_index_newline = data.find(b'\n.\n')
- end_index_return = data.find(b'\r\n.\r\n')
+ start = self._complete_position
+ start = max(self._complete_position - 4, 0)
+ end_index_newline = data.find(b'\n.\n', start)
+ end_index_return = data.find(b'\r\n.\r\n', start)
if end_index_newline != -1:
x = end_index_newline + 3
elif end_index_return != -1:
@@ -461,11 +465,12 @@ class AdminRequest(object):
x = 2
elif data.startswith(b'.\r\n'):
x = 3
+ self._complete_position = len(data)
if x != -1:
self.response = data[:x]
return (True, data[x:])
else:
- return (False, data)
+ return (False, None)
def setComplete(self):
self.wait_event.set()
@@ -530,7 +535,7 @@ class CancelJobAdminRequest(AdminRequest):
self.response = data[:x]
return (True, data[x:])
else:
- return (False, data)
+ return (False, None)
class VersionAdminRequest(AdminRequest):
@@ -551,7 +556,7 @@ class VersionAdminRequest(AdminRequest):
self.response = data[:x]
return (True, data[x:])
else:
- return (False, data)
+ return (False, None)
class WorkersAdminRequest(AdminRequest):
@@ -797,7 +802,11 @@ class BaseClientServer(object):
self.log.debug("Marking %s as disconnected" % conn)
self.connections_condition.acquire()
try:
- jobs = conn.related_jobs.values()
+ # NOTE(notmorgan): In the loop below it is possible to change the
+ # jobs list on the connection. In python 3 .values() is an iter not
+ # a static list, meaning that a change will break the for loop
+ # as the object being iterated on will have changed in size.
+ jobs = list(conn.related_jobs.values())
if conn in self.active_connections:
self.active_connections.remove(conn)
if conn not in self.inactive_connections:
@@ -1105,11 +1114,15 @@ class BaseClientServer(object):
The object may no longer be used after shutdown is called.
"""
- self.log.debug("Beginning shutdown")
- self._shutdown()
- self.log.debug("Beginning cleanup")
- self._cleanup()
- self.log.debug("Finished shutdown")
+ if self.running:
+ self.log.debug("Beginning shutdown")
+ self._shutdown()
+ self.log.debug("Beginning cleanup")
+ self._cleanup()
+ self.log.debug("Finished shutdown")
+ else:
+ self.log.warning("Shutdown called when not currently running. "
+ "Ignoring.")
def _shutdown(self):
# The first part of the shutdown process where all threads
@@ -2284,7 +2297,7 @@ class ServerAdminRequest(AdminRequest):
x = end_index_newline + 1
return (True, data[x:])
else:
- return (False, data)
+ return (False, None)
class NonBlockingConnection(Connection):
@@ -2343,28 +2356,32 @@ class NonBlockingConnection(Connection):
def sendQueuedData(self):
"""Send previously queued data to the socket."""
- while len(self.send_queue):
- data = self.send_queue.pop(0)
- r = 0
- try:
- r = self.conn.send(data)
- except ssl.SSLError as e:
- if e.errno == ssl.SSL_ERROR_WANT_READ:
- raise RetryIOError()
- elif e.errno == ssl.SSL_ERROR_WANT_WRITE:
- raise RetryIOError()
- else:
- raise
- except socket.error as e:
- if e.errno == errno.EAGAIN:
- self.log.debug("Write operation on %s would block"
- % self)
- raise RetryIOError()
- raise
- finally:
- data = data[r:]
- if data:
- self.send_queue.insert(0, data)
+ try:
+ while len(self.send_queue):
+ data = self.send_queue.pop(0)
+ r = 0
+ try:
+ r = self.conn.send(data)
+ except ssl.SSLError as e:
+ if e.errno == ssl.SSL_ERROR_WANT_READ:
+ raise RetryIOError()
+ elif e.errno == ssl.SSL_ERROR_WANT_WRITE:
+ raise RetryIOError()
+ else:
+ raise
+ except socket.error as e:
+ if e.errno == errno.EAGAIN:
+ self.log.debug("Write operation on %s would block"
+ % self)
+ raise RetryIOError()
+ else:
+ raise
+ finally:
+ data = data[r:]
+ if data:
+ self.send_queue.insert(0, data)
+ except RetryIOError:
+ pass
class ServerConnection(NonBlockingConnection):
@@ -2429,6 +2446,10 @@ class Server(BaseClientServer):
access control rules to its connections.
:arg str host: Host name or IPv4/IPv6 address to bind to. Defaults
to "whatever getaddrinfo() returns", which might be IPv4-only.
+ :arg bool keepalive: Whether to use TCP keepalives
+ :arg int tcp_keepidle: Idle time after which to start keepalives sending
+ :arg int tcp_keepintvl: Interval in seconds between TCP keepalives
+ :arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect
"""
edge_bitmask = select.EPOLLET
@@ -2438,7 +2459,8 @@ class Server(BaseClientServer):
def __init__(self, port=4730, ssl_key=None, ssl_cert=None, ssl_ca=None,
statsd_host=None, statsd_port=8125, statsd_prefix=None,
- server_id=None, acl=None, host=None):
+ server_id=None, acl=None, host=None, keepalive=False,
+ tcp_keepidle=7200, tcp_keepintvl=75, tcp_keepcnt=9):
self.port = port
self.ssl_key = ssl_key
self.ssl_cert = ssl_cert
@@ -2467,6 +2489,15 @@ class Server(BaseClientServer):
self.socket = socket.socket(af, socktype, proto)
self.socket.setsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR, 1)
+ if keepalive:
+ self.socket.setsockopt(socket.SOL_SOCKET,
+ socket.SO_KEEPALIVE, 1)
+ self.socket.setsockopt(socket.IPPROTO_TCP,
+ socket.TCP_KEEPIDLE, tcp_keepidle)
+ self.socket.setsockopt(socket.IPPROTO_TCP,
+ socket.TCP_KEEPINTVL, tcp_keepintvl)
+ self.socket.setsockopt(socket.IPPROTO_TCP,
+ socket.TCP_KEEPCNT, tcp_keepcnt)
except socket.error:
self.socket = None
continue
@@ -2692,7 +2723,11 @@ class Server(BaseClientServer):
self.connections_condition.acquire()
self._unregisterConnection(conn)
try:
- jobs = conn.related_jobs.values()
+ # NOTE(notmorgan): In the loop below it is possible to change the
+ # jobs list on the connection. In python 3 .values() is an iter not
+ # a static list, meaning that a change will break the for loop
+ # as the object being iterated on will have changed in size.
+ jobs = list(conn.related_jobs.values())
if conn in self.active_connections:
self.active_connections.remove(conn)
finally:
@@ -2938,7 +2973,7 @@ class Server(BaseClientServer):
functions = self._getFunctionStats()
for name, values in functions.items():
request.connection.sendRaw(("%s\t%s\t%s\t%s\n" %
- (name, values[0], values[1],
+ (name.decode('utf-8'), values[0], values[1],
values[2])).encode('utf8'))
request.connection.sendRaw(b'.\n')
diff --git a/gear/cmd/geard.py b/gear/cmd/geard.py
index 227f1cd..d35dc25 100644
--- a/gear/cmd/geard.py
+++ b/gear/cmd/geard.py
@@ -1,4 +1,3 @@
-#!/usr/bin/env python
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -14,7 +13,6 @@
# under the License.
import argparse
-import ConfigParser
import daemon
import extras
import gear
@@ -25,6 +23,9 @@ import pbr.version
import signal
import sys
+from six.moves import configparser as ConfigParser
+
+
pid_file_module = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile'])
@@ -59,6 +60,18 @@ support.
help='path to SSL private key')
parser.add_argument('--acl', dest='acl', metavar='PATH',
help='path to ACL file')
+ parser.add_argument('--keepalive', dest='keepalive', default=False,
+ action='store_true',
+ help='enable TCP keepalives in socket')
+ parser.add_argument('--keepalive-idle', dest='tcp_keepidle', type=int,
+ default=7200, action='store',
+ help='TCP keepalive idle time')
+ parser.add_argument('--keepalive-interval', dest='tcp_keepintvl', type=int,
+ default=75, action='store',
+ help='TCP keepalive probe interval')
+ parser.add_argument('--keepalive-count', dest='tcp_keepcnt', type=int,
+ default=9, action='store',
+ help='TCP keepalive probes count')
parser.add_argument('--version', dest='version', action='store_true',
help='show version')
self.args = parser.parse_args()
@@ -108,7 +121,12 @@ support.
statsd_host,
statsd_port,
statsd_prefix,
- acl=acl)
+ acl=acl,
+ keepalive=self.args.keepalive,
+ tcp_keepidle=self.args.tcp_keepidle,
+ tcp_keepintvl=self.args.tcp_keepintvl,
+ tcp_keepcnt=self.args.tcp_keepcnt
+ )
signal.pause()
@@ -127,8 +145,3 @@ def main():
pid = pid_file_module.TimeoutPIDLockFile(server.args.pidfile, 10)
with daemon.DaemonContext(pidfile=pid):
server.main()
-
-
-if __name__ == "__main__":
- sys.path.insert(0, '.')
- main()
diff --git a/gear/tests/__init__.py b/gear/tests/__init__.py
index 834b0f0..6d5edb4 100644
--- a/gear/tests/__init__.py
+++ b/gear/tests/__init__.py
@@ -17,6 +17,7 @@
"""Common utilities used in testing"""
import errno
+import logging
import os
import socket
@@ -49,7 +50,10 @@ class BaseTestCase(testtools.TestCase, testresources.ResourcedTestCase):
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
- self.useFixture(fixtures.FakeLogger())
+ self.useFixture(fixtures.FakeLogger(
+ level=logging.DEBUG,
+ format='%(asctime)s %(name)-32s '
+ '%(levelname)-8s %(message)s'))
self.useFixture(fixtures.NestedTempfile())
diff --git a/gear/tests/test_functional.py b/gear/tests/test_functional.py
index 247649a..26e72e3 100644
--- a/gear/tests/test_functional.py
+++ b/gear/tests/test_functional.py
@@ -14,11 +14,13 @@
# limitations under the License.
import os
+import threading
import time
from OpenSSL import crypto
import fixtures
import testscenarios
+import testtools
import gear
from gear import tests
@@ -125,6 +127,16 @@ class TestFunctional(tests.BaseTestCase):
self.assertTrue(job.complete)
self.assertEqual(job.data, [b'workdata'])
+ def test_worker_termination(self):
+ def getJob():
+ with testtools.ExpectedException(gear.InterruptedError):
+ self.worker.getJob()
+ self.worker.registerFunction('test')
+ jobthread = threading.Thread(target=getJob)
+ jobthread.daemon = True
+ jobthread.start()
+ self.worker.stopWaitingForJobs()
+
def load_tests(loader, in_tests, pattern):
return testscenarios.load_tests_apply_scenarios(loader, in_tests, pattern)
diff --git a/gear/tests/test_gear.py b/gear/tests/test_gear.py
index c16df04..546d4c4 100644
--- a/gear/tests/test_gear.py
+++ b/gear/tests/test_gear.py
@@ -42,6 +42,35 @@ class ConnectionTestCase(tests.BaseTestCase):
'host: %s port: %s>' % (self.host, self.port)))
+class AdminRequestTestCase(tests.BaseTestCase):
+ scenarios = [
+ ('empty newline', dict(response=b'.\n', remainder=b'')),
+ ('empty return', dict(response=b'.\r\n', remainder=b'')),
+ ('data newline', dict(response=b'foo\n.\n', remainder=b'')),
+ ('data return', dict(response=b'foo\r\n.\r\n', remainder=b'')),
+ ('pipeline newline', dict(response=b'foo\n.\nbar',
+ remainder=b'bar')),
+ ('pipeline return', dict(response=b'foo\r\n.\r\nbar',
+ remainder=b'bar')),
+ ]
+
+ def test_full_packet(self):
+ req = gear.StatusAdminRequest()
+ ret = req.isComplete(self.response)
+ self.assertTrue(ret[0])
+ self.assertEqual(ret[1], self.remainder)
+
+ def test_partial_packet(self):
+ req = gear.StatusAdminRequest()
+ for i in range(len(self.response)-len(self.remainder)):
+ ret = req.isComplete(self.response[:i])
+ self.assertFalse(ret[0])
+ self.assertIsNone(ret[1])
+ ret = req.isComplete(self.response)
+ self.assertTrue(ret[0])
+ self.assertEqual(ret[1], self.remainder)
+
+
class TestConnection(tests.BaseTestCase):
def setUp(self):
@@ -293,6 +322,11 @@ class TestClient(tests.BaseTestCase):
acl.revokeGrant('manager')
self.assertFalse(acl.canGrant('manager'))
+ def test_double_shutdown(self):
+ client = gear.Client()
+ client.shutdown()
+ client.shutdown()
+
def load_tests(loader, in_tests, pattern):
return testscenarios.load_tests_apply_scenarios(loader, in_tests, pattern)
diff --git a/other-requirements.txt b/other-requirements.txt
new file mode 100644
index 0000000..fa5c3fb
--- /dev/null
+++ b/other-requirements.txt
@@ -0,0 +1,11 @@
+# This is a cross-platform list tracking distribution packages needed by tests;
+# see http://docs.openstack.org/infra/bindep/ for additional information.
+
+python-dev [platform:dpkg]
+python-devel [platform:rpm]
+python3-all-dev [platform:ubuntu !platform:ubuntu-precise]
+python3-dev [platform:dpkg]
+python3-devel [platform:fedora]
+python3.4 [platform:ubuntu-trusty]
+python3.5 [platform:ubuntu-xenial]
+python34-devel [platform:centos]
diff --git a/requirements.txt b/requirements.txt
index c575dc6..a35fc4c 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,4 +1,4 @@
-pbr>=0.6,!=0.7,<1.0
-argparse
+pbr>=1.8.0,<2.0
+six>=1.5.2
extras
python-daemon>=2.0.4
diff --git a/setup.cfg b/setup.cfg
index e103276..f11f162 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -20,10 +20,6 @@ classifier =
packages =
gear
-[global]
-setup-hooks =
- pbr.hooks.setup_hook
-
[entry_points]
console_scripts =
geard = gear.cmd.geard:main
diff --git a/test-requirements.txt b/test-requirements.txt
index abf85cb..98bcf41 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -1,4 +1,4 @@
-hacking>=0.5.3,<0.8
+hacking>=0.10.0,<0.11
coverage>=3.6
discover
diff --git a/tox.ini b/tox.ini
index daf5cc5..d99ad32 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,7 +1,7 @@
[tox]
minversion = 1.6
skipsdist = True
-envlist = py26,py27,pep8
+envlist = py27,py34,pep8
[testenv]
setenv = VIRTUAL_ENV={envdir}
@@ -14,7 +14,6 @@ commands =
[tox:jenkins]
sitepackages = True
-downloadcache = ~/cache/pip
[testenv:pep8]
commands = flake8
@@ -36,3 +35,11 @@ select = H231
[testenv:docs]
commands = python setup.py build_sphinx
+
+[testenv:bindep]
+# Do not install any requirements. We want this to be fast and work even if
+# system dependencies are missing, since it's used to tell you what system
+# dependencies are missing! This also means that bindep must be installed
+# separately, outside of the requirements files.
+deps = bindep
+commands = bindep test