Source code for assembl.views.traversal

"""This defines Context objects for traversal of the magic api.

Pyramid allows to use model objects as Context objects, but in our cases they're surrogates for model objects.
"""
from builtins import str
from builtins import next
from builtins import object
from traceback import print_exc
import logging

from future.utils import string_types, as_native_str
from sqlalchemy.orm import aliased
from sqlalchemy.orm.attributes import InstrumentedAttribute
from sqlalchemy.orm.properties import RelationshipProperty
from sqlalchemy.sql.expression import and_
from sqlalchemy.inspection import inspect as sqlainspect
from sqlalchemy.exc import InvalidRequestError
from pyramid.security import Allow, Everyone, ALL_PERMISSIONS, DENY_ALL
from pyramid.settings import asbool
from pyramid.httpexceptions import HTTPNotFound
from abc import ABCMeta, abstractmethod
import reg

from assembl.auth import P_READ, R_SYSADMIN
from assembl.auth.util import discussion_from_request
from assembl.lib.sqla import uses_list, get_named_class, Base
from assembl.lib.logging import getLogger
from assembl.lib.decl_enums import DeclEnumType
from future.utils import with_metaclass


[docs]class BaseContext(object): """The base class for all traversal contexts. Delegate everything to parent by default.""" def __init__(self, parent=None, acl=None): self.__parent__ = parent if parent: self.__acl__ = acl or parent.__acl__ self.depth = getattr(parent, "depth", 0) + 1 elif acl: self.acl = acl self.depth = 0
[docs] def find_collection(self, collection_class_name): """Find a collection by name""" return self.__parent__.find_collection(collection_class_name)
[docs] def get_discussion_id(self): """Get the current discussion_id somehow often from a :py:class:`DiscussionBoundBase` instance""" return self.__parent__.get_discussion_id()
[docs] def get_request(self): """Get the request from the context, if known""" return self.__parent__.get_request()
def get_all_instances(self): # Should be a yield from for i in self.__parent__.get_all_instances(): yield i def get_first_instance(self): gen = self.get_all_instances() try: return next(gen) except StopIteration: return None finally: gen.close() def get_user_id(self): return self.__parent__.get_user_id() def context_chain(self): yield self for ctx in self.__parent__.context_chain(): yield ctx
[docs] def get_instance_ctx_of_class(self, cls): """Look in the context chain for a model instance of a given class, and return that context""" return self.__parent__.get_instance_ctx_of_class(cls)
[docs] def get_instance_of_class(self, cls): """Look in the context chain for a model instance of a given class""" return self.__parent__.get_instance_of_class(cls)
[docs] def get_permissions(self, discussion_id=None): """Get the permissions from the request, maybe altering on the way. See e.g. in :py:class:`assembl.models.widgets.IdeaCreatingWidget.BaseIdeaHidingCollection`""" discussion_id = discussion_id or self.get_discussion_id() return self.__parent__.get_permissions(discussion_id)
def __eq__(self, other): return (self.__class__ is other.__class__ and self.__parent__ == other.__parent__) def __hash__(self): h = hash(self.__class__) if self.__parent__: h = h % hash(self.__parent__)
[docs]class DictContext(BaseContext): """A Context defined using a simple dictionary""" def __init__(self, name, acl, subobjects=None): super(DictContext, self).__init__(None, acl) self.__name__ = name subobjects = subobjects or () self.subobjects = { ctx.__name__: ctx for ctx in subobjects} for context in subobjects: context.__parent__ = self if acl: self.__acl__ = acl def __getitem__(self, key): return self.subobjects[key]
ACL_READABLE = [(Allow, R_SYSADMIN, ALL_PERMISSIONS), (Allow, Everyone, P_READ), DENY_ALL] ACL_RESTRICTIVE = [(Allow, R_SYSADMIN, ALL_PERMISSIONS), DENY_ALL]
[docs]@reg.dispatch( reg.match_instance('inst_ctx', lambda inst_ctx, ctx: inst_ctx._instance), reg.match_key('ctx', lambda inst_ctx, ctx: ctx.collection.qual_name())) def collection_creation_side_effects(inst_ctx, ctx): """Multiple dispatch adapter for collection-related side effects""" return ()
[docs]class AppRoot(DictContext): """The root context. Anything not defined by a root comes here.""" def __init__(self, request=None, user_id=None): self.request = request self.user_id = user_id self._permissions = None self._user_cache = None super(AppRoot, self).__init__('', ACL_READABLE, [ Api2Context(self, ACL_RESTRICTIVE), DictContext('admin', ACL_RESTRICTIVE, [ DictContext('permissions', None, [ DiscussionsContext()])]), DictContext('api', ACL_RESTRICTIVE, [ DictContext('v1', None, [ DiscussionsContext(), DictContext('token', ACL_READABLE)])])]) def __getitem__(self, key): if key in self.subobjects: return self.subobjects[key] from assembl.models import Discussion discussion = Discussion.default_db.query(Discussion).filter_by( slug=key).first() if not discussion: raise KeyError() return discussion # Base of recursion for methods defined in BaseContext
[docs] def get_request(self): """Get the request from the context, if known""" return self.request
def context_chain(self): yield self
[docs] def find_collection(self, collection_class_name): """Find a collection by name""" return None
[docs] def get_discussion_id(self): return None
def get_user_id(self): if self.user_id: return self.user_id request = self.get_request() if request: return request.authenticated_userid
[docs] def get_instance_ctx_of_class(self, cls): return None # I'm the holder of the user, but not the user's context
[docs] def get_instance_of_class(self, cls): from assembl.models import AgentProfile if issubclass(cls, AgentProfile): user_id = self.get_user_id() if user_id and user_id != Everyone: if self._user_cache is None: self._user_cache = AgentProfile.get(user_id) if isinstance(self._user_cache, cls): return self._user_cache
def get_all_instances(self): from assembl.models import User user = self.get_instance_of_class(User) if user is not None: yield user
[docs] def get_permissions(self, discussion_id=None): if self.user_id: if self._permissions is None: from assembl.auth.util import get_permissions discussion_id = discussion_id or self.get_discussion_id() self._permissions = get_permissions( self.user_id, discussion_id) return self._permissions elif self.request: # only use request if it knows the user return self.request.permissions return []
[docs]class DiscussionsContext(BaseContext): """A context where discussions, named by id, are sub-contexts""" __name__ = 'discussion' def __getitem__(self, key): from assembl.models import Discussion discussion = Discussion.get(int(key)) if not discussion: raise KeyError() return discussion
[docs]class TraversalContext(BaseContext): """The base class for the magic API""" def __init__(self, parent, acl=None): self.__parent__ = parent self.__acl__ = acl or parent.__acl__ self.depth = getattr(parent, "depth", 0) + 1
[docs] def decorate_query(self, query, ctx, tombstones=False): """Given a SQLAlchemy query, add joins and filters that correspond to this step in the traversal path.""" return self.__parent__.decorate_query(query, ctx, tombstones)
[docs] def creation_side_effects_rec(self, inst_ctx, top_ctx): """Recursion""" for inst in self.__parent__.creation_side_effects_rec(inst_ctx, top_ctx): yield inst
[docs] def creation_side_effects(self, inst_ctx=None): """Generator for objects that are created as side-effect of another object's creation. They can have their own side-effect. """ for sub_inst_ctx in self.creation_side_effects_rec(inst_ctx, self): yield sub_inst_ctx for sub_sub in self.creation_side_effects(sub_inst_ctx): yield sub_sub
[docs] def on_new_instance(self, instance): """If a model instance was created in this context, let the context learn about it. Exists mostly for :py:meth:`RelationCollectionDefinition.on_new_instance`""" self.__parent__.on_new_instance(instance)
[docs] def get_target_class(self): """What is the model class we can expect to find at this context?""" return None
[docs]class Api2Context(TraversalContext): """The root class for the magic API (``/data``) Sub-contexts are :py:class:`ClassContext`""" _class_cache = {} __name__ = 'data' def __init__(self, parent, acl=None): super(Api2Context, self).__init__(parent, acl) def get_default_view(self): pass def __getitem__(self, key): cls = get_named_class(key) if not cls: raise KeyError() return ClassContext(self, cls) def all_class_names(self): return [k.external_typename() for k in Base._decl_class_registry.values() if getattr(k, 'external_typename', False)] # Base of recursion for methods defined in TraversalContext
[docs] def decorate_query(self, query, ctx, tombstones=False): return query
[docs] def on_new_instance(self, instance): pass
[docs] def creation_side_effects_rec(self, inst_ctx, top_ctx): """Apply simple side-effects from the instance""" for inst in inst_ctx._instance.creation_side_effects(top_ctx): yield inst
def process_args(args, cls): mapper = sqlainspect(cls) for key, value in args.items(): column = mapper.c.get(key) if column is not None: if isinstance(column.type, DeclEnumType): yield (key, column.type.enum.from_string(value)) elif column.type.python_type == int: yield (key, int(value)) elif column.type.python_type == float: yield (key, float(value)) elif column.type.python_type == bool: yield (key, asbool(value)) else: yield (key, value) continue reln = mapper.relationships.get(key) if (reln is not None and reln.direction.name == 'MANYTOONE' and isinstance(value, string_types)): assert(len(reln.local_columns) == 1) key = next(reln.local_columns.__iter__()).key yield (key, reln.mapper.class_.get_database_id(value)) continue attribute = getattr(cls, key, None) if isinstance(attribute, property) and attribute.fset is not None: yield (key, value) continue
[docs]class ClassContext(TraversalContext): """A context that represents a given model class (e.g. ``/data/Idea``) Sub-contexts are :py:class:`InstanceContext`, given by numeric ID.""" def __init__(self, parent, cls): # permission on class context are quite restrictive. review. super(ClassContext, self).__init__(parent) self._class = cls self.__name__ = cls.external_typename() self.class_alias = aliased(cls, name="alias_%s" % (cls.__name__)) def __getitem__(self, key): from assembl.models import Preferences instance = self._class.get_instance(key) if not instance: raise KeyError() # TODO: use a protocol for this if isinstance(instance, Preferences): return PreferenceContext(self, instance) return InstanceContext(self, instance) def get_default_view(self): my_default = getattr(self._class, 'default_view', None) if my_default: return my_default return self.__parent__.get_default_view() def create_query(self, id_only=True, tombstones=False): from assembl.models import TombstonableMixin cls = self._class alias = self.class_alias if id_only: query = self._class.default_db.query(alias.id) else: query = self._class.default_db.query(alias) # TODO: Distinguish tombstone condition from other base_conditions if issubclass(cls, TombstonableMixin) and not tombstones: query = query.filter(and_(*cls.base_conditions(alias))) query = cls.query_filter_with_crud_op_req( self.get_request(), query=query, clsAlias=alias) return query
[docs] def get_class(self, typename=None): """Returns the collection class, or subclass designated by typename""" cls = self._class if typename is not None: other_cls = get_named_class(typename) if other_cls and issubclass(other_cls, cls): return other_cls return cls
[docs] def get_target_class(self): return self._class
def get_target_alias(self): return self.class_alias def create_object(self, typename=None, json=None): cls = self.get_class(typename) with self._class.default_db.no_autoflush: inst_ctx = cls.create_from_json(json, self) return [inst_ctx._instance] def __eq__(self, other): return (super(ClassContext, self).__eq__(other) and self._class == other._class) def __hash__(self): return super(ClassContext, self).hash() % hash(self._class)
[docs]class JsonLdPredicate(object): """A `view predicate factory`_ that checks that a request really asks for jsonld .. _`view predicate factory`: http://docs.pylonsproject.org/projects/pyramid/en/latest/narr/hooks.html#view-and-route-predicates """ def __init__(self, val, config): self.val = val def text(self): return 'jsonld' phash = text def __call__(self, context, request): return (request.accept.quality('application/ld+json') or 0) > ( request.accept.quality('application/json') or 0)
[docs]class ClassContextPredicate(object): """A `view predicate factory`_ that checks that a given traversal context is a :py:class:`ClassContext` and represents the given class. .. _`view predicate factory`: http://docs.pylonsproject.org/projects/pyramid/en/latest/narr/hooks.html#view-and-route-predicates """ def __init__(self, val, config): self.val = val def text(self): return 'class_context = %s' % (self.val,) phash = text def __call__(self, context, request): return isinstance(context, ClassContext) and context._class == self.val
[docs]class InstanceContext(TraversalContext): """A context that represents a given model instance (e.g. ``/data/Idea/12``) Sub-contexts are :py:class:`CollectionContext`, given by relationship name or taken from :py:meth:`assembl.lib.sqla.Base.extra_collections_dict`. """ def __init__(self, parent, instance): # Do not call super, because it will set the acl. self._instance = instance self.__name__ = str(instance.id) self.__parent__ = parent # relations = instance.__class__.__mapper__.relationships _collections_by_class = {} @classmethod def _get_collections(cls, for_class): return for_class.get_collections() def get_collection_names(self): return list(self._instance.get_collections().keys()) def get_default_view(self): my_default = getattr(self._instance, 'default_view', None) if my_default: return my_default return self.__parent__.get_default_view() @property def __acl__(self): # TODO: publication_state if getattr(self._instance, '__acl__', None): return self._instance.__acl__ if getattr(self._instance, 'discussion', None): return self._instance.discussion.__acl__ discussion_id = self.get_discussion_id() if discussion_id: from assembl.models import Discussion return Discussion.get(discussion_id).__acl__ return self.__parent__.__acl__ def __getitem__(self, key): cls = self._instance.__class__ collection = self._get_collections(cls).get(key, None) if not collection: raise KeyError() return collection.make_context(self)
[docs] def find_collection(self, collection_class_name): return self.__parent__.find_collection(collection_class_name)
[docs] def get_discussion_id(self): from assembl.models import DiscussionBoundBase if isinstance(self._instance, DiscussionBoundBase): try: discussion_id = self._instance.get_discussion_id() if discussion_id: return discussion_id except Exception: pass return super(InstanceContext, self).get_discussion_id()
[docs] def get_instance_of_class(self, cls): if isinstance(self._instance, cls): return self._instance return self.__parent__.get_instance_of_class(cls)
[docs] def get_instance_ctx_of_class(self, cls): if isinstance(self._instance, cls): return self return self.__parent__.get_instance_ctx_of_class(cls)
def get_all_instances(self): yield self._instance # Should be a yield from for i in self.__parent__.get_all_instances(): yield i
[docs] def get_target_class(self): return self._instance.__class__
def get_target_alias(self): return self.__parent__.get_target_alias() @as_native_str() def __repr__(self): return "<InstanceContext (%s)>" % (self._instance,) def __eq__(self, other): return (super(InstanceContext, self).__eq__(other) and self._instance is other._instance) def __hash__(self): return super(ClassContext, self).hash() % id(self._instance)
[docs]class InstanceContextPredicate(object): """A `view predicate factory`_ that checks that a given traversal context is a :py:class:`InstanceContext`, and that the instance is of the given class. .. _`view predicate factory`: http://docs.pylonsproject.org/projects/pyramid/en/latest/narr/hooks.html#view-and-route-predicates """ def __init__(self, val, config): self.val = val def text(self): return 'instance_context = %s' % (self.val,) phash = text def __call__(self, context, request): return isinstance(context, InstanceContext) and\ isinstance(context._instance, self.val)
[docs]class LocalPermissionPredicate(object): """A `view predicate factory`_ that checks that a given traversal context is an :py:class:`InstanceContext` and the user has a certain local permission on it .. _`view predicate factory`: http://docs.pylonsproject.org/projects/pyramid/en/latest/narr/hooks.html#view-and-route-predicates """ def __init__(self, permission, config): self.permission = permission def text(self): return 'local_permission = %s' % (', '.join(self.permission)) phash = text def __call__(self, context, request): return isinstance(context, InstanceContext) and get_permissions( request.authenticated_userid, request.discussion_id, context._instance)
[docs]class InstanceContextPredicateWithExceptions(object): """A `view predicate factory`_ that checks that a given traversal context is a :py:class:`InstanceContext`, and that the instance is of the given class, but not of one of a given set of subclass exceptions. .. _`view predicate factory`: http://docs.pylonsproject.org/projects/pyramid/en/latest/narr/hooks.html#view-and-route-predicates """ def __init__(self, val, config): cls, cls_exceptions = val self.val = cls self.cls_exceptions = cls_exceptions def text(self): return 'instance_context = %s except %s' % ( self.val, repr(self.cls_exceptions)) phash = text def __call__(self, context, request): return isinstance(context, InstanceContext) and\ isinstance(context._instance, self.val) and\ not isinstance(context._instance, self.cls_exceptions)
[docs]class CollectionContext(TraversalContext): """A context that represents a collection of model objects related to the model object of the parent :py:class:`InstanceContext`. The collection itself is embodied by a :py:class:`AbstractCollectionDefinition` object, often backed by a SQLA relationship. Sub-contexts are :py:class:`InstanceContext`, indexed by Id. """ def __init__(self, parent, collection, instance): super(CollectionContext, self).__init__(parent) if isinstance(collection, InstrumentedAttribute): collection = collection.property # permission on class context are quite restrictive. review. self.collection = collection self.parent_instance = instance self.collection_class = self.collection.collection_class self.class_alias = aliased(self.collection_class) # TODO: This makes some tests fail, and I need to understand why. # name="alias_%s_%d" % (self.collection_class.__name__, self.depth)) def get_default_view(self): my_default = self.collection.get_default_view() if my_default: return my_default return self.__parent__.get_default_view() def __getitem__(self, key): instance = self.collection.get_instance(key, self.parent_instance) if not instance: raise KeyError() return InstanceContext(self, instance) @property def __name__(self): return self.collection.name
[docs] def get_collection_class(self, typename=None): """Returns the collection class, or subclass designated by typename""" cls = self.collection_class if typename is not None: other_cls = get_named_class(typename) if other_cls and issubclass(other_cls, cls): return other_cls return cls
[docs] def get_target_class(self): return self.collection_class
def get_target_alias(self): return self.class_alias
[docs] def get_instance_of_class(self, cls): if isinstance(self.parent_instance, cls): return self.parent_instance return self.__parent__.get_instance_of_class(cls)
def create_query(self, id_only=True, tombstones=False): alias = self.class_alias if id_only: query = self.parent_instance.db.query(alias.id) return self.decorate_query(query, self, tombstones).distinct() else: # There will be duplicates. But sqla takes care of them, # virtuoso won't allow distinct on full query, # and a distinct subquery takes forever. # Oh, and quietcast loses the distinct. Just great. query = self.parent_instance.db.query(alias) return self.decorate_query(query, self, tombstones)
[docs] def decorate_query(self, query, ctx, tombstones=False): # This will decorate a query with a join on the relation. from assembl.models import TombstonableMixin query = self.collection.decorate_query( query, self.__parent__.get_target_alias(), self.get_target_alias(), self.parent_instance, ctx) cls = self.collection_class query = cls.query_filter_with_crud_op_req( self.get_request(), query=query, clsAlias=self.class_alias) if issubclass(cls, TombstonableMixin) and not tombstones: query = query.filter(cls.tombstone_condition(self.class_alias)) return super(CollectionContext, self).decorate_query( query, ctx, tombstones=False)
[docs] def on_new_instance(self, instance): self.collection.on_new_instance(instance, self.parent_instance) super(CollectionContext, self).on_new_instance(instance)
[docs] def get_permissions(self, discussion_id=None): discussion_id = discussion_id or self.get_discussion_id() permissions = super( CollectionContext, self).get_permissions(discussion_id) new_permissions = self.collection.extra_permissions(permissions) if new_permissions: permissions = new_permissions.extend(permissions) return permissions
def create_object(self, typename=None, json=None): cls = self.get_collection_class(typename) with self.parent_instance.db.no_autoflush: try: inst_ctx = cls.create_from_json(json, self) if inst_ctx: return [inst_ctx._instance] except Exception as e: # import pdb # pdb.post_mortem() print_exc() raise e
[docs] def creation_side_effects_rec(self, inst_ctx, top_ctx): """Apply side-effects through multiple dispatch on the collection""" for ins in self.__parent__.creation_side_effects_rec(inst_ctx, top_ctx): yield ins assert isinstance(top_ctx, CollectionContext) assert isinstance(inst_ctx, InstanceContext) for ins in collection_creation_side_effects(inst_ctx, self): yield ins
@as_native_str() def __repr__(self): return "<CollectionContext (%s)>" % ( self.collection,)
[docs] def find_collection(self, collection_class_name): if self.collection.qual_name() == collection_class_name: return self return self.__parent__.find_collection(collection_class_name)
def __eq__(self, other): return (super(CollectionContext, self).__eq__(other) and self.collection is other.collection) def __hash__(self): return super(ClassContext, self).hash() % hash( self.collection.qual_name())
[docs]class NamedCollectionContextPredicate(object): """A `view predicate factory`_ that checks that a given traversal context is a :py:class:`CollectionContext`, whose collection's :py:meth:`AbstractCollectionDefinition.name` is as given. .. _`view predicate factory`: http://docs.pylonsproject.org/projects/pyramid/en/latest/narr/hooks.html#view-and-route-predicates """ def __init__(self, val, config): self.val = val def text(self): return 'collection_context_name = %s' % (self.val,) phash = text def __call__(self, context, request): return (isinstance(context, CollectionContext) and self.val == context.collection.qual_name())
[docs]class NamedCollectionInstancePredicate(object): """A `view predicate factory`_ that checks that a given traversal context is an :py:class:`InstanceContext` under a :py:class:`CollectionContext` whose collection's :py:meth:`AbstractCollectionDefinition.name` is as given. .. _`view predicate factory`: http://docs.pylonsproject.org/projects/pyramid/en/latest/narr/hooks.html#view-and-route-predicates """ def __init__(self, val, config): self.val = val def text(self): return 'collection_instance_context_name = %s' % (self.val,) phash = text def __call__(self, context, request): parent = context.__parent__ return (isinstance(context, InstanceContext) and isinstance(parent, CollectionContext) and self.val == parent.collection.qual_name())
[docs]class SecureConnectionPredicate(object): """A `view predicate factory`_ that checks that the connection is secure (https). .. _`view predicate factory`: http://docs.pylonsproject.org/projects/pyramid/en/latest/narr/hooks.html#view-and-route-predicates """ def __init__(self, val, config): self.val = bool(val) def text(self): return 'secure_connection' phash = text def __call__(self, context, request): return self.val == ( request.environ['wsgi.url_scheme'] == 'https')
[docs]class CollectionContextClassPredicate(object): """A `view predicate factory`_ that checks that a given traversal context is a :py:class:`CollectionContext`, where the class of the targets of the relationship is as given. .. _`view predicate factory`: http://docs.pylonsproject.org/projects/pyramid/en/latest/narr/hooks.html#view-and-route-predicates """ def __init__(self, val, config): self.val = val def text(self): return 'collection_context_class = %s' % (self.val,) phash = text def __call__(self, context, request): return isinstance(context, CollectionContext) and\ issubclass(context.collection_class, self.val)
[docs]class AbstractCollectionDefinition(with_metaclass(ABCMeta, object)): """Represents a collection of objects related to an instance.""" def __init__(self, owner_class, name, collection_class): self.owner_class = owner_class self.collection_class = collection_class self.name = name def make_context(self, parent_ctx): return CollectionContext(parent_ctx, self, parent_ctx._instance) def get_instance(self, key, parent_instance): instance = self.collection_class.get_instance(key) # Validate that the instance belongs to the collection... if instance and not self.contains(parent_instance, instance): raise KeyError("This instance does not live in this collection.") return instance @abstractmethod def decorate_query( self, query, owner_alias, coll_alias, parent_instance, ctx): pass def on_new_instance( self, instance, parent_instance): pass @abstractmethod def contains(self, parent_instance, instance): pass def get_default_view(self): pass
[docs] def qual_name(self): """The fully qualified name of the collection, including owning class name. Used in :py:class:`NamedCollectionContextPredicate` and :py:meth:`TraversalContext.find_collection`.""" return ".".join((self.owner_class.__name__, self.name))
[docs] def extra_permissions(self, permissions): """Return a list of extra permissions that could be added along the way. See e.g. in :py:class:`assembl.models.widgets.IdeaCreatingWidget.BaseIdeaHidingCollection`""" return None
@as_native_str() def __repr__(self): return "<%s %s -(%s)-> %s>" % ( self.__class__.__name__, self.owner_class.__name__, self.name, self.collection_class.__name__)
[docs]class RelationCollectionDefinition(AbstractCollectionDefinition): """A collection of objects related to an instance through a relationship.""" back_relation = None def __init__(self, owner_class, relationship, name=None): super(RelationCollectionDefinition, self).__init__( owner_class, name or relationship.key, relationship.mapper.class_) self.relationship = relationship back_properties = list(getattr(relationship, '_reverse_property', ())) if back_properties: # TODO: How to chose? self.back_relation = back_properties.pop() self.owner_class = self.back_relation.mapper.class_ elif relationship.info.get('backref', None): back_ref = relationship.info['backref'] if isinstance(back_ref, InstrumentedAttribute): back_ref = back_ref.class_.__mapper__.relationships.get(back_ref.key) if isinstance(back_ref, str): # TODO: This is probably doable with SQLA machinery... cl_name, reln_name = back_ref.split(".", 1) if cl_name == self.collection_class.__name__: back_ref = self.collection_class.__mapper__.relationships.get(reln_name, None) if isinstance(back_ref, RelationshipProperty): self.back_relation = back_ref def decorate_query( self, query, owner_alias, coll_alias, parent_instance, ctx): # This will decorate a query with a join on the relation. inv = self.back_relation if inv: query = query.join(owner_alias, getattr(coll_alias, inv.key)) else: # hope for the best try: query = query.join(owner_alias) except InvalidRequestError: getLogger().error("Could not join %s to %s" % (owner_alias, query)) # This is very likely to fail downstream return query found_key = False if inv and not uses_list(inv): # Try to constrain on coll_alias's key vs owner_alias. # Difficult cases happen when tombstone is part of the # reln's columns for column in inv.local_columns: for fk in column.foreign_keys: if fk.column.table == parent_instance.__class__.__table__: query = query.filter( getattr(coll_alias, column.name) == parent_instance.id) found_key = True if not found_key: query = query.filter(owner_alias.id == parent_instance.id) return query def on_new_instance(self, instance, parent_instance): if not isinstance(instance, self.collection_class): return # if the relation is through a helper class, # create that and add to assocs (TODO) # otherwise set the appropriate relationship (below.) # Prefer non-list properties because we can check if they're set. if not uses_list(self.relationship): if getattr(parent_instance, self.relationship.key, None) is None: # print "Setting1 ", parent_instance, self.relationship.key, instance setattr(parent_instance, self.relationship.key, instance) elif self.back_relation and not uses_list(self.back_relation): inv = self.back_relation if getattr(instance, inv.key, None) is None: # print "Setting2 ", instance, inv.key, parent_instance setattr(instance, inv.key, parent_instance) elif self.back_relation: inv = self.back_relation # print "Adding1 ", instance, inv.key, parent_instance getattr(instance, inv.key).append(parent_instance) else: # print "Adding2 ", parent_instance, self.relationship.key, instance getattr(parent_instance, self.relationship.key).append(instance) def get_attribute(self, instance, property=None): # What we have is a property, not an instrumented attribute; # but they share the same key. property = property or self.relationship return getattr(instance, property.key) def contains(self, parent_instance, instance): if uses_list(self.relationship): if self.back_relation and not uses_list(self.back_relation): return self.get_attribute( instance, self.back_relation) == parent_instance return instance in self.get_attribute(parent_instance) else: return instance == self.get_attribute(parent_instance) def get_instance(self, key, parent_instance): from ..models import NamedClassMixin instance = None if key == '-': if not uses_list(self.relationship): instance = getattr( parent_instance, self.relationship.key, None) else: # Allow if it happens to be a singleton. instances = getattr(parent_instance, self.relationship.key) if len(instances) == 1: return instances[0] raise KeyError() elif issubclass(self.collection_class, NamedClassMixin): instance = self.collection_class.getByName(key, parent_object=parent_instance) if not instance: instance = self.collection_class.get_instance(key) # Validate that the instance belongs to the collection... if instance and not self.contains(parent_instance, instance): raise KeyError("This instance does not live in this collection.") return instance @as_native_str() def __repr__(self): if self.back_relation: return "<%s %s <-(%s/%s)-> %s>" % ( self.__class__.__name__, self.owner_class.__name__, self.back_relation.key if self.back_relation else '', self.name, self.collection_class.__name__) else: return super(RelationCollectionDefinition, self).__repr__()
[docs]class UserBoundNamespacedDictContext(TraversalContext): """Represents the set of user-bound namespace-K-V items""" def __init__(self, parent, collection): # Do not call super, because it will set the acl. self.collection = collection self.__parent__ = parent self._instance = parent._instance @property def __acl__(self): return self.__parent__.__acl__ def as_collection(self): return self.collection.as_collection(self._instance) def __getitem__(self, namespace): user_ns_b_kvdict = self.collection.get_instance( namespace, self._instance) return UserNSBoundDictContext(user_ns_b_kvdict, self)
[docs] def get_target_class(self): from assembl.models.user_key_values import UserNsDict return UserNsDict
[docs]class UserNSBoundDictContext(TraversalContext): """Represents the set of user-bound, namespace-bound K-V items""" def __init__(self, user_ns_b_kvdict, parent): # Do not call super, because it will set the acl. self.collection = user_ns_b_kvdict self.__parent__ = parent self.parent_instance = parent._instance @property def __acl__(self): return self.__parent__.__acl__ def __getitem__(self, key): return UserNSKeyBoundDictItemContext(self.collection, self, key)
[docs] def get_target_class(self): from assembl.models.user_key_values import NamespacedUserKVCollection return NamespacedUserKVCollection
[docs]class UserNSKeyBoundDictItemContext(TraversalContext): """Represents a value which is bound to a user, namespace and key""" def __init__(self, user_ns_b_kvdict, parent, key): # Do not call super, because it will set the acl. self.collection = user_ns_b_kvdict self.__parent__ = parent self.parent_instance = parent.parent_instance self.key = key @property def __acl__(self): return self.__parent__.__acl__ def __getitem__(self, key): return None
[docs] def get_target_class(self): return None
[docs]class UserNsDictCollection(AbstractCollectionDefinition): def __init__(self, cls): from assembl.models.user_key_values import NamespacedUserKVCollection super(UserNsDictCollection, self).__init__( cls, 'user_ns_kv', NamespacedUserKVCollection) def make_context(self, parent_context): return UserBoundNamespacedDictContext(parent_context, self) def decorate_query( self, query, owner_alias, last_alias, parent_instance, ctx): # No clue what to do here; UserKVCollection is not a sqla object return query.outerjoin( owner_alias, owner_alias.id != None) def contains(self, parent_instance, namespace): # all namespaces exist return True def as_collection(self, parent_instance): from pyramid.threadlocal import get_current_request from pyramid.httpexceptions import HTTPUnauthorized from assembl.models.user_key_values import UserNsDict request = get_current_request() if request is not None: user_id = request.unauthenticated_userid # Check again downstream for real userid if user_id is None: raise HTTPUnauthorized() else: raise RuntimeError() return UserNsDict(parent_instance, user_id) def get_instance(self, namespace, parent_instance): c = self.as_collection(parent_instance) return c[namespace]
[docs]class NamespacedDictContext(TraversalContext): """Represents the set of namespace-K-V items""" def __init__(self, parent, collection): # Do not call super, because it will set the acl. self.collection = collection self.__parent__ = parent self._instance = parent._instance @property def __acl__(self): return self.__parent__.__acl__ def as_collection(self): return self.collection.as_collection(self._instance) def __getitem__(self, namespace): ns_kvdict = self.collection.get_instance( namespace, self._instance) return NSBoundDictContext(ns_kvdict, self)
[docs] def get_target_class(self): from assembl.models.user_key_values import NsDict return NsDict
[docs]class NSBoundDictContext(TraversalContext): """Represents the set of namespace-bound K-V items""" def __init__(self, ns_kvdict, parent): # Do not call super, because it will set the acl. self.collection = ns_kvdict self.__parent__ = parent self.parent_instance = parent._instance @property def __acl__(self): return self.__parent__.__acl__ def __getitem__(self, key): return NSKeyBoundDictItemContext(self.collection, self, key)
[docs] def get_target_class(self): from assembl.models.user_key_values import NamespacedKVCollection return NamespacedKVCollection
[docs]class NSKeyBoundDictItemContext(TraversalContext): """Represents a value which is bound to a namespace and key""" def __init__(self, ns_kvdict, parent, key): # Do not call super, because it will set the acl. self.collection = ns_kvdict self.__parent__ = parent self.parent_instance = parent.parent_instance self.key = key @property def __acl__(self): return self.__parent__.__acl__ def __getitem__(self, key): return None
[docs] def get_target_class(self): return None
[docs]class NsDictCollection(AbstractCollectionDefinition): def __init__(self, cls): from assembl.models.user_key_values import NamespacedKVCollection super(NsDictCollection, self).__init__( cls, 'ns_kv', NamespacedKVCollection) def make_context(self, parent_context): return NamespacedDictContext(parent_context, self) def decorate_query( self, query, owner_alias, last_alias, parent_instance, ctx): # No clue what to do here; KVCollection is not a sqla object return query.outerjoin( owner_alias, owner_alias.id != None) def contains(self, parent_instance, namespace): # all namespaces exist return True def as_collection(self, parent_instance): from assembl.models.user_key_values import NsDict return NsDict(parent_instance) def get_instance(self, namespace, parent_instance): c = self.as_collection(parent_instance) return c[namespace]
[docs]class PreferenceContext(TraversalContext): """Represents a set of preference values (eg for a discussion) Sub-contexts are :py:class:`PreferenceValueContext`""" def __init__(self, parent_context, preferences): # Do not call super, because it will set the acl. self.preferences = preferences self.__parent__ = parent_context @property def __acl__(self): return ACL_RESTRICTIVE def __getitem__(self, key): """returns the :py:class:`PreferenceValueContext` for that preference""" return PreferenceValueContext(self.preferences, self, key)
[docs] def get_target_class(self): from assembl.models.preferences import Preferences return Preferences
[docs]class DiscussionPreferenceContext(PreferenceContext): """Represents a set of preference values for a discussion Backed by a :py:class:`DiscussionPreferenceCollection`, sub-contexts are :py:class:`PreferenceValueContext`""" def __init__(self, parent_context, collection): self.collection = collection self.parent_instance = parent_context._instance preferences = collection.as_collection(self.parent_instance) super(DiscussionPreferenceContext, self).__init__( parent_context, preferences) @property def __acl__(self): # collection acl? return self.__parent__.__acl__
[docs]class PreferenceValueContext(TraversalContext): """Represents a specific discussion preference""" def __init__(self, preferences, parent, key): # Do not call super, because it will set the acl. self.collection = preferences self.__parent__ = parent self.key = key @property def __acl__(self): return self.__parent__.__acl__ def __getitem__(self, key): return None
[docs] def get_target_class(self): return None
[docs]class DiscussionPreferenceCollection(AbstractCollectionDefinition): """Represents the collection of preferences for a given discussion's :py:class:`DiscussionPreferenceContext`.""" def __init__(self, cls): from assembl.models.preferences import Preferences super(DiscussionPreferenceCollection, self).__init__( cls, 'settings', Preferences) def make_context(self, parent_context): return DiscussionPreferenceContext(parent_context, self) def decorate_query( self, query, owner_alias, last_alias, parent_instance, ctx): # No clue what to do here; UserKVCollection is not a sqla object return query.outerjoin( owner_alias, owner_alias.id != None) def contains(self, parent_instance, key): from assembl.models.preferences import Preferences return key in Preferences.property_defaults def as_collection(self, parent_instance): return parent_instance.preferences def get_instance(self, key, parent_instance): c = self.as_collection(parent_instance) return c[key]
def app_root_factory(request, user_id=None): return AppRoot(request, user_id)
[docs]def root_factory(request, user_id=None): """The factory function for the root context""" # OK, this is the old code... I need to do better, but fix first. from ..models import Discussion if request.matchdict and 'discussion_id' in request.matchdict: discussion_id = int(request.matchdict['discussion_id']) discussion = Discussion.default_db.query(Discussion).get(discussion_id) if not discussion: raise HTTPNotFound("No discussion ID %d" % (discussion_id,)) return discussion elif request.matchdict and 'discussion_slug' in request.matchdict: discussion_slug = request.matchdict['discussion_slug'] discussion = Discussion.default_db.query(Discussion).filter_by( slug=discussion_slug).first() if not discussion: raise HTTPNotFound("No discussion named %s" % (discussion_slug,)) return discussion return app_root_factory(request, user_id)
def includeme(config): config.add_view_predicate('json_ld', JsonLdPredicate) config.add_view_predicate('ctx_class', ClassContextPredicate) config.add_view_predicate('ctx_instance_class', InstanceContextPredicate) config.add_view_predicate('ctx_instance_class_with_exceptions', InstanceContextPredicateWithExceptions) config.add_view_predicate('ctx_named_collection', NamedCollectionContextPredicate) config.add_view_predicate('ctx_named_collection_instance', NamedCollectionInstancePredicate) config.add_view_predicate('secure_connection', SecureConnectionPredicate) config.add_view_predicate('ctx_collection_class', CollectionContextClassPredicate, weighs_less_than='ctx_named_collection') config.add_view_predicate('local_permission', LocalPermissionPredicate)