Source code for assembl.views.admin.views

from os import urandom
from base64 import b64encode

import simplejson as json
from pyramid.i18n import TranslationStringFactory
from pyramid.view import view_config
from pyramid.renderers import render_to_response
from pyramid.httpexceptions import (
    HTTPFound, HTTPNotFound, HTTPBadRequest, HTTPUnauthorized)
from pyramid.settings import asbool
import transaction
from pyisemail import is_email

from assembl.lib.locale import strip_country
from assembl.models import (
    Discussion, DiscussionPermission, Role, Permission, UserRole,
    LocalUserRole)
from .. import get_default_context
from assembl.models.mail import IMAPMailbox, MailingList
from assembl.auth import (
    R_PARTICIPANT, R_SYSADMIN, R_ADMINISTRATOR, SYSTEM_ROLES,
    P_SYSADMIN, P_ADMIN_DISC, Everyone)
from assembl.auth.util import (
    add_multiple_users_csv, user_has_permission, get_non_expired_user_id)
from assembl.models.auth import (
    User, AgentProfile, LanguagePreferenceOrder)
from assembl.models.permissions import create_default_permissions
from assembl.models import Preferences
from assembl.lib.utils import get_global_base_url
from assembl.nlp.translation_service import DummyGoogleTranslationService
from ..discussion.views import process_locale


_ = TranslationStringFactory('assembl')


class PseudoDiscussion(object):
    id = 0
    topic = "Administration"
    slug = "admin"
    homepage_url = None
    logo = None
    preferences = Preferences()
    def translation_service(self):
        return None
    def get_base_url(self, *args):
        return get_global_base_url(True)
    def get_url(self, *args):
        return get_global_base_url(True)
    def get_all_agents_preload(self, user):
        return []


