Source code for assembl.models.feed_parsing

"""Utilities for extracting posts and from a RSS or Atom feed."""
from future import standard_library
standard_library.install_aliases()
from builtins import str
from builtins import object
from io import StringIO
from importlib import import_module
from datetime import datetime
from calendar import timegm
import logging

from future.utils import string_types
from sqlalchemy import (
    Column,
    ForeignKey,
    Integer,
    String,
 )
from pyisemail import is_email
import feedparser
import requests
from urllib.parse import urlparse, quote_plus

from ..lib.sqla_types import URLString
from .langstrings import LangString, LocaleLabel
from .generic import PostSource
from .post import ImportedPost
from .auth import AbstractAgentAccount, AgentProfile
from ..tasks.source_reader import PullSourceReader, ReaderError, ReaderStatus


log = logging.getLogger(__name__)


[docs]class FeedFetcher(object): """ A thin wrapper around requests in order to be able to get a feed from a web resource address, returning either as a string object (which is the preferred method for feedparser) or as a StringIO object (which is what speedparser likes) """
[docs] def return_string(self, uri): """Returns the string content with the xml inside""" resp = requests.get(uri) return resp.content
[docs] def return_file(self, uri): """Returns a StringIO with the xml inside""" output = StringIO() resp = requests.get(uri) output.write(resp.content) return output
[docs]class ParserWrapper(object): """ A moderate wrapper around which parsing module is used (feedparser vs speedparser). """ def __init__(self, fetcher, parser, parser_can_read_file=False): self.fetcher = fetcher self.parser = parser self.parser_can_read_file = parser_can_read_file def parse(self, uri): if self.parser_can_read_file: return self.parser.parse(self.fetcher.return_file(uri)) else: return self.parser.parse(self.fetcher.return_string(uri))
[docs]class ParsedData(object): """ For every atom feed URL that is fetched, a ParsedData object is generated to handle the retrieving the feed, the entries, or other fields as needed. This object is the base "Data Getter" object for atom feeds. @TODO: Extend this class to suport RRS feeds as well. """ def __init__(self, url, parser_wrapper=None): self.url = url self._parse_agent = parser_wrapper or \ ParserWrapper(FeedFetcher(), feedparser) self._feed = None def _fetch_source(self): if not self._feed: self._feed = self._parse_agent.parse(self.url) def _update_feed(self,url): self._feed = self._parse_agent.parse(url) def refetch_source(self): self._feed = self._parse_agent.parse(self.url)
[docs] def get_parsed_feed(self): """Returns the entire parsed feed as a dict""" self._fetch_source() return self._feed
[docs] def get_feed(self): """Returns feed summary from entire parsed feed as list""" self._fetch_source() return self.get_parsed_feed()['feed']
# Does not update the source def get_feed_forced(self,url): return self._parse_agent.parse(url) def get_feed_title(self): return self.get_feed()['title'] def get_entries(self): self._fetch_source() return iter(self.get_parsed_feed()['entries'])
[docs]class PaginatedParsedData(ParsedData): """ Extention of the "Data Getter" object, which supports basic pagination of data. @TODO: Extend this object to support variable key pagination, rather than simple integer incrementation. """ def __init__(self, url, parser_wrapper=None, page_key='page', start_page=1): self.page_number = start_page self.page_key = page_key self.url = url self._new_source_fetched = False self._parse_wrapper = parser_wrapper or \ ParserWrapper(FeedFetcher(), feedparser) super(PaginatedParsedData, self).__init__(url, self._parse_wrapper) def _check_empty_entries(self, feed): """Checks if the currently fetched feed has any entries or not""" return feed['entries'] == [] def _update_url(self): """The method updates the url to update the concept of 'next page'. It will call _append_pagination_to_url. @override this to implement specific logic""" next = self.page_number while True: next_url = self.url + "?" + self.page_key + "=" + str(next) next += 1 yield next_url def get_next_feed(self): for url in self._update_url(): feed = super(PaginatedParsedData, self).get_feed_forced(url) self._feed = feed if feed['entries'] == []: raise StopIteration yield feed def _get_entry_per_feed(self, feed): return iter(feed['entries']) def reset(self): self.page_number = 1 def get_entries(self): for feed in self.get_next_feed(): for entry in self._get_entry_per_feed(feed): yield entry
[docs]class FeedPost(ImportedPost): """ A discussion post that is imported from an external feed source. """ __mapper_args__ = { 'polymorphic_identity': 'feed_imported_posts', }
[docs]class LoomioFeedPost(FeedPost): """ A discussion post this is imported from a feed extracted from Loomio. """ __mapper_args__ = { 'polymorphic_identity': 'loomio_feed_post' }
[docs]class WebLinkAccount(AbstractAgentAccount): """ An imported name that has not been validated nor authenticated within the platform. This is to keep track of an imported post's ownership. """ __tablename__ = 'weblink_user' id = Column(Integer, ForeignKey( 'abstract_agent_account.id', onupdate='CASCADE', ondelete='CASCADE'), primary_key=True) user_link = Column(URLString, unique=True) __mapper_args__ = { 'polymorphic_identity': 'weblink_user' } def get_user_link(self): return self.user_link
[docs] def unique_query(self): # Uniqueness does not care about subclasses, # so query on this class rather than self's class. return self.db.query(WebLinkAccount).filter_by( user_link=self.user_link), True
[docs]class LoomioAccount(WebLinkAccount): """ An imported Loomio name and address. This is not an authenticated user. """ __mapper_args__ = { 'polymorphic_identity': 'loomio_user' }
[docs]class FeedPostSource(PostSource): """ The source of an imported feed, be it Atom, RSS, or any other type of feed protocol. """ __tablename__ = 'feed_posts_source' id = Column(Integer, ForeignKey( 'post_source.id', ondelete='CASCADE', onupdate='CASCADE'), primary_key=True) url = Column(URLString, nullable=False) # For parameter free calling to parse posts from this source. parser_full_class_name = Column(String(512), nullable=False) __mapper_args__ = { 'polymorphic_identity': 'feed_posts_source' } post_type = FeedPost # for db querying user_type = WebLinkAccount def make_reader(self): return FeedSourceReader(self.id) @classmethod # eg. create_from(d, "www...xml", "A valid name", PaginatedFeedParser) def create_from(cls, discussion, url, source_name, parse_config_class): encoded_name = source_name encoded_url = url created_date = datetime.utcnow() parser_name = str(parse_config_class).split("'")[1] return cls(name=encoded_name, creation_date=created_date, discussion=discussion, url=encoded_url, parser_full_class_name=parser_name)
[docs] def send_post(self, post): #TODO? log.warn("TODO?: FeedPostSource::send_post(): Actually send the post")
def generate_message_id(self, source_post_id): # Feed post ids are supposed to be globally unique. # They may or may not be emails. if isinstance(source_post_id, string_types) and is_email(source_post_id): return source_post_id # Invalid source_post_id. return "%s_feed@%s" % ( self.flatten_source_post_id(str(source_post_id), 5), urlparse(self.url).hostname)
[docs]class LoomioPostSource(FeedPostSource): """ The source an imported feed, that came directly from Loomio. """ __mapper_args__ = { 'polymorphic_identity': 'feed_posts_source_loomio' } post_type = LoomioFeedPost user_type = LoomioAccount def make_reader(self): return LoomioSourceReader(self.id)
[docs] def send_post(self, post): #TODO? log.warn("TODO?: LoomioPostSource::send_post(): Actually send the post")
class FeedSourceReader(PullSourceReader): def __init__(self, source_id): super(FeedSourceReader,self).__init__(source_id) self._parse_agent = None def do_read(self): self._check_parser_loaded() if self.reimporting: self._re_import() else: self._add_entries() def _re_import(self, discussion=None): sess = self.source.db for entry in self._parse_agent.get_entries(): try: post_id = self._get_entry_id(entry) user_link = self._get_author_link(entry) persisted_post = self._return_existing_post(post_id) account = self._create_account_from_entry(entry) other_account = account.find_duplicate(True, True) if other_account is not account and other_account is not None: account = other_account self._process_reimport_user(entry, account) else: sess.add(account) if persisted_post is not None: self._process_reimport_post(entry, persisted_post, discussion) persisted_post.creator = account.profile sess.commit() else: persisted_post = self._convert_to_post(entry, account) sess.add(persisted_post) sess.commit() self.handle_new_content(persisted_post) except Exception as e: sess.rollback() raise ReaderError(e) finally: self.source = FeedPostSource.get(self.source_id, sess) if self.status != ReaderStatus.READING: break def _process_reimport_post(self, entry, post, discussion=None): post.import_date = datetime.utcnow() post.source_post_id = self._get_entry_id(entry) post.source = self.source post.body_mime_type = self._get_body_mime_type(entry) post.creation_date = self._get_creation_date(entry) post.subject = self._get_subject(entry) post.body = self._get_body(entry) def _process_reimport_user(self, entry, user, user_desc=None): if not user.profile.name: user.profile.name = self._get_author(entry) if not user.profile.description: user.profile.description = \ user_desc if not None else user.profile.description def _add_entries(self): for post, account in self._generate_post_stream(): try: if not account.find_duplicate(True, True): self.source.db.add(account) if not post.find_duplicate(True, True): self.source.db.add(post) self.source.db.commit() self.handle_new_content(post) except Exception as e: self.source.db.rollback() raise ReaderError(e) finally: self.source = FeedPostSource.get(self.source_id, self.source.db) def _check_parser_loaded(self): if not self._parse_agent: module, parse_cls = \ tmp = self.source.parser_full_class_name.rsplit(".",1) mod = import_module(module) tmp = getattr(mod, parse_cls) self._parse_agent = tmp(self.source.url) def _create_account_from_entry(self, entry): author_name = self._get_author(entry) author_link = self._get_author_link(entry) agent_profile = AgentProfile(name=author_name) return self.source.user_type(user_link=author_link, profile=agent_profile) def _generate_post_stream(self): self._check_parser_loaded() for entry in self._parse_agent.get_entries(): account = self._create_account_from_entry(entry) account = account.get_unique_from_db() yield self._convert_to_post(entry, account), account if self.status != ReaderStatus.READING: break def _return_existing_post(self, post_id): cls = self.source.post_type return self.source.db.query(cls).\ filter_by(source_post_id=post_id, source_id=self.source_id).first() def _get_title_from_feed(self): self._check_parser_loaded() return self._parse_agent.get_feed_title() def _get_creation_date(self, entry): return datetime.fromtimestamp(timegm(entry['updated_parsed'])) def _get_entry_id(self, entry): return entry['id'] def _get_body_mime_type(self, entry): content = entry.get('content', None) if content: return content[0]['type'] return 'text/html' # assumed when rss def _get_subject(self, entry): return LangString.create(entry['title']) def _get_body(self, entry): body = entry.get('content', None) if body: body = body[0] language = body.get('language', None) if not language or language.lower() == 'none': language = LocaleLabel.UNDEFINED return LangString.create(body['value'], language) else: body = entry.get('description', None) if body: return LangString.create(body) raise RuntimeError("could not find description for entry ", entry) def _get_author(self, entry): return entry['author'] def _get_author_link(self, entry): if 'author_detail' in entry and 'href' in entry['author_detail']: return entry['author_detail']['href'] return "%s#%s" % (self.source.url, self._get_author(entry)) def _convert_to_post(self, entry, account): source_post_id = self._get_entry_id(entry) source = self.source body_mime_type = self._get_body_mime_type(entry) subject = self._get_subject(entry) body = self._get_body(entry) imported_date = datetime.utcnow() user = account.profile # TODO AY: Can we get the locale? return source.post_type( creation_date=self._get_creation_date(entry), import_date=imported_date, source=source, source_post_id=source_post_id, discussion=source.discussion, body_mime_type=body_mime_type, creator=user, subject=subject, body=body) class LoomioSourceReader(FeedSourceReader): def __init__(self, source_id): super(LoomioSourceReader, self).__init__(source_id) def _process_reimport_post(self, entry, post, discussion): super(LoomioSourceReader, self).\ _process_reimport_post(entry, post, discussion) post.subject = self._get_title_from_feed() def _convert_to_post(self, entry, account): post = super(LoomioSourceReader, self)._convert_to_post(entry,account) post.subject = self._get_title_from_feed() return post