Source code for assembl.models.social_auth

"""Models for integration of `Python Social Auth`_.

.. _`Python Social Auth`: http://psa.matiasaguirre.net/
"""
from future import standard_library
standard_library.install_aliases()
from builtins import str
import logging
import re
from datetime import datetime, timedelta

from future.utils import string_types
import transaction
from sqlalchemy import (
    Boolean,
    Column,
    String,
    ForeignKey,
    Integer,
    Unicode,
    UnicodeText,
    DateTime,
    Time,
    Binary,
    Text,
    inspect,
    desc,
    event,
    Index,
    UniqueConstraint
)
import simplejson as json
from sqlalchemy.orm import (
    relationship, backref, deferred)
from social_core.exceptions import MissingBackend
from social_sqlalchemy.storage import (
    SQLAlchemyMixin, SQLAlchemyNonceMixin, UserMixin,
    SQLAlchemyAssociationMixin, SQLAlchemyCodeMixin, BaseSQLAlchemyStorage)
from sqlalchemy.ext.mutable import MutableDict
from urllib.parse import quote, unquote

from ..lib import config
from ..lib.sqla_types import URLString, JSONType
from .auth import (
    AbstractAgentAccount, IdentityProvider, AgentProfile, User)
from ..auth.generic_auth_backend import GenericAuth
from . import Base
from ..semantic.namespaces import (
    SIOC, ASSEMBL, QUADNAMES, FOAF, DCTERMS, RDF)
from ..semantic.virtuoso_mapping import (
    QuadMapPatternS, USER_SECTION, PRIVATE_USER_SECTION)


log = logging.getLogger(__name__)


