Source code for assembl.tasks.source_reader

#!/usr/bin/python
"""A long-running process that receives requests to read data from various ContentSources,
and reads at reasonable intervals. It can also handle sources that can push changes. """
from __future__ import print_function
from builtins import str
import signal
from random import uniform
from time import sleep
from threading import Thread, Event, currentThread
from traceback import print_stack
from datetime import datetime, timedelta
from abc import ABCMeta, abstractmethod
import logging
from logging.config import fileConfig
import pdb

from future.utils import as_native_str, with_metaclass
from pyramid.paster import get_appsettings
from zope.component import getGlobalSiteManager
from kombu import BrokerConnection, Exchange, Queue
from kombu.mixins import ConsumerMixin
from kombu.utils.debug import setup_logging
from sqlalchemy import inspect
from sqlalchemy.exc import TimeoutError

from . import configure
from ..lib.raven_client import setup_raven, capture_exception
from ..lib.config import set_config
from ..lib.enum import OrderedEnum
from ..lib.sqla import configure_engine
from ..lib.zmqlib import configure_zmq

log = logging.getLogger(__name__)
pool_counter = 0


def fmax(*args):
    return max(filter(None, args))


[docs]class ReaderStatus(OrderedEnum): # See doc/sourcereader.dot CREATED = 0 READING = 1 WAIT_FOR_PUSH = 2 # A state where new data will come without prompting PAUSED = 3 # A state where new data will come when prompted CLOSED = 4 SHUTDOWN = 5 TRANSIENT_ERROR = 10 # Try again later (same connection) CLIENT_ERROR = 11 # Make a new connection to re-try IRRECOVERABLE_ERROR = 12 # This server will never work.
known_transitions = { ReaderStatus.CREATED: { ReaderStatus.READING, ReaderStatus.CLOSED, ReaderStatus.CLIENT_ERROR, ReaderStatus.IRRECOVERABLE_ERROR, }, ReaderStatus.READING: { ReaderStatus.CLIENT_ERROR, ReaderStatus.READING, ReaderStatus.IRRECOVERABLE_ERROR, ReaderStatus.PAUSED, ReaderStatus.TRANSIENT_ERROR, ReaderStatus.WAIT_FOR_PUSH, ReaderStatus.CLOSED, ReaderStatus.SHUTDOWN, # Should this not go to closed first? }, ReaderStatus.PAUSED: { ReaderStatus.CLOSED, ReaderStatus.READING, ReaderStatus.SHUTDOWN, ReaderStatus.WAIT_FOR_PUSH, }, ReaderStatus.CLIENT_ERROR: { ReaderStatus.SHUTDOWN, ReaderStatus.CLIENT_ERROR, ReaderStatus.IRRECOVERABLE_ERROR, ReaderStatus.READING, }, ReaderStatus.CLOSED: { ReaderStatus.CLIENT_ERROR, ReaderStatus.SHUTDOWN, }, ReaderStatus.IRRECOVERABLE_ERROR: { ReaderStatus.IRRECOVERABLE_ERROR, ReaderStatus.READING, ReaderStatus.SHUTDOWN, }, ReaderStatus.TRANSIENT_ERROR: { ReaderStatus.READING, }, ReaderStatus.WAIT_FOR_PUSH: { ReaderStatus.CLIENT_ERROR, ReaderStatus.CLOSED, ReaderStatus.PAUSED, ReaderStatus.READING, ReaderStatus.TRANSIENT_ERROR, ReaderStatus.IRRECOVERABLE_ERROR, ReaderStatus.WAIT_FOR_PUSH, ReaderStatus.SHUTDOWN, }, ReaderStatus.SHUTDOWN: { ReaderStatus.SHUTDOWN, }, } disconnected_states = set(( ReaderStatus.CLIENT_ERROR, ReaderStatus.IRRECOVERABLE_ERROR, ReaderStatus.CLOSED, ReaderStatus.SHUTDOWN)) # Connection constants QUEUE_NAME = "source_reader" ROUTING_KEY = QUEUE_NAME
[docs]class ReaderError(RuntimeError): status = ReaderStatus.TRANSIENT_ERROR pass
[docs]class ClientError(RuntimeError): status = ReaderStatus.CLIENT_ERROR pass
[docs]class IrrecoverableError(ClientError): status = ReaderStatus.IRRECOVERABLE_ERROR pass
[docs]class ReadingForTooLong(ClientError): pass
[docs]class SourceReader(with_metaclass(ABCMeta, Thread)): """ """ deamon = True # Timings. Those should vary per source type, maybe even by source? min_time_between_reads = timedelta(minutes=1) time_between_reads = timedelta(minutes=10) max_idle_period = timedelta(hours=3) transient_error_backoff = timedelta(seconds=10) transient_error_numlimit = 10 client_error_backoff = timedelta(minutes=15) client_error_numlimit = 3 irrecoverable_error_backoff = timedelta(days=1) reading_takes_too_long = timedelta(minutes=15) def __init__(self, source_id): super(SourceReader, self).__init__() log.disabled = False self.source_id = source_id self.source = None self.status = ReaderStatus.CREATED self.last_prod = datetime.utcnow() self.last_read_started = datetime.fromtimestamp(0) self.last_read = datetime.fromtimestamp(0) self.last_successful_login = datetime.fromtimestamp(0) self.last_error_status = None self.reimporting = False self.can_push = False # Set to true for, eg, imap with polling. self.event = Event() self.debug = False def set_status(self, status): lvl = logging.INFO if status in known_transitions[self.status] else logging.ERROR log.log(lvl, "%s %d: %s -> %s" % ( self.__class__.__name__, self.source_id, self.status.name, status.name)) if status == ReaderStatus.READING and self.status != ReaderStatus.READING: self.source.connection_error = self.status.value self.source.db.commit() self.refresh_source() self.status = status def successful_login(self): self.last_successful_login = datetime.utcnow() self.source.db.commit() self.after_login = True def successful_read(self): from assembl.models import ContentSource self.last_read = datetime.utcnow() self.reset_errors() self.reimporting = False self.source.db.commit() self.refresh_source() def reset_errors(self): self.error_count = 0 self.last_error_status = None self.error_backoff_until = None self.source.reset_errors() def new_error(self, reader_error, status=None, expected=True): import traceback from assembl.models import ContentSource log.error(traceback.format_exc()) if self.debug: pdb.post_mortem() if not expected: capture_exception() status = status or getattr( reader_error, 'status', ReaderStatus.TRANSIENT_ERROR) if status != self.last_error_status: # Counter-intuitive, but either lighter or more severe errors # reset the count. self.error_count = 1 elif status == self.last_error_status: self.error_count += 1 # escalate errors with repetition if status == ReaderStatus.TRANSIENT_ERROR: if self.error_count > self.transient_error_numlimit: status = ReaderStatus.CLIENT_ERROR self.error_count = 1 elif status == ReaderStatus.CLIENT_ERROR: if self.error_count > self.client_error_numlimit: status = ReaderStatus.IRRECOVERABLE_ERROR self.error_count = 1 else: assert False if status == ReaderStatus.TRANSIENT_ERROR: error_backoff = self.transient_error_backoff elif status == ReaderStatus.CLIENT_ERROR: error_backoff = self.client_error_backoff elif status == ReaderStatus.IRRECOVERABLE_ERROR: error_backoff = self.irrecoverable_error_backoff else: assert False # double backoff every time error_backoff *= 2 ** (self.error_count - 1) self.last_error_status = status self.source.db.rollback() self.refresh_source() self.source.connection_error = status.value self.source.error_description = str(reader_error) if (status > ReaderStatus.TRANSIENT_ERROR and self.status != ReaderStatus.SHUTDOWN): self.set_status(status) self.reimporting = False self.error_backoff_until = datetime.utcnow() + error_backoff self.source.error_backoff_until = self.error_backoff_until self.source.db.commit() self.refresh_source() def refresh_source(self): from assembl.models import ContentSource # after a commit, refresh the source so it's still usable self.source = ContentSource.get(self.source_id) def is_in_error(self): return self.last_error_status is not None def is_connected(self): return self.status not in disconnected_states def setup_read(self, reimport, **kwargs): self.reimporting = reimport self.extra_args = kwargs def handle_new_content(self, content): from .translate import translate_content translate_content(content) # should delay def wake(self): log.debug("SourceReader.wake") if self.status in (ReaderStatus.PAUSED, ReaderStatus.CLOSED) and ( datetime.utcnow() - fmax(self.last_prod, self.last_read) > self.min_time_between_reads): self.event.set() elif self.status == ReaderStatus.TRANSIENT_ERROR and ( datetime.utcnow() - fmax(self.last_prod, self.last_error_status) > self.transient_error_backoff): # Exception: transient backoff escalation can be cancelled by wake self.event.set() elif (self.status in (ReaderStatus.READING, ReaderStatus.WAIT_FOR_PUSH) and (datetime.utcnow() - self.last_read_started) > self.reading_takes_too_long): try: self.do_close() self.new_error(ReadingForTooLong()) except ReaderError as e: self.new_error(e) self.last_prod = datetime.utcnow()
[docs] def run(self): self.setup() while self.status not in ( ReaderStatus.SHUTDOWN, ReaderStatus.IRRECOVERABLE_ERROR): if self.error_backoff_until: interval = (self.error_backoff_until - datetime.utcnow()).total_seconds() if interval > 0: self.event.wait(interval) self.event.clear() try: self.login() self.successful_login() except ReaderError as e: self.new_error(e) if self.status > ReaderStatus.TRANSIENT_ERROR: self.try_close() continue except Exception as e: self.new_error(e, ReaderStatus.CLIENT_ERROR, expected=False) self.try_close() break while self.after_login or self.is_connected(): if self.status == ReaderStatus.SHUTDOWN: break self.after_login = False # Read in all cases try: self.read() except ReaderError as e: self.new_error(e) if self.status > ReaderStatus.TRANSIENT_ERROR: self.try_close() except Exception as e: self.new_error(e, ReaderStatus.CLIENT_ERROR, expected=False) self.try_close() break if not self.is_connected(): break if self.can_push: self.set_status(ReaderStatus.WAIT_FOR_PUSH) while self.status == ReaderStatus.WAIT_FOR_PUSH: try: # This is not a final close, but sends it back to the QueuePool self.source.db.close() self.wait_for_push() except ReaderError as e: self.new_error(e) if self.status > ReaderStatus.TRANSIENT_ERROR: self.try_close() else: self.end_wait_for_push() break except Exception as e: self.new_error(e, ReaderStatus.CLIENT_ERROR, expected=False) self.try_close() break if not self.is_connected(): break if self.status == ReaderStatus.READING: self.set_status(ReaderStatus.WAIT_FOR_PUSH) if self.status == ReaderStatus.PAUSED: # If wait_for_push leaves us in PAUSED state, # restart reading cycle break if not self.is_connected(): break continue # to next read cycle if not self.is_connected(): break if (self.last_read - self.last_prod > self.max_idle_period): # Nobody cares, I can stop reading try: if self.status == ReaderStatus.WAIT_FOR_PUSH: self.end_wait_for_push() finally: self.close() if self.status != ReaderStatus.SHUTDOWN: self.event.wait(0) self.event.clear() else: self.event.wait(self.time_between_reads.total_seconds()) self.event.clear() if self.status == ReaderStatus.SHUTDOWN or self.is_connected(): self.close() if self.source and not inspect(self.source).detached: self.source.db.close()
@abstractmethod def login(self): pass @abstractmethod def wait_for_push(self): # redefine in push-capable readers assert self.can_push # Leave a non-error status as either WAIT_FOR_PAUSE # or READING; the latter will loop. @abstractmethod def end_wait_for_push(self): # redefine in push-capable readers self.source.db.close() def close(self): if self.status == ReaderStatus.WAIT_FOR_PUSH: try: self.end_wait_for_push() except ReaderError as e: self.new_error(e, min(e.status, ReaderStatus.CLIENT_ERROR)) if self.status == ReaderStatus.SHUTDOWN: return self.set_status(ReaderStatus.CLOSED) try: self.do_close() except ReaderError as e: self.new_error(e, min(e.status, ReaderStatus.CLIENT_ERROR)) finally: self.set_status(ReaderStatus.SHUTDOWN) if self.source: self.source.db.close() def try_close(self): try: self.do_close() except ReaderError as e: self.new_error(e, min(e.status, ReaderStatus.CLIENT_ERROR)) @abstractmethod def do_close(self): pass def setup(self): from assembl.models import ContentSource backoff = 0.5 + uniform(0, 0.5) while self.source is None: try: self.source = ContentSource.get(self.source_id) except TimeoutError: # Ran out of connection pool log.error("TimeoutError for " + str(self.source_id)) sleep(backoff) backoff *= 2 connection_error = ( ReaderStatus(self.source.connection_error) if self.source.connection_error else None) self.error_backoff_until = self.source.error_backoff_until if connection_error: self.status = connection_error def read(self): self.set_status(ReaderStatus.READING) self.last_read_started = datetime.utcnow() self.do_read() self.successful_read() if (self.status in (ReaderStatus.READING, ReaderStatus.WAIT_FOR_PUSH)): self.set_status(ReaderStatus.PAUSED) # or WAIT_FOR_PUSH @abstractmethod def do_read(self): pass def shutdown(self): # Must return quickly self.set_status(ReaderStatus.SHUTDOWN) self.event.set() @as_native_str() def __repr__(self): return "<%s.%d in %s>" % ( self.__class__.__name__, self.source_id, self.name)
class PullSourceReader(SourceReader): # Simple reader, no wait for push, just redefine do_read def login(self): pass def wait_for_push(self): assert False, "This reader cannot wait for push" def end_wait_for_push(self): assert False, "This reader cannot wait for push" def do_close(self): pass # Kombu communication. Does not work yet. _exchange = Exchange(QUEUE_NAME) _queue = Queue( QUEUE_NAME, exchange=_exchange, routing_key=ROUTING_KEY) _producer_connection = None def wake(source_id, reimport=False, force_restart=False, **kwargs): global _producer_connection from kombu.common import maybe_declare from kombu.pools import producers with producers[_producer_connection].acquire(block=True) as producer: maybe_declare(_exchange, producer.channel) kwargs.update(dict(source=source_id, reimport=reimport, force_restart=force_restart)) producer.publish(kwargs, serializer="json", routing_key=ROUTING_KEY) def external_shutdown(): global _producer_connection from kombu.common import maybe_declare from kombu.pools import producers with producers[_producer_connection].acquire(block=True) as producer: maybe_declare(_exchange, producer.channel) producer.publish( {"shutdown": True}, serializer="json", routing_key=ROUTING_KEY)
[docs]class SourceDispatcher(ConsumerMixin): def __init__(self, connection, debug=False): super(SourceDispatcher, self).__init__() self.connection = connection self.readers = {} self.debug = debug log.disabled = False def get_consumers(self, Consumer, channel): global _queue return [Consumer(queues=(_queue,), callbacks=[self.callback])] def callback(self, body, message): if isinstance(body, list): message.ack() return # old message if body.get("shutdown", False): self.shutdown() else: source_id = body.get("source", None) if not source_id: raise ValueError("source not defined in "+repr(body)) self.read(source_id, **body) message.ack() def read(self, source_id, reimport=False, force_restart=False, **kwargs): from assembl.models import ContentSource reader = self.readers.get(source_id, None) if force_restart and reader is not None: reader.shutdown() self.readers.pop(source_id) reader = None if not (reader and reader.is_connected()): source = ContentSource.get(source_id) if not source: return False if source.connection_error == ReaderStatus.IRRECOVERABLE_ERROR and not force_restart: return False if force_restart: source.reset_errors() reader = source.make_reader() if not reader: return False reader.debug = self.debug self.readers[source_id] = reader if reader is None: return False reader.setup_read(reimport, **kwargs) reader.start() return True if reader is None: return False # We know it is connected by now. reader.setup_read(reimport, **kwargs) reader.wake() return True def shutdown(self): self.should_stop = True for reader in self.readers.values(): if reader is not None: reader.shutdown()
def includeme(config): global _producer_connection, _exchange setup_logging(loglevel='DEBUG') url = (config.registry.settings.get('celery_tasks.broker') or config.registry.settings.get('celery_tasks.imap.broker')) _producer_connection = BrokerConnection(url) if __name__ == '__main__': import argparse parser = argparse.ArgumentParser(description='Process to read from sources') parser.add_argument('--debug', '-d', action='store_true', help='enter debugger on exception') parser.add_argument('config', help='configuration ini file') args = parser.parse_args() config_file_name = args.config settings = get_appsettings(config_file_name, 'idealoom') registry = getGlobalSiteManager() registry.settings = settings set_config(settings) setup_raven(settings, config_file_name) fileConfig(config_file_name) # set the basic session maker without zope or autoflush engine = configure_engine(settings, False, autoflush=False, max_overflow=20) # @event.listens_for(engine, "checkin") def show_checkin(*args): global pool_counter pool_counter -= 1 log.debug("checkin pool: %d in %s" % (pool_counter, currentThread())) print_stack() # @event.listens_for(engine, "checkout") def show_checkout(*args): global pool_counter pool_counter += 1 log.debug("checkout pool: %d in %s" % (pool_counter, currentThread())) print_stack() configure(registry, 'source_reader') configure_zmq(settings['changes_socket'], True) from assembl.models.import_records import includeme includeme(None) log.disabled = False url = (settings.get('celery_tasks.broker') or settings.get('celery_tasks.imap.broker')) with BrokerConnection(url) as conn: sourcedispatcher = SourceDispatcher(conn, args.debug) def shutdown(*args): sourcedispatcher.shutdown() signal.signal(signal.SIGTERM, shutdown) try: sourcedispatcher.run() except KeyboardInterrupt: shutdown()