Source code for perfmetrics.statsd

# -*- coding: utf-8 -*-
"""
Statsd client implementations.

"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import logging
import random
import socket

try:
    # Python 3
    from urllib.parse import urlsplit
    from urllib.parse import parse_qsl
    from urllib.parse import uses_query

    basestring = str

except ImportError: # pragma: no cover
    # Python 2
    from urlparse import urlsplit
    from urlparse import parse_qsl
    from urlparse import uses_query

from .interfaces import IStatsdClient
from .interfaces import implementer

logger = logging.getLogger(__name__)

__all__ = [
    'StatsdClient',
    'StatsdClientMod',
    'NullStatsdClient',
    'statsd_client_from_uri',
]

if 'statsd' not in uses_query:  # pragma: no cover
    uses_query.append('statsd')

[docs]def statsd_client_from_uri(uri): """ Create and return :class:`perfmetrics.statsd.StatsdClient`. A typical URI is ``statsd://localhost:8125``. An optional query parameter is ``prefix``. The default prefix is an empty string. """ parts = urlsplit(uri) if parts.scheme != 'statsd': raise ValueError("URI scheme not supported: %s" % uri) kw = {} if parts.query: kw.update(parse_qsl(parts.query)) return StatsdClient(parts.hostname, parts.port, **kw)
[docs]@implementer(IStatsdClient) class StatsdClient(object): """ Send packets to statsd. Default implementation of :class:`perfmetrics.interfaces.IStatsdClient`. Derived from statsd.py by Steve Ivy <steveivy@gmail.com>. """ def __init__(self, host='localhost', port=8125, prefix=''): # Resolve the host name early. info = socket.getaddrinfo(host, int(port), 0, socket.SOCK_DGRAM) family, socktype, proto, _canonname, addr = info[0] self.addr = addr self.log = logger self.udp_sock = socket.socket(family, socktype, proto) self.random = random.random # Testing hook if prefix and not prefix.endswith('.'): prefix = prefix + '.' self.prefix = prefix
[docs] def close(self): """ See :meth:`perfmetrics.interfaces.IStatsdClient.close`. .. versionadded:: 3.0 """ if self.udp_sock: self.udp_sock.close() self.udp_sock = None
[docs] def timing(self, stat, value, rate=1, buf=None, rate_applied=False): """ See :meth:`perfmetrics.interfaces.IStatsdClient.timing`. """ if rate >= 1 or rate_applied or self.random() < rate: s = '%s%s:%d|ms' % (self.prefix, stat, value) if buf is None: self._send(s) else: buf.append(s)
[docs] def gauge(self, stat, value, rate=1, buf=None, rate_applied=False): """ See :meth:`perfmetrics.interfaces.IStatsdClient.gauge`. """ if rate >= 1 or rate_applied or self.random() < rate: s = '%s%s:%s|g' % (self.prefix, stat, value) if buf is None: self._send(s) else: buf.append(s)
[docs] def incr(self, stat, count=1, rate=1, buf=None, rate_applied=False): """ See :meth:`perfmetrics.interfaces.IStatsdClient.incr`. """ if rate >= 1: s = '%s%s:%s|c' % (self.prefix, stat, count) elif rate_applied or self.random() < rate: s = '%s%s:%s|c|@%s' % (self.prefix, stat, count, rate) else: return if buf is None: self._send(s) else: buf.append(s)
[docs] def decr(self, stat, count=1, rate=1, buf=None, rate_applied=False): """ See :meth:`perfmetrics.interfaces.IStatsdClient.decr`. """ self.incr(stat, -count, rate=rate, buf=buf, rate_applied=rate_applied)
[docs] def set_add(self, stat, value, rate=1, buf=None, rate_applied=False): """ See :meth:`perfmetrics.interfaces.IStatsdClient.set_add`. """ if rate >= 1 or rate_applied or self.random() < rate: s = '%s%s:%s|s' % (self.prefix, stat, value) if buf is None: self._send(s) else: buf.append(s)
def _send(self, data): """Send a UDP packet containing a string.""" try: self.udp_sock.sendto(data.encode('ascii'), self.addr) except IOError: self.log.exception("Failed to send UDP packet")
[docs] def sendbuf(self, buf): """ See :meth:`perfmetrics.interfaces.IStatsdClient.sendbuf`. """ if buf: self._send('\n'.join(buf))
[docs]@implementer(IStatsdClient) class StatsdClientMod(object): """ Wrap `StatsdClient`, modifying all stat names in context. .. versionchanged:: 3.0 The wrapped object's attributes are now accessible on this object. This object now uses ``__slots__``. """ __slots__ = ( '_wrapped', 'format', ) def __init__(self, wrapped, format): self._wrapped = wrapped self.format = format def close(self): self._wrapped.close() def __getattr__(self, name): return getattr(self._wrapped, name) def __setattr__(self, name, value): if name in self.__slots__: object.__setattr__(self, name, value) else: setattr(self._wrapped, name, value) def timing(self, stat, *args, **kw): self._wrapped.timing(self.format % stat, *args, **kw) def gauge(self, stat, *args, **kw): self._wrapped.gauge(self.format % stat, *args, **kw) def incr(self, stat, *args, **kw): self._wrapped.incr(self.format % stat, *args, **kw) def decr(self, stat, *args, **kw): self._wrapped.decr(self.format % stat, *args, **kw) def set_add(self, stat, *args, **kw): self._wrapped.set_add(self.format % stat, *args, **kw) def sendbuf(self, buf): self._wrapped.sendbuf(buf)
[docs]@implementer(IStatsdClient) class NullStatsdClient(object): """No-op statsd client."""
[docs] def close(self): """Does nothing."""
[docs] def timing(self, stat, *args, **kw): """Does nothing."""
[docs] def gauge(self, stat, *args, **kw): """Does nothing."""
[docs] def incr(self, stat, *args, **kw): """Does nothing."""
[docs] def decr(self, stat, *args, **kw): """Does nothing."""
[docs] def set_add(self, stat, value, *args, **kw): """Does nothing."""
[docs] def sendbuf(self, buf): """Does nothing"""
null_client = NullStatsdClient()