Source code for gordon.router

# -*- coding: utf-8 -*-
#
# Copyright 2018 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Core message routing logic for the plugins within Gordon Service.

Messages received on the success channel will be routed to the next
designated plugin phase. For example, a message that has a ``consume``
phase will be routed to the installed enricher provider (or publisher
provider if no enricher provider is installed).

If a message fails its next phase, its phase will be updated to ``drop``
and routed to the event consumer provider for cleanup.

.. attention::

    The :class:`GordonRouter` only supports the following two phase
    routes:

    1. consume -> enrich -> publish -> done
    2. consume -> publish -> done

    Future releases may support more configurable phase routings.

"""

import asyncio
import logging

from gordon import interfaces


[docs]class GordonRouter: """Route messages to the appropriate plugin destination. .. attention:: `error_channel` is currently not used in this router, and may be removed entirely from all interface definitions. Args: phase_route (dict(str, str)): The route messages should follow. success_channel (asyncio.Queue): A sink for successfully processed :class:`gordon.interfaces.IEventMessage` s. error_channel (asyncio.Queue): A sink for :class:`gordon.interfaces.IEventMessage` s that were not processed due to problems. plugins (list): Instantiated message handling plugins. metrics (obj): Implemented :class:`IMetricRelay` interface to emit metrics. """ # TODO (lynn): Ideally this is configurable/extendable in future # iterations of gordon. FINAL_PHASES = ('cleanup',) def __init__(self, phase_route, success_channel, error_channel, plugins, metrics): self.success_channel = success_channel self.error_channel = error_channel self.phase_plugin_map = self._get_phase_plugin_map(plugins) self.phase_route = phase_route self.metrics = metrics self._messages_in_flight = {} def _get_phase_plugin_map(self, plugins): phase_map = {p.phase: p for p in plugins} if not set(self.FINAL_PHASES) & set(phase_map): msg = ('No plugin implements a final phase, defaulting to ' 'drop messages that are routed to these phases: ' f'{",".join(self.FINAL_PHASES)}.') logging.warn(msg) return phase_map def _get_next_phase(self, event_msg): try: next_phase = self.phase_route[event_msg.phase] msg = f'Routing message {event_msg.msg_id} to "{next_phase}".' logging.debug(msg) except KeyError: msg = (f'Message "{event_msg.msg_id}" has an unknown phase: ' f'"{event_msg.phase}", routing to "cleanup".') logging.error(msg) next_phase = 'cleanup' return next_phase async def _update_messages_in_flight(self): in_flight_count = len(self._messages_in_flight) await self.metrics.set( 'router-messages-in-flight', value=in_flight_count) async def _add_message_in_flight(self, event_msg): if event_msg.msg_id not in self._messages_in_flight: context = {'unit': 'seconds'} timer = self.metrics.timer( 'router-message-flight-duration', context=context) await timer.start() self._messages_in_flight[event_msg.msg_id] = timer await self._update_messages_in_flight() async def _remove_message_in_flight(self, event_msg): if event_msg.phase in self.FINAL_PHASES: timer = self._messages_in_flight.pop(event_msg.msg_id) await timer.stop() await self._update_messages_in_flight() async def _route(self, event_msg, force_cleanup=False): if force_cleanup: next_phase = 'cleanup' else: next_phase = self._get_next_phase(event_msg) context = { 'current': event_msg.phase, 'next': next_phase, } await self.metrics.incr('router-update-message-phase', context=context) event_msg.update_phase(next_phase) next_plugin = self.phase_plugin_map.get(next_phase) if next_phase in self.FINAL_PHASES and not next_plugin: msg = (f'Dropping message "{event_msg.msg_id}", final phase ' f'"{next_phase}" not implemented.') logging.debug(msg) return try: await next_plugin.handle_message(event_msg) # don't add a successfully dropped message back onto channel if next_phase not in self.FINAL_PHASES: msg = f'Adding message "{event_msg}" to success channel.' event_msg.append_to_history(msg, event_msg.phase) await self.success_channel.put(event_msg) elif next_phase in self.FINAL_PHASES and not force_cleanup: # if we're here, can assume message successfully went thru # the entire phase route and not dropped await self.metrics.incr('router-message-completed') except Exception as e: msg = (f'Routing message "{event_msg}" to cleanup due to exception:' f' "{e}"') event_msg.append_to_history(msg, event_msg.phase) logging.warn(msg, exc_info=e) context = {'error': e.__class__.__name__} await self.metrics.incr('router-message-dropped', context=context) await self._route(event_msg, force_cleanup=True) async def _verify_message_impl(self, event_msg): if not interfaces.IEventMessage.providedBy(event_msg): msg = (f'Ignoring message "{event_msg.msg_id}". Does not correctly' ' implement `IEventMessage`.') logging.warn(msg) context = {'error': 'invalid-message-provider'} await self.metrics.incr('router-message-dropped', context=context) return False return True async def _poll_channel(self): if not self.success_channel.empty(): event_msg = await self.success_channel.get() # graceful shutdown if event_msg is None: msg = 'Received TERM signal, shutting down router...' logging.info(msg) # TODO (lynn): potentially propagate signal all plugins # to clean up rather than just breaking return if not await self._verify_message_impl(event_msg): return await self.metrics.incr('router-message-consumed') await self._add_message_in_flight(event_msg) await self._route(event_msg) await self._remove_message_in_flight(event_msg) async def run(self): """Entrypoint to route messages between plugins.""" logging.info('Starting message router...') coroutines = set() while True: coro = self._poll_channel() coroutines.add(coro) _, coroutines = await asyncio.wait(coroutines, timeout=0.1)