Source code for assembl.lib.migration

"""Basic infrastructure for alembic migration"""
from __future__ import absolute_import

import sys
from contextlib import contextmanager

from alembic.config import Config
from alembic.migration import MigrationContext
from alembic.environment import EnvironmentContext
from alembic.script import ScriptDirectory
import transaction

from ..lib.sqla import (
    get_metadata, get_session_maker, mark_changed)
from ..lib.text_search import update_indices


def has_tables(db):
    (num_tables,) = db.query(
        """COUNT(table_name) FROM information_schema.tables
        WHERE table_schema='public'""").first()
    # don't count alembic
    return num_tables > 1


@contextmanager
def locked_transaction(db, num):
    # use a pg_advisory_lock to make sure that the transaction is locked.
    # Do it on another connection, so errors will not leave the lock dangling.
    cnx = db.session_factory.kw['bind'].connect()
    cnx.execute("select pg_advisory_lock(%d)" % num).first()
    try:
        session = db()
        with transaction.manager:
            session = db()
            yield session
            mark_changed(session)
    finally:
        cnx.execute("select pg_advisory_unlock(%d)" % num)
        cnx.close()


[docs]def bootstrap_db(config_uri, with_migration=True): """Bring a blank database to a functional state.""" config = Config(config_uri) script_dir = ScriptDirectory.from_config(config) heads = script_dir.get_heads() if len(heads) > 1: sys.stderr.write('Error: migration scripts have more than one ' 'head.\nPlease resolve the situation before ' 'attempting to bootstrap the database.\n') sys.exit(2) elif len(heads) == 0: sys.stderr.write('Error: migration scripts have no head.\n') sys.exit(2) head = heads[0] db = get_session_maker() db.flush() if not has_tables(db()): with locked_transaction(db, 1234) as session: context = MigrationContext.configure(session.connection()) if not has_tables(session): import assembl.models get_metadata().create_all(session.connection()) assert has_tables(session) context._ensure_version_table() context.stamp(script_dir, head) elif with_migration: context = MigrationContext.configure(db().connection()) db_version = context.get_current_revision() # artefact: in tests, db_version may be none. if db_version and db_version != head: with locked_transaction(db, 1235) as session: context = MigrationContext.configure(session.connection()) db_version = context.get_current_revision() if db_version != head: with EnvironmentContext( config, script_dir, as_sql=False, fn=lambda heads, context: script_dir._upgrade_revs(head, db_version), destination_rev=head ): script_dir.run_env() context = MigrationContext.configure(db().connection()) db_version = context.get_current_revision() assert db_version == head return db
def bootstrap_db_data(db, mark=True): from .config import get if get('in_alembic'): return # import after session to delay loading of BaseOps from assembl.models import ( Permission, Role, IdentityProvider, LocaleLabel, URIRefDb) from .generic_pointer import init_dbtype from .database_functions import ensure_functions with locked_transaction(db, 1236) as session: for cls in (Permission, Role, IdentityProvider, LocaleLabel, URIRefDb): cls.populate_db(session) ensure_functions(session) update_indices(session) init_dbtype(session) mark_changed(session)
[docs]def ensure_db_version(config_uri, session_maker): """Exit if database is not up-to-date.""" config = Config(config_uri) script_dir = ScriptDirectory.from_config(config) heads = script_dir.get_heads() if len(heads) > 1: sys.stderr.write('Error: migration scripts have more than one head.\n' 'Please resolve the situation before attempting to ' 'start the application.\n') sys.exit(2) else: repo_version = heads[0] if heads else None context = MigrationContext.configure(session_maker()().connect()) db_version = context.get_current_revision() if not db_version: sys.stderr.write('Database not initialized.\n' 'Try this: "idealoom-db-manage %s bootstrap".\n' % config_uri) sys.exit(2) if db_version != repo_version: sys.stderr.write('Stopping: DB version (%s) not up-to-date (%s).\n' % (db_version, repo_version)) sys.stderr.write('Try this: "idealoom-db-manage %s upgrade head".\n' % config_uri) sys.exit(2)
[docs]def is_migration_script(): """Determine weather the current process is a migration script.""" return 'alembic' in sys.argv[0] or 'idealoom-db-manage' in sys.argv[0]
[docs]def includeme(config): """Initialize Alembic-related stuff at app start-up time.""" skip_migration = config.registry.settings.get('app.skip_migration') if not skip_migration and not is_migration_script(): ensure_db_version( config.registry.settings['config_uri'], get_session_maker())