Source code for assembl.views.api.extract

"""Cornice API for extracts"""

from cornice import Service
from pyramid.security import authenticated_userid, Everyone
from pyramid.httpexceptions import (
    HTTPNotFound, HTTPBadRequest, HTTPForbidden, HTTPServerError,
    HTTPNoContent, HTTPClientError)
from sqlalchemy import Unicode
from sqlalchemy.sql.expression import cast
from sqlalchemy.orm import joinedload_all
import simplejson as json

from assembl.views.api import API_DISCUSSION_PREFIX
from assembl.auth import (
    P_READ, P_ADD_EXTRACT, P_EDIT_EXTRACT, P_ASSOCIATE_EXTRACT)
from assembl.auth.util import get_permissions
from assembl.models import (
    AgentProfile, Extract, TextFragmentIdentifier, IdeaExtractLink,
    AnnotatorSource, Post, Webpage, Idea, AnnotationSelector)
from assembl.auth.util import user_has_permission
from assembl.lib.web_token import decode_token
from assembl.lib import sqla

cors_policy = dict(
    enabled=True,
    headers=('Location', 'Content-Type', 'Content-Length'),
    origins=('*',),
    credentials=True,
    max_age=86400)


extracts = Service(
    name='extracts',
    path=API_DISCUSSION_PREFIX + '/extracts',
    description="An extract from Content that is an expression of an Idea",
    renderer='json',
    cors_policy=cors_policy
)

extract = Service(
    name='extract',
    path=API_DISCUSSION_PREFIX + '/extracts/{id:.+}',
    description="Manipulate a single extract",
    renderer='json',
    cors_policy=cors_policy
)

search_extracts = Service(
    name='search_extracts',
    path=API_DISCUSSION_PREFIX + '/search_extracts',
    description="search for extracts matching a URL",
    renderer='json', cors_policy=cors_policy
)


@extract.get(permission=P_READ)
def get_extract(request):
    extract_id = request.matchdict['id']
    extract = Extract.get_instance(extract_id)
    view_def = request.GET.get('view') or 'default'
    user_id = authenticated_userid(request) or Everyone
    permissions = request.permissions

    if extract is None:
        raise HTTPNotFound(
            "Extract with id '%s' not found." % extract_id)

    return extract.generic_json(view_def, user_id, permissions)


def _get_extracts_real(request, view_def='default', ids=None, user_id=None):
    discussion = request.discussion
    user_id = user_id or Everyone
    all_extracts = discussion.db.query(Extract).filter(
        Extract.discussion_id == discussion.id
    )
    if ids:
        ids = [Extract.get_database_id(id) for id in ids]
        all_extracts = all_extracts.filter(Extract.id.in_(ids))

    all_extracts = all_extracts.options(joinedload_all(
        Extract.idea_content_links))
    all_extracts = all_extracts.options(
        joinedload_all(Extract.selectors).joinedload(
            AnnotationSelector.extract, innerjoin=True))
    permissions = request.permissions

    return [extract.generic_json(view_def, user_id, permissions)
            for extract in all_extracts]


@extracts.get(permission=P_READ)
def get_extracts(request):
    view_def = request.GET.get('view', 'default')
    ids = request.GET.getall('ids')

    return _get_extracts_real(
        request, view_def, ids, authenticated_userid(request))


