Source code for assembl.tasks.imapclient_source_reader

import logging
from datetime import datetime, timedelta

from imapclient import IMAPClient
from imapclient.exceptions import (
    IMAPClientAbortError, IMAPClientError, ProtocolError)
from sqlalchemy.orm import undefer

import ssl
import certifi

from assembl.lib.raven_client import capture_exception
from assembl.models import ContentSource, AbstractMailbox, ImportedPost, Email
from .source_reader import (
    ReaderStatus, SourceReader, ReaderError, ClientError, IrrecoverableError)

log = logging.getLogger(__name__)


[docs]class IMAPReader(SourceReader): """A :py:class:`assembl.tasks.source_reader.SourceReader` subclass for reading IMAP messages with IMAPClient. Can wait for push.""" max_idle_period = timedelta(minutes=29) def __init__(self, source_id): super(IMAPReader, self).__init__(source_id) self.selected_folder = False self.mailbox = None self.idling = False self.aborted = False log.disabled = False def login(self): try: context = ssl.create_default_context(cafile=certifi.where()) # context.check_hostname = False # context.verify_mode = ssl.CERT_NONE mailbox = IMAPClient( self.source.host, port=self.source.port, use_uid=True, ssl=self.source.use_ssl, ssl_context=context) # mailbox.debug = 5 capabilities = mailbox.capabilities() if b'STARTTLS' in capabilities: # Always use starttls if server supports it mailbox.starttls(context) if b'IDLE' in capabilities: self.can_push = True log.debug("login") mailbox.login(self.source.username, self.source.password) mailbox.select_folder(self.source.folder) self.selected_folder = True self.aborted = False self.mailbox = mailbox except (IMAPClientAbortError, ProtocolError) as e: capture_exception(e) self.aborted = True raise IrrecoverableError(e) except IMAPClientError as e: capture_exception(e) raise ClientError(e) def wait_for_push(self): assert self.can_push try: start_time = datetime.now() found_emails = [] limit = start_time + self.max_idle_period if not self.idling: self.mailbox.idle() self.idling = True self.set_status(ReaderStatus.WAIT_FOR_PUSH) while (datetime.now() < limit) and not found_emails and ( self.status == ReaderStatus.WAIT_FOR_PUSH): elapsed = datetime.now() - start_time timeout = max( 1, int((self.max_idle_period - elapsed).total_seconds())) log.debug("idle_check") resps = self.mailbox.idle_check(timeout) if not resps: log.debug("timeout?") break for resp in resps: if (resp[0] == b'OK' and resp[1] == b'Still here'): continue if resp[1] == b'EXISTS': found_emails.append(resp[0]) if self.status == ReaderStatus.WAIT_FOR_PUSH: self.end_wait_for_push() if found_emails: self.process_email_ids(found_emails) self.set_status(ReaderStatus.WAIT_FOR_PUSH) except (IMAPClientAbortError, ProtocolError) as e: capture_exception(e) self.aborted = True raise ClientError(e) except IMAPClientError as e: capture_exception(e) raise ReaderError(e) except AssertionError as e: # Case where we're closing from another thread pass def end_wait_for_push(self): if self.idling and self.status in ( ReaderStatus.WAIT_FOR_PUSH, ReaderStatus.CLOSED, ReaderStatus.SHUTDOWN): try: self.mailbox.idle_done() except (IMAPClientAbortError, ProtocolError) as e: capture_exception(e) self.aborted = True raise ClientError(e) except IMAPClientError as e: log.warning(e) finally: self.idling = False super(IMAPReader, self).end_wait_for_push() def shutdown(self): super(IMAPReader, self).shutdown() if self.idling: self.end_wait_for_push() def do_close(self): exc = None self.idling = False if self.selected_folder: try: self.mailbox.close_folder() except (IMAPClientAbortError, ProtocolError) as e: capture_exception(e) self.aborted = True exc = ClientError(e) except IMAPClientError as e: capture_exception(e) exc = ReaderError(e) finally: self.selected_folder = False if self.mailbox: if not self.aborted: try: log.debug("logout") self.mailbox.logout() except (IMAPClientAbortError, ProtocolError) as e: capture_exception(e) self.aborted = True exc = ClientError(e) except IMAPClientError as e: capture_exception(e) exc = ReaderError(e) finally: self.mailbox = None self.mailbox = None if exc is not None: raise exc def import_email(self, email_id): mailbox = self.mailbox # log.debug( "running fetch for message: "+email_id) try: messages = self.mailbox.fetch([email_id], [b"RFC822"]) # log.debug( repr(messages)) message_string = messages[email_id][b"RFC822"] assert message_string message_string = AbstractMailbox.guess_encoding(message_string) try: if self.source.message_ok_to_import(message_string): (email_object, dummy, error) = self.source.parse_email(message_string) if error: raise ReaderError(error) self.source.db.add(email_object) else: log.info("Skipped message with imap id %s (bounce or vacation message)" % (email_id)) # log.debug( "Setting self.source.last_imported_email_uid to "+email_id) self.source.last_imported_email_uid = email_id self.source.db.commit() finally: self.source = ContentSource.get(self.source.id) except (IMAPClientAbortError, ProtocolError) as e: capture_exception(e) self.aborted = True raise ClientError(e) except IMAPClientError as e: capture_exception(e) raise ReaderError(e) def process_email_ids(self, email_ids): self.set_status(ReaderStatus.READING) self.refresh_source() log.info("Processing messages from IMAP: %d "% (len(email_ids))) for email_id in email_ids: self.import_email(email_id) if self.status != ReaderStatus.READING: break # We imported mails, we need to re-thread self.source.db.flush() # Rethread emails globally (sigh) emails = self.source.db.query(Email).filter_by( discussion_id=self.source.discussion_id ).options(undefer(ImportedPost.imported_blob)).all() AbstractMailbox.thread_mails(emails) self.source.db.commit() def do_read(self): only_new = not self.reimporting try: self.set_status(ReaderStatus.READING) mailbox = self.mailbox command = b"ALL" search_status = None email_ids = None if only_new and self.source.last_imported_email_uid: command = "%s:*" % self.source.last_imported_email_uid email_ids = mailbox.search(command, 'utf-8') #log.debug(email_ids) if (only_new and search_status == b'OK' and email_ids and email_ids[0] == self.source.last_imported_email_uid): # Note: the email_ids[0]==self.source.last_imported_email_uid test is # necessary beacuse according to https://tools.ietf.org/html/rfc3501 # seq-range like "3291:* includes the UID of the last message in # the mailbox, even if that value is less than 3291." # discard the first message, it should be the last imported email. del email_ids[0] else: # Either: # a) we don't import only new messages or # b) the message with self.source.last_imported_email_uid hasn't been found # (may have been deleted) # In this case we request all messages and rely on duplicate # detection command = b"ALL" email_ids = mailbox.search(b"ALL", 'utf-8') if len(email_ids): self.process_email_ids(email_ids) else: log.debug("No IMAP messages to process") self.successful_read() self.set_status(ReaderStatus.PAUSED) except (IMAPClientAbortError, ProtocolError) as e: capture_exception(e) self.aborted = True raise ClientError(e) except IMAPClientError as e: capture_exception(e) raise ReaderError(e)