# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2015-2019 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""Index syncing API."""
from __future__ import absolute_import, print_function
import six
from datetime import datetime
from invenio_search.proxies import current_search, current_search_client
from invenio_search.utils import build_alias_name, build_index_name, \
prefix_index
from ..indexer import SYNC_INDEXER_MQ_QUEUE, MigrationIndexer
from ..utils import State, extract_doctype_from_mapping, \
get_queue_size
[docs]class Job(object):
"""Index migration job."""
def __init__(self, name, migration, config):
"""Initialize a migration job.
:param name: job's name.
:param migration: an invenio_index_migrator.api.migration.Migration
object.
:param config: job's configuration.
"""
self.name = name
self.migration = migration
self.config = config
self.state = State(
self.migration.index, document_id=self.document_name)
@property
def document_name(self):
"""Get the document name for the job."""
return '{}-{}'.format(self.migration.name, self.name)
@property
def src_es_client(self):
"""Get the source ES client."""
return self.migration.src_es_client
[docs] def run(self):
"""Run the job."""
raise NotImplementedError()
[docs] def cancel(self):
"""Cancel the job."""
raise NotImplementedError()
# TODO: Define the attributes and values of the returned dict
[docs] def status(self):
"""Return the status of the job."""
job_state = self.state.read()
current = {}
current['completed'] = False
current['last_updated'] = job_state['last_record_update']
current['queue_size'] = get_queue_size(SYNC_INDEXER_MQ_QUEUE)
if job_state['reindex_task_id']:
task = current_search_client.tasks.get(
task_id=job_state['reindex_task_id'])
current['total'] = task['task']['status']['total']
current['_es_task_response'] = task
current['completed'] = task['completed']
if task['completed']:
current['status'] = 'Finished reindex'
current['seconds'] = task['response']['took'] / 1000.0
else:
current['status'] = 'Reindexing...'
current['duration'] = '{:.1f} second(s)'.format(
task['task']['running_time_in_nanos'] / 1000000000.0)
current['current'] = current_search_client.count(
index=job_state['dst']['index'])['count']
if current['total'] > 0:
current['percent'] = \
100.0 * current['current'] / current['total']
current['created'] = task['task']['status']['created']
else:
current['status'] = 'Finished'
current['threshold_reached'] = job_state['threshold_reached']
return current
# TODO:
[docs] def create_index(self, index):
"""Create indexes needed for the job."""
index_result, _ = current_search.create_index(
index,
create_write_alias=False
)
index_name = index_result[0]
refresh_interval = self.config.get('refresh_interval', '300s')
current_search_client.indices.put_settings(
index=index_name,
body=dict(index=dict(refresh_interval=refresh_interval))
)
print('[*] created index: {}'.format(index_name))
state = self.state.read()
state['dst']['index'] = index_name
self.state.commit(state)
[docs] def rollover_actions(self):
"""Rollover actions."""
actions = []
state = self.state.read()
src_index = state["src"]["index"]
dst_index = state["dst"]["index"]
# Reset the "refresh_interval" setting for the destination index
current_search_client.indices.put_settings(
index=dst_index,
body=dict(index=dict(refresh_interval=None))
)
# Preform a "flush + refresh" before rolling over the aliases
current_search_client.indices.flush(
wait_if_ongoing=True, index=dst_index)
current_search_client.indices.refresh(index=dst_index)
for alias in state["dst"]["aliases"]:
if self.migration.strategy == self.migration.IN_CLUSTER_STRATEGY:
actions.append(
{"remove": {"index": src_index, "alias": alias}})
actions.append({"add": {"index": dst_index, "alias": alias}})
return actions
[docs]class ReindexJob(Job):
"""Reindex job that uses Elasticsearch's reindex API."""
[docs] def run(self):
"""Fetch source index using ES Reindex API."""
pid_type = self.config['pid_type']
print('[*] running reindex for pid type: {}'.format(pid_type))
reindex_params = self.config.get('reindex_params', {})
source_params = reindex_params.pop('source', {})
dest_params = reindex_params.pop('dest', {})
dest_params.setdefault('version_type', 'external_gte')
state = self.state.read()
payload = dict(
source=dict(index=state['src']['index'], **source_params),
dest=dict(index=state['dst']['index'], **dest_params),
**reindex_params
)
if self.migration.strategy == self.migration.CROSS_CLUSTER_STRATEGY:
payload['source']['remote'] = \
self.migration.src_es_client.reindex_remote
# Reindex using ES Reindex API synchronously
# Keep track of the time we issued the reindex command
start_date = datetime.utcnow()
wait_for_completion = self.config.get('wait_for_completion', False)
response = current_search_client.reindex(
wait_for_completion=wait_for_completion,
body=payload
)
state['stats']['total'] = self.migration.src_es_client.client.count(
index=state['src']['index'])
state['last_record_update'] = str(start_date)
task_id = response.get('task', None)
state['reindex_task_id'] = task_id
if wait_for_completion:
if response.get('timed_out') or len(response['failures']) > 0:
state['status'] = 'FAILED'
else:
state['threshold_reached'] = True
state['status'] = 'COMPLETED'
self.state.commit(state)
print('reindex task started: {}'.format(task_id))
return state
[docs] def cancel(self):
"""Cancel reindexing job."""
state = self.state.read()
task_id = state['reindex_task_id']
cancel_response = current_search_client.tasks.cancel(task_id)
if cancel_response.get('timed_out') or \
len(cancel_response.get('failures', [])) > 0:
state['status'] = 'FAILED'
else:
state['status'] = 'CANCELLED'
self.state.commit(state)
if 'node_failures' in cancel_response:
print('failed to cancel task', cancel_response)
else:
print('- successfully cancelled task: {}'.format(task_id))
[docs] def initial_state(self, dry_run=False):
"""Build job's initial state."""
old_client = self.migration.src_es_client.client
index = self.config['index']
src_prefix = self.migration.src_es_client.config.get('prefix')
src_alias_name = build_alias_name(index, prefix=src_prefix)
if old_client.index_exists(src_alias_name) and old_client.alias_exists(
alias=src_alias_name):
indexes = list(old_client.get_indexes_from_alias(
alias=src_alias_name).keys())
if len(indexes) > 1:
raise Exception(
'Multiple indexes found for alias {}.'.format(
src_alias_name))
else:
raise Exception(
"alias or index ({}) doesn't exist".format(src_alias_name)
)
src_state = dict(index=src_alias_name)
def find_aliases_for_index(index_name, aliases):
"""Find all aliases for a given index."""
if isinstance(aliases, str):
return None
for key, values in aliases.items():
if key == index_name:
return [build_alias_name(key)]
else:
# TODO: refactoring
found_aliases = find_aliases_for_index(index_name, values)
if isinstance(found_aliases, list):
found_aliases.append(build_alias_name(key))
return found_aliases
mapping_fp = current_search.mappings[index]
dst_index_aliases = find_aliases_for_index(
index, current_search.aliases) or []
dst_state = dict(
index=index,
aliases=dst_index_aliases,
mapping=mapping_fp,
doc_type=extract_doctype_from_mapping(mapping_fp),
)
initial_state = dict(
type="job",
name=self.name,
status='INITIAL',
migration_id=self.name,
config=self.config,
pid_type=self.config['pid_type'],
src=src_state,
dst=dst_state,
last_record_update=None,
reindex_task_id=None,
threshold_reached=False,
rollover_threshold=self.config['rollover_threshold'],
rollover_ready=False,
rollover_finished=False,
stats={},
reindex_params=self.config.get('reindex_params', {})
)
return initial_state
[docs]class ReindexAndSyncJob(ReindexJob):
"""Job that both reindexes with ES reindex API and syncs with the DB.
The first run will use the reindex API and the subsequent runs will fetch
from the database and sync the data.
"""
[docs] def iter_indexer_ops(self, start_date=None, end_date=None):
"""Iterate over documents that need to be reindexed."""
from invenio_db import db
from invenio_pidstore.models import PersistentIdentifier, PIDStatus
from invenio_records.models import RecordMetadata
q = db.session.query(
RecordMetadata.id.distinct(),
PersistentIdentifier.status,
PersistentIdentifier.pid_type
).join(
PersistentIdentifier,
RecordMetadata.id == PersistentIdentifier.object_uuid
).filter(
PersistentIdentifier.pid_type == self.config['pid_type'],
PersistentIdentifier.object_type == 'rec',
RecordMetadata.updated >= start_date
).yield_per(500) # TODO: parameterize
for record_id, pid_status, pid_type in q:
_dst = self.state.read()['dst']
_index = _dst['index']
_doc_type = _dst['doc_type']
payload = {'id': record_id, 'index': _index, 'doc_type': _doc_type}
if pid_status == PIDStatus.DELETED:
payload['op'] = 'delete'
else:
payload['op'] = 'create'
yield payload
[docs] def run_delta_job(self):
"""Calculate delta from DB changes since the last update."""
state = self.state.read()
# Check if reindex task is running - abort
task = current_search_client.tasks.get(
task_id=state['reindex_task_id'])
if not task['completed']:
raise RuntimeError(
'Reindex is currently running - aborting delta.')
# determine bounds
start_time = state['last_record_update']
if not start_time:
raise RuntimeError(
'no reindex task running nor start time - aborting')
else:
start_time = datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S.%f')
# Fetch data from start_time from db
indexer = MigrationIndexer()
# Send indexer actions to special reindex queue
start_date = datetime.utcnow()
indexer._bulk_op(self.iter_indexer_ops(start_time), None)
last_record_update = str(start_date)
# Run synchornous bulk index processing
# TODO: make this asynchronous by default
succeeded, failed = indexer.process_bulk_queue(
es_bulk_kwargs=dict(raise_on_error=False)
)
total_actions = succeeded + failed
print('[*] indexed {} record(s)'.format(total_actions))
threshold_reached = False
if total_actions <= state['rollover_threshold']:
threshold_reached = True
state['last_record_update'] = last_record_update
state['threshold_reached'] = threshold_reached
self.state.commit(state)
return state
[docs] def run(self):
"""Run reindexing and syncing job."""
if self.state.read()['reindex_task_id']:
return self.run_delta_job()
else:
if self.state.read()['status'] != 'COMPLETED':
return super(ReindexAndSyncJob, self).run()
[docs] def cancel(self):
"""Cancel reinding and syncing job."""
super(ReindexAndSyncJob, self).cancel()
# FIXME: extract common code into parent class methods
[docs]class MultiIndicesReindexJob(Job):
"""Reindex job that uses Elasticsearch's reindex API."""
[docs] def run(self):
"""Fetch source index using ES Reindex API."""
print('[*] running reindex for templates')
reindex_params = self.config.get('reindex_params', {})
source_params = reindex_params.pop('source', {})
dest_params = reindex_params.pop('dest', {})
dest_params.pop('index', '')
state = self.state.read()
src_index = state['src']['index']
dst_index = state['dst']['index']
if not isinstance(dst_index, six.string_types):
raise Exception(u'"dest.index" has to be a string, and not {}'
.format(dst_index))
dest_params.setdefault('version_type', 'external_gte')
payload = dict(
source=dict(index=src_index, **source_params),
dest=dict(index=dst_index, **dest_params),
**reindex_params
)
if self.migration.strategy == self.migration.CROSS_CLUSTER_STRATEGY:
payload['source']['remote'] = \
self.migration.src_es_client.reindex_remote
# Keep track of the time we issued the reindex command
start_date = datetime.utcnow()
# Reindex using ES Reindex API synchronously
wait_for_completion = self.config.get('wait_for_completion', False)
response = current_search_client.reindex(
wait_for_completion=wait_for_completion,
body=payload
)
state['stats']['total'] = self.migration.src_es_client.client.count(
index=state['src']['index'])
state['last_record_update'] = str(start_date)
task_id = response.get('task', None)
state['reindex_task_id'] = task_id
if wait_for_completion:
if response.get('timed_out') or len(response['failures']) > 0:
state['status'] = 'FAILED'
else:
state['threshold_reached'] = True
state['status'] = 'COMPLETED'
self.state.commit(state)
print('reindex task started: {}'.format(task_id))
return state
[docs] def initial_state(self, dry_run=False):
"""Build job's initial state."""
index = self.config['index']
src_prefix = self.migration.config.get('src_es_client', {}).get('prefix')
src_index = index
if src_prefix:
if isinstance(index, six.string_types):
src_index = [index]
src_index = [src_prefix + i for i in src_index]
dst_index = build_alias_name(
self.config['reindex_params']['dest']['index'])
initial_state = dict(
type="job",
name=self.name,
status='INITIAL',
migration_id=self.name,
config=self.config,
pid_type=self.config['pid_type'],
src=dict(index=src_index),
dst=dict(index=dst_index),
last_record_update=None,
reindex_task_id=None,
threshold_reached=False,
rollover_threshold=self.config['rollover_threshold'],
rollover_ready=False,
rollover_finished=False,
stats={},
reindex_params=self.config.get('reindex_params', {})
)
return initial_state
[docs] def create_index(self, index):
"""Create templates."""
# Only templates need to be created
current_search.put_templates(ignore=[400, 404])
[docs] def rollover_actions(self):
"""Rollover actions."""
# TODO: Investigate for in-cluster migrations what kind of rollover
# actions are needed
return []