Source code for assembl.views

""" App URL routing and renderers are configured in this module. 

Note that IdeaLoom is a `hybrid app`_, and combines routes and :py:mod:`traversal`.

.. _`hybrid app`: http://docs.pylonsproject.org/projects/pyramid/en/latest/narr/hybrid.html
"""
from future.standard_library import install_aliases
install_aliases()

from builtins import str
import os.path
import codecs
from collections import defaultdict
import logging
from urllib.parse import urlparse

import simplejson as json
from pyramid.view import view_config
from pyramid.response import Response
from pyramid.httpexceptions import (
    HTTPException, HTTPInternalServerError, HTTPMovedPermanently, HTTPError,
    HTTPBadRequest, HTTPFound, HTTPTemporaryRedirect as HTTPTemporaryRedirectP)
from pyramid.i18n import TranslationStringFactory
from pyramid.settings import asbool, aslist
from social_core.exceptions import AuthMissingParameter
from lxml import html

from ..lib.json import json_renderer_factory
from ..lib import config
from ..lib.clean_input import sanitize_text
from ..lib.frontend_urls import FrontendUrls
from ..lib.locale import get_language, get_country, strip_most_countries
from ..lib.utils import get_global_base_url
from ..lib.raven_client import capture_exception, flush
from ..__version__ import version

log = logging.getLogger(__name__)
default_context = {
}


TEMPLATE_PATH = os.path.join(
    os.path.dirname(os.path.dirname(__file__)), 'templates')


