Source code for assembl.tasks.threaded_model_watcher
"""Dispatch model events to another thread, instead of through Celery.
Note that the ModelEventWatcher are Mapper-level flush events, so they cannot
create objects. This pushes the logic on another thread, so we're already
using another thread-specific session."""
from __future__ import print_function
from future import standard_library
standard_library.install_aliases()
from builtins import object
from threading import Thread
from queue import Queue
from zope import interface
from ..lib.model_watcher import IModelEventWatcher
[docs]class ThreadDispatcher(Thread):
    """A thread that will receive CRUD events and hand them to another model watcher."""
    singleton = None
    daemon = True
    "The class of the model watcher"
    mw_class = None
    @classmethod
    def get_instance(cls):
        if not cls.singleton:
            cls.singleton = cls()
            cls.singleton.start()
        return cls.singleton
    def __init__(self):
        super(ThreadDispatcher, self).__init__()
        self.queue = Queue()
        self.dying = False
        self.mw = self.mw_class()
[docs]    def run(self):
        while not self.dying:
            print("*"*20, end=' ')
            event = self.queue.get()
            print(event)
            method_name = event[0]
            method = getattr(self.mw, method_name)
            method(*event[1:]) 
    @classmethod
    def start_dispatcher(cls):
        cls.get_instance() 
[docs]@interface.implementer(IModelEventWatcher)
class ThreadedModelEventWatcher(object):
    """A IModelEventWatcher that will dispatch events to its
    :py:class:`ThreadDispatcher`"""
    def __init__(self):
        self.queue = ThreadDispatcher.get_instance().queue
    def processPostCreated(self, id):
        self.queue.put(('processPostCreated', id))
    def processIdeaCreated(self, id):
        self.queue.put(('processIdeaCreated', id))
    def processIdeaModified(self, id, version):
        self.queue.put(('processIdeaModified', id, version))
    def processIdeaDeleted(self, id):
        self.queue.put(('processIdeaDeleted', id))
    def processExtractCreated(self, id):
        self.queue.put(('processExtractCreated', id))
    def processExtractModified(self, id, version):
        self.queue.put(('processExtractModified', id, version))
    def processExtractDeleted(self, id):
        self.queue.put(('processExtractDeleted', id))
    def processAccountCreated(self, id):
        self.queue.put(('processAccountCreated', id))
    def processAccountModified(self, id):
        self.queue.put(('processAccountModified', id)) 
def configure_threaded_watcher(settings):
    from . import resolver
    class_name = settings.get(
        'assembl.threadedmodelwatcher',
        "assembl.lib.model_watcher.BaseModelEventWatcher")
    ThreadDispatcher.mw_class = resolver.resolve(class_name)
def includeme(config):
    configure_threaded_watcher(config.get_settings())