Source code for assembl.tasks.threaded_model_watcher

"""Dispatch model events to another thread, instead of through Celery.

Note that the ModelEventWatcher are Mapper-level flush events, so they cannot
create objects. This pushes the logic on another thread, so we're already
using another thread-specific session."""
from __future__ import print_function

from future import standard_library
standard_library.install_aliases()
from builtins import object
from threading import Thread
from queue import Queue

from zope import interface

from ..lib.model_watcher import IModelEventWatcher

[docs]class ThreadDispatcher(Thread): """A thread that will receive CRUD events and hand them to another model watcher.""" singleton = None daemon = True "The class of the model watcher" mw_class = None @classmethod def get_instance(cls): if not cls.singleton: cls.singleton = cls() cls.singleton.start() return cls.singleton def __init__(self): super(ThreadDispatcher, self).__init__() self.queue = Queue() self.dying = False self.mw = self.mw_class()
[docs] def run(self): while not self.dying: print("*"*20, end=' ') event = self.queue.get() print(event) method_name = event[0] method = getattr(self.mw, method_name) method(*event[1:])
@classmethod def start_dispatcher(cls): cls.get_instance()
[docs]@interface.implementer(IModelEventWatcher) class ThreadedModelEventWatcher(object): """A IModelEventWatcher that will dispatch events to its :py:class:`ThreadDispatcher`""" def __init__(self): self.queue = ThreadDispatcher.get_instance().queue def processPostCreated(self, id): self.queue.put(('processPostCreated', id)) def processIdeaCreated(self, id): self.queue.put(('processIdeaCreated', id)) def processIdeaModified(self, id, version): self.queue.put(('processIdeaModified', id, version)) def processIdeaDeleted(self, id): self.queue.put(('processIdeaDeleted', id)) def processExtractCreated(self, id): self.queue.put(('processExtractCreated', id)) def processExtractModified(self, id, version): self.queue.put(('processExtractModified', id, version)) def processExtractDeleted(self, id): self.queue.put(('processExtractDeleted', id)) def processAccountCreated(self, id): self.queue.put(('processAccountCreated', id)) def processAccountModified(self, id): self.queue.put(('processAccountModified', id))
def configure_threaded_watcher(settings): from . import resolver class_name = settings.get( 'assembl.threadedmodelwatcher', "assembl.lib.model_watcher.BaseModelEventWatcher") ThreadDispatcher.mw_class = resolver.resolve(class_name) def includeme(config): configure_threaded_watcher(config.get_settings())