# -*- coding: utf-8 -*-
#
# Copyright 2017 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Main module to run the Gordon service.
The service expects a ``gordon.toml`` and/or a ``gordon-user.toml``
file for configuration in the current working directory, or in a
provided root directory.
Any configuration defined in ``gordon-user.toml`` overwrites those in
``gordon.toml``.
Example:
.. code-block:: bash
$ python gordon/main.py
$ python gordon/main.py -c /etc/default/
$ python gordon/main.py --config-root /etc/default/
"""
import asyncio
import logging
import os
import signal
import click
import toml
import ulogger
from gordon import __version__ as version
from gordon import exceptions
from gordon import interfaces
from gordon import plugins_loader
from gordon import router
async def shutdown(sig, loop):
"""Gracefully cancel current tasks when app receives a shutdown signal."""
logging.info(f'Received exit signal {sig.name}...')
tasks = [task for task in asyncio.Task.all_tasks() if task is not
asyncio.tasks.Task.current_task()]
for task in tasks:
logging.debug(f'Cancelling task: {task}')
task.cancel()
results = await asyncio.gather(*tasks, return_exceptions=True)
logging.debug(f'Done awaiting cancelled tasks, results: {results}')
loop.stop()
logging.info('Shutdown complete.')
def _deep_merge_dict(a, b):
"""Additively merge right side dict into left side dict."""
for k, v in b.items():
if k in a and isinstance(a[k], dict) and isinstance(v, dict):
_deep_merge_dict(a[k], v)
else:
a[k] = v
def _load_config(root=''):
conf, error = {}, False
conf_files = ['gordon.toml', 'gordon-user.toml']
for conf_file in conf_files:
try:
with open(os.path.join(root, conf_file), 'r') as f:
_deep_merge_dict(conf, (toml.load(f)))
except IOError as e:
error = e
if error and conf == {}:
raise IOError(
f'Cannot load Gordon configuration file from "{root}": {error}.')
return conf
[docs]def setup(config_root=''):
"""
Service configuration and logging setup.
Configuration defined in ``gordon-user.toml`` will overwrite
``gordon.toml``.
Args:
config_root (str): Where configuration should load from,
defaults to current working directory.
Returns:
A dict for Gordon service configuration.
"""
config = _load_config(root=config_root)
logging_config = config.get('core', {}).get('logging', {}).copy()
log_level = logging_config.pop('level', 'INFO').upper()
log_handlers = logging_config.pop('handlers', ['syslog'])
ulogger.setup_logging(
progname='gordon', level=log_level, handlers=log_handlers,
**logging_config)
return config
def _log_or_exit_on_exceptions(base_msg, exc, debug):
log_level_func = logging.warn
if not debug:
log_level_func = logging.error
if isinstance(exc, list):
for exception in exc:
log_level_func(base_msg, exc_info=exception)
else:
log_level_func(base_msg, exc_info=exc)
if not debug:
raise SystemExit(1)
def _gather_plugins_by_type(plugins, debug):
runnable_plugins = []
message_handling_plugins = []
for plugin in plugins:
# TODO (lynn): these should be switched out for adding
# the "verify interface implementation" ability. See
# https://docs.zope.org/zope.interface/verify.html
if interfaces.IRunnable.providedBy(plugin):
if not hasattr(plugin, 'run') or \
not asyncio.iscoroutinefunction(plugin.run):
msg = (f'Implemention "{plugin}" of the required '
'"IRunnable" interface does not have the '
'necessary `run` method.')
exc = exceptions.InvalidPluginError(msg)
_log_or_exit_on_exceptions(msg, exc, debug)
continue
runnable_plugins.append(plugin)
if interfaces.IMessageHandler.providedBy(plugin):
if not hasattr(plugin, 'handle_message') or \
not asyncio.iscoroutinefunction(plugin.handle_message):
msg = (f'Implemention "{plugin}" of the required '
'"IMessageHandler" interface does not have the '
'necessary `handle_message` method.')
exc = exceptions.InvalidPluginError(msg)
_log_or_exit_on_exceptions(msg, exc, debug)
continue
message_handling_plugins.append(plugin)
if not runnable_plugins or not message_handling_plugins:
msg = (f'At least one runnable plugin is required.')
exc = exceptions.MissingPluginError(msg)
_log_or_exit_on_exceptions(msg, [exc], debug=debug)
return runnable_plugins, message_handling_plugins
def _setup_router(config, plugins, metrics, success_channel, error_channel):
msg_router = router.GordonRouter(
config, success_channel, error_channel, plugins, metrics)
return msg_router
async def _run(runnable_plugins, msg_router, debug):
tasks = [p.run() for p in runnable_plugins]
tasks.append(msg_router.run())
await asyncio.gather(*tasks)
@click.command()
@click.option('-c', '--config-root',
type=click.Path(exists=True), required=False, default='.',
help='Directory where to find service configuration.')
def run(config_root):
config = setup(os.path.abspath(config_root))
debug_mode = config.get('core', {}).get('debug', False)
plugin_kwargs = {
'success_channel': asyncio.Queue(),
'error_channel': asyncio.Queue(),
}
plugin_names, plugins, errors, plugin_kwargs = plugins_loader.load_plugins(
config, plugin_kwargs)
for err_plugin, exc in errors:
base_msg = f'Plugin was not loaded: {err_plugin}'
_log_or_exit_on_exceptions(base_msg, exc, debug=debug_mode)
if plugin_names:
logging.info(f'Loaded {len(plugin_names)} plugins: {plugin_names}.')
runnables, message_handlers = _gather_plugins_by_type(plugins, debug_mode)
route_config = config.get('core', {}).get('route', {})
msg_router = _setup_router(
route_config, message_handlers, **plugin_kwargs)
logging.info(f'Starting gordon v{version}...')
loop = asyncio.get_event_loop()
# Register shutdown to signals
for signame in (signal.SIGHUP, signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(
signame, lambda: asyncio.ensure_future(shutdown(signame, loop)))
try:
loop.create_task(_run(runnables, msg_router, debug_mode))
loop.run_forever()
finally:
loop.close()
if __name__ == '__main__':
run()