diff options
-rw-r--r-- | gear/__init__.py | 111 | ||||
-rw-r--r-- | gear/cmd/geard.py | 29 | ||||
-rw-r--r-- | gear/tests/__init__.py | 6 | ||||
-rw-r--r-- | gear/tests/test_functional.py | 12 | ||||
-rw-r--r-- | gear/tests/test_gear.py | 34 | ||||
-rw-r--r-- | other-requirements.txt | 11 | ||||
-rw-r--r-- | requirements.txt | 4 | ||||
-rw-r--r-- | setup.cfg | 4 | ||||
-rw-r--r-- | test-requirements.txt | 2 | ||||
-rw-r--r-- | tox.ini | 11 |
10 files changed, 168 insertions, 56 deletions
diff --git a/gear/__init__.py b/gear/__init__.py index f820aa2..fecce32 100644 --- a/gear/__init__.py +++ b/gear/__init__.py @@ -314,7 +314,8 @@ class Connection(object): admin_request = self._getAdminRequest() if admin: complete, remainder = admin_request.isComplete(raw_bytes) - raw_bytes = remainder + if remainder is not None: + raw_bytes = remainder if complete: return admin_request else: @@ -431,6 +432,7 @@ class AdminRequest(object): command = None arguments = [] response = None + _complete_position = 0 def __init__(self, *arguments): self.wait_event = threading.Event() @@ -451,8 +453,10 @@ class AdminRequest(object): def isComplete(self, data): x = -1 - end_index_newline = data.find(b'\n.\n') - end_index_return = data.find(b'\r\n.\r\n') + start = self._complete_position + start = max(self._complete_position - 4, 0) + end_index_newline = data.find(b'\n.\n', start) + end_index_return = data.find(b'\r\n.\r\n', start) if end_index_newline != -1: x = end_index_newline + 3 elif end_index_return != -1: @@ -461,11 +465,12 @@ class AdminRequest(object): x = 2 elif data.startswith(b'.\r\n'): x = 3 + self._complete_position = len(data) if x != -1: self.response = data[:x] return (True, data[x:]) else: - return (False, data) + return (False, None) def setComplete(self): self.wait_event.set() @@ -530,7 +535,7 @@ class CancelJobAdminRequest(AdminRequest): self.response = data[:x] return (True, data[x:]) else: - return (False, data) + return (False, None) class VersionAdminRequest(AdminRequest): @@ -551,7 +556,7 @@ class VersionAdminRequest(AdminRequest): self.response = data[:x] return (True, data[x:]) else: - return (False, data) + return (False, None) class WorkersAdminRequest(AdminRequest): @@ -797,7 +802,11 @@ class BaseClientServer(object): self.log.debug("Marking %s as disconnected" % conn) self.connections_condition.acquire() try: - jobs = conn.related_jobs.values() + # NOTE(notmorgan): In the loop below it is possible to change the + # jobs list on the connection. In python 3 .values() is an iter not + # a static list, meaning that a change will break the for loop + # as the object being iterated on will have changed in size. + jobs = list(conn.related_jobs.values()) if conn in self.active_connections: self.active_connections.remove(conn) if conn not in self.inactive_connections: @@ -1105,11 +1114,15 @@ class BaseClientServer(object): The object may no longer be used after shutdown is called. """ - self.log.debug("Beginning shutdown") - self._shutdown() - self.log.debug("Beginning cleanup") - self._cleanup() - self.log.debug("Finished shutdown") + if self.running: + self.log.debug("Beginning shutdown") + self._shutdown() + self.log.debug("Beginning cleanup") + self._cleanup() + self.log.debug("Finished shutdown") + else: + self.log.warning("Shutdown called when not currently running. " + "Ignoring.") def _shutdown(self): # The first part of the shutdown process where all threads @@ -2284,7 +2297,7 @@ class ServerAdminRequest(AdminRequest): x = end_index_newline + 1 return (True, data[x:]) else: - return (False, data) + return (False, None) class NonBlockingConnection(Connection): @@ -2343,28 +2356,32 @@ class NonBlockingConnection(Connection): def sendQueuedData(self): """Send previously queued data to the socket.""" - while len(self.send_queue): - data = self.send_queue.pop(0) - r = 0 - try: - r = self.conn.send(data) - except ssl.SSLError as e: - if e.errno == ssl.SSL_ERROR_WANT_READ: - raise RetryIOError() - elif e.errno == ssl.SSL_ERROR_WANT_WRITE: - raise RetryIOError() - else: - raise - except socket.error as e: - if e.errno == errno.EAGAIN: - self.log.debug("Write operation on %s would block" - % self) - raise RetryIOError() - raise - finally: - data = data[r:] - if data: - self.send_queue.insert(0, data) + try: + while len(self.send_queue): + data = self.send_queue.pop(0) + r = 0 + try: + r = self.conn.send(data) + except ssl.SSLError as e: + if e.errno == ssl.SSL_ERROR_WANT_READ: + raise RetryIOError() + elif e.errno == ssl.SSL_ERROR_WANT_WRITE: + raise RetryIOError() + else: + raise + except socket.error as e: + if e.errno == errno.EAGAIN: + self.log.debug("Write operation on %s would block" + % self) + raise RetryIOError() + else: + raise + finally: + data = data[r:] + if data: + self.send_queue.insert(0, data) + except RetryIOError: + pass class ServerConnection(NonBlockingConnection): @@ -2429,6 +2446,10 @@ class Server(BaseClientServer): access control rules to its connections. :arg str host: Host name or IPv4/IPv6 address to bind to. Defaults to "whatever getaddrinfo() returns", which might be IPv4-only. + :arg bool keepalive: Whether to use TCP keepalives + :arg int tcp_keepidle: Idle time after which to start keepalives sending + :arg int tcp_keepintvl: Interval in seconds between TCP keepalives + :arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect """ edge_bitmask = select.EPOLLET @@ -2438,7 +2459,8 @@ class Server(BaseClientServer): def __init__(self, port=4730, ssl_key=None, ssl_cert=None, ssl_ca=None, statsd_host=None, statsd_port=8125, statsd_prefix=None, - server_id=None, acl=None, host=None): + server_id=None, acl=None, host=None, keepalive=False, + tcp_keepidle=7200, tcp_keepintvl=75, tcp_keepcnt=9): self.port = port self.ssl_key = ssl_key self.ssl_cert = ssl_cert @@ -2467,6 +2489,15 @@ class Server(BaseClientServer): self.socket = socket.socket(af, socktype, proto) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + if keepalive: + self.socket.setsockopt(socket.SOL_SOCKET, + socket.SO_KEEPALIVE, 1) + self.socket.setsockopt(socket.IPPROTO_TCP, + socket.TCP_KEEPIDLE, tcp_keepidle) + self.socket.setsockopt(socket.IPPROTO_TCP, + socket.TCP_KEEPINTVL, tcp_keepintvl) + self.socket.setsockopt(socket.IPPROTO_TCP, + socket.TCP_KEEPCNT, tcp_keepcnt) except socket.error: self.socket = None continue @@ -2692,7 +2723,11 @@ class Server(BaseClientServer): self.connections_condition.acquire() self._unregisterConnection(conn) try: - jobs = conn.related_jobs.values() + # NOTE(notmorgan): In the loop below it is possible to change the + # jobs list on the connection. In python 3 .values() is an iter not + # a static list, meaning that a change will break the for loop + # as the object being iterated on will have changed in size. + jobs = list(conn.related_jobs.values()) if conn in self.active_connections: self.active_connections.remove(conn) finally: @@ -2938,7 +2973,7 @@ class Server(BaseClientServer): functions = self._getFunctionStats() for name, values in functions.items(): request.connection.sendRaw(("%s\t%s\t%s\t%s\n" % - (name, values[0], values[1], + (name.decode('utf-8'), values[0], values[1], values[2])).encode('utf8')) request.connection.sendRaw(b'.\n') diff --git a/gear/cmd/geard.py b/gear/cmd/geard.py index 227f1cd..d35dc25 100644 --- a/gear/cmd/geard.py +++ b/gear/cmd/geard.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # Copyright 2013 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -14,7 +13,6 @@ # under the License. import argparse -import ConfigParser import daemon import extras import gear @@ -25,6 +23,9 @@ import pbr.version import signal import sys +from six.moves import configparser as ConfigParser + + pid_file_module = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile']) @@ -59,6 +60,18 @@ support. help='path to SSL private key') parser.add_argument('--acl', dest='acl', metavar='PATH', help='path to ACL file') + parser.add_argument('--keepalive', dest='keepalive', default=False, + action='store_true', + help='enable TCP keepalives in socket') + parser.add_argument('--keepalive-idle', dest='tcp_keepidle', type=int, + default=7200, action='store', + help='TCP keepalive idle time') + parser.add_argument('--keepalive-interval', dest='tcp_keepintvl', type=int, + default=75, action='store', + help='TCP keepalive probe interval') + parser.add_argument('--keepalive-count', dest='tcp_keepcnt', type=int, + default=9, action='store', + help='TCP keepalive probes count') parser.add_argument('--version', dest='version', action='store_true', help='show version') self.args = parser.parse_args() @@ -108,7 +121,12 @@ support. statsd_host, statsd_port, statsd_prefix, - acl=acl) + acl=acl, + keepalive=self.args.keepalive, + tcp_keepidle=self.args.tcp_keepidle, + tcp_keepintvl=self.args.tcp_keepintvl, + tcp_keepcnt=self.args.tcp_keepcnt + ) signal.pause() @@ -127,8 +145,3 @@ def main(): pid = pid_file_module.TimeoutPIDLockFile(server.args.pidfile, 10) with daemon.DaemonContext(pidfile=pid): server.main() - - -if __name__ == "__main__": - sys.path.insert(0, '.') - main() diff --git a/gear/tests/__init__.py b/gear/tests/__init__.py index 834b0f0..6d5edb4 100644 --- a/gear/tests/__init__.py +++ b/gear/tests/__init__.py @@ -17,6 +17,7 @@ """Common utilities used in testing""" import errno +import logging import os import socket @@ -49,7 +50,10 @@ class BaseTestCase(testtools.TestCase, testresources.ResourcedTestCase): stderr = self.useFixture(fixtures.StringStream('stderr')).stream self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr)) - self.useFixture(fixtures.FakeLogger()) + self.useFixture(fixtures.FakeLogger( + level=logging.DEBUG, + format='%(asctime)s %(name)-32s ' + '%(levelname)-8s %(message)s')) self.useFixture(fixtures.NestedTempfile()) diff --git a/gear/tests/test_functional.py b/gear/tests/test_functional.py index 247649a..26e72e3 100644 --- a/gear/tests/test_functional.py +++ b/gear/tests/test_functional.py @@ -14,11 +14,13 @@ # limitations under the License. import os +import threading import time from OpenSSL import crypto import fixtures import testscenarios +import testtools import gear from gear import tests @@ -125,6 +127,16 @@ class TestFunctional(tests.BaseTestCase): self.assertTrue(job.complete) self.assertEqual(job.data, [b'workdata']) + def test_worker_termination(self): + def getJob(): + with testtools.ExpectedException(gear.InterruptedError): + self.worker.getJob() + self.worker.registerFunction('test') + jobthread = threading.Thread(target=getJob) + jobthread.daemon = True + jobthread.start() + self.worker.stopWaitingForJobs() + def load_tests(loader, in_tests, pattern): return testscenarios.load_tests_apply_scenarios(loader, in_tests, pattern) diff --git a/gear/tests/test_gear.py b/gear/tests/test_gear.py index c16df04..546d4c4 100644 --- a/gear/tests/test_gear.py +++ b/gear/tests/test_gear.py @@ -42,6 +42,35 @@ class ConnectionTestCase(tests.BaseTestCase): 'host: %s port: %s>' % (self.host, self.port))) +class AdminRequestTestCase(tests.BaseTestCase): + scenarios = [ + ('empty newline', dict(response=b'.\n', remainder=b'')), + ('empty return', dict(response=b'.\r\n', remainder=b'')), + ('data newline', dict(response=b'foo\n.\n', remainder=b'')), + ('data return', dict(response=b'foo\r\n.\r\n', remainder=b'')), + ('pipeline newline', dict(response=b'foo\n.\nbar', + remainder=b'bar')), + ('pipeline return', dict(response=b'foo\r\n.\r\nbar', + remainder=b'bar')), + ] + + def test_full_packet(self): + req = gear.StatusAdminRequest() + ret = req.isComplete(self.response) + self.assertTrue(ret[0]) + self.assertEqual(ret[1], self.remainder) + + def test_partial_packet(self): + req = gear.StatusAdminRequest() + for i in range(len(self.response)-len(self.remainder)): + ret = req.isComplete(self.response[:i]) + self.assertFalse(ret[0]) + self.assertIsNone(ret[1]) + ret = req.isComplete(self.response) + self.assertTrue(ret[0]) + self.assertEqual(ret[1], self.remainder) + + class TestConnection(tests.BaseTestCase): def setUp(self): @@ -293,6 +322,11 @@ class TestClient(tests.BaseTestCase): acl.revokeGrant('manager') self.assertFalse(acl.canGrant('manager')) + def test_double_shutdown(self): + client = gear.Client() + client.shutdown() + client.shutdown() + def load_tests(loader, in_tests, pattern): return testscenarios.load_tests_apply_scenarios(loader, in_tests, pattern) diff --git a/other-requirements.txt b/other-requirements.txt new file mode 100644 index 0000000..fa5c3fb --- /dev/null +++ b/other-requirements.txt @@ -0,0 +1,11 @@ +# This is a cross-platform list tracking distribution packages needed by tests; +# see http://docs.openstack.org/infra/bindep/ for additional information. + +python-dev [platform:dpkg] +python-devel [platform:rpm] +python3-all-dev [platform:ubuntu !platform:ubuntu-precise] +python3-dev [platform:dpkg] +python3-devel [platform:fedora] +python3.4 [platform:ubuntu-trusty] +python3.5 [platform:ubuntu-xenial] +python34-devel [platform:centos] diff --git a/requirements.txt b/requirements.txt index c575dc6..a35fc4c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -pbr>=0.6,!=0.7,<1.0 -argparse +pbr>=1.8.0,<2.0 +six>=1.5.2 extras python-daemon>=2.0.4 @@ -20,10 +20,6 @@ classifier = packages = gear -[global] -setup-hooks = - pbr.hooks.setup_hook - [entry_points] console_scripts = geard = gear.cmd.geard:main diff --git a/test-requirements.txt b/test-requirements.txt index abf85cb..98bcf41 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,4 +1,4 @@ -hacking>=0.5.3,<0.8 +hacking>=0.10.0,<0.11 coverage>=3.6 discover @@ -1,7 +1,7 @@ [tox] minversion = 1.6 skipsdist = True -envlist = py26,py27,pep8 +envlist = py27,py34,pep8 [testenv] setenv = VIRTUAL_ENV={envdir} @@ -14,7 +14,6 @@ commands = [tox:jenkins] sitepackages = True -downloadcache = ~/cache/pip [testenv:pep8] commands = flake8 @@ -36,3 +35,11 @@ select = H231 [testenv:docs] commands = python setup.py build_sphinx + +[testenv:bindep] +# Do not install any requirements. We want this to be fast and work even if +# system dependencies are missing, since it's used to tell you what system +# dependencies are missing! This also means that bindep must be installed +# separately, outside of the requirements files. +deps = bindep +commands = bindep test |