Source code for assembl.tasks.imapclient_source_reader
import logging
from datetime import datetime, timedelta
from imapclient import IMAPClient
from imapclient.exceptions import (
IMAPClientAbortError, IMAPClientError, ProtocolError)
from sqlalchemy.orm import undefer
import ssl
import certifi
from assembl.lib.raven_client import capture_exception
from assembl.models import ContentSource, AbstractMailbox, ImportedPost, Email
from .source_reader import (
ReaderStatus, SourceReader, ReaderError, ClientError, IrrecoverableError)
log = logging.getLogger(__name__)
[docs]class IMAPReader(SourceReader):
"""A :py:class:`assembl.tasks.source_reader.SourceReader`
subclass for reading IMAP messages with IMAPClient. Can wait for push."""
max_idle_period = timedelta(minutes=29)
def __init__(self, source_id):
super(IMAPReader, self).__init__(source_id)
self.selected_folder = False
self.mailbox = None
self.idling = False
self.aborted = False
log.disabled = False
def login(self):
try:
context = ssl.create_default_context(cafile=certifi.where())
# context.check_hostname = False
# context.verify_mode = ssl.CERT_NONE
mailbox = IMAPClient(
self.source.host,
port=self.source.port,
use_uid=True,
ssl=self.source.use_ssl,
ssl_context=context)
# mailbox.debug = 5
capabilities = mailbox.capabilities()
if b'STARTTLS' in capabilities:
# Always use starttls if server supports it
mailbox.starttls(context)
if b'IDLE' in capabilities:
self.can_push = True
log.debug("login")
mailbox.login(self.source.username, self.source.password)
mailbox.select_folder(self.source.folder)
self.selected_folder = True
self.aborted = False
self.mailbox = mailbox
except (IMAPClientAbortError, ProtocolError) as e:
capture_exception(e)
self.aborted = True
raise IrrecoverableError(e)
except IMAPClientError as e:
capture_exception(e)
raise ClientError(e)
def wait_for_push(self):
assert self.can_push
try:
start_time = datetime.now()
found_emails = []
limit = start_time + self.max_idle_period
if not self.idling:
self.mailbox.idle()
self.idling = True
self.set_status(ReaderStatus.WAIT_FOR_PUSH)
while (datetime.now() < limit) and not found_emails and (
self.status == ReaderStatus.WAIT_FOR_PUSH):
elapsed = datetime.now() - start_time
timeout = max(
1, int((self.max_idle_period - elapsed).total_seconds()))
log.debug("idle_check")
resps = self.mailbox.idle_check(timeout)
if not resps:
log.debug("timeout?")
break
for resp in resps:
if (resp[0] == b'OK' and resp[1] == b'Still here'):
continue
if resp[1] == b'EXISTS':
found_emails.append(resp[0])
if self.status == ReaderStatus.WAIT_FOR_PUSH:
self.end_wait_for_push()
if found_emails:
self.process_email_ids(found_emails)
self.set_status(ReaderStatus.WAIT_FOR_PUSH)
except (IMAPClientAbortError, ProtocolError) as e:
capture_exception(e)
self.aborted = True
raise ClientError(e)
except IMAPClientError as e:
capture_exception(e)
raise ReaderError(e)
except AssertionError as e:
# Case where we're closing from another thread
pass
def end_wait_for_push(self):
if self.idling and self.status in (
ReaderStatus.WAIT_FOR_PUSH, ReaderStatus.CLOSED,
ReaderStatus.SHUTDOWN):
try:
self.mailbox.idle_done()
except (IMAPClientAbortError, ProtocolError) as e:
capture_exception(e)
self.aborted = True
raise ClientError(e)
except IMAPClientError as e:
log.warning(e)
finally:
self.idling = False
super(IMAPReader, self).end_wait_for_push()
def shutdown(self):
super(IMAPReader, self).shutdown()
if self.idling:
self.end_wait_for_push()
def do_close(self):
exc = None
self.idling = False
if self.selected_folder:
try:
self.mailbox.close_folder()
except (IMAPClientAbortError, ProtocolError) as e:
capture_exception(e)
self.aborted = True
exc = ClientError(e)
except IMAPClientError as e:
capture_exception(e)
exc = ReaderError(e)
finally:
self.selected_folder = False
if self.mailbox:
if not self.aborted:
try:
log.debug("logout")
self.mailbox.logout()
except (IMAPClientAbortError, ProtocolError) as e:
capture_exception(e)
self.aborted = True
exc = ClientError(e)
except IMAPClientError as e:
capture_exception(e)
exc = ReaderError(e)
finally:
self.mailbox = None
self.mailbox = None
if exc is not None:
raise exc
def import_email(self, email_id):
mailbox = self.mailbox
# log.debug( "running fetch for message: "+email_id)
try:
messages = self.mailbox.fetch([email_id], [b"RFC822"])
# log.debug( repr(messages))
message_string = messages[email_id][b"RFC822"]
assert message_string
message_string = AbstractMailbox.guess_encoding(message_string)
try:
if self.source.message_ok_to_import(message_string):
(email_object, dummy, error) = self.source.parse_email(message_string)
if error:
raise ReaderError(error)
self.source.db.add(email_object)
else:
log.info("Skipped message with imap id %s (bounce or vacation message)" % (email_id))
# log.debug( "Setting self.source.last_imported_email_uid to "+email_id)
self.source.last_imported_email_uid = email_id
self.source.db.commit()
finally:
self.source = ContentSource.get(self.source.id)
except (IMAPClientAbortError, ProtocolError) as e:
capture_exception(e)
self.aborted = True
raise ClientError(e)
except IMAPClientError as e:
capture_exception(e)
raise ReaderError(e)
def process_email_ids(self, email_ids):
self.set_status(ReaderStatus.READING)
self.refresh_source()
log.info("Processing messages from IMAP: %d "% (len(email_ids)))
for email_id in email_ids:
self.import_email(email_id)
if self.status != ReaderStatus.READING:
break
# We imported mails, we need to re-thread
self.source.db.flush()
# Rethread emails globally (sigh)
emails = self.source.db.query(Email).filter_by(
discussion_id=self.source.discussion_id
).options(undefer(ImportedPost.imported_blob)).all()
AbstractMailbox.thread_mails(emails)
self.source.db.commit()
def do_read(self):
only_new = not self.reimporting
try:
self.set_status(ReaderStatus.READING)
mailbox = self.mailbox
command = b"ALL"
search_status = None
email_ids = None
if only_new and self.source.last_imported_email_uid:
command = "%s:*" % self.source.last_imported_email_uid
email_ids = mailbox.search(command, 'utf-8')
#log.debug(email_ids)
if (only_new and search_status == b'OK' and email_ids
and email_ids[0] == self.source.last_imported_email_uid):
# Note: the email_ids[0]==self.source.last_imported_email_uid test is
# necessary beacuse according to https://tools.ietf.org/html/rfc3501
# seq-range like "3291:* includes the UID of the last message in
# the mailbox, even if that value is less than 3291."
# discard the first message, it should be the last imported email.
del email_ids[0]
else:
# Either:
# a) we don't import only new messages or
# b) the message with self.source.last_imported_email_uid hasn't been found
# (may have been deleted)
# In this case we request all messages and rely on duplicate
# detection
command = b"ALL"
email_ids = mailbox.search(b"ALL", 'utf-8')
if len(email_ids):
self.process_email_ids(email_ids)
else:
log.debug("No IMAP messages to process")
self.successful_read()
self.set_status(ReaderStatus.PAUSED)
except (IMAPClientAbortError, ProtocolError) as e:
capture_exception(e)
self.aborted = True
raise ClientError(e)
except IMAPClientError as e:
capture_exception(e)
raise ReaderError(e)