Source code for assembl.views.auth.views

from future import standard_library
standard_library.install_aliases()
from builtins import str
from datetime import datetime
import simplejson as json
from urllib.parse import quote
from smtplib import SMTPRecipientsRefused
from email.header import Header
import logging

from future.utils import string_types
from pyramid.i18n import TranslationStringFactory
from pyramid.view import view_config
from pyramid_mailer import get_mailer
from pyramid_mailer.message import Message
from pyramid.renderers import render_to_response
from pyramid.security import (
    remember,
    forget,
    Everyone,
    Authenticated,
    authenticated_userid,
    NO_PERMISSION_REQUIRED)
from pyramid.httpexceptions import (
    HTTPUnauthorized,
    HTTPFound,
    HTTPNotFound,
    HTTPBadRequest,
    HTTPServerError)
from pyramid.settings import asbool, aslist
from sqlalchemy import desc
from pyisemail import is_email
from social_core.actions import do_auth
from social_pyramid.utils import psa
from social_core.exceptions import (
    AuthException, AuthFailed, AuthCanceled, AuthUnknownError,
    AuthMissingParameter, AuthStateMissing, AuthStateForbidden,
    AuthTokenError)


from assembl.models import (
    EmailAccount, IdentityProvider, SocialAuthAccount,
    AgentProfile, User, Role, LocalUserRole, Preferences,
    AbstractAgentAccount, Discussion, AgentStatusInDiscussion)
from assembl.auth import (
    P_READ, R_PARTICIPANT, P_SELF_REGISTER, P_SELF_REGISTER_REQUEST)
from assembl.auth.password import (
    verify_email_token, verify_password_change_token,
    password_change_token, Validity, get_data_token_time)
from assembl.auth.util import (
    discussion_from_request, roles_with_permissions, maybe_auto_subscribe,
    get_permissions)
from assembl.auth.social_auth import maybe_social_logout
from ...lib import config
from assembl.lib.sqla_types import EmailString
from assembl.lib.utils import normalize_email_name, get_global_base_url
from .. import (
    get_default_context, JSONError, get_provider_data,
    HTTPTemporaryRedirect, create_get_route, sanitize_next_view)

_ = TranslationStringFactory('assembl')
log = logging.getLogger(__name__)


public_roles = {Everyone, Authenticated}


def get_login_context(request, force_show_providers=False):
    slug = request.matchdict.get('discussion_slug', None)
    if slug:
        request.session['discussion'] = slug
    else:
        request.session.pop('discussion')
    discussion = discussion_from_request(request)
    get_routes = create_get_route(request, discussion)
    providers = get_provider_data(get_routes)
    hide_registration = (discussion
        and not public_roles.intersection(set(roles_with_permissions(
            discussion, P_READ)))
        and not roles_with_permissions(
            discussion, P_SELF_REGISTER_REQUEST, P_SELF_REGISTER))
    if not force_show_providers:
        hide_providers = aslist(request.registry.settings.get(
            'hide_login_providers', ()))

        if isinstance(hide_providers, string_types):
            hide_providers = (hide_providers, )
        providers = [p for p in providers if p['type'] not in hide_providers]

    return dict(get_default_context(request),
                providers=providers,
                providers_json=json.dumps(providers),
                saml_providers=request.registry.settings.get(
                    'SOCIAL_AUTH_SAML_ENABLED_IDPS', {}),
                hide_registration=hide_registration,
                identifier = request.params.get('identifier', ''),
                google_consumer_key=request.registry.settings.get(
                    'google.consumer_key', ''),
                next=handle_next_view(request),
                get_route=get_routes)

def _get_route_from_path(request, path):
    from pyramid.urldispatch import IRoutesMapper
    rm = request.registry.getUtility(IRoutesMapper)
    for route in rm.routelist:
        match = route.match(path)
        if match is not None:
            return route, match
    return None, {}


def handle_next_view(request, consume=False, default_suffix=''):
    slug = request.matchdict.get('discussion_slug', None)
    default = "/".join((x for x in ('', slug, default_suffix)
                        if x is not None))
    return sanitize_next_view(request.params.get('next', None)) or default


def maybe_contextual_route(request, route_name, **args):
    discussion_slug = None
    if request.matchdict:
        discussion_slug = request.matchdict.get('discussion_slug', None)
    if discussion_slug is None:
        discussion = discussion_from_request(request)
        if discussion:
            discussion_slug = discussion.slug
    if discussion_slug is None:
        return request.route_url(route_name, **args)
    else:
        return request.route_url(
            'contextual_' + route_name,
            discussion_slug=discussion_slug, **args)