[docs]@view_config(route_name='base_admin', request_method='GET', http_cache=60, permission=P_SYSADMIN) def base_admin_view(request): """The Base admin view, for frontend urls""" user_id = get_non_expired_user_id(request) or Everyone if user_id == Everyone: raise HTTPUnauthorized() context = get_default_context(request) session = Discussion.default_db preferences = Preferences.get_default_preferences(session) user = User.get(user_id) if '_LOCALE_' in request.cookies: locale = request.cookies['_LOCALE_'] process_locale(locale, user, session, LanguagePreferenceOrder.Cookie) elif '_LOCALE_' in request.params: locale = request.params['_LOCALE_'] process_locale(locale, user, session, LanguagePreferenceOrder.Parameter) else: locale = request.locale_name process_locale(locale, user, session, LanguagePreferenceOrder.OS_Default) target_locale = strip_country(locale) locale_labels = json.dumps( DummyGoogleTranslationService.target_locale_labels_cls(target_locale)) context['translation_locale_names_json'] = locale_labels context['locale_names_json'] = locale_labels role_names = [x for (x,) in session.query(Role.name).all()] permission_names = [x for (x,) in session.query(Permission.name).all()] context['role_names'] = json.dumps(role_names) context['permission_names'] = json.dumps(permission_names) context['discussion'] = PseudoDiscussion() response = render_to_response('../../templates/adminIndex.jinja2', context, request=request) # Prevent caching the home, especially for proper login/logout response.cache_control.max_age = 0 response.cache_control.prevent_auto = True return response
@view_config(route_name='test_simultaneous_ajax_calls', permission=P_SYSADMIN, request_method="GET") def test_simultaneous_ajax_calls(request): g = lambda x: request.GET.get(x, None) session = User.default_db discussion_id = g('discussion_id') widget_id = g('widget_id') if not discussion_id: raise HTTPBadRequest( explanation="Please provide a discussion_id parameter") if not widget_id: raise HTTPBadRequest( explanation="Please provide a widget_id parameter") widget_id = int(widget_id) discussion_id = int(discussion_id) discussion = Discussion.get_instance(discussion_id) if not discussion: raise HTTPNotFound("Discussion with id '%d' not found." % ( discussion_id,)) user_id = get_non_expired_user_id(request) assert user_id context = dict( get_default_context(request), discussion=discussion, discussion_id=discussion_id, widget_id=widget_id) return render_to_response( 'admin/test_simultaneous_ajax_calls.jinja2', context, request=request) @view_config(route_name='discussion_admin', permission=P_SYSADMIN, request_method=("GET", "POST")) def discussion_admin(request): user_id = get_non_expired_user_id(request) if not user_id: return HTTPFound(location='/login?next=/admin/discussions/') db = User.default_db config = request.registry.settings can_create_mailbox = asbool(config.get('can_create_mailbox', False)) create_mailbox = False slug = request.POST.get('slug', '') domain = config.get('imap_domain', None) or config.get('public_hostname') if slug: email = "@".join((slug, domain)) else: email = '' context = dict( get_default_context(request), slug=slug, imap_mailbox="auto" if can_create_mailbox else "setup", topic='', source_name="IMAP", host=config.get("mail.host"), username=email, admin_sender=email, folder='inbox', port=143, use_ssl=False, public_discussion=True, homepage='', mailing_list_address='', can_create_mailbox=can_create_mailbox, discussions=db.query(Discussion)) if request.method == 'POST': context['errors'] = errors = [] context.update(request.POST) ml_address = context['mailing_list_address'] if not slug: errors.append("Please specify a slug") if context['imap_mailbox'] == "setup": if not context['admin_sender']: errors.append(_("Please specify the admin_sender")) elif not is_email(context['admin_sender']): errors.append(_("Please specify a valid admin sender email")) if ml_address and not is_email(ml_address): errors.append(_("Please specify a valid mailing list address")) if not context['username']: errors.append(_("Please specify a user name")) if not context['password']: errors.append(_("Please specify a password")) create_mailbox = True password = context['password'] elif context['imap_mailbox'] == "auto": assert can_create_mailbox password = b64encode(urandom(12)) create_mailbox = True if not errors: discussion = Discussion( creator_id=user_id, slug=slug, topic=context['topic'], homepage=context['homepage'], ) # Could raise an exception if there is no/incorrect scheme passed db.add(discussion) discussion.apply_side_effects_without_json(request=request) create_default_permissions(discussion) if create_mailbox: mailbox_class = ( MailingList if ml_address else IMAPMailbox) mailbox = mailbox_class( discussion=discussion, name=context['source_name'], host=context['host'], port=int(context['port']), username=context['username'], use_ssl=context['use_ssl'] == 'on', folder=context['folder'], admin_sender=context['admin_sender'], password=password, ) if ml_address: mailbox.post_email_address = ml_address db.add(mailbox) discussion.invoke_callbacks_after_creation() return render_to_response( 'admin/discussions.jinja2', context, request=request) @view_config(route_name='discussion_edit', permission=P_ADMIN_DISC, request_method=("GET", "POST")) def discussion_edit(request): discussion_id = int(request.matchdict['discussion_id']) discussion = Discussion.get_instance(discussion_id) user_id = get_non_expired_user_id(request) assert user_id permissions = request.permissions partners = json.dumps([p.generic_json( user_id=user_id, permissions=permissions ) for p in discussion.partner_organizations]) if not discussion: raise HTTPNotFound("Discussion with id '%d' not found." % ( discussion_id,)) context = dict( get_default_context(request), discussion=discussion, admin_discussion_permissions_url=request.route_url( 'discussion_permissions', discussion_id=discussion.id), partners=partners) if request.method == 'POST': g = lambda x: request.POST.get(x, None) (topic, slug, objectives) = ( g('topic'), g('slug'), g('objectives'), ) discussion.topic = topic discussion.slug = slug discussion.objectives = objectives return render_to_response( 'admin/discussion_edit.jinja2', context, request=request) def order_by_domain_and_name(user): email = user.get_preferred_email() domain = email.split('@')[1] if email else '' return (domain, user.name or '') @view_config(route_name='discussion_permissions', permission=P_ADMIN_DISC, request_method=("GET", "POST")) def discussion_permissions(request): user_id = get_non_expired_user_id(request) assert user_id db = Discussion.default_db discussion_id = int(request.matchdict['discussion_id']) discussion = Discussion.get_instance(discussion_id) error = '' if not discussion: raise HTTPNotFound("Discussion with id '%d' not found." % ( discussion_id,)) roles = db.query(Role).all() roles_by_name = {r.name: r for r in roles} role_names = [r.name for r in roles] role_names.sort() permissions = db.query(Permission).all() perms_by_name = {p.name: p for p in permissions} permission_names = [p.name for p in permissions] permission_names.sort() disc_perms = db.query(DiscussionPermission).filter_by( discussion_id=discussion_id).join(Role, Permission).all() disc_perms_as_set = set((dp.role.name, dp.permission.name) for dp in disc_perms) disc_perms_dict = {(dp.role.name, dp.permission.name): dp for dp in disc_perms} local_roles = db.query(LocalUserRole).filter_by( discussion_id=discussion_id).join(Role, User).all() local_roles_as_set = set((lur.user.id, lur.role.name) for lur in local_roles) local_roles_dict = {(lur.user.id, lur.role.name): lur for lur in local_roles} users = set(lur.user for lur in local_roles) num_users = '' if request.POST: if 'submit_role_permissions' in request.POST: for role in role_names: if role == R_SYSADMIN: continue for permission in permission_names: allowed_text = 'allowed_%s_%s' % (role, permission) if (role, permission) not in disc_perms_as_set and \ allowed_text in request.POST: dp = DiscussionPermission( role=roles_by_name[role], permission=perms_by_name[permission], discussion_id=discussion_id) disc_perms_dict[(role, permission)] = dp disc_perms_as_set.add((role, permission)) db.add(dp) elif (role, permission) in disc_perms_as_set and \ allowed_text not in request.POST: dp = disc_perms_dict[(role, permission)] del disc_perms_dict[(role, permission)] disc_perms_as_set.remove((role, permission)) db.delete(dp) if not role in SYSTEM_ROLES and\ 'delete_'+role in request.POST: db.delete(roles_by_name[role]) del roles_by_name[role] role_names.remove(role) elif 'submit_add_role' in request.POST: #TODO: Sanitize role role = Role(name='r:'+request.POST['new_role']) roles_by_name[role.name] = role role_names.append(role.name) db.add(role) elif 'submit_user_roles' in request.POST: user_ids = {u.id for u in users} for role in role_names: if role == R_SYSADMIN: continue prefix = 'has_'+role+'_' for name in request.POST: if name.startswith(prefix): a_user_id = int(name[len(prefix):]) if a_user_id not in user_ids: users.add(User.get_instance(a_user_id)) user_ids.add(a_user_id) for user in users: has_role_text = 'has_%s_%d' % (role, user.id) if (user.id, role) not in local_roles_as_set and \ has_role_text in request.POST: lur = LocalUserRole( role=roles_by_name[role], user=user, discussion_id=discussion_id) local_roles.append(lur) # TODO revisit this if Roles and Subscription are # de-coupled if role == 'r:participant': user.update_agent_status_subscribe(discussion) local_roles_dict[(user.id, role)] = lur local_roles_as_set.add((user.id, role)) db.add(lur) elif (user.id, role) in local_roles_as_set and \ has_role_text not in request.POST: lur = local_roles_dict[(user.id, role)] del local_roles_dict[(user.id, role)] local_roles_as_set.remove((user.id, role)) local_roles.remove(lur) # TODO revisit this if Roles and Subscription are # de-coupled if role == 'r:participant': user.update_agent_status_unsubscribe(discussion) db.delete(lur) elif 'submit_look_for_user' in request.POST: search_string = '%' + request.POST['user_search'] + '%' other_users = db.query(User).filter( AgentProfile.name.ilike(search_string) | User.username.ilike(search_string) | User.preferred_email.ilike(search_string)).all() users.update(other_users) elif 'submit_user_file' in request.POST: role = request.POST['add_with_role'] or R_PARTICIPANT if role == R_SYSADMIN and not user_has_permission( discussion_id, user_id, P_SYSADMIN): role = R_ADMINISTRATOR if 'user_csvfile' in request.POST: try: num_users = add_multiple_users_csv( request, request.POST['user_csvfile'].file, discussion_id, role, request.POST.get('send_invite', False), request.POST['email_subject'], request.POST['text_email_message'], request.POST['html_email_message'], request.POST['sender_name'], request.POST.get('resend_notloggedin', False)) except Exception as e: error = repr(e) transaction.doom() else: error = request.localizer.translate(_('No file given.')) def allowed(role, permission): if role == R_SYSADMIN: return True return (role, permission) in disc_perms_as_set def has_local_role(user_id, role): return (user_id, role) in local_roles_as_set users = list(users) users.sort(key=order_by_domain_and_name) context = dict( get_default_context(request), discussion=discussion, allowed=allowed, roles=role_names, permissions=permission_names, users=users, error=error, num_users=num_users, has_local_role=has_local_role, is_system_role=lambda r: r in SYSTEM_ROLES ) return render_to_response( 'admin/discussion_permissions.jinja2', context, request=request) @view_config(route_name='general_permissions', permission=P_SYSADMIN, request_method=("GET", "POST")) def general_permissions(request): user_id = get_non_expired_user_id(request) assert user_id db = Discussion.default_db roles = db.query(Role).all() roles_by_name = {r.name: r for r in roles} role_names = [r.name for r in roles] role_names.sort() permissions = db.query(Permission).all() permission_names = [p.name for p in permissions] permission_names.sort() user_roles = db.query(UserRole).join(Role, User).all() user_roles_as_set = set( (lur.user.id, lur.role.name) for lur in user_roles) user_roles_dict = { (lur.user.id, lur.role.name): lur for lur in user_roles} users = set(lur.user for lur in user_roles) if request.POST: if 'submit_user_roles' in request.POST: user_ids = {u.id for u in users} for role in role_names: if role == Everyone: continue prefix = 'has_'+role+'_' for name in request.POST: if name.startswith(prefix): a_user_id = int(name[len(prefix):]) if a_user_id not in user_ids: users.add(User.get_instance(a_user_id)) user_ids.add(a_user_id) for user in users: has_role_text = 'has_%s_%d' % (role, user.id) if (user.id, role) not in user_roles_as_set and \ has_role_text in request.POST: ur = UserRole( role=roles_by_name[role], user=user) user_roles.append(ur) user_roles_dict[(user.id, role)] = ur user_roles_as_set.add((user.id, role)) db.add(ur) elif (user.id, role) in user_roles_as_set and \ has_role_text not in request.POST: ur = user_roles_dict[(user.id, role)] del user_roles_dict[(user.id, role)] user_roles_as_set.remove((user.id, role)) user_roles.remove(ur) db.delete(ur) elif 'submit_look_for_user' in request.POST: search_string = '%' + request.POST['user_search'] + '%' other_users = db.query(User).filter( AgentProfile.name.ilike(search_string) | User.username.ilike(search_string) | User.preferred_email.ilike(search_string)).all() users.update(other_users) def has_role(user_id, role): return (user_id, role) in user_roles_as_set users = list(users) users.sort(key=order_by_domain_and_name) context = dict( get_default_context(request), roles=role_names, permissions=permission_names, users=users, has_role=has_role, is_system_role=lambda r: r in SYSTEM_ROLES ) return render_to_response( 'admin/global_permissions.jinja2', context, request=request)