Source code for invenio_index_migrator.api.migration

# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2015-2019 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Index syncing API."""

from __future__ import absolute_import, print_function

import json
import warnings
from datetime import datetime

from elasticsearch import VERSION as ES_VERSION
from elasticsearch.exceptions import NotFoundError
from flask import current_app
from invenio_search.api import RecordsSearch
from invenio_search.proxies import current_search, current_search_client
from invenio_search.utils import build_alias_name, build_index_name, \
    prefix_index
from six import string_types
from werkzeug.utils import cached_property

from ..indexer import SYNC_INDEXER_MQ_QUEUE, MigrationIndexer
from ..proxies import current_index_migrator
from ..tasks import run_sync_job
from ..utils import ESClient, State, extract_doctype_from_mapping, \
    get_queue_size, obj_or_import_string


def ensure_valid_config(f):
    """Decorate to ensure that all config parameters are valid."""
    def inner(self, name, **config):
        missing = [p for p in self.REQUIRED_PARAMS if p not in config]
        if missing:
            msg = "Required input parameters are missing {}" \
                .format(missing)
            raise Exception(msg)
        if config["strategy"] not in self.STRATEGIES:
            msg = "Invalid strategy {}. You should pass one of the {}." \
                .format(config["strategy"], self.STRATEGIES)
            raise Exception(description=msg)
        return f(self, name, **config)
    return inner


[docs]class Migration(object): """Index migration base class.""" IN_CLUSTER_STRATEGY = "in_cluster_strategy" CROSS_CLUSTER_STRATEGY = "cross_cluster_strategy" STRATEGIES = ( IN_CLUSTER_STRATEGY, CROSS_CLUSTER_STRATEGY ) REQUIRED_PARAMS = ('src_es_client', 'jobs', 'strategy') @ensure_valid_config def __init__(self, name, **config): """Initialize the job configuration.""" self.name = name self.jobs = {} self.index = current_index_migrator.config_index self.config = config self.src_es_client = ESClient(config['src_es_client']) self.state = State( index=self.index, document_id=name ) @cached_property
[docs] def strategy(self): """Return migration strategy.""" return self.config["strategy"]
@classmethod
[docs] def create_from_config(cls, recipe_name, **recipe_config): """Create `Migration` instance from config.""" return cls(recipe_name, **recipe_config)
@classmethod
[docs] def create_from_state(cls, recipe_name, **recipe_config): """Create `Migration` instance from ES state.""" document = current_search_client.get( index=current_index_migrator.config_index, id=recipe_name) return cls(recipe_name, **document["_source"]["config"])
[docs] def load_jobs_from_config(self): """Load jobs from config.""" jobs = {} for job_name, job_config in self.config['jobs'].items(): job = obj_or_import_string(job_config['cls'])( job_name, self, config=job_config) jobs[job_name] = job return jobs
[docs] def create_index(self): """Create Elasticsearch index for the migration.""" current_search_client.indices.create(index=self.index) print('[*] created index: {}'.format(self.index))
[docs] def init(self, dry_run=False): """Initialize the index with recipe and jobs documents.""" if not dry_run: if not current_search_client.indices.exists(index=self.index): self.create_index() try: current_search_client.get(index=self.index, id=self.name) raise Exception( ('The document {} already exists, a job is already ' 'active.').format(self.state.index)) except NotFoundError: pass # Get old indices jobs = {} for job_name, job_config in self.config['jobs'].items(): job = obj_or_import_string(job_config['cls'])( job_name, self, config=job_config) initial_state = job.initial_state(dry_run=dry_run) jobs[job_name] = (job, initial_state) self.jobs = jobs if not dry_run: migration_initial_state = { "type": "migration", "config": self.config, "status": "INITIAL", "job_ids": [job.document_name for job, _ in self.jobs.values()] } self.state.commit(migration_initial_state) for job, initial_state in self.jobs.values(): job.state.commit(initial_state) job.create_index(initial_state["dst"]["index"])
[docs] def rollover(self, force=False): """Perform a rollover action.""" payload = dict(actions=[]) self.jobs = self.load_jobs_from_config() if force or self.state.read()['status'] == 'COMPLETED': for job in self.jobs.values(): payload['actions'] += job.rollover_actions() current_search_client.indices.update_aliases(body=payload) else: print('Not all jobs are completed - rollover not possible.')
[docs] def notify(self): """Notify when rollover is possible. Override this to notify the user whenever the threshold is reached and a rollover is possible. """ pass
[docs] def run(self): """Run the index sync job.""" job_states = {} self.jobs = self.load_jobs_from_config() for name, job in self.jobs.items(): print('[~] running job: {}'.format(name)) job_states[name] = job.run() if all(state['threshold_reached'] for state in job_states.values()): state = self.state.read() state['status'] = 'COMPLETED' self.state.commit(state) self.notify()
[docs] def status(self): """Get status for index sync job.""" self.jobs = self.load_jobs_from_config() state = self.state.read() resp = dict( migration_status=state['status'], jobs={ job_name: job.status() for job_name, job in self.jobs.items() } ) if state['status'] == 'COMPLETED': print( 'Threshold reach - rollover is possible. Please see the ' 'documentation for the steps needed to perform the rollover.' ) return resp
[docs] def cancel(self): """Cancel migration and all its jobs.""" self.jobs = self.load_jobs_from_config() for job in self.jobs.values(): job.cancel() state = self.state.read() state['status'] = 'CANCELLED' self.state.commit(state)