Source code for assembl.lib.raven_client
"""Abstract the existence and use of ravenJS"""
from __future__ import print_function
from future import standard_library
standard_library.install_aliases()
from configparser import ConfigParser
from sentry_sdk import (
init as sentry_init, capture_message, capture_exception, configure_scope, Hub)
from sentry_sdk.integrations.pyramid import PyramidIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
from sentry_sdk.integrations.redis import RedisIntegration
from ..__version__ import version
def sentry_context(user_id=None, **kwargs):
with configure_scope() as scope:
if user_id:
scope.user = {"id": user_id}
if kwargs:
for k, v in kwargs.items():
if v is None:
continue
scope.set_extra(k, v)
def flush(timeout=2.0):
client = Hub.current.client
if client is not None:
client.flush(timeout=timeout)
[docs]def setup_raven(settings, settings_file=None, use_async=False, celery=False):
"""Setup raven client.
Sentry is automatically setup in assembl,
this is useful for other processes."""
if isinstance(settings, ConfigParser):
raven_url = settings.get('app:idealoom', 'raven_url')
else:
raven_url = settings.get('raven_url', '')
if raven_url and len(raven_url) > 12:
integrations = integrations = [
PyramidIntegration(),
SqlalchemyIntegration(),
RedisIntegration(),
]
if use_async:
from sentry_sdk.integrations.aiohttp import AioHttpIntegration
integrations.append(AioHttpIntegration())
if celery:
from sentry_sdk.integrations.celery import CeleryIntegration
integrations.append(CeleryIntegration())
sentry_init(
raven_url, integrations=integrations, release=version(),
server_name=settings.get('app:idealoom', 'public_hostname'))
def includeme(config):
setup_raven(config.registry.settings)