import asyncio import functools import random import re import socket from datetime import timedelta from unittest import SkipTest, mock from statsd import StatsClient from statsd import TCPStatsClient from statsd import UnixSocketStatsClient ADDR = (socket.gethostbyname('localhost'), 8125) UNIX_SOCKET = 'tmp.socket' # proto specific methods to get the socket method to send data send_method = { 'udp': lambda x: x.sendto, 'tcp': lambda x: x.sendall, 'unix': lambda x: x.sendall, } # proto specific methods to create the expected value make_val = { 'udp': lambda x, addr: mock.call(str.encode(x), addr), 'tcp': lambda x, addr: mock.call(str.encode(x + '\n')), 'unix': lambda x, addr: mock.call(str.encode(x + '\n')), } def eq_(a, b, msg=None): assert a == b, msg def _udp_client(prefix=None, addr=None, port=None, ipv6=False): if not addr: addr = ADDR[0] if not port: port = ADDR[1] sc = StatsClient(host=addr, port=port, prefix=prefix, ipv6=ipv6) sc._sock = mock.Mock() return sc def _tcp_client(prefix=None, addr=None, port=None, timeout=None, ipv6=False): if not addr: addr = ADDR[0] if not port: port = ADDR[1] sc = TCPStatsClient(host=addr, port=port, prefix=prefix, timeout=timeout, ipv6=ipv6) sc._sock = mock.Mock() return sc def _unix_socket_client(prefix=None, socket_path=None): if not socket_path: socket_path = UNIX_SOCKET sc = UnixSocketStatsClient(socket_path=socket_path, prefix=prefix) sc._sock = mock.Mock() return sc def _timer_check(sock, count, proto, start, end): send = send_method[proto](sock) eq_(send.call_count, count) value = send.call_args[0][0].decode('ascii') exp = re.compile(r'^{}:\d+|{}$'.format(start, end)) assert exp.match(value) def _sock_check(sock, count, proto, val=None, addr=None): send = send_method[proto](sock) eq_(send.call_count, count) if not addr: addr = ADDR if val is not None: eq_( send.call_args, make_val[proto](val, addr), ) class assert_raises: """A context manager that asserts a given exception was raised. >>> with assert_raises(TypeError): ... raise TypeError >>> with assert_raises(TypeError): ... raise ValueError AssertionError: ValueError not in ['TypeError'] >>> with assert_raises(TypeError): ... pass AssertionError: No exception raised. Or you can specify any of a number of exceptions: >>> with assert_raises(TypeError, ValueError): ... raise ValueError >>> with assert_raises(TypeError, ValueError): ... raise KeyError AssertionError: KeyError not in ['TypeError', 'ValueError'] You can also get the exception back later: >>> with assert_raises(TypeError) as cm: ... raise TypeError('bad type!') >>> cm.exception TypeError('bad type!') >>> cm.exc_type TypeError >>> cm.traceback Lowercase name because that it's a class is an implementation detail. """ def __init__(self, *exc_cls): self.exc_cls = exc_cls def __enter__(self): # For access to the exception later. return self def __exit__(self, typ, value, tb): assert typ, 'No exception raised.' assert typ in self.exc_cls, '{} not in {}'.format( typ.__name__, [e.__name__ for e in self.exc_cls]) self.exc_type = typ self.exception = value self.traceback = tb # Swallow expected exceptions. return True def _test_incr(cl, proto): cl.incr('foo') _sock_check(cl._sock, 1, proto, val='foo:1|c') cl.incr('foo', 10) _sock_check(cl._sock, 2, proto, val='foo:10|c') cl.incr('foo', 1.2) _sock_check(cl._sock, 3, proto, val='foo:1.2|c') cl.incr('foo', 10, rate=0.5) _sock_check(cl._sock, 4, proto, val='foo:10|c|@0.5') @mock.patch.object(random, 'random', lambda: -1) def test_incr_udp(): """StatsClient.incr works.""" cl = _udp_client() _test_incr(cl, 'udp') @mock.patch.object(random, 'random', lambda: -1) def test_incr_tcp(): """TCPStatsClient.incr works.""" cl = _tcp_client() _test_incr(cl, 'tcp') @mock.patch.object(random, 'random', lambda: -1) def test_incr_unix_socket(): """TCPStatsClient.incr works.""" cl = _unix_socket_client() _test_incr(cl, 'unix') def _test_decr(cl, proto): cl.decr('foo') _sock_check(cl._sock, 1, proto, 'foo:-1|c') cl.decr('foo', 10) _sock_check(cl._sock, 2, proto, 'foo:-10|c') cl.decr('foo', 1.2) _sock_check(cl._sock, 3, proto, 'foo:-1.2|c') cl.decr('foo', 1, rate=0.5) _sock_check(cl._sock, 4, proto, 'foo:-1|c|@0.5') @mock.patch.object(random, 'random', lambda: -1) def test_decr_udp(): """StatsClient.decr works.""" cl = _udp_client() _test_decr(cl, 'udp') @mock.patch.object(random, 'random', lambda: -1) def test_decr_tcp(): """TCPStatsClient.decr works.""" cl = _tcp_client() _test_decr(cl, 'tcp') @mock.patch.object(random, 'random', lambda: -1) def test_decr_unix_socket(): """TCPStatsClient.decr works.""" cl = _unix_socket_client() _test_decr(cl, 'unix') def _test_gauge(cl, proto): cl.gauge('foo', 30) _sock_check(cl._sock, 1, proto, 'foo:30|g') cl.gauge('foo', 1.2) _sock_check(cl._sock, 2, proto, 'foo:1.2|g') cl.gauge('foo', 70, rate=0.5) _sock_check(cl._sock, 3, proto, 'foo:70|g|@0.5') @mock.patch.object(random, 'random', lambda: -1) def test_gauge_udp(): """StatsClient.gauge works.""" cl = _udp_client() _test_gauge(cl, 'udp') @mock.patch.object(random, 'random', lambda: -1) def test_gauge_tcp(): """TCPStatsClient.gauge works.""" cl = _tcp_client() _test_gauge(cl, 'tcp') @mock.patch.object(random, 'random', lambda: -1) def test_gauge_unix_socket(): """TCPStatsClient.decr works.""" cl = _unix_socket_client() _test_gauge(cl, 'unix') def _test_ipv6(cl, proto, addr): cl.gauge('foo', 30) _sock_check(cl._sock, 1, proto, 'foo:30|g', addr=addr) def test_ipv6_udp(): """StatsClient can use to IPv6 address.""" addr = ('::1', 8125, 0, 0) cl = _udp_client(addr=addr[0], ipv6=True) _test_ipv6(cl, 'udp', addr) def test_ipv6_tcp(): """TCPStatsClient can use to IPv6 address.""" addr = ('::1', 8125, 0, 0) cl = _tcp_client(addr=addr[0], ipv6=True) _test_ipv6(cl, 'tcp', addr) def _test_resolution(cl, proto, addr): cl.incr('foo') _sock_check(cl._sock, 1, proto, 'foo:1|c', addr=addr) def test_ipv6_resolution_udp(): raise SkipTest('IPv6 resolution is broken on Travis') cl = _udp_client(addr='localhost', ipv6=True) _test_resolution(cl, 'udp', ('::1', 8125, 0, 0)) def test_ipv6_resolution_tcp(): cl = _tcp_client(addr='localhost', ipv6=True) _test_resolution(cl, 'tcp', ('::1', 8125, 0, 0)) def test_ipv4_resolution_udp(): cl = _udp_client(addr='localhost') _test_resolution(cl, 'udp', ('127.0.0.1', 8125)) def test_ipv4_resolution_tcp(): cl = _tcp_client(addr='localhost') _test_resolution(cl, 'tcp', ('127.0.0.1', 8125)) def _test_gauge_delta(cl, proto): tests = ( (12, '+12'), (-13, '-13'), (1.2, '+1.2'), (-1.3, '-1.3'), ) def _check(num, result): cl._sock.reset_mock() cl.gauge('foo', num, delta=True) _sock_check(cl._sock, 1, proto, 'foo:%s|g' % result) for num, result in tests: _check(num, result) @mock.patch.object(random, 'random', lambda: -1) def test_gauge_delta_udp(): """StatsClient.gauge works with delta values.""" cl = _udp_client() _test_gauge_delta(cl, 'udp') @mock.patch.object(random, 'random', lambda: -1) def test_gauge_delta_tcp(): """TCPStatsClient.gauge works with delta values.""" cl = _tcp_client() _test_gauge_delta(cl, 'tcp') def _test_gauge_absolute_negative(cl, proto): cl.gauge('foo', -5, delta=False) _sock_check(cl._sock, 1, 'foo:0|g\nfoo:-5|g') @mock.patch.object(random, 'random', lambda: -1) def test_gauge_absolute_negative_udp(): """StatsClient.gauge works with absolute negative value.""" cl = _udp_client() _test_gauge_delta(cl, 'udp') @mock.patch.object(random, 'random', lambda: -1) def test_gauge_absolute_negative_tcp(): """TCPStatsClient.gauge works with absolute negative value.""" cl = _tcp_client() _test_gauge_delta(cl, 'tcp') def _test_gauge_absolute_negative_rate(cl, proto, mock_random): mock_random.return_value = -1 cl.gauge('foo', -1, rate=0.5, delta=False) _sock_check(cl._sock, 1, proto, 'foo:0|g\nfoo:-1|g') mock_random.return_value = 2 cl.gauge('foo', -2, rate=0.5, delta=False) # Should not have changed. _sock_check(cl._sock, 1, proto, 'foo:0|g\nfoo:-1|g') @mock.patch.object(random, 'random') def test_gauge_absolute_negative_rate_udp(mock_random): """StatsClient.gauge works with absolute negative value and rate.""" cl = _udp_client() _test_gauge_absolute_negative_rate(cl, 'udp', mock_random) @mock.patch.object(random, 'random') def test_gauge_absolute_negative_rate_tcp(mock_random): """TCPStatsClient.gauge works with absolute negative value and rate.""" cl = _tcp_client() _test_gauge_absolute_negative_rate(cl, 'tcp', mock_random) def _test_set(cl, proto): cl.set('foo', 10) _sock_check(cl._sock, 1, proto, 'foo:10|s') cl.set('foo', 2.3) _sock_check(cl._sock, 2, proto, 'foo:2.3|s') cl.set('foo', 'bar') _sock_check(cl._sock, 3, proto, 'foo:bar|s') cl.set('foo', 2.3, 0.5) _sock_check(cl._sock, 4, proto, 'foo:2.3|s|@0.5') @mock.patch.object(random, 'random', lambda: -1) def test_set_udp(): """StatsClient.set works.""" cl = _udp_client() _test_set(cl, 'udp') @mock.patch.object(random, 'random', lambda: -1) def test_set_tcp(): """TCPStatsClient.set works.""" cl = _tcp_client() _test_set(cl, 'tcp') def _test_timing(cl, proto): cl.timing('foo', 100) _sock_check(cl._sock, 1, proto, 'foo:100.000000|ms') cl.timing('foo', 350) _sock_check(cl._sock, 2, proto, 'foo:350.000000|ms') cl.timing('foo', 100, rate=0.5) _sock_check(cl._sock, 3, proto, 'foo:100.000000|ms|@0.5') @mock.patch.object(random, 'random', lambda: -1) def test_timing_udp(): """StatsClient.timing works.""" cl = _udp_client() _test_timing(cl, 'udp') @mock.patch.object(random, 'random', lambda: -1) def test_timing_tcp(): """TCPStatsClient.timing works.""" cl = _tcp_client() _test_timing(cl, 'tcp') def test_timing_supports_timedelta(): cl = _udp_client() proto = 'udp' cl.timing('foo', timedelta(seconds=1.5)) _sock_check(cl._sock, 1, proto, 'foo:1500.000000|ms') cl.timing('foo', timedelta(days=1.5)) _sock_check(cl._sock, 2, proto, 'foo:129600000.000000|ms') @mock.patch.object(random, 'random', lambda: -1) def test_timing_unix_socket(): """UnixSocketStatsClient.timing works.""" cl = _unix_socket_client() _test_timing(cl, 'unix') def _test_prepare(cl, proto): tests = ( ('foo:1|c', ('foo', '1|c', 1)), ('bar:50|ms|@0.5', ('bar', '50|ms', 0.5)), ('baz:23|g', ('baz', '23|g', 1)), ) def _check(o, s, v, r): with mock.patch.object(random, 'random', lambda: -1): eq_(o, cl._prepare(s, v, r)) for o, (s, v, r) in tests: _check(o, s, v, r) @mock.patch.object(random, 'random', lambda: -1) def test_prepare_udp(): """Test StatsClient._prepare method.""" cl = _udp_client() _test_prepare(cl, 'udp') @mock.patch.object(random, 'random', lambda: -1) def test_prepare_tcp(): """Test TCPStatsClient._prepare method.""" cl = _tcp_client() _test_prepare(cl, 'tcp') def _test_prefix(cl, proto): cl.incr('bar') _sock_check(cl._sock, 1, proto, 'foo.bar:1|c') @mock.patch.object(random, 'random', lambda: -1) def test_prefix_udp(): """StatsClient.incr works.""" cl = _udp_client(prefix='foo') _test_prefix(cl, 'udp') @mock.patch.object(random, 'random', lambda: -1) def test_prefix_tcp(): """TCPStatsClient.incr works.""" cl = _tcp_client(prefix='foo') _test_prefix(cl, 'tcp') @mock.patch.object(random, 'random', lambda: -1) def test_prefix_unix_socket(): """UnixSocketStatsClient.incr works.""" cl = _unix_socket_client(prefix='foo') _test_prefix(cl, 'unix') def _test_timer_manager(cl, proto): with cl.timer('foo'): pass _timer_check(cl._sock, 1, proto, 'foo', 'ms') def test_timer_manager_udp(): """StatsClient.timer can be used as manager.""" cl = _udp_client() _test_timer_manager(cl, 'udp') def test_timer_manager_tcp(): """TCPStatsClient.timer can be used as manager.""" cl = _tcp_client() _test_timer_manager(cl, 'tcp') def _test_timer_decorator(cl, proto): @cl.timer('foo') def foo(a, b): return [a, b] @cl.timer('bar') def bar(a, b): return [b, a] # make sure it works with more than one decorator, called multiple # times, and that parameters are handled correctly eq_([4, 2], foo(4, 2)) _timer_check(cl._sock, 1, proto, 'foo', 'ms') eq_([2, 4], bar(4, 2)) _timer_check(cl._sock, 2, proto, 'bar', 'ms') eq_([6, 5], bar(5, 6)) _timer_check(cl._sock, 3, proto, 'bar', 'ms') def test_timer_decorator_udp(): """StatsClient.timer is a thread-safe decorator (UDP).""" cl = _udp_client() _test_timer_decorator(cl, 'udp') def test_timer_decorator_tcp(): """StatsClient.timer is a thread-safe decorator (TCP).""" cl = _tcp_client() _test_timer_decorator(cl, 'tcp') def _test_timer_capture(cl, proto): with cl.timer('woo') as result: eq_(result.ms, None) assert isinstance(result.ms, float) def test_timer_capture_udp(): """You can capture the output of StatsClient.timer (UDP).""" cl = _udp_client() _test_timer_capture(cl, 'udp') def test_timer_capture_tcp(): """You can capture the output of StatsClient.timer (TCP).""" cl = _tcp_client() _test_timer_capture(cl, 'tcp') def _test_timer_context_rate(cl, proto): with cl.timer('foo', rate=0.5): pass _timer_check(cl._sock, 1, proto, 'foo', 'ms|@0.5') @mock.patch.object(random, 'random', lambda: -1) def test_timer_context_rate_udp(): """StatsClient.timer can be used as manager with rate.""" cl = _udp_client() _test_timer_context_rate(cl, 'udp') @mock.patch.object(random, 'random', lambda: -1) def test_timer_context_rate_tcp(): """TCPStatsClient.timer can be used as manager with rate.""" cl = _tcp_client() _test_timer_context_rate(cl, 'tcp') def test_timer_decorator_partial_function(): """TCPStatsClient.timer can be used as decorator on a partial function.""" cl = _tcp_client() foo = functools.partial(lambda x: x * x, 2) func = cl.timer('foo')(foo) eq_(4, func()) _timer_check(cl._sock, 1, 'tcp', 'foo', 'ms|@0.1') def _test_timer_decorator_rate(cl, proto): @cl.timer('foo', rate=0.1) def foo(a, b): return [b, a] @cl.timer('bar', rate=0.2) def bar(a, b=2, c=3): return [c, b, a] eq_([2, 4], foo(4, 2)) _timer_check(cl._sock, 1, proto, 'foo', 'ms|@0.1') eq_([3, 2, 5], bar(5)) _timer_check(cl._sock, 2, proto, 'bar', 'ms|@0.2') @mock.patch.object(random, 'random', lambda: -1) def test_timer_decorator_rate_udp(): """StatsClient.timer can be used as decorator with rate.""" cl = _udp_client() _test_timer_decorator_rate(cl, 'udp') @mock.patch.object(random, 'random', lambda: -1) def test_timer_decorator_rate_tcp(): """TCPStatsClient.timer can be used as decorator with rate.""" cl = _tcp_client() _test_timer_decorator_rate(cl, 'tcp') def _test_timer_context_exceptions(cl, proto): with assert_raises(socket.timeout): with cl.timer('foo'): raise socket.timeout() _timer_check(cl._sock, 1, proto, 'foo', 'ms') def test_timer_context_exceptions_udp(): cl = _udp_client() _test_timer_context_exceptions(cl, 'udp') def test_timer_context_exceptions_tcp(): cl = _tcp_client() _test_timer_context_exceptions(cl, 'tcp') def _test_timer_decorator_exceptions(cl, proto): @cl.timer('foo') def foo(): raise ValueError() with assert_raises(ValueError): foo() _timer_check(cl._sock, 1, proto, 'foo', 'ms') def test_coroutine_timer_decorator(): event_loop = asyncio.new_event_loop() asyncio.set_event_loop(event_loop) already = False cl = _udp_client() def _send(*_): nonlocal already assert already is True, '_send called before coroutine completed' cl._send = _send @cl.timer('bar') async def inner(): nonlocal already await asyncio.sleep(0) already = True return None event_loop.run_until_complete(inner()) event_loop.close() def test_timer_decorator_exceptions_udp(): cl = _udp_client() _test_timer_decorator_exceptions(cl, 'udp') def test_timer_decorator_exceptions_tcp(): cl = _tcp_client() _test_timer_decorator_exceptions(cl, 'tcp') def _test_timer_object(cl, proto): t = cl.timer('foo').start() t.stop() _timer_check(cl._sock, 1, proto, 'foo', 'ms') def test_timer_object_udp(): """StatsClient.timer works.""" cl = _udp_client() _test_timer_object(cl, 'udp') def test_timer_object_tcp(): """TCPStatsClient.timer works.""" cl = _tcp_client() _test_timer_object(cl, 'tcp') def _test_timer_object_no_send(cl, proto): t = cl.timer('foo').start() t.stop(send=False) _sock_check(cl._sock, 0, proto) t.send() _timer_check(cl._sock, 1, proto, 'foo', 'ms') def test_timer_object_no_send_udp(): """Stop StatsClient.timer without sending.""" cl = _udp_client() _test_timer_object_no_send(cl, 'udp') def test_timer_object_no_send_tcp(): """Stop TCPStatsClient.timer without sending.""" cl = _tcp_client() _test_timer_object_no_send(cl, 'tcp') def _test_timer_object_rate(cl, proto): t = cl.timer('foo', rate=0.5) t.start() t.stop() _timer_check(cl._sock, 1, proto, 'foo', 'ms@0.5') @mock.patch.object(random, 'random', lambda: -1) def test_timer_object_rate_udp(): """StatsClient.timer works with rate.""" cl = _udp_client() _test_timer_object_rate(cl, 'udp') @mock.patch.object(random, 'random', lambda: -1) def test_timer_object_rate_tcp(): """TCPStatsClient.timer works with rate.""" cl = _tcp_client() _test_timer_object_rate(cl, 'tcp') def _test_timer_object_no_send_twice(cl): t = cl.timer('foo').start() t.stop() with assert_raises(RuntimeError): t.send() def test_timer_object_no_send_twice_udp(): """StatsClient.timer raises RuntimeError if send is called twice.""" cl = _udp_client() _test_timer_object_no_send_twice(cl) def test_timer_object_no_send_twice_tcp(): """TCPStatsClient.timer raises RuntimeError if send is called twice.""" cl = _tcp_client() _test_timer_object_no_send_twice(cl) def _test_timer_send_without_stop(cl): with cl.timer('foo') as t: assert t.ms is None with assert_raises(RuntimeError): t.send() t = cl.timer('bar').start() assert t.ms is None with assert_raises(RuntimeError): t.send() def test_timer_send_without_stop_udp(): """StatsClient.timer raises error if send is called before stop.""" cl = _udp_client() _test_timer_send_without_stop(cl) def test_timer_send_without_stop_tcp(): """TCPStatsClient.timer raises error if send is called before stop.""" cl = _tcp_client() _test_timer_send_without_stop(cl) def _test_timer_object_stop_without_start(cl): with assert_raises(RuntimeError): cl.timer('foo').stop() def test_timer_object_stop_without_start_udp(): """StatsClient.timer raises error if stop is called before start.""" cl = _udp_client() _test_timer_object_stop_without_start(cl) def test_timer_object_stop_without_start_tcp(): """TCPStatsClient.timer raises error if stop is called before start.""" cl = _tcp_client() _test_timer_object_stop_without_start(cl) def _test_pipeline(cl, proto): pipe = cl.pipeline() pipe.incr('foo') pipe.decr('bar') pipe.timing('baz', 320) pipe.send() _sock_check(cl._sock, 1, proto, 'foo:1|c\nbar:-1|c\nbaz:320.000000|ms') def test_pipeline_udp(): """StatsClient.pipeline works.""" cl = _udp_client() _test_pipeline(cl, 'udp') def test_pipeline_tcp(): """TCPStatsClient.pipeline works.""" cl = _tcp_client() _test_pipeline(cl, 'tcp') def _test_pipeline_null(cl, proto): pipe = cl.pipeline() pipe.send() _sock_check(cl._sock, 0, proto) def test_pipeline_null_udp(): """Ensure we don't error on an empty pipeline (UDP).""" cl = _udp_client() _test_pipeline_null(cl, 'udp') def test_pipeline_null_tcp(): """Ensure we don't error on an empty pipeline (TCP).""" cl = _tcp_client() _test_pipeline_null(cl, 'tcp') def _test_pipeline_manager(cl, proto): with cl.pipeline() as pipe: pipe.incr('foo') pipe.decr('bar') pipe.gauge('baz', 15) _sock_check(cl._sock, 1, proto, 'foo:1|c\nbar:-1|c\nbaz:15|g') def test_pipeline_manager_udp(): """StatsClient.pipeline can be used as manager.""" cl = _udp_client() _test_pipeline_manager(cl, 'udp') def test_pipeline_manager_tcp(): """TCPStatsClient.pipeline can be used as manager.""" cl = _tcp_client() _test_pipeline_manager(cl, 'tcp') def _test_pipeline_timer_manager(cl, proto): with cl.pipeline() as pipe: with pipe.timer('foo'): pass _timer_check(cl._sock, 1, proto, 'foo', 'ms') def test_pipeline_timer_manager_udp(): """Timer manager can be retrieve from UDP Pipeline manager.""" cl = _udp_client() _test_pipeline_timer_manager(cl, 'udp') def test_pipeline_timer_manager_tcp(): """Timer manager can be retrieve from TCP Pipeline manager.""" cl = _tcp_client() _test_pipeline_timer_manager(cl, 'tcp') def _test_pipeline_timer_decorator(cl, proto): with cl.pipeline() as pipe: @pipe.timer('foo') def foo(): pass foo() _timer_check(cl._sock, 1, proto, 'foo', 'ms') def test_pipeline_timer_decorator_udp(): """UDP Pipeline manager can be used as decorator.""" cl = _udp_client() _test_pipeline_timer_decorator(cl, 'udp') def test_pipeline_timer_decorator_tcp(): """TCP Pipeline manager can be used as decorator.""" cl = _tcp_client() _test_pipeline_timer_decorator(cl, 'tcp') def _test_pipeline_timer_object(cl, proto): with cl.pipeline() as pipe: t = pipe.timer('foo').start() t.stop() _sock_check(cl._sock, 0, proto) _timer_check(cl._sock, 1, proto, 'foo', 'ms') def test_pipeline_timer_object_udp(): """Timer from UDP Pipeline manager works.""" cl = _udp_client() _test_pipeline_timer_object(cl, 'udp') def test_pipeline_timer_object_tcp(): """Timer from TCP Pipeline manager works.""" cl = _tcp_client() _test_pipeline_timer_object(cl, 'tcp') def _test_pipeline_empty(cl): with cl.pipeline() as pipe: pipe.incr('foo') eq_(1, len(pipe._stats)) eq_(0, len(pipe._stats)) def test_pipeline_empty_udp(): """Pipelines should be empty after a send() call (UDP).""" cl = _udp_client() _test_pipeline_empty(cl) def test_pipeline_empty_tcp(): """Pipelines should be empty after a send() call (TCP).""" cl = _tcp_client() _test_pipeline_empty(cl) def _test_pipeline_negative_absolute_gauge(cl, proto): with cl.pipeline() as pipe: pipe.gauge('foo', -10, delta=False) pipe.incr('bar') _sock_check(cl._sock, 1, proto, 'foo:0|g\nfoo:-10|g\nbar:1|c') def test_pipeline_negative_absolute_gauge_udp(): """Negative absolute gauges use an internal pipeline (UDP).""" cl = _udp_client() _test_pipeline_negative_absolute_gauge(cl, 'udp') def test_pipeline_negative_absolute_gauge_tcp(): """Negative absolute gauges use an internal pipeline (TCP).""" cl = _tcp_client() _test_pipeline_negative_absolute_gauge(cl, 'tcp') def _test_big_numbers(cl, proto): num = 1234568901234 tests = ( # Explicitly create strings so we avoid the bug we're trying to test. ('gauge', 'foo:1234568901234|g'), ('incr', 'foo:1234568901234|c'), ('timing', 'foo:1234568901234.000000|ms'), ) def _check(method, result): cl._sock.reset_mock() getattr(cl, method)('foo', num) _sock_check(cl._sock, 1, proto, result) for method, result in tests: _check(method, result) def test_big_numbers_udp(): """Test big numbers with UDP client.""" cl = _udp_client() _test_big_numbers(cl, 'udp') def test_big_numbers_tcp(): """Test big numbers with TCP client.""" cl = _tcp_client() _test_big_numbers(cl, 'tcp') def _test_rate_no_send(cl, proto): cl.incr('foo', rate=0.5) _sock_check(cl._sock, 0, proto) @mock.patch.object(random, 'random', lambda: 2) def test_rate_no_send_udp(): """Rate below random value prevents sending with StatsClient.incr.""" cl = _udp_client() _test_rate_no_send(cl, 'udp') @mock.patch.object(random, 'random', lambda: 2) def test_rate_no_send_tcp(): """Rate below random value prevents sending with TCPStatsClient.incr.""" cl = _tcp_client() _test_rate_no_send(cl, 'tcp') def test_socket_error(): """Socket error on StatsClient should be ignored.""" cl = _udp_client() cl._sock.sendto.side_effect = socket.timeout() cl.incr('foo') _sock_check(cl._sock, 1, 'udp', 'foo:1|c') def test_pipeline_packet_size(): """Pipelines shouldn't send packets larger than 512 bytes (UDP only).""" sc = _udp_client() pipe = sc.pipeline() for x in range(32): # 32 * 16 = 512, so this will need 2 packets. pipe.incr('sixteen_char_str') pipe.send() eq_(2, sc._sock.sendto.call_count) assert len(sc._sock.sendto.call_args_list[0][0][0]) <= 512 assert len(sc._sock.sendto.call_args_list[1][0][0]) <= 512 @mock.patch.object(socket, 'socket') def test_tcp_raises_exception_to_user(mock_socket): """Socket errors in TCPStatsClient should be raised to user.""" addr = ('127.0.0.1', 1234) cl = _tcp_client(addr=addr[0], port=addr[1]) cl.incr('foo') eq_(1, cl._sock.sendall.call_count) cl._sock.sendall.side_effect = socket.error with assert_raises(socket.error): cl.incr('foo') @mock.patch.object(socket, 'socket') def test_tcp_timeout(mock_socket): """Timeout on TCPStatsClient should be set on socket.""" test_timeout = 321 cl = TCPStatsClient(timeout=test_timeout) cl.incr('foo') cl._sock.settimeout.assert_called_once_with(test_timeout) @mock.patch.object(socket, 'socket') def test_unix_socket_timeout(mock_socket): """Timeout on UnixSocketStatsClient should be set on socket.""" test_timeout = 321 cl = UnixSocketStatsClient(UNIX_SOCKET, timeout=test_timeout) cl.incr('foo') cl._sock.settimeout.assert_called_once_with(test_timeout)