summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMauro Stettler <mauro.stettler@gmail.com>2015-03-16 23:37:12 +0900
committerMauro Stettler <mauro.stettler@gmail.com>2015-03-17 01:26:19 +0900
commit82a543aa0b5c609c456f94e86ff263cb30e90997 (patch)
treee7f19fea018b61e30f3b6a73db6363ccc7364f2b
parent6857690ae34b87f11572f64c5f93938acff42809 (diff)
downloadpystatsd-82a543aa0b5c609c456f94e86ff263cb30e90997.tar.gz
Add TCPStatsClient and TCPPipeline
-rw-r--r--statsd/__init__.py3
-rw-r--r--statsd/client.py49
-rw-r--r--statsd/tests.py258
3 files changed, 306 insertions, 4 deletions
diff --git a/statsd/__init__.py b/statsd/__init__.py
index a4f0372..79f54f5 100644
--- a/statsd/__init__.py
+++ b/statsd/__init__.py
@@ -1,8 +1,9 @@
from __future__ import absolute_import
from .client import StatsClient
+from .client import TCPStatsClient
VERSION = (3, 0, 1)
__version__ = '.'.join(map(str, VERSION))
-__all__ = ['StatsClient']
+__all__ = ['StatsClient', 'TCPStatsClient']
diff --git a/statsd/client.py b/statsd/client.py
index 3f0da06..a423dae 100644
--- a/statsd/client.py
+++ b/statsd/client.py
@@ -7,7 +7,7 @@ import time
import abc
-__all__ = ['StatsClient']
+__all__ = ['StatsClient', 'TCPStatsClient']
class Timer(object):
@@ -154,6 +154,46 @@ class StatsClient(StatsClientBase):
return Pipeline(self)
+class TCPStatsClient(StatsClientBase):
+ """TCP version of StatsClient."""
+
+ def __init__(self, host='localhost', port=8125, prefix=None, timeout=None):
+ """Create a new client."""
+ self._host = host
+ self._port = port
+ self._timeout = timeout
+ self._prefix = prefix
+ self._sock = None
+
+ def _send(self, data):
+ """Send data to statsd."""
+ if not self._sock:
+ self.connect()
+ self._do_send(data)
+
+ def _do_send(self, data):
+ self._sock.sendall(data.encode('ascii') + b'\n')
+
+ def close(self):
+ if self._sock and hasattr(self._sock, 'close'):
+ self._sock.close()
+ self._sock = None
+
+ def connect(self):
+ family, _, _, _, addr = socket.getaddrinfo(
+ self._host, self._port, 0, socket.SOCK_STREAM)[0]
+ self._sock = socket.socket(family, socket.SOCK_STREAM)
+ self._sock.settimeout(self._timeout)
+ self._sock.connect(addr)
+
+ def pipeline(self):
+ return TCPPipeline(self)
+
+ def reconnect(self, data):
+ self.close()
+ self.connect()
+
+
class PipelineBase(StatsClientBase):
__metaclass__ = abc.ABCMeta
@@ -203,3 +243,10 @@ class Pipeline(PipelineBase):
else:
data += '\n' + stat
self._client._after(data)
+
+
+class TCPPipeline(PipelineBase):
+
+ def _send(self):
+ self._client._after('\n'.join(self._stats))
+ self._stats.clear()
diff --git a/statsd/tests.py b/statsd/tests.py
index f3f0a54..a6a525c 100644
--- a/statsd/tests.py
+++ b/statsd/tests.py
@@ -7,6 +7,7 @@ import mock
from nose.tools import eq_
from statsd import StatsClient
+from statsd import TCPStatsClient
ADDR = (socket.gethostbyname('localhost'), 8125)
@@ -15,12 +16,14 @@ ADDR = (socket.gethostbyname('localhost'), 8125)
# proto specific methods to get the socket method to send data
send_method = {
'udp': lambda x: x.sendto,
+ 'tcp': lambda x: x.sendall,
}
# proto specific methods to create the expected value
make_val = {
'udp': lambda x, addr: mock.call(str.encode(x), addr),
+ 'tcp': lambda x, addr: mock.call(str.encode(x + '\n')),
}
@@ -34,6 +37,16 @@ def _udp_client(prefix=None, addr=None, port=None):
return sc
+def _tcp_client(prefix=None, addr=None, port=None, timeout=None):
+ if not addr:
+ addr = ADDR[0]
+ if not port:
+ port = ADDR[1]
+ sc = TCPStatsClient(host=addr, port=port, prefix=prefix, timeout=timeout)
+ sc._sock = mock.Mock()
+ return sc
+
+
def _timer_check(sock, count, proto, start, end):
send = send_method[proto](sock)
eq_(send.call_count, count)
@@ -132,6 +145,13 @@ def test_incr_udp():
_test_incr(cl, 'udp')
+@mock.patch.object(random, 'random', lambda: -1)
+def test_incr_tcp():
+ """TCPStatsClient.incr works."""
+ cl = _tcp_client()
+ _test_incr(cl, 'tcp')
+
+
def _test_decr(cl, proto):
cl.decr('foo')
_sock_check(cl._sock, 1, proto, 'foo:-1|c')
@@ -153,6 +173,13 @@ def test_decr_udp():
_test_decr(cl, 'udp')
+@mock.patch.object(random, 'random', lambda: -1)
+def test_decr_tcp():
+ """TCPStatsClient.decr works."""
+ cl = _tcp_client()
+ _test_decr(cl, 'tcp')
+
+
def _test_gauge(cl, proto):
cl.gauge('foo', 30)
_sock_check(cl._sock, 1, proto, 'foo:30|g')
@@ -171,6 +198,13 @@ def test_gauge_udp():
_test_gauge(cl, 'udp')
+@mock.patch.object(random, 'random', lambda: -1)
+def test_gauge_tcp():
+ """TCPStatsClient.gauge works."""
+ cl = _tcp_client()
+ _test_gauge(cl, 'tcp')
+
+
def _test_ipv6(cl, proto, addr):
cl.gauge('foo', 30)
_sock_check(cl._sock, 1, proto, 'foo:30|g', addr=addr)
@@ -183,6 +217,13 @@ def test_ipv6_udp():
cl = _udp_client(addr=addr[0])
_test_ipv6(cl, 'udp', addr)
+@mock.patch.object(random, 'random', lambda: -1)
+def test_ipv6_tcp():
+ """TCPStatsClient can use to IPv6 address."""
+ addr = ('::1', 8125, 0, 0)
+ cl = _tcp_client(addr=addr[0])
+ _test_ipv6(cl, 'tcp', addr)
+
def _test_gauge_delta(cl, proto):
tests = (
@@ -208,6 +249,13 @@ def test_gauge_delta_udp():
_test_gauge_delta(cl, 'udp')
+@mock.patch.object(random, 'random', lambda: -1)
+def test_gauge_delta_tcp():
+ """TCPStatsClient.gauge works with delta values."""
+ cl = _tcp_client()
+ _test_gauge_delta(cl, 'tcp')
+
+
def _test_gauge_absolute_negative(cl, proto):
cl.gauge('foo', -5, delta=False)
_sock_check(cl._sock, 1, 'foo:0|g\nfoo:-5|g')
@@ -220,6 +268,13 @@ def test_gauge_absolute_negative_udp():
_test_gauge_delta(cl, 'udp')
+@mock.patch.object(random, 'random', lambda: -1)
+def test_gauge_absolute_negative_tcp():
+ """TCPStatsClient.gauge works with absolute negative value."""
+ cl = _tcp_client()
+ _test_gauge_delta(cl, 'tcp')
+
+
def _test_gauge_absolute_negative_rate(cl, proto, mock_random):
mock_random.return_value = -1
cl.gauge('foo', -1, rate=0.5, delta=False)
@@ -238,6 +293,13 @@ def test_gauge_absolute_negative_rate_udp(mock_random):
_test_gauge_absolute_negative_rate(cl, 'udp', mock_random)
+@mock.patch.object(random, 'random')
+def test_gauge_absolute_negative_rate_tcp(mock_random):
+ """TCPStatsClient.gauge works with absolute negative value and rate."""
+ cl = _tcp_client()
+ _test_gauge_absolute_negative_rate(cl, 'tcp', mock_random)
+
+
def _test_set(cl, proto):
cl.set('foo', 10)
_sock_check(cl._sock, 1, proto, 'foo:10|s')
@@ -259,6 +321,13 @@ def test_set_udp():
_test_set(cl, 'udp')
+@mock.patch.object(random, 'random', lambda: -1)
+def test_set_tcp():
+ """TCPStatsClient.set works."""
+ cl = _tcp_client()
+ _test_set(cl, 'tcp')
+
+
def _test_timing(cl, proto):
cl.timing('foo', 100)
_sock_check(cl._sock, 1, proto, 'foo:100|ms')
@@ -277,6 +346,13 @@ def test_timing_udp():
_test_timing(cl, 'udp')
+@mock.patch.object(random, 'random', lambda: -1)
+def test_timing_tcp():
+ """TCPStatsClient.timing works."""
+ cl = _tcp_client()
+ _test_timing(cl, 'tcp')
+
+
def _test_prepare(cl, proto):
tests = (
('foo:1|c', ('foo', '1|c', 1)),
@@ -299,6 +375,13 @@ def test_prepare_udp():
_test_prepare(cl, 'udp')
+@mock.patch.object(random, 'random', lambda: -1)
+def test_prepare_tcp():
+ """Test TCPStatsClient._prepare method."""
+ cl = _tcp_client()
+ _test_prepare(cl, 'tcp')
+
+
def _test_prefix(cl, proto):
cl.incr('bar')
_sock_check(cl._sock, 1, proto, 'foo.bar:1|c')
@@ -311,6 +394,13 @@ def test_prefix_udp():
_test_prefix(cl, 'udp')
+@mock.patch.object(random, 'random', lambda: -1)
+def test_prefix_tcp():
+ """TCPStatsClient.incr works."""
+ cl = _tcp_client(prefix='foo')
+ _test_prefix(cl, 'tcp')
+
+
def _test_timer_manager(cl, proto):
with cl.timer('foo'):
pass
@@ -324,6 +414,12 @@ def test_timer_manager_udp():
_test_timer_manager(cl, 'udp')
+def test_timer_manager_tcp():
+ """TCPStatsClient.timer can be used as manager."""
+ cl = _tcp_client()
+ _test_timer_manager(cl, 'tcp')
+
+
def _test_timer_decorator(cl, proto):
@cl.timer('foo')
def foo(a, b):
@@ -351,6 +447,12 @@ def test_timer_decorator_udp():
_test_timer_decorator(cl, 'udp')
+def test_timer_decorator_tcp():
+ """StatsClient.timer is a thread-safe decorator (TCP)."""
+ cl = _tcp_client()
+ _test_timer_decorator(cl, 'tcp')
+
+
def _test_timer_capture(cl, proto):
with cl.timer('woo') as result:
eq_(result.ms, None)
@@ -363,6 +465,12 @@ def test_timer_capture_udp():
_test_timer_capture(cl, 'udp')
+def test_timer_capture_tcp():
+ """You can capture the output of StatsClient.timer (TCP)."""
+ cl = _tcp_client()
+ _test_timer_capture(cl, 'tcp')
+
+
def _test_timer_context_rate(cl, proto):
with cl.timer('foo', rate=0.5):
pass
@@ -377,6 +485,13 @@ def test_timer_context_rate_udp():
_test_timer_context_rate(cl, 'udp')
+@mock.patch.object(random, 'random', lambda: -1)
+def test_timer_context_rate_tcp():
+ """TCPStatsClient.timer can be used as manager with rate."""
+ cl = _tcp_client()
+ _test_timer_context_rate(cl, 'tcp')
+
+
def _test_timer_decorator_rate(cl, proto):
@cl.timer('foo', rate=0.1)
def foo(a, b):
@@ -400,6 +515,13 @@ def test_timer_decorator_rate_udp():
_test_timer_decorator_rate(cl, 'udp')
+@mock.patch.object(random, 'random', lambda: -1)
+def test_timer_decorator_rate_tcp():
+ """TCPStatsClient.timer can be used as decorator with rate."""
+ cl = _tcp_client()
+ _test_timer_decorator_rate(cl, 'tcp')
+
+
def _test_timer_context_exceptions(cl, proto):
with assert_raises(socket.timeout):
with cl.timer('foo'):
@@ -409,11 +531,17 @@ def _test_timer_context_exceptions(cl, proto):
def test_timer_context_exceptions_udp():
- """Exceptions within a managed block should get logged and propagate."""
+ """Exceptions within a managed block should get logged and propagate (UDP)."""
cl = _udp_client()
_test_timer_context_exceptions(cl, 'udp')
+def test_timer_context_exceptions_tcp():
+ """Exceptions within a managed block should get logged and propagate (TCP)."""
+ cl = _tcp_client()
+ _test_timer_context_exceptions(cl, 'tcp')
+
+
def _test_timer_decorator_exceptions(cl, proto):
@cl.timer('foo')
def foo():
@@ -426,11 +554,17 @@ def _test_timer_decorator_exceptions(cl, proto):
def test_timer_decorator_exceptions_udp():
- """Exceptions from wrapped methods should get logged and propagate."""
+ """Exceptions from wrapped methods should get logged and propagate (UDP)."""
cl = _udp_client()
_test_timer_decorator_exceptions(cl, 'udp')
+def test_timer_decorator_exceptions_tcp():
+ """Exceptions from wrapped methods should get logged and propagate (TCP)."""
+ cl = _tcp_client()
+ _test_timer_decorator_exceptions(cl, 'tcp')
+
+
def _test_timer_object(cl, proto):
t = cl.timer('foo').start()
t.stop()
@@ -444,6 +578,12 @@ def test_timer_object_udp():
_test_timer_object(cl, 'udp')
+def test_timer_object_tcp():
+ """TCPStatsClient.timer works."""
+ cl = _tcp_client()
+ _test_timer_object(cl, 'tcp')
+
+
def _test_timer_object_no_send(cl, proto):
t = cl.timer('foo').start()
t.stop(send=False)
@@ -459,6 +599,12 @@ def test_timer_object_no_send_udp():
_test_timer_object_no_send(cl, 'udp')
+def test_timer_object_no_send_tcp():
+ """Stop TCPStatsClient.timer without sending."""
+ cl = _tcp_client()
+ _test_timer_object_no_send(cl, 'tcp')
+
+
def _test_timer_object_rate(cl, proto):
t = cl.timer('foo', rate=0.5)
t.start()
@@ -474,6 +620,13 @@ def test_timer_object_rate_udp():
_test_timer_object_rate(cl, 'udp')
+@mock.patch.object(random, 'random', lambda: -1)
+def test_timer_object_rate_tcp():
+ """TCPStatsClient.timer works with rate."""
+ cl = _tcp_client()
+ _test_timer_object_rate(cl, 'tcp')
+
+
def _test_timer_object_no_send_twice(cl):
t = cl.timer('foo').start()
t.stop()
@@ -488,6 +641,12 @@ def test_timer_object_no_send_twice_udp():
_test_timer_object_no_send_twice(cl)
+def test_timer_object_no_send_twice_tcp():
+ """TCPStatsClient.timer raises RuntimeError if send is called twice."""
+ cl = _tcp_client()
+ _test_timer_object_no_send_twice(cl)
+
+
def _test_timer_send_without_stop(cl):
with cl.timer('foo') as t:
assert t.ms is None
@@ -506,6 +665,12 @@ def test_timer_send_without_stop_udp():
_test_timer_send_without_stop(cl)
+def test_timer_send_without_stop_tcp():
+ """TCPStatsClient.timer raises error if send is called before stop."""
+ cl = _tcp_client()
+ _test_timer_send_without_stop(cl)
+
+
def _test_timer_object_stop_without_start(cl):
with assert_raises(RuntimeError):
cl.timer('foo').stop()
@@ -517,6 +682,12 @@ def test_timer_object_stop_without_start_udp():
_test_timer_object_stop_without_start(cl)
+def test_timer_object_stop_without_start_tcp():
+ """TCPStatsClient.timer raises error if stop is called before start."""
+ cl = _tcp_client()
+ _test_timer_object_stop_without_start(cl)
+
+
def _test_pipeline(cl, proto):
pipe = cl.pipeline()
pipe.incr('foo')
@@ -532,6 +703,12 @@ def test_pipeline_udp():
_test_pipeline(cl, 'udp')
+def test_pipeline_tcp():
+ """TCPStatsClient.pipeline works."""
+ cl = _tcp_client()
+ _test_pipeline(cl, 'tcp')
+
+
def _test_pipeline_null(cl, proto):
pipe = cl.pipeline()
pipe.send()
@@ -544,6 +721,12 @@ def test_pipeline_null_udp():
_test_pipeline_null(cl, 'udp')
+def test_pipeline_null_tcp():
+ """Ensure we don't error on an empty pipeline (TCP)."""
+ cl = _tcp_client()
+ _test_pipeline_null(cl, 'tcp')
+
+
def _test_pipeline_manager(cl, proto):
with cl.pipeline() as pipe:
pipe.incr('foo')
@@ -558,6 +741,12 @@ def test_pipeline_manager_udp():
_test_pipeline_manager(cl, 'udp')
+def test_pipeline_manager_tcp():
+ """TCPStatsClient.pipeline can be used as manager."""
+ cl = _tcp_client()
+ _test_pipeline_manager(cl, 'tcp')
+
+
def _test_pipeline_timer_manager(cl, proto):
with cl.pipeline() as pipe:
with pipe.timer('foo'):
@@ -571,6 +760,12 @@ def test_pipeline_timer_manager_udp():
_test_pipeline_timer_manager(cl, 'udp')
+def test_pipeline_timer_manager_tcp():
+ """Timer manager can be retrieve from TCP Pipeline manager."""
+ cl = _tcp_client()
+ _test_pipeline_timer_manager(cl, 'tcp')
+
+
def _test_pipeline_timer_decorator(cl, proto):
with cl.pipeline() as pipe:
@pipe.timer('foo')
@@ -586,6 +781,12 @@ def test_pipeline_timer_decorator_udp():
_test_pipeline_timer_decorator(cl, 'udp')
+def test_pipeline_timer_decorator_tcp():
+ """TCP Pipeline manager can be used as decorator."""
+ cl = _tcp_client()
+ _test_pipeline_timer_decorator(cl, 'tcp')
+
+
def _test_pipeline_timer_object(cl, proto):
with cl.pipeline() as pipe:
t = pipe.timer('foo').start()
@@ -600,6 +801,12 @@ def test_pipeline_timer_object_udp():
_test_pipeline_timer_object(cl, 'udp')
+def test_pipeline_timer_object_tcp():
+ """Timer from TCP Pipeline manager works."""
+ cl = _tcp_client()
+ _test_pipeline_timer_object(cl, 'tcp')
+
+
def _test_pipeline_empty(cl):
with cl.pipeline() as pipe:
pipe.incr('foo')
@@ -613,6 +820,12 @@ def test_pipeline_empty_udp():
_test_pipeline_empty(cl)
+def test_pipeline_empty_tcp():
+ """Pipelines should be empty after a send() call (TCP)."""
+ cl = _tcp_client()
+ _test_pipeline_empty(cl)
+
+
def _test_pipeline_negative_absolute_gauge(cl, proto):
with cl.pipeline() as pipe:
pipe.gauge('foo', -10, delta=False)
@@ -626,6 +839,12 @@ def test_pipeline_negative_absolute_gauge_udp():
_test_pipeline_negative_absolute_gauge(cl, 'udp')
+def test_pipeline_negative_absolute_gauge_tcp():
+ """Negative absolute gauges use an internal pipeline (TCP)."""
+ cl = _tcp_client()
+ _test_pipeline_negative_absolute_gauge(cl, 'tcp')
+
+
def _test_big_numbers(cl, proto):
num = 1234568901234
result = 'foo:1234568901234|%s'
@@ -651,6 +870,12 @@ def test_big_numbers_udp():
_test_big_numbers(cl, 'udp')
+def test_big_numbers_tcp():
+ """Test big numbers with TCP client."""
+ cl = _tcp_client()
+ _test_big_numbers(cl, 'tcp')
+
+
def _test_rate_no_send(cl, proto):
cl.incr('foo', rate=0.5)
_sock_check(cl._sock, 0, proto)
@@ -663,6 +888,13 @@ def test_rate_no_send_udp():
_test_rate_no_send(cl, 'udp')
+@mock.patch.object(random, 'random', lambda: 2)
+def test_rate_no_send_tcp():
+ """Rate below random value prevents sending with TCPStatsClient.incr."""
+ cl = _tcp_client()
+ _test_rate_no_send(cl, 'tcp')
+
+
def test_socket_error():
"""Socket error on StatsClient should be ignored."""
cl = _udp_client()
@@ -682,3 +914,25 @@ def test_pipeline_packet_size():
eq_(2, sc._sock.sendto.call_count)
assert len(sc._sock.sendto.call_args_list[0][0][0]) <= 512
assert len(sc._sock.sendto.call_args_list[1][0][0]) <= 512
+
+
+@mock.patch.object(socket, 'socket')
+def test_tcp_raises_exception_to_user(mock_socket):
+ """Socket errors in TCPStatsClient should be raised to user."""
+ addr = ('127.0.0.1', 1234)
+ cl = _tcp_client(addr=addr[0], port=addr[1])
+ cl.incr('foo')
+ cl._sock.sendall.assert_called_once()
+ cl._sock.sendall.side_effect = socket.error
+ with assert_raises(socket.error):
+ cl.incr('foo')
+
+
+@mock.patch.object(socket, 'socket')
+def test_tcp_timeout(mock_socket):
+ """Timeout on TCPStatsClient should be set on socket."""
+ test_timeout = 321
+ cl = TCPStatsClient(timeout=test_timeout)
+ cl.incr('foo')
+ cl._sock.settimeout.assert_called_once()
+ cl._sock.settimeout.assert_called_with(test_timeout)