Source code for gordon.metrics.ffwd

# -*- coding: utf-8 -*-
# Copyright (c) 2018 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""

Gordon ships with a simple `ffwd <https://github.com/spotify/ffwd>`_
metrics implementation, which can be enabled via configuration. This
module contains the SimpleFfwdRelay, and all required classes that it
uses to send messsages to the ffwd daemon via UDP.

The `SimpleFfwdRelay` requires no configuration, but can be customized.
The defaults that may be overridden are shown below.

.. code-block:: ini

    [ffwd]
    # to identify the service creating metrics
    key = 'gordon-service'

    # the address of the ffwd daemon (see: UDPClient)
    ip = "127.0.0.9"
    port = 19000

    # a scaling factor for timing (see: FfwdTimer)
    time_unit = 1E9

"""

import asyncio
import json
import time

import zope.interface

from gordon import interfaces


[docs]class UDPClientProtocol(asyncio.DatagramProtocol): """Protocol for sending one-off messages via UDP. Args: message (bytes): Message for ffwd agent. """ def __init__(self, message): self.message = message self.transport = None
[docs] def connection_made(self, transport): """Create connection, use to send message and close. Args: transport (asyncio.DatagramTransport): Transport used for sending. """ self.transport = transport self.transport.sendto(self.message) self.transport.close()
[docs]class UDPClient: """Client for sending UDP datagrams. Args: ip (str): (optional) Destination IP address (default: 127.0.0.1). port (int): (optional) Destination port (default: 9000). loop (asyncio.AbstractEventLoop impl): (optional) Event loop. """ DEFAULT_IP = '127.0.0.1' DEFAULT_PORT = 19000 def __init__(self, ip=None, port=None, loop=None): self.ip = ip or self.DEFAULT_IP self.port = port or self.DEFAULT_PORT self.loop = loop or asyncio.get_event_loop()
[docs] async def send(self, metric): """Transform metric to JSON bytestring and send to server. Args: metric (dict): Complete metric to send as JSON. """ message = json.dumps(metric).encode('utf-8') await self.loop.create_datagram_endpoint( lambda: UDPClientProtocol(message), remote_addr=(self.ip, self.port))
[docs]@zope.interface.implementer(interfaces.ITimer) class FfwdTimer: """Timer which sends UDP messages to FFWD on completion. Args: metric (dict): Dict representation of the metric to send. udp_client (UDPClient): A metric sending client. time_unit (number): (optional) Scale time unit for use with time.perf_counter(), for example: 1E9 to send nanoseconds. """ def __init__(self, metric, udp_client, time_unit=None): self.metric = metric self.udp_client = udp_client self.time_unit = time_unit or 1 async def __aenter__(self): """Enter context manager to start timing.""" await self.start() return self async def __aexit__(self, *args): """Exit context manager to stop timing.""" await self.stop()
[docs] async def start(self): """Start timer.""" self._start_time = time.perf_counter()
[docs] async def stop(self): """Stop timer.""" time_elapsed = time.perf_counter() - self._start_time self.metric['value'] = time_elapsed * self.time_unit await self.udp_client.send(self.metric)
[docs]@zope.interface.implementer(interfaces.IMetricRelay) class SimpleFfwdRelay: """Metrics relay which sends to FFWD immediately. The relay does no client-side aggregation and metrics are emitted immediately. The relay uses a combination of the `key and attributes fields <https://github.com/spotify/ffwd/tree/master/ modules/json>`_ to semantically identify metrics in ffwd. Args: config (dict): Configuration with optional keys described above. """ def __init__(self, config): self.key = config.get('key', 'gordon-service') self.udp_client = UDPClient( config.get('ffwd_ip'), config.get('ffwd_port')) self.time_unit = config.get('time_unit') def _create_metric(self, metric_type, metric_name, value, context, **kwargs): attrs = {'what': metric_name, 'metric_type': metric_type} if context: attrs.update(context) metric = { 'key': self.key, 'attributes': attrs, 'value': value, 'type': 'metric' } return metric
[docs] async def incr(self, metric_name, value=1, context=None, **kwargs): """Increase a metric by 1 or a given amount. Args: metric_name (str): Identifier of the metric. value (int): (optional) Value with which to increase the metric (default: 1). context (dict): (optional) Additional key-value pairs which further describe the metric, for example: {'remote-host': '1.2.3.4'} """ await self.set(metric_name, value, context, **kwargs)
[docs] def timer(self, metric_name, context=None, **kwargs): """Create a FfwdTimer. Args: metric_name (str): Identifier of the metric. context (dict): (optional) Additional key-value pairs which further describe the metric, for example: {'unit': 'seconds'} """ metric = self._create_metric('timer', metric_name, None, context, **kwargs) return FfwdTimer(metric, self.udp_client, self.time_unit)
[docs] async def set(self, metric_name, value, context=None, **kwargs): """Set a metric to a given value. Args: metric_name (str): Identifier of the metric. value (number): The value of the metric. context (dict): (optional) Additional key-value pairs which further describe the metric, for example: {'app-version': '1.5.3'} """ metric = self._create_metric('meter', metric_name, value, context, **kwargs) await self.udp_client.send(metric)
[docs] async def cleanup(self): """Not used.""" pass