[docs]def get_social_autologin(request, discussion=None, next_view=None): """Look for a mandatory social login""" discussion = discussion or discussion_from_request(request) if discussion: preferences = discussion.preferences else: preferences = Preferences.get_default_preferences() auto_login_backend = preferences['authorization_server_backend'] if not auto_login_backend: return None next_view = next_view or sanitize_next_view( request.params.get('next', None)) if discussion and not next_view: next_view = request.route_path('home', discussion_slug=discussion.slug) query = {"next": next_view} if ":" in auto_login_backend: auto_login_backend, provider = auto_login_backend.split(":", 1) query['idp'] = provider if discussion: return request.route_url( "contextual_social.auth", discussion_slug=discussion.slug, backend=auto_login_backend, _query=query) else: return request.route_url( "social.auth", backend=auto_login_backend, _query=query)
@view_config( route_name='logout', request_method='GET', renderer='assembl:templates/login.jinja2', ) @view_config( route_name='contextual_logout', request_method='GET', renderer='assembl:templates/login.jinja2', ) def logout(request): logout_url = maybe_social_logout(request) forget(request) if logout_url: return HTTPFound(location=logout_url) next_view = handle_next_view(request, True) return HTTPFound(location=next_view) @view_config( route_name='login', request_method='GET', http_cache=60, renderer='assembl:templates/login.jinja2', ) @view_config( route_name='contextual_login', request_method='GET', http_cache=60, renderer='assembl:templates/login.jinja2', ) @view_config( route_name='login_forceproviders', request_method='GET', http_cache=60, renderer='assembl:templates/login.jinja2', ) @view_config( route_name='contextual_login_forceproviders', request_method='GET', http_cache=60, renderer='assembl:templates/login.jinja2', ) def login_view(request): if request.scheme == "http"\ and asbool(config.get("accept_secure_connection")): return HTTPFound(get_global_base_url(True) + request.path_qs) force_providers = request.matched_route.name.endswith('_forceproviders') if request.matched_route.name == 'contextual_login': contextual_login = get_social_autologin(request) if contextual_login: return HTTPFound(contextual_login) return get_login_context(request, force_providers) def get_profile(request): id_type = request.matchdict.get('type').strip() identifier = request.matchdict.get('identifier').strip() session = AgentProfile.default_db if id_type == 'u': profile = session.query(User).filter_by( username=identifier).first() if not profile: raise HTTPNotFound() elif id_type == 'id': try: id = int(identifier) except: raise HTTPNotFound() profile = session.query(AgentProfile).get(id) if not profile: raise HTTPNotFound() elif id_type == 'email': account = session.query(AbstractAgentAccount).filter_by( email_ci=identifier).order_by(desc( AbstractAgentAccount.verified)).first() if not account: raise HTTPNotFound() profile = account.profile else: # TODO: CHECK if we're looking at username or uid account = session.query(SocialAuthAccount).join( IdentityProvider).filter( SocialAuthAccount.username == identifier and IdentityProvider.type == id_type).first() if not account: raise HTTPNotFound() profile = account.profile return profile @view_config(route_name='profile_user', request_method=("GET", "POST")) def assembl_profile(request): session = AgentProfile.default_db localizer = request.localizer profile = get_profile(request) id_type = request.matchdict.get('type').strip() logged_in = authenticated_userid(request) save = request.method == 'POST' # if some other user if not profile or not logged_in or logged_in != profile.id: if save: raise HTTPUnauthorized() # Add permissions to view a profile? return render_to_response( 'assembl:templates/view_profile.jinja2', dict(get_default_context(request), profile=profile, user=logged_in and session.query(User).get(logged_in))) confirm_email = request.params.get('confirm_email', None) if confirm_email: return HTTPTemporaryRedirect(location=request.route_url( 'confirm_emailid_sent', email_account_id=int(confirm_email))) errors = [] if save: user_id = profile.id redirect = False username = request.params.get('username', '').strip() if username and ( profile.username is None or username != profile.username): # check if exists if session.query(User).filter_by(username=username).count(): errors.append(localizer.translate(_( 'The username %s is already used')) % (username,)) else: profile.username = username if id_type == 'u': redirect = True name = request.params.get('name', '').strip() if name: profile.name = name p1, p2 = (request.params.get('password1', '').strip(), request.params.get('password2', '').strip()) if p1 != p2: errors.append(localizer.translate(_( 'The passwords are not identical'))) elif p1: profile.password_p = p1 add_email = request.params.get('add_email', '').strip() if add_email: if not is_email(add_email): return dict(get_default_context(request), error=localizer.translate(_( "This is not a valid email"))) # No need to check presence since not validated yet email = EmailAccount( email=add_email, profile=profile) session.add(email) if redirect: return HTTPFound(location=request.route_url( 'profile_user', type='u', identifier=username)) profile = session.query(User).get(user_id) unverified_emails = [ (ea, session.query(AbstractAgentAccount).filter_by( email_ci=ea.email_ci, verified=True).first()) for ea in profile.email_accounts if not ea.verified] get_route = create_get_route(request) return render_to_response( 'assembl:templates/profile.jinja2', dict(get_default_context(request), error='<br />'.join(errors), unverified_emails=unverified_emails, providers=get_provider_data(get_route), google_consumer_key=request.registry.settings.get( 'google.consumer_key', ''), the_user=profile, user=session.query(User).get(logged_in))) @view_config(route_name='avatar', request_method="GET") def avatar(request): profile = get_profile(request) size = int(request.matchdict.get('size')) if profile: gravatar_url = profile.avatar_url(size, request.application_url) return HTTPFound(location=gravatar_url) default = request.registry.settings.get( 'avatar.default_image_url', '') or \ request.application_url+'/static/img/icon/user.png' return HTTPFound(location=default) @view_config( route_name='register', request_method=("GET", "POST"), permission=NO_PERMISSION_REQUIRED, renderer='assembl:templates/register.jinja2' ) @view_config( route_name='contextual_register', request_method=("GET", "POST"), permission=NO_PERMISSION_REQUIRED, renderer='assembl:templates/register.jinja2' ) def assembl_register_view(request): slug = request.matchdict.get('discussion_slug', "") next_view = handle_next_view(request) if not request.params.get('email'): if request.scheme == "http"\ and asbool(config.get("accept_secure_connection")): return HTTPFound(get_global_base_url(True) + request.path_qs) response = get_login_context(request) return response forget(request) session = AgentProfile.default_db localizer = request.localizer name = request.params.get('name', '').strip() if not name or len(name) < 3: return dict(get_default_context(request), error=localizer.translate(_( "Please use a name of at least 3 characters"))) password = request.params.get('password', '').strip() password2 = request.params.get('password2', '').strip() email = request.params.get('email', '').strip() if not is_email(email): return dict(get_default_context(request), error=localizer.translate(_( "This is not a valid email"))) email = EmailString.normalize_email_case(email) # Find agent account to avoid duplicates! if session.query(AbstractAgentAccount).filter_by( email_ci=email, verified=True).count(): return dict(get_default_context(request), error=localizer.translate(_( "We already have a user with this email."))) if password != password2: return dict(get_default_context(request), error=localizer.translate(_( "The passwords should be identical"))) # TODO: Validate password quality # otherwise create. validate_registration = asbool(config.get( 'idealoom_validate_registration_emails')) user = User( name=name, password=password, verified=not validate_registration, creation_date=datetime.utcnow() ) email_account = EmailAccount( email=email, verified=not validate_registration, profile=user ) session.add(user) session.add(email_account) discussion = discussion_from_request(request) if discussion: permissions = get_permissions(Everyone, discussion.id) if not (P_SELF_REGISTER in permissions or P_SELF_REGISTER_REQUEST in permissions): discussion = None if discussion: _now = datetime.utcnow() agent_status = AgentStatusInDiscussion( agent_profile=user, discussion=discussion, first_visit=_now, last_visit=_now, user_created_on_this_discussion=True) session.add(agent_status) session.flush() if not validate_registration: if asbool(config.get('pyramid.debug_authorization')): # for debugging purposes from assembl.auth.password import email_token log.debug("email token: " + request.route_url( 'user_confirm_email', token=email_token(email_account))) headers = remember(request, user.id) user.successful_login() request.response.headerlist.extend(headers) if discussion: maybe_auto_subscribe(user, discussion) # TODO: Tell them to expect an email. return HTTPFound(location=next_view) return HTTPFound(location=maybe_contextual_route( request, 'confirm_emailid_sent', email_account_id=email_account.id)) @view_config(context=SMTPRecipientsRefused) def smtp_error_view(exc, request): path_info = request.environ['PATH_INFO'] localizer = request.localizer message = localizer.translate(_( "Your email was refused by the SMTP server. You probably entered an email that does not exist.")) if path_info.startswith('/data/') or path_info.startswith('/api/'): return JSONError(message) referrer = request.environ['HTTP_REFERER'] request.session.flash(message) referrer = referrer.split('?')[0] return HTTPFound(location=referrer) def from_identifier(identifier): session = AgentProfile.default_db if '@' in identifier: identifier = EmailString.normalize_email_case(identifier) account = session.query(AbstractAgentAccount).filter_by( email_ci=identifier).order_by(AbstractAgentAccount.verified.desc()).first() if account: user = account.profile return (user, account) else: user = session.query(User).filter_by( username=identifier).first() if user: return (user, None) return None, None @view_config( route_name='login', request_method='POST', permission=NO_PERMISSION_REQUIRED, ) @view_config( route_name='contextual_login', request_method='POST', permission=NO_PERMISSION_REQUIRED, ) def assembl_login_complete_view(request): # Check if proper authorization. Otherwise send to another page. session = AgentProfile.default_db # POST before GET identifier = (request.POST.get('identifier').strip() or request.GET.get('identifier').strip() or '') password = request.params.get('password', '').strip() next_view = handle_next_view(request, True) logged_in = authenticated_userid(request) localizer = request.localizer user = None user, account = from_identifier(identifier) if not user: error_message = localizer.translate(_("This user cannot be found")) request.session.flash(error_message) return HTTPFound(location=maybe_contextual_route( request, 'login', _query={"identifier": identifier} if identifier else None)) if account and not account.verified: return HTTPFound(location=maybe_contextual_route( request, 'confirm_emailid_sent', email_account_id=account.id)) if logged_in: if user.id != logged_in: # logging in as a different user # Could I be combining account? forget(request) else: # re-logging in? Why? return HTTPFound(location=next_view) if not user.check_password(password): error_message = localizer.translate(_("Invalid user and password")) user.login_failures += 1 # TODO: handle high failure count request.session.flash(error_message) return HTTPFound(location=maybe_contextual_route( request, 'login', _query={"identifier": identifier} if identifier else None)) user.successful_login() headers = remember(request, user.id) request.response.headerlist.extend(headers) discussion = discussion_from_request(request) if discussion: maybe_auto_subscribe(user, discussion) return HTTPFound(location=next_view) @view_config(route_name="contextual_social.auth", request_method=('GET', 'POST')) @psa('social.complete') def auth(request): request.session['discussion'] = request.matchdict['discussion_slug'] request.session['add_account'] = False return do_auth(request.backend, redirect_name='next') @view_config(route_name="add_social_account", request_method=('GET', 'POST')) @view_config( route_name="contextual_add_social_account", request_method=('GET', 'POST')) @psa('social.complete') def add_social_account(request): request.session['discussion'] = request.matchdict['discussion_slug'] request.session['add_account'] = True # TODO: Make False later. return do_auth(request.backend, redirect_name='next') @view_config( route_name='confirm_emailid_sent', request_method=("GET", "POST"), renderer='assembl:templates/confirm.jinja2', permission=NO_PERMISSION_REQUIRED ) @view_config( route_name='contextual_confirm_emailid_sent', request_method=("GET", "POST"), renderer='assembl:templates/confirm.jinja2', permission=NO_PERMISSION_REQUIRED ) def confirm_emailid_sent(request): # TODO: How to make this not become a spambot? id = int(request.matchdict.get('email_account_id')) email = AbstractAgentAccount.get(id) if not email: raise HTTPNotFound() localizer = request.localizer context = get_default_context(request) if email.verified: # Your email is fine, why do you want to confirm it? # Temporary: explain, but it's a dead-end. # TODO: Unlog and redirect to login. return dict( context, profile_id=email.profile_id, action = context['get_route']("confirm_emailid_sent", email_account_id=id), email_account_id=str(id), title=localizer.translate(_('This email address is already confirmed')), description=localizer.translate(_( 'You do not need to confirm this email address, it is already confirmed.'))) send_confirmation_email(request, email) return dict( get_default_context(request), action = context['get_route']("confirm_emailid_sent", email_account_id=id), profile_id=email.profile_id, email_account_id=request.matchdict.get('email_account_id'), title=localizer.translate(_('Confirmation requested')), description=localizer.translate(_( 'A confirmation e-mail has been sent to your account and should be in your inbox in a few minutes. ' 'It contains a confirmation link, please click on it in order to confirm your e-mail address. ' 'If you did not receive any confirmation e-mail (check your spams), click here.'))) @view_config( route_name='user_confirm_email', request_method="GET", renderer='assembl:templates/email_confirmed.jinja2', permission=NO_PERMISSION_REQUIRED ) @view_config( route_name='contextual_user_confirm_email', request_method="GET", renderer='assembl:templates/email_confirmed.jinja2', permission=NO_PERMISSION_REQUIRED ) def user_confirm_email(request): token = request.matchdict.get('token') or '' account, validity = verify_email_token(token) session = AbstractAgentAccount.default_db logged_in = authenticated_userid(request) # if mismatch? localizer = request.localizer if account and account.profile_id != logged_in: # token for someone else: forget login. logged_in = None forget(request) token_date = get_data_token_time(token) old_token = ( account is None or token_date is None or ( account.profile.last_login and token_date < account.profile.last_login)) inferred_discussion = discussion = discussion_from_request(request) if account and not discussion: # We do not know from which discussion the user started to log in; # See if only involved in one discussion discussions = account.profile.involved_in_discussion if len(discussions) == 1: inferred_discussion = discussions[0] if account and account.verified and logged_in: # no need to revalidate, just send to discussion. # Question: maybe_auto_subscribe? Doubt it. request.session.flash(localizer.translate( _("Email <%s> already confirmed")) % (account.email,)) return HTTPFound(location=request.route_url( 'home' if inferred_discussion else 'discussion_list', discussion_slug=inferred_discussion.slug)) if validity != Validity.VALID or old_token: # V-, B-: Invalid or obsolete token # Offer to send a new token if account and not account.verified: # bad token, unverified account... offer a new token if validity != Validity.VALID: error = localizer.translate(_( "This link was not valid. We sent another.")) else: error = localizer.translate(_( "This link has been used. We sent another.")) request.session.flash(error) return HTTPFound(location=maybe_contextual_route( request, 'confirm_emailid_sent', email_account_id=account.id)) else: if account and account.verified: # bad token, verified account... send them to login error = localizer.translate( _("Email <%s> already confirmed")) % (account.email,) else: # now what? We do not have the email. # Just send to login for now error = localizer.translate(_( "This link is not valid. Please attempt to login to get another one.")) request.session.flash(error) return HTTPFound(location=maybe_contextual_route( request, 'login', _query=dict( identifier=account.email if account else None))) # By now we know we have a good token; make it login-equivalent. user = account.profile assert isinstance(user, User) # accounts should not get here. OK to fail. headers = remember(request, user.id) request.response.headerlist.extend(headers) user.successful_login() username = user.username next_view = handle_next_view(request, False) if account.verified: message = localizer.translate( _("Email <%s> already confirmed")) % (account.email,) else: # maybe another profile already verified that email other_account = session.query(AbstractAgentAccount).filter_by( email_ci=account.email_ci, verified=True).first() if other_account: # We have two versions of the email, delete the unverified one session.delete(account) if other_account.profile != user: # Give priority to the one where the email was verified last. other_profile = other_account.profile user.merge(other_profile) session.delete(other_profile) if user.username: username = user.username account = other_account account.verified = True user.verified = True # do not use inferred discussion for auto_subscribe user.successful_login() if discussion and maybe_auto_subscribe(user, discussion): message = localizer.translate(_( "Your email address %s has been confirmed, " "and you are now subscribed to discussion's " "default notifications.")) % (account.email,) else: message = localizer.translate(_( "Your email address %s has been confirmed." )) % (account.email,) if inferred_discussion: request.session.flash(message, 'message') return HTTPFound(location=request.route_url( 'home', discussion_slug=inferred_discussion.slug)) else: return HTTPFound( location=request.route_url('discussion_list')) @view_config( context=AuthException, # maybe more specific? renderer='assembl:templates/login.jinja2', ) def login_denied_view(request): # TODO: Go to appropriate login page, and flash error message. localizer = request.localizer request.session.flash(localizer.translate(_('Login failed, try again'))) get_route = create_get_route(request) return HTTPFound(location=get_route('login', _query=request.GET or None)) @view_config( route_name='confirm_email_sent', request_method=("GET", "POST"), renderer='assembl:templates/confirm.jinja2', permission=NO_PERMISSION_REQUIRED ) @view_config( route_name='contextual_confirm_email_sent', request_method=("GET", "POST"), renderer='assembl:templates/confirm.jinja2', permission=NO_PERMISSION_REQUIRED ) def confirm_email_sent(request): localizer = request.localizer # TODO: How to make this not become a spambot? email = request.matchdict.get('email') if not email: raise HTTPNotFound() if '@' not in email: raise HTTPBadRequest("Not an email") email = EmailString.normalize_email_case(email) email_objects = AbstractAgentAccount.default_db.query( AbstractAgentAccount).filter_by(email_ci=email) verified_emails = [e for e in email_objects if e.verified] unverified_emails = [e for e in email_objects if not e.verified] if len(verified_emails) > 1: # TODO!: Merge accounts. raise HTTPServerError("Multiple verified emails") elif len(verified_emails): if len(unverified_emails): # TODO!: Send an email, mention duplicates, and... # offer to merge accounts? # Send an email to other emails of the duplicate? Sigh! pass return HTTPFound(location=maybe_contextual_route( request, 'login', _query=dict( identifer=email, error=localizer.translate(_( "This email is already confirmed."))))) else: if len(unverified_emails): # Normal case: Send an email. May be spamming for email_account in unverified_emails: send_confirmation_email(request, email_account) context = get_default_context(request) return dict( context, action=context['get_route']("confirm_email_sent", email=email), email=email, title=localizer.translate(_('Confirmation requested')), description=localizer.translate(_( 'A confirmation e-mail has been sent to your account and should be in your inbox in a few minutes. ' 'It contains a confirmation link, please click on it in order to confirm your e-mail address. ' 'If you did not receive any confirmation e-mail (check your spams), click here.'))) else: # We do not have an email to this name. return HTTPFound(location=maybe_contextual_route( request, 'register', email=email, _query=dict( error=localizer.translate(_( "We do not know about this email."))))) @view_config( route_name='request_password_change', request_method=("GET", "POST"), renderer='assembl:templates/request_password_change.jinja2', permission=NO_PERMISSION_REQUIRED ) @view_config( route_name='contextual_request_password_change', request_method=("GET", "POST"), renderer='assembl:templates/request_password_change.jinja2', permission=NO_PERMISSION_REQUIRED ) def request_password_change(request): localizer = request.localizer identifier = request.params.get('identifier') or '' user_id = request.params.get('user_id') or '' error = request.params.get('error') or '' user = None if user_id: try: user = User.get(int(user_id)) identifier = identifier or user.get_preferred_email() or '' except: error = error or localizer.translate(_("This user cannot be found")) elif identifier: user, account = from_identifier(identifier) if user: user_id = user.id else: error = error or localizer.translate(_("This user cannot be found")) if error or not user: return dict( get_default_context(request), error=error, user_id=user_id, identifier=identifier, title=localizer.translate(_('I forgot my password'))) discussion_slug = request.matchdict.get('discussion_slug', None) route = 'password_change_sent' if discussion_slug: route = 'contextual_' + route return HTTPFound(location=maybe_contextual_route( request, 'password_change_sent', profile_id=user_id, _query=dict(email=identifier if '@' in identifier else ''))) @view_config( route_name='password_change_sent', request_method=("GET", "POST"), renderer='assembl:templates/confirm.jinja2', permission=NO_PERMISSION_REQUIRED ) @view_config( route_name='contextual_password_change_sent', request_method=("GET", "POST"), renderer='assembl:templates/confirm.jinja2', permission=NO_PERMISSION_REQUIRED ) def password_change_sent(request): localizer = request.localizer if not request.params.get('sent', False): profile_id = int(request.matchdict.get('profile_id')) profile = AgentProfile.get(profile_id) email = request.params.get('email') if not profile: raise HTTPNotFound("No profile "+str(profile_id)) else: email = email or profile.get_preferred_email() discussion = discussion_from_request(request) send_change_password_email(request, profile, email, discussion=discussion) profile_id=int(request.matchdict.get('profile_id')) context = get_default_context(request) return dict( context, profile_id=profile_id, action = context['get_route']("password_change_sent", profile_id=profile_id), error=request.params.get('error'), title=localizer.translate(_('Password change requested')), description=localizer.translate(_( 'We have sent you an email with a temporary connection link. ' 'Please use that link to log in and change your password.')))
[docs]@view_config( route_name='welcome', request_method="GET", renderer='assembl:templates/do_password_change.jinja2', permission=NO_PERMISSION_REQUIRED ) @view_config( route_name='contextual_welcome', request_method="GET", permission=NO_PERMISSION_REQUIRED ) @view_config( route_name='do_password_change', request_method="GET", permission=NO_PERMISSION_REQUIRED ) @view_config( route_name='contextual_do_password_change', request_method="GET", permission=NO_PERMISSION_REQUIRED ) def do_password_change(request): "Validate the change_password token, and react accordingly." # Codes below refer to those cases: # V. token Valid(+) or invalid(-)? (Possibly expired through internal date) # P. user has(+) a Password or not (-)? # W. Welcome(+) vs change password(-) # B. last login absent, or Before token created (+) vs last login after token created (-) # L. user is already Logged in(+) or not(-)? welcome = 'welcome' in request.matched_route.name localizer = request.localizer discussion = discussion_from_request(request) token = request.matchdict.get('token') user, validity = verify_password_change_token(token) logged_in = authenticated_userid(request) if user and user.id != logged_in: # token for someone else: forget login. logged_in = None forget(request) lacking_password = user is not None and user.password is None token_date = get_data_token_time(token) old_token = ( user is None or token_date is None or ( user.last_login and token_date < user.last_login)) log.debug("pwc V%sP%sW%sB%sL%s" % tuple(["-" if b else "+" for b in ( validity != Validity.VALID, lacking_password, not welcome, old_token, logged_in is None)])) if welcome and not lacking_password: # W+P+: welcome link sends onwards irrespective of token if logged_in: # L+: send onwards to discussion return HTTPFound(location=request.route_url( 'home' if discussion else 'discussion_list', discussion_slug=discussion.slug)) else: # L-: offer to login return HTTPFound(location=maybe_contextual_route( request, 'login', _query=dict( identifier=user.get_preferred_email() if user else None))) if (validity != Validity.VALID or old_token) and not logged_in: # V-, V+P+W-B-L-: Invalid or obsolete token (obsolete+logged in treated later.) # Offer to send a new token if validity != Validity.VALID: error = localizer.translate(_( "This link is not valid. Do you want us to send another?")) else: error = localizer.translate(_( "This link has been used. Do you want us to send another?")) request.session.flash(error) return HTTPFound(location=maybe_contextual_route( request, 'request_password_change', _query=dict( user_id=user.id if user else ''))) # V+: Valid token (encompasses P-B+, W-, B-L+); ALSO V-L+ # V+P-B- should not happen, but we'll treat it the same. # go through password change dialog. We'll complete login afterwards. if welcome: platform_name = config.get("platform_name") if discussion: request.session.flash(localizer.translate(_( "You will enter the discussion as <b>{name}</b>.") ).format(name=user.name), 'message') else: discussion_topic = platform_name request.session.flash(localizer.translate(_( "You will enter {platform_name} as <b>{name}</b>.") ).format(platform_name=platform_name, name=user.name), 'message') request.session.flash(localizer.translate(_( "Please choose your password for security reasons.") ).format(name=user.name), 'message') return HTTPFound(location=maybe_contextual_route( request, 'finish_password_change', _query=dict( token=token, welcome=welcome)))
@view_config( route_name='finish_password_change', request_method=("GET", "POST"), renderer='assembl:templates/do_password_change.jinja2', permission=NO_PERMISSION_REQUIRED ) @view_config( route_name='contextual_finish_password_change', request_method=("GET", "POST"), permission=NO_PERMISSION_REQUIRED, renderer='assembl:templates/do_password_change.jinja2', ) def finish_password_change(request): localizer = request.localizer token = request.params.get('token') title = request.params.get('title') welcome = asbool(request.params.get('welcome')) discussion = discussion_from_request(request) if welcome: title = localizer.translate(_( 'Welcome to {discussion_topic}.')).format( discussion_topic=discussion.topic if discussion else config.get("platform_name")) else: title = localizer.translate(_('Change your password')) user, validity = verify_password_change_token(token) logged_in = authenticated_userid(request) # if mismatch? if user and user.id != logged_in: # token for someone else: forget login. logged_in = None forget(request) token_date = get_data_token_time(token) old_token = ( user is None or token_date is None or ( user.last_login and token_date < user.last_login)) if (validity != Validity.VALID or old_token) and not logged_in: # V-, V+P+W-B-L-: Invalid or obsolete token (obsolete+logged in treated later.) # Offer to send a new token if validity != Validity.VALID: error = localizer.translate(_( "This link is not valid. Do you want us to send another?")) else: error = localizer.translate(_( "This link has been used. Do you want us to send another?")) request.session.flash(error) return HTTPFound(location=maybe_contextual_route( request, 'request_password_change', _query=dict( user_id=user.id if user else ''))) error = None p1, p2 = (request.params.get('password1', '').strip(), request.params.get('password2', '').strip()) if p1 != p2: error = localizer.translate(_('The passwords are not identical')) elif p1: user.password_p = p1 user.successful_login() headers = remember(request, user.id) request.response.headerlist.extend(headers) if discussion: maybe_auto_subscribe(user, discussion) request.session.flash(localizer.translate(_( "Password changed")), 'message') return HTTPFound(location=request.route_url( 'home' if discussion else 'discussion_list', discussion_slug=discussion.slug)) return dict( get_default_context(request), title=title, token=token, error=error) def send_confirmation_email(request, email, immediate=False): mailer = get_mailer(request) localizer = request.localizer confirm_what = localizer.translate(_('email')) subject = localizer.translate(_("Please confirm your {confirm_what} with {idealoom}")) if isinstance(email.profile, User) and not email.profile.verified: confirm_what = localizer.translate(_('account')) text_message = localizer.translate(_(u"""Hello, {name}, and welcome to {idealoom}! Please confirm your email address &lt;{email}&gt; and complete your registration by clicking the link below. <{confirm_url}> Best regards, The {idealoom} Team""")) html_message = localizer.translate(_(u"""<p>Hello, {name}, and welcome to {idealoom}!</p> <p>Please <a href="{confirm_url}">click here to confirm your email address</a> &lt;{email}&gt; and complete your registration.</p> <p>Best regards,<br />The {idealoom} Team</p>""")) else: text_message = localizer.translate(_(u"""Hello, {name}! Please confirm your new email address <{email}> on your {idealoom} account by clicking the link below. <{confirm_url}> Best regards, The {idealoom} Team""")) html_message = localizer.translate(_(u"""<p>Hello, {name}!</p> <p>Please <a href="{confirm_url}">click here to confirm your new email address</a> &lt;{email}&gt; on your {idealoom} account.</p> <p>Best regards,<br />The {idealoom} Team</p>""")) from assembl.auth.password import email_token data = dict( name=email.profile.name, email=email.email, idealoom=config.get("platform_name"), confirm_what=confirm_what, confirm_url=maybe_contextual_route( request, 'user_confirm_email', token=email_token(email)) ) message = Message( subject=subject.format(**data), sender=config.get('idealoom_admin_email'), recipients=["%s <%s>" % (email.profile.name, email.email)], body=text_message.format(**data), html=html_message.format(**data)) message.extra_headers['Date'] = datetime.utcnow().strftime( '%a, %d %b %Y %T %z (+0000)') # TODO: message ID. # TODO: create my own message subclass that autofills this. if immediate: mailer.send_immediately(message) else: mailer.send(message) def send_change_password_email( request, profile, email=None, subject=None, text_body=None, html_body=None, discussion=None, sender_name=None, welcome=False): mailer = get_mailer(request) localizer = request.localizer data = dict( idealoom=config.get("platform_name"), name=profile.name, confirm_url=maybe_contextual_route( request, 'welcome' if welcome else 'do_password_change', token=password_change_token(profile))) sender_email = config.get('idealoom_admin_email') if discussion: data.update(dict( discussion_topic=discussion.topic, discussion_url=discussion.get_url())) sender_name = sender_name or discussion.topic if sender_name: sender_name = normalize_email_name(sender_name) sender = '"%s" <%s>' % (sender_name, sender_email) sender_name = Header(sender_name, 'utf-8').encode() if len(sender) > 255: sender = sender_email else: sender = sender_email subject = (subject or localizer.translate( _("Request for password change"))).format(**data) #subject = Header(subject, 'utf-8').encode() # Fails in some cases??? if text_body is None or html_body is not None: # if text_body and no html_body, html_body remains None. html_body = html_body or localizer.translate(_(u"""<p>Hello, {name}!</p> <p>We have received a request to change the password on your {idealoom} account. Please <a href="{confirm_url}">click here to confirm your password change</a>.</p> <p>If you did not ask to reset your password please disregard this email.</p> <p>Best regards,<br />The {idealoom} Team</p> """)) text_body = text_body or localizer.translate(_(u"""Hello, {name}! We have received a request to change the password on your {idealoom} account. To confirm your password change please click on the link below. <{confirm_url}> If you did not ask to reset your password please disregard this email. Best regards, The {idealoom} Team """)) message = Message( subject=subject, sender=sender, recipients=["%s <%s>" % ( profile.name, email or profile.get_preferred_email())], body=text_body.format(**data), html=html_body.format(**data)) message.extra_headers['Date'] = datetime.utcnow().strftime( '%a, %d %b %Y %T %z (+0000)') mailer.send(message)