"""The IdeaLoom-specific configuration of PythonSocialAuth_
.. _PythonSocialAuth: http://psa.matiasaguirre.net/
"""
import re
from datetime import datetime
import logging
from pprint import pprint
from pyramid.events import subscriber, BeforeRender
from pyramid.security import (
remember,
forget,
Everyone,
authenticated_userid)
from pyramid.config import aslist
import simplejson as json
from social_pyramid.utils import backends
from social_pyramid.strategy import PyramidStrategy
from social_core.utils import to_setting_name, setting_name, SETTING_PREFIX
from assembl.models import (
User, Preferences, AbstractAgentAccount, IdentityProvider)
from .util import discussion_from_request, maybe_auto_subscribe
from ..lib import config
from .generic_auth_backend import load_backends, GenericAuth
log = logging.getLogger(__name__)
def login_user(backend, user, user_social_auth):
remember(backend.strategy.request, user.id)
def login_required(request):
logged_in = authenticated_userid(request)
return logged_in is None
@subscriber(BeforeRender)
def add_social(event):
request = event['request']
event['social'] = backends(request, request.user)
def user_details(
strategy, details, response, social=None, *args, **kwargs):
if social:
social.interpret_social_auth_details(details)
social.set_extra_data(response)
if social.verified:
social.profile.verified = True
[docs]def associate_by_email(backend, details, provider=None, user=None, *args, **kwargs):
"""
Find other users of the same email. One of them may be appropriate.
Taken from social_core.pipeline.social_auth.associate_by_email and rewritten
"""
email = details.get('email')
provider = IdentityProvider.get_by_type(backend.name)
if email and provider.trust_emails:
# Try to associate accounts registered with the same email address,
# only if it's a single object. AuthException is raised if multiple
# objects are returned.
users = list(backend.strategy.storage.user.get_users_by_email(email))
if user and user not in users:
users.insert(0, user)
if len(users) == 0:
return None
user = users.pop(0)
if not isinstance(user, User):
# Assume it's safe to upgrade to user status already
user = user.change_class(User, None, verified=True)
return {'user': user, "other_users": users}
def social_user(backend, uid, user=None, *args, **kwargs):
provider = backend.name
social = backend.strategy.storage.user.get_social_auth(
provider, uid)
user = social.user if social else None
return {'social': social,
'user': user,
'is_new': user is None,
'new_association': False}
def maybe_merge(
backend, details, user=None, other_users=None,
*args, **kwargs):
# If we do not already have a user, see if we're in a situation
# where we're adding an account to an existing user, and maybe
# even merging
request = backend.strategy.request
adding_account = request.session.get("add_account", None)
if adding_account is not None:
del request.session["add_account"]
# current discussion and next?
logged_in = authenticated_userid(request)
if logged_in:
logged_in = User.get(logged_in)
if adding_account:
if user and user != logged_in:
# logged_in presumably newer?
logged_in.merge(user)
logged_in.db.delete(user)
logged_in.db.flush()
user = logged_in
else:
forget(request)
if other_users:
if not user:
user = other_users.pop(0)
# Merge other accounts with same verified email
for profile in other_users:
user.merge(profile)
profile.delete()
return {"user": user}
def associate_user(backend, uid, user=None, social=None, details=None,
*args, **kwargs):
results = None
if not social:
from social_core.pipeline.social_auth import \
associate_user as psa_associate_user
results = psa_associate_user(
backend, uid, user, social, *args, **kwargs)
# User has logged in with this account
social = results.get('social', social)
if social:
social.successful_login()
# Delete old email accounts
email = (details or {}).get('email', None)
if email and results and results['new_association']:
for acc in user.email_accounts:
if acc.email_ci == email:
acc.delete()
return results
def auto_subscribe(backend, social, user, *args, **kwargs):
if not user:
return
if user and social.email:
# Remove pure-email account if found social.
for email_account in user.email_accounts:
if email_account.email_ci == social.email:
social.verified |= email_account.verified
email_account.delete()
break
request = backend.strategy.request
discussion = discussion_from_request(request)
# Maybe discussion slug is in the 'next' parameter
if not discussion:
next_param = request.GET.get('next', request.POST.get('next', None))
if next_param:
next_param = next_param.strip('/').split('/')
if (len(next_param) == 2 and
next_param[1] == 'home' or next_param[0] == 'debate'):
from assembl.models import Discussion
slug = next_param[0] if next_param[1] == 'home' else next_param[1]
discussion = Discussion.default_db.query(
Discussion).filter_by(slug=slug).first()
if discussion:
user.successful_social_login()
maybe_auto_subscribe(user, discussion)
return {"discussion": discussion}
def print_details(backend, details, *args, **kwargs):
pprint(details, args, kwargs)
[docs]def maybe_social_logout(request):
"""If the user has a an account with the default social provider,
and that account has a logout URL, redirect there.
Maybe the next argument should be carried?"""
discussion = discussion_from_request(request)
if not discussion:
return
backend_name = discussion.preferences['authorization_server_backend']
if not backend_name:
return
user_id = request.authenticated_userid
if not user_id:
return
user = User.get(user_id)
for account in user.accounts:
if getattr(account, 'provider_with_idp', None) == backend_name:
break
else:
return
# TODO: tell the account that the login has expired.
# Also check if already expired?
return config.get('SOCIAL_AUTH_%s_LOGOUT_URL' % (
account.provider.upper(),))
# Here, I thought of using the PSA disconnect.
# But actually the default pipeline destroys the entry!
#
# backend_cls = get_backend(
# load_backends(config.get('SOCIAL_AUTH_AUTHENTICATION_BACKENDS')),
# account.provider)
# strategy = load_strategy(request)
# backend = backend_cls(strategy)
# backend.disconnect(user=user)
class AppStrategy(PyramidStrategy):
def request_is_secure(self):
return self.request.scheme == 'https' or config.get('secure_proxy')
def request_path(self):
return self.request.path
def request_port(self):
return self.request.host_port
def request_get(self):
return self.request.GET
def request_post(self):
return self.request.POST
def get_preferences(self):
discussion = discussion_from_request(self.request)
if discussion:
return discussion.preferences
else:
return Preferences.get_default_preferences()
def get_setting(self, name):
"""Return value for given setting name. May extract from discussion prefs"""
# TODO: Add WHITELISTED_DOMAINS
# TODO: Obsolete code: those preferences are gone.
if name.split("_")[-1] in ('KEY', 'SECRET', 'SERVER'):
prefs = self.get_preferences()
backend = prefs["authorization_server_backend"]
if backend:
m = re.match((
r"^(?:SOCIAL_AUTH_)?(?:%s_)?(KEY|SECRET|SERVER)$"
% to_setting_name(backend)), name)
if m:
val = prefs.get("authorization_" + m.group(1).lower(), None)
if val is not None:
return val
return super(AppStrategy, self).get_setting(name)
# def partial_from_session(self, session):
# from social_core.pipeline.utils import partial_from_session
# return partial_from_session(self, session)
def build_absolute_uri(self, path=None):
path = super(AppStrategy, self).build_absolute_uri(path)
if self.request_is_secure() and path.startswith('http:'):
path = 'https' + path[4:]
return path
def get_pipeline(self, backend=None):
return (
# Optional step: print details so we see what's going on
# 'assembl.auth.social_auth.print_details',
# Get the information we can about the user and return it in a simple
# format to create the user instance later. On some cases the details are
# already part of the auth response from the provider, but sometimes this
# could hit a provider API.
'social_core.pipeline.social_auth.social_details',
# Get the social uid from whichever service we're authing thru. The uid is
# the unique identifier of the given user in the provider.
'social_core.pipeline.social_auth.social_uid',
# Verifies that the current auth process is valid within the current
# project, this is were emails and domains whitelists are applied (if
# defined).
'social_core.pipeline.social_auth.auth_allowed',
# Checks if the current social-account is already associated in the site.
'assembl.auth.social_auth.social_user',
# Make up a username for this person, appends a random string at the end if
# there's any collision.
'social_core.pipeline.user.get_username',
# Send a validation email to the user to verify its email address.
# 'social_core.pipeline.mail.mail_validation',
# Associates the current social details with another user account with
# a similar email address.
'assembl.auth.social_auth.associate_by_email',
# If we do not already have a user, see if we're in a situation
# where we're adding an account to an existing user, and maybe
# even merging. We may also forget the logged in user.
'assembl.auth.social_auth.maybe_merge',
# Create a user account if we haven't found one yet.
'social_core.pipeline.user.create_user',
# Create the record that associated the social account with this user.
'assembl.auth.social_auth.associate_user',
# Populate the extra_data field in the social record with the values
# specified by settings (and the default ones like access_token, etc).
'social_core.pipeline.social_auth.load_extra_data',
# Update the user record with any changed info from the auth service.
'assembl.auth.social_auth.user_details',
# Autosubscribe if appropriate
'assembl.auth.social_auth.auto_subscribe',
)
[docs]def get_active_auth_strategies(settings):
"""Give the list of available social auth providers.
Includes multiple instances if a provider can have multiple servers.
This currently includes SAML, and eventually wordpress.
TODO: Should replace the login_providers config variable"""
all_backends = load_backends(settings.get('SOCIAL_AUTH_AUTHENTICATION_BACKENDS'))
for backend_name in all_backends:
def get_setting(name):
return (settings.get(setting_name(SETTING_PREFIX, backend_name, name), None)
or settings.get(setting_name(backend_name, name), None))
if backend_name == 'wordpress-oauth2':
# TODO: This special case needs to be treated the same as saml asap.
# Also: maybe check preferences
yield backend_name
elif backend_name == 'saml':
# special case: Multiple IDPs
idps = get_setting('ENABLED_IDPS') or {}
for idp in idps.keys():
yield 'saml:' + idp
elif issubclass(all_backends[backend_name], GenericAuth):
if backend_name in settings.get('SOCIAL_AUTH_GENERICAUTH_SUBCONFIGS'):
yield backend_name
elif get_setting('key'):
yield backend_name
def adjust_settings(settings):
settings['login_providers'] = aslist(settings.get('login_providers', ''))
settings['trusted_login_providers'] = aslist(settings.get('trusted_login_providers', ''))
if not any(settings['login_providers']):
log.warning('no login providers configured, double check '
'your ini file and add a few')
for k, v in settings.items():
if k.startswith("SOCIAL_AUTH_"):
if k.endswith("_SCOPE") or k.endswith("_FIELD_SELECTORS"):
settings[k] = aslist(v)
elif isinstance(v, str) and v.lstrip().startswith('{'):
settings[k] = json.loads(v)
for name in ('SOCIAL_AUTH_AUTHENTICATION_BACKENDS',
'SOCIAL_AUTH_USER_FIELDS',
'SOCIAL_AUTH_PROTECTED_USER_FIELDS',
'SOCIAL_AUTH_FIELDS_STORED_IN_SESSION'):
settings[name] = aslist(settings.get(name, ''))
[docs]def includeme(config):
"""Pre-parse certain settings for python_social_auth, then load it."""
adjust_settings(config.get_settings())