[docs]class HTTPTemporaryRedirect(HTTPTemporaryRedirectP): def __init__(self, *args, **kwargs): kwargs["cache_control"] = "no-cache" super(HTTPTemporaryRedirect, self).__init__(*args, **kwargs) self.cache_control.prevent_auto = True
def backbone_include(config): FrontendUrls.register_frontend_routes(config) config.add_route('styleguide', '/styleguide') config.add_route('test', '/test') def get_theme_base_path(): theme_base_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'static', 'css', 'themes') return theme_base_path
[docs]def find_theme(theme_name): """ Recursively looks for a theme with the provided name in the theme path folder @returns the theme path fragment relative to the theme base_path, or None if not found """ theme_base_path = get_theme_base_path() walk_results = os.walk(theme_base_path, followlinks=True) for (dirpath, dirnames, filenames) in walk_results: if '_theme.scss' in filenames: #print repr(dirpath), repr(dirnames) , repr(filenames) relpath = os.path.relpath(dirpath, theme_base_path) (head, name) = os.path.split(dirpath) log.debug(name+" "+relpath) if name == theme_name: return relpath return None
[docs]def get_theme_info(discussion): """ @return (theme_name, theme_relative_path) the relative path is relative to the theme_base_path. See find_theme. """ theme_name = config.get('default_theme') or 'default' theme_path = None if discussion: # Legacy code: Slug override theme_path = find_theme(discussion.slug) if theme_path: theme_name = discussion.slug else: theme_path = find_theme(theme_name) if theme_path is not None: return (theme_name, theme_path) else: return ('default', 'default')
def get_provider_data(get_route, providers=None): from assembl.models.auth import IdentityProvider if providers is None: providers = aslist(config.get('login_providers')) providers_by_name = IdentityProvider.default_db.query( IdentityProvider.name, IdentityProvider.provider_type ).order_by(IdentityProvider.id).all() saml_providers = [] if 'saml' in providers: providers.remove('saml') saml_providers = config.get('SOCIAL_AUTH_SAML_ENABLED_IDPS') if not isinstance(saml_providers, dict): saml_providers = json.loads(saml_providers) provider_data = [ { "name": name.capitalize(), "type": ptype, "extra": {}, "add_social_account": get_route( 'add_social_account', backend=ptype), "login": get_route('social.auth', backend=ptype), } for (name, ptype) in providers_by_name if ptype in providers ] if 'yahoo' in providers: for provider in provider_data: if provider['type'] == 'yahoo': provider['extra'] = { "oauth": True, "openid_identifier": 'yahoo.com', } if saml_providers: provider_data.extend([ { "name": data["description"], "type": "saml", "add_social_account": get_route( 'add_social_account', backend='saml'), "login": get_route('social.auth', backend='saml'), "extra": { "idp": prov_id } } for prov_id, data in saml_providers.items() ]) return provider_data def create_get_route(request, discussion=0): if discussion is 0: # None would be a known absence, don't recalculate from assembl.auth.util import discussion_from_request discussion = discussion_from_request(request) if discussion: def get_route(name, **kwargs): try: return request.route_path('contextual_' + name, discussion_slug=discussion.slug, **kwargs) except KeyError: return request.route_path( name, discussion_slug=discussion.slug, **kwargs) else: def get_route(name, **kwargs): kwargs['discussion_slug'] = kwargs.get('discussion_slug', '') return request.route_path(name, **kwargs) return get_route _RES_FILE_CACHE = {} def get_res_file(testing): global _RES_FILE_CACHE use_webpack_server = asbool(config.get('use_webpack_server')) if use_webpack_server: # reset the cache every time _RES_FILE_CACHE[testing] = None if not _RES_FILE_CACHE.get(testing, None): res_name = os.path.dirname(os.path.dirname(__file__)) + "/static/js/build/" if use_webpack_server: res_name += 'live_' res_name += "test.html" if testing else "index.html" _RES_FILE_CACHE[testing] = html.parse(res_name) return _RES_FILE_CACHE[testing] def get_js_links(static_url, testing=False): res_file = get_res_file(testing) links = res_file.xpath("//script/@src") # excludeChunks fails in webpack server? if testing: links = [l for l in links if 'main' not in l] else: links = [l for l in links if 'test' not in l.lower()] links = [l for l in links if 'notification' not in l and 'annotator_ext' not in l] if not asbool(config.get('use_webpack_server')): links = [static_url + l for l in links] links = ['<script type="text/javascript" src="%s"></script>' % l for l in links] return "\n".join(links) def get_css_links(static_url, testing=False): res_file = get_res_file(testing) links = res_file.xpath("//head/link[@rel='stylesheet']/@href") links = [l for l in links if 'notification' not in l and 'annotator_ext' not in l] if not asbool(config.get('use_webpack_server')): links = [static_url + l for l in links] links = ['<link type="text/css" rel="stylesheet" href="%s"></link>' % l for l in links] return "\n".join(links) def get_service_url(config, service, secure_req): proxied = asbool(config.get(f'{service}_proxied')) port = None if proxied else config.get(f'{service}_port') use_secure = proxied and ( asbool(config.get("require_secure_connection")) or (asbool(config.get("accept_secure_connection")) and secure_req)) return get_global_base_url( use_secure, port) + config.get(f'{service}_prefix') def get_default_context(request, **kwargs): kwargs.update(default_context) from ..auth.util import get_current_discussion application_url = get_global_base_url() if request.scheme == "http"\ and asbool(config.get("require_secure_connection")): raise HTTPFound(application_url + request.path_qs) secure_req = request.url.startswith('https:') socket_url = get_service_url(config, 'changes_websocket', secure_req) oembed_url = get_service_url(config, 'oembed', secure_req) localizer = request.localizer _ = TranslationStringFactory('assembl') user = request.user if user and user.username: user_profile_edit_url = request.route_url( 'profile_user', type='u', identifier=user.username) elif user: user_profile_edit_url = request.route_url( 'profile_user', type='id', identifier=user.id) else: user_profile_edit_url = None web_analytics_piwik_script = config.get( 'web_analytics_piwik_script') or False discussion = get_current_discussion() if (web_analytics_piwik_script and discussion and discussion.web_analytics_piwik_id_site): web_analytics_piwik_script = web_analytics_piwik_script % ( discussion.web_analytics_piwik_id_site, discussion.web_analytics_piwik_id_site) else: web_analytics_piwik_script = False web_analytics_piwik_custom_variable_size = config.get('web_analytics_piwik_custom_variable_size') if not web_analytics_piwik_custom_variable_size: web_analytics_piwik_custom_variable_size = 5 help_url = config.get('help_url') or '' if discussion and discussion.help_url: help_url = discussion.help_url if help_url and "%s" in help_url: help_url = help_url % strip_most_countries(localizer.locale_name) first_login_after_auto_subscribe_to_notifications = False if (user and discussion and discussion.id and user.is_first_visit and discussion.subscribe_to_notifications_on_signup and user.is_participant(discussion.id)): first_login_after_auto_subscribe_to_notifications = True locales = config.get('available_languages').split() countries_for_locales = defaultdict(set) for locale in locales: countries_for_locales[get_language(locale)].add(get_country(locale)) show_locale_country = { locale: (len(countries_for_locales[get_language(locale)]) > 1) for locale in locales} jedfilename = os.path.join( os.path.dirname(__file__), '..', 'locale', localizer.locale_name, 'LC_MESSAGES', 'assembl.jed.json') if not os.path.exists(jedfilename) and '_' in localizer.locale_name: jedfilename = os.path.join( os.path.dirname(__file__), '..', 'locale', get_language(localizer.locale_name), 'LC_MESSAGES', 'assembl.jed.json') assert os.path.exists(jedfilename) from ..models.facebook_integration import language_sdk_existance fb_lang_exists, fb_locale = language_sdk_existance(get_language(localizer.locale_name), countries_for_locales) def process_export_list(ls): import string return [s.strip() for s in ls.split(",")] social_settings = { 'fb_export_permissions': config.get('facebook.export_permissions'), 'fb_debug': asbool(config.get('facebook.debug_mode')), 'fb_app_id': config.get('facebook.consumer_key'), 'fb_api_version': config.get('facebook.api_version') or '2.2', 'supported_exports': process_export_list( config.get('supported_exports_list')) } # A container for all analytics related settings. All future # analytics based settings that will be exposed to the templates # should be included in this dictionary analytics_settings = { 'enabled': True if web_analytics_piwik_script else False, } if analytics_settings.get('enabled', False): analytics_settings['piwik'] = { 'script': web_analytics_piwik_script } use_webpack_server = asbool(config.get("use_webpack_server")) static_url = '/static' widget_url = '/static/widget' if use_webpack_server: webpack_host = config.get( 'webpack_host', config.get('public_hostname', 'localhost')) static_url = 'http://%s:%d' % ( webpack_host, int(config.get('webpack_port', 8080))) get_route = create_get_route(request, discussion) providers = get_provider_data(get_route) errors = request.session.pop_flash() if kwargs.get('error', None): errors.append(kwargs['error']) if errors: kwargs['error'] = '<br />'.join(errors) messages = request.session.pop_flash('message') if not messages: messages = request.GET.getall('message') if messages: # defend against xss messages = [sanitize_text(m) for m in messages] print(messages) if messages: kwargs['message'] = '<br />'.join(messages) (theme_name, theme_relative_path) = get_theme_info(discussion) return dict( kwargs, STATIC_URL=static_url, WIDGET_URL=widget_url, request=request, application_url=application_url, get_route=get_route, user=user, templates=get_template_views(), discussion=discussion or {}, # Templates won't load without a discussion object preferences=discussion.preferences if discussion else {}, user_profile_edit_url=user_profile_edit_url, locale=localizer.locale_name, locales=locales, fb_lang_exists=fb_lang_exists, fb_locale=fb_locale, social_settings=social_settings, show_locale_country=show_locale_country, theme_name=theme_name, theme_relative_path=theme_relative_path, minified_js=config.get('minified_js') or False, platform_name=config.get('platform_name') or "IdeaLoom", web_analytics=analytics_settings, help_url=help_url, socket_url=socket_url, oembed_url=oembed_url, first_login_after_auto_subscribe_to_notifications=first_login_after_auto_subscribe_to_notifications, raven_url=config.get('raven_url') or '', activate_tour=str(config.get('activate_tour') or False).lower(), providers=providers, providers_json=json.dumps(providers), js_links=get_js_links(static_url), css_links=get_css_links(static_url), version=version(), translations=codecs.open(jedfilename, encoding='utf-8').read() )
[docs]def get_template_views(): """ get all .tmpl files from templates/views directory """ views_path = os.path.join(TEMPLATE_PATH, 'views') views = [] for (dirpath, dirname, filenames) in os.walk(views_path): for filename in filenames: if filename.endswith('.tmpl'): views.append(filename.split('.')[0]) return views
[docs]class JSONError(HTTPError): def __init__(self, detail=None, error_type=None, code=HTTPBadRequest.code, headers=None, comment=None, body_template=None, **kw): # error_type should be from .errors.ErrorTypes self.errors = [] if detail: self.add_error(detail, error_type) super(JSONError, self).__init__( detail, headers, comment, **kw) @staticmethod def create_dict(message, error_type=None): if error_type: return dict(message=message, type=error_type.name) return dict(message=message) def add_error(self, message, error_type=None, code=None): self.errors.append(self.create_dict(message, error_type)) if code is not None: self.code = code def __bool__(self): return bool(self.errors)
@view_config(context=HTTPError, renderer='assembl:templates/includes/404.jinja2') def not_found(context, request): request.response.status = context.status_code return {"message": context.message, "code": context.status_code} @view_config(context=JSONError, renderer='json') def json_error_view(request): exc = request.exception request.response.status_code = exc.code return exc.errors # TODO social_auth: Test the heck out of this. @view_config(context=AuthMissingParameter) def csrf_error_view(exc, request): if "HTTP_COOKIE" not in request.environ: user_agent = request.user_agent is_safari = 'Safari' in user_agent and 'Chrome' not in user_agent route_name = request.matched_route.name is_login_callback = (route_name == 'social.complete') if is_safari and is_login_callback: # This is an absolutely horrible hack, but depending on some settings, # Safari does not give cookies on a redirect, so we lose session info. if 'reload' not in request.GET: # So first make sure the new session does not kill the old one def callback(request, response): response._headerlist = [(h, v) for (h, v) in response._headerlist if h != 'Set-Cookie'] log.debug("headerlist: "+ response._headerlist) request.add_response_callback(callback) # And return a page that will reload the same request, NOT through a 303. # Also add a "reload" parameter to avoid doing it twice if it failed. template = ('<html><head><script>document.location = "' + request.path_info + '?' + request.query_string + '&reload=true"</script></head></html>') return Response(template, content_type='text/html', charset="ascii") else: # The hack failed. Tell the user what to do. raise HTTPBadRequest(explanation="Missing cookies", detail="""Note that we need active cookies. On Safari, the "Allow from current website only" option in the Privacy tab of preferences is too restrictive; use "Allow from websites I visit" and try again. Simply reloading may work.""") raise HTTPBadRequest(explanation="Missing cookies", detail=repr(request.exception)) raise HTTPBadRequest(explanation="CSRF error", detail=repr(request.exception)) def error_view(exc, request): # from traceback import format_exc from datetime import datetime capture_exception(getattr(request, "exc_info", None)) flush() # make sure it got to sentry raise HTTPInternalServerError( explanation="Sorry, IdeaLoom had an internal issue and you have to reload. Please send this to a discussion administrator.", detail=datetime.utcnow().isoformat()+"\n"+repr(request.exception)) # format_exc(request.exception)) def redirector(request): return HTTPMovedPermanently(request.route_url( 'home', discussion_slug=request.matchdict.get('discussion_slug'))) def sanitize_next_view(next_view): if next_view and ':/' in next_view: parsed = urlparse(next_view) if not parsed: return None if parsed.netloc != config.get("public_hostname"): return None if parsed.scheme == 'http': if asbool(config.get("require_secure_connection")): return None elif parsed.scheme == 'https': if not asbool(config.get("accept_secure_connection")): return None else: return None return next_view
[docs]def includeme(config): """ Initialize views and renderers at app start-up time. """ settings = config.get_settings() config.add_renderer('json', json_renderer_factory) config.include('.traversal') default_discussion = settings.get('default_discussion', None) if default_discussion: config.add_route('discussion_list', '/discussions') config.add_view( lambda req: HTTPFound('/'+default_discussion), route_name='default_disc_redirect') config.add_route('default_disc_redirect', '/') else: config.add_route('discussion_list', '/') if asbool(config.get_settings().get('idealoom_handle_exceptions', 'true')): config.add_view(error_view, context=Exception) # authentication config.include('.auth') config.include('.api') config.include('.api2') config.include('.discussion_list') config.include('.admin') config.add_route('home-auto', '/{discussion_slug}/') config.add_view(redirector, route_name='home-auto') default_context['cache_bust'] = \ config.registry.settings['requirejs.cache_bust'] # Scan now, to get cornice views config.scan('.') # make sure this comes last to avoid conflicts config.add_route('home', '/{discussion_slug}') config.include(backbone_include, route_prefix='/{discussion_slug}')