Source code for assembl.tasks
"""Background tasks for running IdeaLoom.
Tasks are kept running by Circus_.
Short-lived tasks are written as Celery_ tasks; long-running tasks are
mostly ad hoc at this point: the :py:mod:`source_reader`
and :py:mod:`changes_router`.
.. _Circus: http://circus.readthedocs.io/en/latest/
.. _Celery: http://www.celeryproject.org/
"""
from __future__ import absolute_import
from future import standard_library
standard_library.install_aliases()
standard_library.install_hooks()
from os import getcwd
from os.path import join, dirname, realpath, exists
import logging
import configparser
from pyramid.paster import get_appsettings
from pyramid.path import DottedNameResolver
from datetime import timedelta
from celery import Celery
from pyramid_mailer import mailer_factory_from_settings
from ..lib.sqla import configure_engine
from ..lib.zmqlib import configure_zmq
from ..lib.raven_client import setup_raven
from ..lib.config import get, set_config
from zope.component import getGlobalSiteManager
from ..lib.model_watcher import configure_model_watcher
from ..lib.logging import getLogger
_settings = None
log = logging.getLogger(__name__)
resolver = DottedNameResolver(__package__)
def configure(registry, task_name):
global _settings, celery
from .threaded_model_watcher import configure_threaded_watcher
settings = registry.settings
if _settings is None:
_settings = settings
# temporary solution
configure_threaded_watcher(settings)
configure_model_watcher(registry, task_name)
region = get('aws_region', 'us-east-1')
config = {
"task_serializer": 'json',
"task_acks_late": True,
"cache_backend": settings.get('celery_tasks.broker', ''),
"result_backend": settings.get('celery_tasks.broker', ''),
"task_store_errors_even_if_ignored": True,
"broker_transport_options": {'region': region},
}
config['broker_url'] = settings.get('celery_tasks.broker')
celery.config_from_object(config, force=True)
CELERYBEAT_SCHEDULE = {
'resend-every-10-minutes': {
'task': 'assembl.tasks.notify.process_pending_notifications',
'schedule': timedelta(seconds=600),
'options': {
'routing_key': 'notify',
'exchange': 'notify'
}
},
}
# Minimum delay between emails sent to a domain.
# For this to work, you need to have a SINGLE celery process for notification.
SMTP_DOMAIN_DELAYS = {
'': timedelta(0)
}
# INI file values with this prefix will be used to populate SMTP_DOMAIN_DELAYS.
# Anything after the last dot is a domain name (including empty).
# Use seconds (float) as values.
SETTINGS_SMTP_DELAY = "celery_tasks.notify.smtp_delay."
[docs]class CeleryWithConfig(Celery):
"A Celery task that can receive settings"
_preconf = {
"CELERYBEAT_SCHEDULE": CELERYBEAT_SCHEDULE
}
def on_configure(self):
global _settings
if _settings is None:
# i.e. includeme not called, i.e. not from pyramid
self.init_from_celery()
def init_from_celery(self):
# A task is called through celery, so it may not have basic
# configuration setup. Go through that setup the first time.
global _settings, SMTP_DOMAIN_DELAYS
rootdir = getcwd()
settings_file = join(rootdir, 'local.ini')
if not exists(settings_file):
settings_file = join(rootdir, 'production.ini')
if not exists(settings_file):
rootdir = dirname(dirname(dirname(realpath(__file__))))
settings_file = join(rootdir, 'local.ini')
if not exists(settings_file):
settings_file = join(rootdir, 'production.ini')
if not exists(settings_file):
raise RuntimeError("Missing settings file")
_settings = settings = get_appsettings(settings_file, 'idealoom')
configure_zmq(settings['changes_socket'], False)
config = configparser.SafeConfigParser()
config.read(settings_file)
registry = getGlobalSiteManager()
registry.settings = settings
setup_raven(settings, settings_file, celery=True)
set_config(settings)
configure_engine(settings, True)
if settings.get('celery_tasks_debug_signal', False):
from assembl.lib import signals
signals.listen()
configure(registry, 'celery_tasks')
from .threaded_model_watcher import ThreadDispatcher
threaded_watcher_class_name = settings.get(
'celery_tasks.threadedmodelwatcher',
"assembl.lib.model_watcher.BaseModelEventWatcher")
ThreadDispatcher.mw_class = resolver.resolve(
threaded_watcher_class_name)
self.mailer = mailer_factory_from_settings(settings)
# setup SETTINGS_SMTP_DELAY
for name, val in settings.items():
if name.startswith(SETTINGS_SMTP_DELAY):
try:
val = timedelta(seconds=float(val))
except ValueError:
log.error("Not a valid value for %s: %s" % (name, val))
continue
SMTP_DOMAIN_DELAYS[name[len(SETTINGS_SMTP_DELAY):]] = val
getLogger().info("SMTP_DOMAIN_DELAYS", delays=SMTP_DOMAIN_DELAYS)
import assembl.tasks.imap
import assembl.tasks.notify
import assembl.tasks.notification_dispatch
import assembl.tasks.translate
celery = CeleryWithConfig('celery_tasks')
def includeme(config):
global _settings
_settings = config.registry.settings
config.include('.threaded_model_watcher')
configure(config.registry, 'assembl')
config.include('.source_reader')