[docs]class AppSocialAuthMixin(Base, SQLAlchemyMixin): __abstract__ = True @classmethod def _session(cls): return cls.default_db() @classmethod def _query(cls): return cls._session().query(cls) @classmethod def _new_instance(cls, model, *args, **kwargs): session = kwargs.pop('session', cls.default_db()) instance = model(*args, **kwargs) session.add(instance) session.flush() return instance @classmethod def _save_instance(cls, instance): instance.session().add(instance) instance.session().flush() return instance @classmethod def _flush(cls): try: cls._session().flush() except AssertionError: with transaction.manager as manager: manager.commit()
[docs] def save(self): self.session.add(self) self.session.flush()
[docs]class Nonce(AppSocialAuthMixin, SQLAlchemyNonceMixin): pass
[docs]class Association(AppSocialAuthMixin, SQLAlchemyAssociationMixin): pass
[docs]class Code(AppSocialAuthMixin, SQLAlchemyCodeMixin): pass
[docs]class SocialAuthAccount( AbstractAgentAccount, AppSocialAuthMixin, UserMixin): """An account with an external :py:class:`.auth.IdentityProvider`""" __tablename__ = "social_auth_account" __mapper_args__ = { 'polymorphic_identity': 'social_auth_account', } __table_args__ = ( UniqueConstraint('provider_id', 'provider_domain', 'uid'), ) UID_LENGTH = config.get('UID_LENGTH', 255) id = Column(Integer, ForeignKey( 'abstract_agent_account.id', ondelete='CASCADE', onupdate='CASCADE' ), primary_key=True) provider_id = Column( Integer, ForeignKey('identity_provider.id', ondelete='CASCADE', onupdate='CASCADE'), nullable=False, info={'rdf': QuadMapPatternS(None, SIOC.member_of)}) identity_provider = relationship(IdentityProvider) username = Column(Unicode(200)) # info={'rdf': QuadMapPatternS(None, SIOC.name)}) provider_domain = Column(String(255)) uid = Column(String(UID_LENGTH), nullable=False) # info={'rdf': QuadMapPatternS(None, SIOC.id)}) extra_data = Column(MutableDict.as_mutable(JSONType)) picture_url = Column(URLString) user = relationship(AgentProfile, backref='identity_accounts') last_checked = Column(DateTime) def successful_login(self): self.last_checked = datetime.utcnow() def login_expiry(self): if self.last_checked is None: return datetime.utcnow() - timedelta(seconds=1) expiry = self.login_duration() if not expiry: return None return self.last_checked + timedelta(expiry) @property def provider(self): return self.identity_provider.provider_type @property def provider_with_idp(self): provider = self.provider if provider == 'saml': # PSA prefixes SAML uids with the idp_name idp_name = self.uid.split(':')[0] # Also available as self.extra_data['idp_name'] return ':'.join((provider, idp_name)) return provider @provider.setter def provider(self, value): self.identity_provider = IdentityProvider.get_by_type(value) def __init__(self, **kwargs): super(SocialAuthAccount, self).__init__(**kwargs) self.interpret_profile(self.extra_data) # reimplementation of UserSocialAuth
[docs] @classmethod def username_max_length(cls): return User.__table__.columns.get('username').type.length
[docs] @classmethod def user_model(cls): return User
# reimplementation of SQLAlchemyUserMixin
[docs] @classmethod def changed(cls, user): cls._save_instance(user)
def set_extra_data(self, extra_data=None): if super(SocialAuthAccount, self).set_extra_data(extra_data): self.interpret_profile(self.extra_data)
[docs] @classmethod def allowed_to_disconnect(cls, user, backend_name, association_id=None): if association_id is not None: qs = cls._query().filter(cls.id != association_id) else: qs = cls._query().join(cls.identity_provider).filter(IdentityProvider.provider_type != backend_name) qs = qs.filter(cls.user == user) if hasattr(user, 'has_usable_password'): # TODO valid_password = user.has_usable_password() else: valid_password = True return valid_password or qs.count() > 0
[docs] @classmethod def disconnect(cls, entry): cls._session().delete(entry) cls._flush()
@classmethod def user_query(cls): return cls._session().query(cls.user_model())
[docs] @classmethod def user_exists(cls, *args, **kwargs): """ Return True/False if a User instance exists with the given arguments. Arguments are directly passed to filter() manager method. """ query = cls.user_query() username = kwargs.pop('username', None) if username: query = query.filter(User.username == username) return query.filter_by(*args, **kwargs).count() > 0
[docs] @classmethod def get_username(cls, user): """Return the username for given user""" # assume user is a User, not an AgentProfile return user.username
[docs] @classmethod def create_user(cls, email=None, username=None, fullname=None, *args, **kwargs): if fullname: kwargs['name'] = fullname user = cls._new_instance(cls.user_model(), *args, **kwargs) return user
[docs] @classmethod def get_user(cls, pk): return cls.user_query().get(pk)
[docs] @classmethod def get_users_by_email(cls, email): # Find users with similar email. # Only use if social provider is trusted to have verified email. users = cls.default_db().query(User).join( User.accounts).filter( AbstractAgentAccount.email_ci == email, ).all() # choose best known profile for base_account # prefer profiles with verified users, then users, then oldest profiles users.sort(key=lambda p: ( isinstance(p, User) and p.verified, isinstance(p, User), -p.id), reverse=True) return users
[docs] @classmethod def get_social_auth(cls, provider, uid): if not isinstance(uid, string_types): uid = str(uid) return cls._query().join( cls.identity_provider).filter( IdentityProvider.provider_type == provider, cls.uid == uid).first()
[docs] @classmethod def get_social_auth_for_user( cls, user, provider=None, id=None): qs = cls._query().filter_by(profile_id=user.id) if provider: qs = qs.join( cls.identity_provider).filter( IdentityProvider.provider_type == provider) if id: qs = qs.filter(cls.id == id) return qs
[docs] @classmethod def create_social_auth(cls, user, uid, provider): if not isinstance(uid, string_types): uid = str(uid) id_provider = IdentityProvider.get_by_type(provider) return cls._new_instance( cls, profile=user, uid=uid, identity_provider=id_provider, verified=id_provider.trust_emails)
# override social_core.storage.UserMixin.get_backend_instance def get_backend_instance(self, strategy): try: backend_class = self.get_backend(strategy) except MissingBackend: return None else: if issubclass(backend_class, GenericAuth): return backend_class(strategy=strategy, name=self.provider) else: return backend_class(strategy=strategy) # Lifted from IdentityProviderAccount
[docs] def signature(self): return ('idprovider_agent_account', self.provider_id, self.username, self.uid)
def interpret_profile(self, profile=None): profile = profile or self.extra_data if profile: self.populate_picture(profile) if not self.email: # May be missed by social auth. compensate. emails = profile.get('emails', []) if emails: self.email = emails[0].get('value', '') def interpret_social_auth_details(self, details): self.email = details.get("email", self.email) self.username = details.get('username', self.username) # TODO: Maybe see if username usable for user? fullname = details.get("fullname") if not fullname: first_name = details.get('first_name', None) last_name = details.get('last_name', None) if first_name and last_name: fullname = ' '.join((first_name, last_name)) if fullname and not self.user.name: self.user.name = fullname def display_name(self): # TODO: format according to provider, ie @ for twitter. if self.username: name = self.username else: name = self.uid return ":".join((self.identity_provider.provider_type, name)) def get_provider_name(self): return self.identity_provider.name def get_provider_type(self): return self.identity_provider.provider_type def real_name(self): if not self.full_name: info = self.extra_data name = info.get('name', None) or {} if isinstance(name, str): self.fullname = name elif name.get('formatted', None): self.full_name = name['formatted'] elif 'givenName' in name and 'familyName' in name: self.full_name = ' '.join( (name['givenName'], name['familyName'])) return self.full_name def populate_picture(self, profile): if 'photos' in profile: # google, facebook photos = [x.get('value', None) for x in profile['photos']] photos = [x for x in photos if x] if photos: self.picture_url = photos[0] elif 'image' in profile: # google photo = profile['image'].get('url', None) if photo: self.picture_url = photo elif profile.get('user', {}).get('mugshot_url_template', None): # yammer self.picture_url = profile['user']['mugshot_url_template'] elif profile.get('user', {}).get('mugshot_url', None): # yammer self.picture_url = profile['user']['mugshot_url'] elif profile.get('mugshot_url', None): # yammer self.picture_url = profile['mugshot_url'] elif self.identity_provider.provider_type.startswith('facebook'): account = profile.get('id', None) if account is None: accounts = [x.get('uid') for x in profile.get('accounts', ())] accounts = [x for x in accounts if x] if not accounts: return account = accounts[0] self.picture_url = 'http://graph.facebook.com/%s/picture' % (account) facebook_sizes = ( ('square', 50), ('small', 50), ('normal', 100), ('large', 200)) twitter_sizes = ( ('_mini', 25), ('_normal', 48), ('_bigger', 73), ('', 1000)) def avatar_url(self, size=32): picture_url = self.picture_url if not picture_url: return if config.get("accept_secure_connection"): # Make the connection https, known services can handle both. # Ideally we should check which ones work. picture_url = "https://" + picture_url.split("://", 1)[-1] if "{width}" in unquote(picture_url): # yammer picture_url = unquote(picture_url).format(width=size, height=size) return picture_url if self.identity_provider.provider_type.startswith('google'): modified = re.sub( r"((\?|&)(size|sz))=(\d+)", r"\1=%d" % (size,), picture_url) if modified == picture_url: separator = "&" if "?" in picture_url else "?" modified = picture_url + separator + 'sz=' + str(size) return modified elif self.identity_provider.provider_type.startswith('facebook'): for (size_name, name_size) in self.facebook_sizes: if size <= name_size: break return '%s?type=%s' % (picture_url, size_name) elif self.identity_provider.provider_type == 'twitter': for (size_name, name_size) in self.twitter_sizes: if size <= name_size: break return size_name.join(picture_url.split('_normal')) # @classmethod # def special_quad_patterns(cls, alias_maker, discussion_id): # return [QuadMapPatternS(AgentProfile.iri_class().apply( # SocialAuthAccount.profile_id), # FOAF.img, SocialAuthAccount.picture_url, # name=QUADNAMES.foaf_image, # conditions=(SocialAuthAccount.picture_url != None), # sections=(PRIVATE_USER_SECTION,))]
[docs] def unique_query(self): query, _ = super(SocialAuthAccount, self).unique_query() return query.filter_by( type=self.type, provider_id=self.provider_id, uid=self.uid), True
@classmethod def find_accounts(cls, provider, social_account): # Probably deprecated if 'email' in social_account: return provider.db.query(cls).filter_by( provider=provider, domain=social_account['domain'], email_ci=social_account['email']).all() elif 'uid' in social_account: return provider.db.query(cls).filter_by( provider=provider, domain=social_account['domain'], uid=social_account['uid']).all() elif 'username' in social_account: return provider.db.query(cls).filter_by( provider=provider, domain=social_account['domain'], username=social_account['username']).all() else: log.error("account needs username, uid or email" + social_account) raise RuntimeError("account needs username uid or email") def login_duration(self): data = self.extra_data intrinsic = None if 'expires' in data: intrinsic = data['expires'] elif 'expires_in' in data: intrinsic = data['expires_in'] provider = self.provider_with_idp provider = '_'.join(provider.split(':')) config_t = config.get('login_expiry_' + provider, None) if config_t is None and '_' in provider: config_t = config.get( 'login_expiry_' + provider.split('_')[0], None) if config_t is None: config_t = config.get('login_expiry_default', None) if intrinsic is not None: # convert to days intrinsic = float(intrinsic) / 864000 if config_t is not None: # take minimum of intrinsic or config. intrinsic = min(float(config_t), intrinsic) return float(intrinsic or config_t or 0) # temporary shims @property def profile_info_json(self): return self.extra_data @profile_info_json.setter def profile_info_json(self, val): self.extra_data = val self.interpret_profile(val)
class AppStorage(BaseSQLAlchemyStorage): user = SocialAuthAccount nonce = Nonce association = Association code = Code