Source code for assembl.auth.generic_auth_backend

import base64

from social_core.backends.utils import (
    get_backend, load_backends as social_load_backends, module_member)
from social_core.backends.base import BaseAuth
from social_core.backends.oauth import BaseOAuth2
import social_pyramid.utils
import jsonpath_ng
from future.utils import string_types

from ..lib import logging

log = logging.getLogger('assembl.auth')


[docs]class GenericAuth(BaseAuth): """A backend which takes all its attributes from configuration file""" def __init__(self, strategy=None, redirect_uri=None, name=None): self.name = name super(GenericAuth, self).__init__(strategy, redirect_uri) data = self.setting('GENERICAUTH_SUBCONFIGS') assert name in data # override class variables with local self.__dict__.update(data[name])
[docs] def setting(self, name, default=None): """Look in data first""" if name in self.__dict__: return getattr(self, name) return super(GenericAuth, self).setting(name, default)
@staticmethod def extract_path(data, path): r = jsonpath_ng.parse(path).find(data) if r: return r[0].value log.warn("Could not find %s in %s" % (path, data)) def format_json_vals(self, json, data): return {k: v.format(**data) if isinstance(v, string_types) else v for (k, v) in json.items()} def get_response_info(self, prop_name, response): # Simple case rPropName = self.setting(prop_name.upper() + "_PROP", None) if rPropName: return response.get(rPropName, None) # Complex case: use paths rPropName = self.setting(prop_name.upper() + "_PATH", None) if rPropName: return self.extract_path(response, rPropName)
[docs]class GenericOAuth2(GenericAuth, BaseOAuth2): """A Generic OAuth2 client""" # properties and settings from BaseAuth # EXTRA_DATA: None # REQUIRES_EMAIL_VALIDATION: False # SEND_USER_AGENT: False # SSL_PROTOCOL: None # properties and settings from OAuthAuth # AUTHORIZATION_URL: '' # ACCESS_TOKEN_URL: '' # ACCESS_TOKEN_METHOD: 'GET' # REVOKE_TOKEN_URL: None # REVOKE_TOKEN_METHOD: 'POST' # ID_KEY: 'id' # SCOPE_PARAMETER_NAME: 'scope' # DEFAULT_SCOPE: None # SCOPE_SEPARATOR: ' ' # properties and settings from BaseOAuth2 # REFRESH_TOKEN_URL: None # REFRESH_TOKEN_METHOD: 'POST' # RESPONSE_TYPE: 'code' # REDIRECT_STATE: True # STATE_PARAMETER: True
[docs] def get_user_details(self, response): """Return user details from arbitrary account""" def get_response_info(prop_name): return self.get_response_info(prop_name, response) name_info = {} for partName in ('first_name', 'last_name', 'fullname'): info = get_response_info(partName) if info is not None: name_info[partName] = info fullname, first_name, last_name = self.get_user_names(**name_info) return {'username': get_response_info('username') or first_name or '', 'email': get_response_info('email') or '', 'fullname': fullname, 'first_name': first_name, 'last_name': last_name}
def auth_headers(self): headers = self.setting("AUTH_HEADERS") if not headers: return super(GenericOAuth2, self).auth_headers() key, secret = self.get_key_and_secret() combo_key_secret = ':'.join((key, secret)) b64_combo_key_secret = base64.urlsafe_b64encode(combo_key_secret) format_data = dict( key=key, secret=secret, combo_key_secret=combo_key_secret, b64_combo_key_secret=b64_combo_key_secret) return self.format_json_vals(headers, format_data)
[docs] def user_data(self, access_token, *args, **kwargs): """Loads user data from service""" params = self.setting("USER_INFO_PARAMS") headers = self.setting("USER_INFO_HEADERS") data_subpath = self.setting("USER_INFO_SUBPATH") key, secret = self.get_key_and_secret() response = kwargs.pop('response') format_data = dict( response, access_token=access_token, key=key, secret=secret) if params: params = self.format_json_vals(params, format_data) if headers: headers = self.format_json_vals(headers, format_data) result = self.get_json( self.setting("USER_INFO_URL"), params=params, headers=headers ) if result and data_subpath: return self.extract_path(result, data_subpath) return result
def load_backends(backends, force_load=False): parametrized_backends = [b for b in backends if ':' in b] simple_backends = [b for b in backends if ':' not in b] backends = social_load_backends(simple_backends) for fullname in parametrized_backends: backend, name = fullname.split(':') backend = module_member(backend) if issubclass(backend, GenericAuth): backends[name] = backend return backends def load_backend(strategy, name, redirect_uri): backends = social_pyramid.utils.get_helper('AUTHENTICATION_BACKENDS') Backend = get_backend(backends, name) if issubclass(Backend, GenericAuth): return Backend(strategy=strategy, redirect_uri=redirect_uri, name=name) else: return Backend(strategy=strategy, redirect_uri=redirect_uri)
[docs]def includeme(config): """Monkey-patch load_backend.""" social_pyramid.utils.load_backend = load_backend