diff options
author | Mauro Stettler <mauro.stettler@gmail.com> | 2015-03-16 23:37:12 +0900 |
---|---|---|
committer | Mauro Stettler <mauro.stettler@gmail.com> | 2015-03-17 01:26:19 +0900 |
commit | 82a543aa0b5c609c456f94e86ff263cb30e90997 (patch) | |
tree | e7f19fea018b61e30f3b6a73db6363ccc7364f2b | |
parent | 6857690ae34b87f11572f64c5f93938acff42809 (diff) | |
download | pystatsd-82a543aa0b5c609c456f94e86ff263cb30e90997.tar.gz |
Add TCPStatsClient and TCPPipeline
-rw-r--r-- | statsd/__init__.py | 3 | ||||
-rw-r--r-- | statsd/client.py | 49 | ||||
-rw-r--r-- | statsd/tests.py | 258 |
3 files changed, 306 insertions, 4 deletions
diff --git a/statsd/__init__.py b/statsd/__init__.py index a4f0372..79f54f5 100644 --- a/statsd/__init__.py +++ b/statsd/__init__.py @@ -1,8 +1,9 @@ from __future__ import absolute_import from .client import StatsClient +from .client import TCPStatsClient VERSION = (3, 0, 1) __version__ = '.'.join(map(str, VERSION)) -__all__ = ['StatsClient'] +__all__ = ['StatsClient', 'TCPStatsClient'] diff --git a/statsd/client.py b/statsd/client.py index 3f0da06..a423dae 100644 --- a/statsd/client.py +++ b/statsd/client.py @@ -7,7 +7,7 @@ import time import abc -__all__ = ['StatsClient'] +__all__ = ['StatsClient', 'TCPStatsClient'] class Timer(object): @@ -154,6 +154,46 @@ class StatsClient(StatsClientBase): return Pipeline(self) +class TCPStatsClient(StatsClientBase): + """TCP version of StatsClient.""" + + def __init__(self, host='localhost', port=8125, prefix=None, timeout=None): + """Create a new client.""" + self._host = host + self._port = port + self._timeout = timeout + self._prefix = prefix + self._sock = None + + def _send(self, data): + """Send data to statsd.""" + if not self._sock: + self.connect() + self._do_send(data) + + def _do_send(self, data): + self._sock.sendall(data.encode('ascii') + b'\n') + + def close(self): + if self._sock and hasattr(self._sock, 'close'): + self._sock.close() + self._sock = None + + def connect(self): + family, _, _, _, addr = socket.getaddrinfo( + self._host, self._port, 0, socket.SOCK_STREAM)[0] + self._sock = socket.socket(family, socket.SOCK_STREAM) + self._sock.settimeout(self._timeout) + self._sock.connect(addr) + + def pipeline(self): + return TCPPipeline(self) + + def reconnect(self, data): + self.close() + self.connect() + + class PipelineBase(StatsClientBase): __metaclass__ = abc.ABCMeta @@ -203,3 +243,10 @@ class Pipeline(PipelineBase): else: data += '\n' + stat self._client._after(data) + + +class TCPPipeline(PipelineBase): + + def _send(self): + self._client._after('\n'.join(self._stats)) + self._stats.clear() diff --git a/statsd/tests.py b/statsd/tests.py index f3f0a54..a6a525c 100644 --- a/statsd/tests.py +++ b/statsd/tests.py @@ -7,6 +7,7 @@ import mock from nose.tools import eq_ from statsd import StatsClient +from statsd import TCPStatsClient ADDR = (socket.gethostbyname('localhost'), 8125) @@ -15,12 +16,14 @@ ADDR = (socket.gethostbyname('localhost'), 8125) # proto specific methods to get the socket method to send data send_method = { 'udp': lambda x: x.sendto, + 'tcp': lambda x: x.sendall, } # proto specific methods to create the expected value make_val = { 'udp': lambda x, addr: mock.call(str.encode(x), addr), + 'tcp': lambda x, addr: mock.call(str.encode(x + '\n')), } @@ -34,6 +37,16 @@ def _udp_client(prefix=None, addr=None, port=None): return sc +def _tcp_client(prefix=None, addr=None, port=None, timeout=None): + if not addr: + addr = ADDR[0] + if not port: + port = ADDR[1] + sc = TCPStatsClient(host=addr, port=port, prefix=prefix, timeout=timeout) + sc._sock = mock.Mock() + return sc + + def _timer_check(sock, count, proto, start, end): send = send_method[proto](sock) eq_(send.call_count, count) @@ -132,6 +145,13 @@ def test_incr_udp(): _test_incr(cl, 'udp') +@mock.patch.object(random, 'random', lambda: -1) +def test_incr_tcp(): + """TCPStatsClient.incr works.""" + cl = _tcp_client() + _test_incr(cl, 'tcp') + + def _test_decr(cl, proto): cl.decr('foo') _sock_check(cl._sock, 1, proto, 'foo:-1|c') @@ -153,6 +173,13 @@ def test_decr_udp(): _test_decr(cl, 'udp') +@mock.patch.object(random, 'random', lambda: -1) +def test_decr_tcp(): + """TCPStatsClient.decr works.""" + cl = _tcp_client() + _test_decr(cl, 'tcp') + + def _test_gauge(cl, proto): cl.gauge('foo', 30) _sock_check(cl._sock, 1, proto, 'foo:30|g') @@ -171,6 +198,13 @@ def test_gauge_udp(): _test_gauge(cl, 'udp') +@mock.patch.object(random, 'random', lambda: -1) +def test_gauge_tcp(): + """TCPStatsClient.gauge works.""" + cl = _tcp_client() + _test_gauge(cl, 'tcp') + + def _test_ipv6(cl, proto, addr): cl.gauge('foo', 30) _sock_check(cl._sock, 1, proto, 'foo:30|g', addr=addr) @@ -183,6 +217,13 @@ def test_ipv6_udp(): cl = _udp_client(addr=addr[0]) _test_ipv6(cl, 'udp', addr) +@mock.patch.object(random, 'random', lambda: -1) +def test_ipv6_tcp(): + """TCPStatsClient can use to IPv6 address.""" + addr = ('::1', 8125, 0, 0) + cl = _tcp_client(addr=addr[0]) + _test_ipv6(cl, 'tcp', addr) + def _test_gauge_delta(cl, proto): tests = ( @@ -208,6 +249,13 @@ def test_gauge_delta_udp(): _test_gauge_delta(cl, 'udp') +@mock.patch.object(random, 'random', lambda: -1) +def test_gauge_delta_tcp(): + """TCPStatsClient.gauge works with delta values.""" + cl = _tcp_client() + _test_gauge_delta(cl, 'tcp') + + def _test_gauge_absolute_negative(cl, proto): cl.gauge('foo', -5, delta=False) _sock_check(cl._sock, 1, 'foo:0|g\nfoo:-5|g') @@ -220,6 +268,13 @@ def test_gauge_absolute_negative_udp(): _test_gauge_delta(cl, 'udp') +@mock.patch.object(random, 'random', lambda: -1) +def test_gauge_absolute_negative_tcp(): + """TCPStatsClient.gauge works with absolute negative value.""" + cl = _tcp_client() + _test_gauge_delta(cl, 'tcp') + + def _test_gauge_absolute_negative_rate(cl, proto, mock_random): mock_random.return_value = -1 cl.gauge('foo', -1, rate=0.5, delta=False) @@ -238,6 +293,13 @@ def test_gauge_absolute_negative_rate_udp(mock_random): _test_gauge_absolute_negative_rate(cl, 'udp', mock_random) +@mock.patch.object(random, 'random') +def test_gauge_absolute_negative_rate_tcp(mock_random): + """TCPStatsClient.gauge works with absolute negative value and rate.""" + cl = _tcp_client() + _test_gauge_absolute_negative_rate(cl, 'tcp', mock_random) + + def _test_set(cl, proto): cl.set('foo', 10) _sock_check(cl._sock, 1, proto, 'foo:10|s') @@ -259,6 +321,13 @@ def test_set_udp(): _test_set(cl, 'udp') +@mock.patch.object(random, 'random', lambda: -1) +def test_set_tcp(): + """TCPStatsClient.set works.""" + cl = _tcp_client() + _test_set(cl, 'tcp') + + def _test_timing(cl, proto): cl.timing('foo', 100) _sock_check(cl._sock, 1, proto, 'foo:100|ms') @@ -277,6 +346,13 @@ def test_timing_udp(): _test_timing(cl, 'udp') +@mock.patch.object(random, 'random', lambda: -1) +def test_timing_tcp(): + """TCPStatsClient.timing works.""" + cl = _tcp_client() + _test_timing(cl, 'tcp') + + def _test_prepare(cl, proto): tests = ( ('foo:1|c', ('foo', '1|c', 1)), @@ -299,6 +375,13 @@ def test_prepare_udp(): _test_prepare(cl, 'udp') +@mock.patch.object(random, 'random', lambda: -1) +def test_prepare_tcp(): + """Test TCPStatsClient._prepare method.""" + cl = _tcp_client() + _test_prepare(cl, 'tcp') + + def _test_prefix(cl, proto): cl.incr('bar') _sock_check(cl._sock, 1, proto, 'foo.bar:1|c') @@ -311,6 +394,13 @@ def test_prefix_udp(): _test_prefix(cl, 'udp') +@mock.patch.object(random, 'random', lambda: -1) +def test_prefix_tcp(): + """TCPStatsClient.incr works.""" + cl = _tcp_client(prefix='foo') + _test_prefix(cl, 'tcp') + + def _test_timer_manager(cl, proto): with cl.timer('foo'): pass @@ -324,6 +414,12 @@ def test_timer_manager_udp(): _test_timer_manager(cl, 'udp') +def test_timer_manager_tcp(): + """TCPStatsClient.timer can be used as manager.""" + cl = _tcp_client() + _test_timer_manager(cl, 'tcp') + + def _test_timer_decorator(cl, proto): @cl.timer('foo') def foo(a, b): @@ -351,6 +447,12 @@ def test_timer_decorator_udp(): _test_timer_decorator(cl, 'udp') +def test_timer_decorator_tcp(): + """StatsClient.timer is a thread-safe decorator (TCP).""" + cl = _tcp_client() + _test_timer_decorator(cl, 'tcp') + + def _test_timer_capture(cl, proto): with cl.timer('woo') as result: eq_(result.ms, None) @@ -363,6 +465,12 @@ def test_timer_capture_udp(): _test_timer_capture(cl, 'udp') +def test_timer_capture_tcp(): + """You can capture the output of StatsClient.timer (TCP).""" + cl = _tcp_client() + _test_timer_capture(cl, 'tcp') + + def _test_timer_context_rate(cl, proto): with cl.timer('foo', rate=0.5): pass @@ -377,6 +485,13 @@ def test_timer_context_rate_udp(): _test_timer_context_rate(cl, 'udp') +@mock.patch.object(random, 'random', lambda: -1) +def test_timer_context_rate_tcp(): + """TCPStatsClient.timer can be used as manager with rate.""" + cl = _tcp_client() + _test_timer_context_rate(cl, 'tcp') + + def _test_timer_decorator_rate(cl, proto): @cl.timer('foo', rate=0.1) def foo(a, b): @@ -400,6 +515,13 @@ def test_timer_decorator_rate_udp(): _test_timer_decorator_rate(cl, 'udp') +@mock.patch.object(random, 'random', lambda: -1) +def test_timer_decorator_rate_tcp(): + """TCPStatsClient.timer can be used as decorator with rate.""" + cl = _tcp_client() + _test_timer_decorator_rate(cl, 'tcp') + + def _test_timer_context_exceptions(cl, proto): with assert_raises(socket.timeout): with cl.timer('foo'): @@ -409,11 +531,17 @@ def _test_timer_context_exceptions(cl, proto): def test_timer_context_exceptions_udp(): - """Exceptions within a managed block should get logged and propagate.""" + """Exceptions within a managed block should get logged and propagate (UDP).""" cl = _udp_client() _test_timer_context_exceptions(cl, 'udp') +def test_timer_context_exceptions_tcp(): + """Exceptions within a managed block should get logged and propagate (TCP).""" + cl = _tcp_client() + _test_timer_context_exceptions(cl, 'tcp') + + def _test_timer_decorator_exceptions(cl, proto): @cl.timer('foo') def foo(): @@ -426,11 +554,17 @@ def _test_timer_decorator_exceptions(cl, proto): def test_timer_decorator_exceptions_udp(): - """Exceptions from wrapped methods should get logged and propagate.""" + """Exceptions from wrapped methods should get logged and propagate (UDP).""" cl = _udp_client() _test_timer_decorator_exceptions(cl, 'udp') +def test_timer_decorator_exceptions_tcp(): + """Exceptions from wrapped methods should get logged and propagate (TCP).""" + cl = _tcp_client() + _test_timer_decorator_exceptions(cl, 'tcp') + + def _test_timer_object(cl, proto): t = cl.timer('foo').start() t.stop() @@ -444,6 +578,12 @@ def test_timer_object_udp(): _test_timer_object(cl, 'udp') +def test_timer_object_tcp(): + """TCPStatsClient.timer works.""" + cl = _tcp_client() + _test_timer_object(cl, 'tcp') + + def _test_timer_object_no_send(cl, proto): t = cl.timer('foo').start() t.stop(send=False) @@ -459,6 +599,12 @@ def test_timer_object_no_send_udp(): _test_timer_object_no_send(cl, 'udp') +def test_timer_object_no_send_tcp(): + """Stop TCPStatsClient.timer without sending.""" + cl = _tcp_client() + _test_timer_object_no_send(cl, 'tcp') + + def _test_timer_object_rate(cl, proto): t = cl.timer('foo', rate=0.5) t.start() @@ -474,6 +620,13 @@ def test_timer_object_rate_udp(): _test_timer_object_rate(cl, 'udp') +@mock.patch.object(random, 'random', lambda: -1) +def test_timer_object_rate_tcp(): + """TCPStatsClient.timer works with rate.""" + cl = _tcp_client() + _test_timer_object_rate(cl, 'tcp') + + def _test_timer_object_no_send_twice(cl): t = cl.timer('foo').start() t.stop() @@ -488,6 +641,12 @@ def test_timer_object_no_send_twice_udp(): _test_timer_object_no_send_twice(cl) +def test_timer_object_no_send_twice_tcp(): + """TCPStatsClient.timer raises RuntimeError if send is called twice.""" + cl = _tcp_client() + _test_timer_object_no_send_twice(cl) + + def _test_timer_send_without_stop(cl): with cl.timer('foo') as t: assert t.ms is None @@ -506,6 +665,12 @@ def test_timer_send_without_stop_udp(): _test_timer_send_without_stop(cl) +def test_timer_send_without_stop_tcp(): + """TCPStatsClient.timer raises error if send is called before stop.""" + cl = _tcp_client() + _test_timer_send_without_stop(cl) + + def _test_timer_object_stop_without_start(cl): with assert_raises(RuntimeError): cl.timer('foo').stop() @@ -517,6 +682,12 @@ def test_timer_object_stop_without_start_udp(): _test_timer_object_stop_without_start(cl) +def test_timer_object_stop_without_start_tcp(): + """TCPStatsClient.timer raises error if stop is called before start.""" + cl = _tcp_client() + _test_timer_object_stop_without_start(cl) + + def _test_pipeline(cl, proto): pipe = cl.pipeline() pipe.incr('foo') @@ -532,6 +703,12 @@ def test_pipeline_udp(): _test_pipeline(cl, 'udp') +def test_pipeline_tcp(): + """TCPStatsClient.pipeline works.""" + cl = _tcp_client() + _test_pipeline(cl, 'tcp') + + def _test_pipeline_null(cl, proto): pipe = cl.pipeline() pipe.send() @@ -544,6 +721,12 @@ def test_pipeline_null_udp(): _test_pipeline_null(cl, 'udp') +def test_pipeline_null_tcp(): + """Ensure we don't error on an empty pipeline (TCP).""" + cl = _tcp_client() + _test_pipeline_null(cl, 'tcp') + + def _test_pipeline_manager(cl, proto): with cl.pipeline() as pipe: pipe.incr('foo') @@ -558,6 +741,12 @@ def test_pipeline_manager_udp(): _test_pipeline_manager(cl, 'udp') +def test_pipeline_manager_tcp(): + """TCPStatsClient.pipeline can be used as manager.""" + cl = _tcp_client() + _test_pipeline_manager(cl, 'tcp') + + def _test_pipeline_timer_manager(cl, proto): with cl.pipeline() as pipe: with pipe.timer('foo'): @@ -571,6 +760,12 @@ def test_pipeline_timer_manager_udp(): _test_pipeline_timer_manager(cl, 'udp') +def test_pipeline_timer_manager_tcp(): + """Timer manager can be retrieve from TCP Pipeline manager.""" + cl = _tcp_client() + _test_pipeline_timer_manager(cl, 'tcp') + + def _test_pipeline_timer_decorator(cl, proto): with cl.pipeline() as pipe: @pipe.timer('foo') @@ -586,6 +781,12 @@ def test_pipeline_timer_decorator_udp(): _test_pipeline_timer_decorator(cl, 'udp') +def test_pipeline_timer_decorator_tcp(): + """TCP Pipeline manager can be used as decorator.""" + cl = _tcp_client() + _test_pipeline_timer_decorator(cl, 'tcp') + + def _test_pipeline_timer_object(cl, proto): with cl.pipeline() as pipe: t = pipe.timer('foo').start() @@ -600,6 +801,12 @@ def test_pipeline_timer_object_udp(): _test_pipeline_timer_object(cl, 'udp') +def test_pipeline_timer_object_tcp(): + """Timer from TCP Pipeline manager works.""" + cl = _tcp_client() + _test_pipeline_timer_object(cl, 'tcp') + + def _test_pipeline_empty(cl): with cl.pipeline() as pipe: pipe.incr('foo') @@ -613,6 +820,12 @@ def test_pipeline_empty_udp(): _test_pipeline_empty(cl) +def test_pipeline_empty_tcp(): + """Pipelines should be empty after a send() call (TCP).""" + cl = _tcp_client() + _test_pipeline_empty(cl) + + def _test_pipeline_negative_absolute_gauge(cl, proto): with cl.pipeline() as pipe: pipe.gauge('foo', -10, delta=False) @@ -626,6 +839,12 @@ def test_pipeline_negative_absolute_gauge_udp(): _test_pipeline_negative_absolute_gauge(cl, 'udp') +def test_pipeline_negative_absolute_gauge_tcp(): + """Negative absolute gauges use an internal pipeline (TCP).""" + cl = _tcp_client() + _test_pipeline_negative_absolute_gauge(cl, 'tcp') + + def _test_big_numbers(cl, proto): num = 1234568901234 result = 'foo:1234568901234|%s' @@ -651,6 +870,12 @@ def test_big_numbers_udp(): _test_big_numbers(cl, 'udp') +def test_big_numbers_tcp(): + """Test big numbers with TCP client.""" + cl = _tcp_client() + _test_big_numbers(cl, 'tcp') + + def _test_rate_no_send(cl, proto): cl.incr('foo', rate=0.5) _sock_check(cl._sock, 0, proto) @@ -663,6 +888,13 @@ def test_rate_no_send_udp(): _test_rate_no_send(cl, 'udp') +@mock.patch.object(random, 'random', lambda: 2) +def test_rate_no_send_tcp(): + """Rate below random value prevents sending with TCPStatsClient.incr.""" + cl = _tcp_client() + _test_rate_no_send(cl, 'tcp') + + def test_socket_error(): """Socket error on StatsClient should be ignored.""" cl = _udp_client() @@ -682,3 +914,25 @@ def test_pipeline_packet_size(): eq_(2, sc._sock.sendto.call_count) assert len(sc._sock.sendto.call_args_list[0][0][0]) <= 512 assert len(sc._sock.sendto.call_args_list[1][0][0]) <= 512 + + +@mock.patch.object(socket, 'socket') +def test_tcp_raises_exception_to_user(mock_socket): + """Socket errors in TCPStatsClient should be raised to user.""" + addr = ('127.0.0.1', 1234) + cl = _tcp_client(addr=addr[0], port=addr[1]) + cl.incr('foo') + cl._sock.sendall.assert_called_once() + cl._sock.sendall.side_effect = socket.error + with assert_raises(socket.error): + cl.incr('foo') + + +@mock.patch.object(socket, 'socket') +def test_tcp_timeout(mock_socket): + """Timeout on TCPStatsClient should be set on socket.""" + test_timeout = 321 + cl = TCPStatsClient(timeout=test_timeout) + cl.incr('foo') + cl._sock.settimeout.assert_called_once() + cl._sock.settimeout.assert_called_with(test_timeout) |