[docs]@extracts.post() def post_extract(request): """ Create a new extract. """ extract_data = json.loads(request.body) discussion = request.context db = discussion.db user_id = authenticated_userid(request) if not user_id: # Straight from annotator token = request.headers.get('X-Annotator-Auth-Token') if token: token = decode_token( token, request.registry.settings['session.secret']) if token: user_id = token['userId'] user_id = user_id or Everyone permissions = get_permissions(user_id, discussion.id) else: permissions = request.permissions if P_ADD_EXTRACT not in permissions: #TODO: maparent: restore this code once it works: #raise HTTPForbidden(result=ACLDenied(permission=P_ADD_EXTRACT)) raise HTTPForbidden() if not user_id or user_id == Everyone: # TODO: Create an anonymous user. raise HTTPServerError("Anonymous extracts are not implemeted yet.") content = None uri = extract_data.get('uri') important = extract_data.get('important', False) annotation_text = extract_data.get('text') target = extract_data.get('target') if not uri: # Extract from an internal post if not target: raise HTTPBadRequest("No target") target_class = sqla.get_named_class(target.get('@type')) if issubclass(target_class, Post): post_id = target.get('@id') post = Post.get_instance(post_id) if not post: raise HTTPNotFound( "Post with id '%s' not found." % post_id) content = post elif issubclass(target_class, Webpage): uri = target.get('url') if uri and not content: content = Webpage.get_instance(uri) if not content: # TODO: maparent: This is actually a singleton pattern, should be # handled by the AnnotatorSource now that it exists... source = db.query(AnnotatorSource).filter_by( discussion=discussion).filter( cast(AnnotatorSource.name, Unicode) == 'Annotator').first() if not source: source = AnnotatorSource( name='Annotator', discussion=discussion) db.add(source) content = Webpage(url=uri, discussion=discussion) db.add(content) extract_body = extract_data.get('quote', None) new_extract = Extract( creator_id=user_id, discussion=discussion, important=important, annotation_text=annotation_text, content=content ) db.add(new_extract) icls = extract_data.get('ideaLinks', []) if icls and P_ASSOCIATE_EXTRACT not in permissions: raise HTTPForbidden() for icl in icls: # TODO: Check idCreator matches if present idea_id = icl.get("idIdea", None) if not idea_id: raise HTTPBadRequest("idea_content_link without idIdea") idea = Idea.get_instance(idea_id) if(idea.discussion.id != discussion.id): raise HTTPBadRequest( "Extract from discussion %s cannot be associated with an idea from a different discussion." % extract.get_discussion_id()) link = IdeaExtractLink( creator_id=user_id, content=content, idea=idea, extract=new_extract ) db.add(link) for range_data in extract_data.get('ranges', []): range = TextFragmentIdentifier( extract=new_extract, body=extract_body, xpath_start=range_data['start'], offset_start=range_data['startOffset'], xpath_end=range_data['end'], offset_end=range_data['endOffset']) db.add(range) db.flush() return {'ok': True, '@id': new_extract.uri()}
[docs]@extract.put() def put_extract(request): """ Updating an Extract """ extract_id = request.matchdict['id'] user_id = authenticated_userid(request) discussion = request.context if not user_id: # Straight from annotator token = request.headers.get('X-Annotator-Auth-Token') if token: token = decode_token( token, request.registry.settings['session.secret']) if token: user_id = token['userId'] user_id = user_id or Everyone extract = Extract.get_instance(extract_id) if not extract: raise HTTPNotFound("Extract with id '%s' not found." % extract_id) permissions = get_permissions(user_id, discussion.id, extract) if P_EDIT_EXTRACT not in permissions: raise HTTPForbidden() updated_extract_data = json.loads(request.body) extract.creator_id = user_id or AgentProfile.get_database_id(extract.creator_id) # extract.order = updated_extract_data.get('order', extract.order) extract.important = updated_extract_data.get('important', extract.important) icls = updated_extract_data.get('ideaLinks', None) change_icls = False if icls: existing = set(extract.idea_content_links) for icl in icls: # TODO: Check idCreator matches if present idea_id = icl.get("idIdea", None) if not idea_id: raise HTTPBadRequest("idea_content_link without idIdea") idea = Idea.get_instance(idea_id) if(idea.discussion.id != discussion.id): raise HTTPBadRequest( "Extract from discussion %s cannot be associated with an idea from a different discussion." % extract.get_discussion_id()) if '@id' in icl: icl = IdeaExtractLink.get_instance(icl['@id']) if icl.idea_id != idea.id: icl.idea_id = idea.id change_icls = True existing.remove(icl) else: link = IdeaExtractLink( creator_id=user_id, idea=idea, content_id=extract.content_id, extract=extract ) extract.db.add(link) change_icls = True for icl in existing: icl.delete() change_icls = True if change_icls and P_ASSOCIATE_EXTRACT not in permissions: raise HTTPForbidden() Extract.default_db.add(extract) #TODO: Merge ranges. Sigh. return {'ok': True}
@extract.delete(permission=P_READ) def delete_extract(request): user_id = authenticated_userid(request) discussion = request.context if not user_id: # Straight from annotator token = request.headers.get('X-Annotator-Auth-Token') if token: token = decode_token( token, request.registry.settings['session.secret']) if token: user_id = token['userId'] user_id = user_id or Everyone extract_id = request.matchdict['id'] extract = Extract.get_instance(extract_id) permissions = get_permissions(user_id, discussion.id, extract) if P_EDIT_EXTRACT not in permissions: raise HTTPForbidden() if not extract: return HTTPNoContent() # TODO: Tombstonable extracts??? extract.delete() return HTTPNoContent() @search_extracts.get() def do_search_extracts(request): uri = request.GET.get('uri', None) if not uri: raise HTTPClientError("Please specify a URI") view_def = request.GET.get('view') or 'default' discussion = request.context user_id = authenticated_userid(request) if not user_id: # Straight from annotator token = request.headers.get('X-Annotator-Auth-Token') if token: token = decode_token( token, request.registry.settings['session.secret']) if token: user_id = token['userId'] user_id = user_id or Everyone if not user_has_permission(discussion.id, user_id, P_READ): raise HTTPForbidden() permissions = [P_READ] if not uri: raise HTTPBadRequest("Please specify a search uri") content = Webpage.get_by(url=uri) if content: extracts = Extract.default_db.query(Extract).filter_by(content=content).all() rows = [ extract.generic_json(view_def, user_id, permissions) for extract in extracts] return {"total": len(extracts), "rows": rows} return {"total": 0, "rows": []}