Saturday, February 28, 2015

A ganglia plugin for collecting Stats from Storm topologies

Ganglia is a highly scalable monitor for distributed systems. It facilitates users to examine  different stats of the monitored machines in a single pane. It maintains historical stats and hence very good for checking how the CPU usage, memory usage, disk usage etc., JVM GC pauses are varying over time on the distributed machines.
Ganglia basically have three components.  They are Ganglia monitoring daemon(gmond), Ganglia meta daemon (gmetad) and Ganglia web front end.

Gmond monitors the changes in the host state, announces them, listen state changes from other nodes running gmond, and returns the XML description of the cluster state when queried. Gmond depends on plug-ins to collect host stats. The plug-ins may be written in C,C++,Python, Perl or PHP. In fact, we can implement real piece of code that collects the stats in any language with some tricks. Also, it won't be difficult to extend it support plug-ins  from other languages as well.


Gmetad polls the clusters it is monitoring and collects the reported metrics and write them to individual round robin databases.


Ganglia web front end provides  the metric Visualization UI.

There is a very good link to install and configure Ganglia on Ubuntu 14.04 here (from digital ocean).  Please follow the steps to setup Ganglia monitoring on your machines.  You may follow the similar steps to install on other Linux systems.

As you have installed and and configured Ganglia, now it is time to write and test a new Ganglia plug-in. From Ganglia 3.1 onwards,  Gmond has a modular interface that allows us to extend its capability through plug-ins. We can write modules or plug-ins to collect new kind of metrices. Gmond uses the Mod_Python module (modpython.so) to interface with all the Pythin plug-ins to collect the metrices published by them.

First we need to create an entry for modpython.so in gmond configuration file. Edit gmond.conf (by default it is installed at /etc/ganglia/gmond.conf) and add the below lines in the "modules" section:

  module {
    name = "python_module"
    path = "/usr/lib/ganglia/modpython.so"
    params = "/usr/lib/ganglia/python"
  }

So, the modules section in gmond.conf will look like somewhat like as shown below:

modules {
  module {
    name = "core_metrics"
  }
  module {
    name = "cpu_module"
    path = "/usr/lib/ganglia/modcpu.so"
  }
  module {
    name = "disk_module"
    path = "/usr/lib/ganglia/moddisk.so"
  }
  module {
    name = "load_module"
    path = "/usr/lib/ganglia/modload.so"
  }
  module {
    name = "mem_module"
    path = "/usr/lib/ganglia/modmem.so"
  }
  module {
    name = "net_module"
    path = "/usr/lib/ganglia/modnet.so"
  }
  module {
    name = "proc_module"
    path = "/usr/lib/ganglia/modproc.so"
  }
  module {
    name = "sys_module"
    path = "/usr/lib/ganglia/modsys.so"
  }
  module {
    name = "python_module"
    path = "/usr/lib/ganglia/modpython.so"
    params = "/usr/lib/ganglia/python"
  } 




Also, create a directory /etc/ganglia/pyconfs to store the configuration files for the python modules. Let us use .pyconf as the extension of the Python modules.  Add the below line in /etc/ganglia/gmond.conf
include ('/etc/ganglia/pyconfs/*.pyconf')

We will add a configuration file for the sample plugin (/etc/ganglia/pyconfs/ganglia_plugin_sample.pyconf). Content of this file is pasted below:
modules {
    module {
        name = "ganglia_plugin_sample"
        language = "python"
        param firstparam {
            value = 100
        }
        param secondparam {
            value = 500
        }
    }
}

collection_group {
  collect_every = 20
  time_threshold = 90
  metric {
    name_match = ".*" 
    value_threshold = "1"
  }
}



Now let us add a very simple Ganglia python plug-in. The module should implement the two functions at least, they are metric_init and metric_cleanup.  metric_init generally initializes the values for different metrices that the plug-in will be publishing. 

Below is the script:

import sys
import os
import random

descriptors = list()
firstparam_max = 1000
secondparam_max = 1000

def callback_fun1(name):
    '''
    Returns a random number between 1 and firstparam_max
    '''
    random.seed()
    return random.randint(1, firstparam_max)

def callback_fun2(name):
    '''
    Returns a random number between 5 and secondparam_max
    '''
    random.seed()
    return random.randint(5,secondparam_max)

def metric_init(params):
    global descriptors, firstparam_max, secondparam_max
    if 'firstparam' in params:
        firstparam_max = int(params['firstparam'])
        d = {'name': 'firstparam',
                'call_back': callback_fun1,
                'time_max': 90,
                'value_type': 'uint',
                'units': 'Count',
                'slope': 'both',
                'format': '%u',
                'description': 'Sample metric',
                'groups': 'Sample'}
        descriptors.append(d)
    
    if 'secondparam' in params:
        secondparam_max = int(params['secondparam'])
        d = {'name': 'secondparam',
                'call_back': callback_fun2,
                'time_max': 90,
                'value_type': 'uint',
                'units': 'Count',
                'slope': 'both',
                'format': '%u',
                'description': 'Sample metric',
                'groups': 'Sample'}
        descriptors.append(d)
    return descriptors

    def metric_cleanup():
        '''
        We don't need any cleanup :) :)
        '''
        pass


# This routine is for debugging purpose only and not used by gmond
# To debug the output, run as below:
# $ python ganglia_plugin_sample.py
if __name__ == '__main__':
    params = {'firstparam': 100, 'secondparam' : 500}
    metric_init(params)
    for d in descriptors:
        v = d['call_back'](d['name'])
        print '%s --> %u' % (d['name'], v)








Here, the plug-in is collecting two metrices (firstparam and secondparam). The value for firstparam is computed by function callback_fun1 and for secondparam is collected by callback_fun2. THe "slope" is set to "both" for both the metrices. This a hint for the gmetad to know that the value may increase or decrease.

We should copy this script to/usr/lib/ganglia/python/ganglia_plugin_sample.py.

After you copy the plugin-script and make the configuration changes, restart ganglia-monitor (gmond) service.

$sudo service ganglia-monitor restart

After some time you should see the "Sample metrics" group and the graphs for the metrices firstparam and secondparam under it.


I have created a Python module for collecting storm stats. The module is available here . Also for instructions please refer this readme.

I have pasted the module below for your easy reference:
import logging
import md5
import pickle
import re
import sys
from time import time
from thrift.transport import TTransport, TSocket
from thrift.protocol import TBinaryProtocol

clusterinfo = None
topology_found = True
descriptors = list()
topologies = []
serialfile_dir = '/tmp'
topology_summary_cols_map = {'status': 'Status', 'num_workers': 'Worker Count',
                             'num_executors': 'ExecutorCount', 'uptime_secs': 'Uptime',
                             'num_tasks': 'TaskCount'}

spout_stats = {'Executors': ['Count', '%u', 'uint'], 'Tasks': ['Count', '%u', 'uint'],
               'Emitted': ['Count', '%f', 'double'],
               'Transferred': ['Count/sec', '%f', 'double'],
               'CompleteLatency': ['ms', '%f', 'double'], 'Acked': ['Count/Sec', '%f', 'double'],
               'Failed': ['Count/sec', '%f', 'double']}

bolt_stats = {'Executors': ['Count', '%u', 'uint'], 'Tasks': ['Count', '%u', 'uint'],
              'Emitted': ['Count/sec', '%f', 'double'],
              'Executed': ['Count/sec', '%f', 'double'],
              'Transferred': ['Count/sec', '%f', 'double'],
              'ExecuteLatency': ['ms', '%f', 'double'],
              'ProcessLatency': ['ms', '%f', 'double'],
              'Acked': ['Count/sec', '%f', 'double'],
              'Failed': ['Count/sec', '%f', 'double']}

diff_cols = ['Acked', 'Failed', 'Executed', 'Transferred', 'Emitted']
overall = {'ExecutorCount': ['Count', '%u', 'uint'],
           'WorkerCount': ['Count', '%u', 'uint'],
           'TaskCount': ['Count', '%u', 'uint'],
           'UptimeSecs': ['Count', '%u', 'uint']}

toplogy_mods = {}
lastchecktime = 0
lastinfotime = 0
maxinterval = 6
all_topology_stats = {}
bolt_array = {}
spout_array = {}
nimbus_host = '127.0.0.1'
nimbus_port = 6627
logging.basicConfig(filename='/tmp/storm_topology.log', level=logging.INFO,
                    format='%(asctime)s  %(levelname)s line:%(lineno)d %(message)s', filemode='w')
logging_levels = {'INFO': logging.INFO, 'DEBUG': logging.DEBUG, 'WARNING': logging.WARNING,
                  'ERROR': logging.ERROR, 'CRITICAL': logging.CRITICAL}


def get_avg(arr):
    if len(arr) < 1:
        return 0
    return sum(arr) / len(arr)


def normalize_stats(stats, duration):
    for k in stats:
        statsk = stats[k]
        if 'Emitted' in statsk and duration > 0:
            if statsk['Emitted'] > 0:
                statsk['Emitted'] = statsk['Emitted'] / duration
        if 'Acked' in statsk and duration > 0:
            if statsk['Acked'] > 0:
                statsk['Acked'] = statsk['Acked'] / duration
        if 'Executed' in statsk and duration > 0:
            if statsk['Executed'] > 0:
                statsk['Executed'] = statsk['Executed'] / duration


def freshen_topology(topology):
    tmpsavestats = None
    inf = None
    savedlastchecktime = 0
    tmp = md5.new()
    tmp.update(topology)
    filename = '/tmp/save_stats_for_' + tmp.hexdigest() + '.pk'
    try:
        inf = open(filename, 'rb')
    except IOError as e:
        logging.warn(e)
    if inf is not None:
        try:
            tmpsavestats = pickle.load(inf)
            savedlastchecktime = pickle.load(inf)
        except EOFError as e:
            logging.warn(e.message())
        inf.close()
    if not all_topology_stats[topology]['topology_stats_got']:
        logging.warn('Info not got for topology ' + topology)
        return
    overallstats = all_topology_stats[topology]['overallstats']
    boltspoutstats = all_topology_stats[topology]['boltspoutstats']
    of = open(filename, 'wb')
    if of is not None:
        pickle.dump(boltspoutstats, of)
        pickle.dump(time(), of)
        of.close()
    if overallstats['UptimeSecs'] > (lastchecktime - savedlastchecktime):
        if tmpsavestats is not None:
            for bolt in bolt_array[topology]:
                if bolt in tmpsavestats and bolt in boltspoutstats:
                    stats_new = boltspoutstats[bolt]
                    stats_old = tmpsavestats[bolt]
                    for key in bolt_stats:
                        if key == 'ExecuteLatency' or key == 'ProcessLatency':
                            continue
                        if key not in stats_new:
                            continue
                        if key not in stats_old:
                            continue
                        if key in diff_cols:
                            stats_new[key] -= stats_old[key]
            for spout in spout_array[topology]:
                if spout in tmpsavestats and spout in boltspoutstats:
                    stats_new = boltspoutstats[spout]
                    stats_old = tmpsavestats[spout]
                    for key in spout_stats:
                        if key == 'CompleteLatency':
                            continue
                        if key not in stats_new:
                            continue
                        if key not in stats_old:
                            continue
                        if key in diff_cols:
                            stats_new[key] -= stats_old[key]
            normalize_stats(boltspoutstats, lastchecktime - savedlastchecktime)
        else:
            normalize_stats(boltspoutstats, overallstats['UptimeSecs'])
    else:
        normalize_stats(boltspoutstats, overallstats['UptimeSecs'])


def freshen():
    global lastchecktime
    if time() > (lastchecktime + maxinterval):
        lastchecktime = time()
        get_topology_stats_for(topologies)
        for topology in topologies:
            freshen_topology(topology)


def callback_boltspout(name):
    freshen()
    first_pos = name.find('_')
    last_pos = name.rfind('_')
    topology_mod = name[0:first_pos]
    bolt = name[first_pos + 1: last_pos]
    statname = name[last_pos + 1:]
    topology = toplogy_mods[topology_mod]
    if not all_topology_stats[topology]['topology_stats_got']:
        logging.debug('Returing 0 for ' + name)
        return 0
    logging.debug('Got stats for ' + name + " " +
                  str(all_topology_stats[topology]['boltspoutstats'][bolt][statname]))
    return all_topology_stats[topology]['boltspoutstats'][bolt][statname]


def callback_overall(name):
    freshen()
    topology_mod, name = name.split('_')
    topology = toplogy_mods[topology_mod]
    if not all_topology_stats[topology]['topology_stats_got']:
        logging.debug('Returing 0 for ' + name)
        return 0
    logging.debug(topology + ' ' + name + ' ' +
                  str(all_topology_stats[topology]['overallstats'][name]))
    return all_topology_stats[topology]['overallstats'][name]


def update_task_count(component_task_count, component_name, count):
    if component_name not in component_task_count:
        component_task_count[component_name] = 0
    component_task_count[component_name] += count


def update_exec_count(component_exec_count, component_name, count):
    if component_name not in component_exec_count:
        component_exec_count[component_name] = 0
    component_exec_count[component_name] += count


def update_whole_num_stat_special(stats, store, boltname, statname):
    if boltname not in store:
        store[boltname] = {}
    if statname not in store[boltname]:
        store[boltname][statname] = 0
    for k in stats:
        if k == '__metrics' or k == '__ack_init' or k == '__ack_ack' or k == '__system':
            continue
        store[boltname][statname] += stats[k]


def update_whole_num_stat(stats, store, boltname, statname):
    if boltname not in store:
        store[boltname] = {}
    if statname not in store[boltname]:
        store[boltname][statname] = 0
    for k in stats:
        store[boltname][statname] += stats[k]


def update_avg_stats(stats, store, boltname, statname):
    if boltname not in store:
        store[boltname] = {}
    if statname not in store[boltname]:
        store[boltname][statname] = []
    for k in stats:
        store[boltname][statname].append(stats[k])


def get_topology_stats_for(topologies):
    all_topology_stats.clear()
    for topology in topologies:
        all_topology_stats[topology] = get_topology_stats(topology)


def refresh_topology_stats():
    logging.debug('Refreshing topology stats')
    for t in topologies:
        all_topology_stats[t] = {'topology_stats_got': False}
    global clusterinfo
    try:
        transport = TSocket.TSocket(nimbus_host, nimbus_port)
        transport.setTimeout(1000)
        framedtrasp = TTransport.TFramedTransport(transport)
        protocol = TBinaryProtocol.TBinaryProtocol(framedtrasp)
        client = Nimbus.Client(protocol)
        framedtrasp.open()
        boltspoutstats = None
        component_task_count = None
        component_exec_count = None
        clusterinfo = client.getClusterInfo()
        for tsummary in clusterinfo.topologies:
            if tsummary.name not in topologies:
                continue
            toplogyname = tsummary.name
            overallstats = {}
            overallstats['ExecutorCount'] = tsummary.num_executors
            overallstats['TaskCount'] = tsummary.num_tasks
            overallstats['WorkerCount'] = tsummary.num_workers
            overallstats['UptimeSecs'] = tsummary.uptime_secs
            all_topology_stats[toplogyname]['overallstats'] = overallstats
            boltspoutstats = {}
            component_task_count = {}
            component_exec_count = {}
            all_topology_stats[toplogyname]['boltspoutstats'] = boltspoutstats
            all_topology_stats[toplogyname]['component_task_count'] = component_task_count
            all_topology_stats[toplogyname]['component_exec_count'] = component_exec_count
            tinfo = client.getTopologyInfo(tsummary.id)
            all_topology_stats[toplogyname]['topology_stats_got'] = True
            for exstat in tinfo.executors:
                stats = exstat.stats
                update_whole_num_stat_special(stats.emitted[":all-time"], boltspoutstats,
                                              exstat.component_id, 'Emitted')
                update_whole_num_stat_special(stats.transferred[":all-time"], boltspoutstats,
                                              exstat.component_id, 'Transferred')

                numtask = exstat.executor_info.task_end - exstat.executor_info.task_end + 1
                update_task_count(component_task_count, exstat.component_id, numtask)
                update_exec_count(component_exec_count, exstat.component_id, 1)
                if stats.specific.bolt is not None:
                    update_whole_num_stat(stats.specific.bolt.acked[":all-time"], boltspoutstats,
                                          exstat.component_id, 'Acked')
                    update_whole_num_stat(stats.specific.bolt.failed[":all-time"], boltspoutstats,
                                          exstat.component_id, 'Failed')
                    update_whole_num_stat(stats.specific.bolt.executed[":all-time"], boltspoutstats,
                                          exstat.component_id, 'Executed')
                    update_avg_stats(stats.specific.bolt.process_ms_avg["600"], boltspoutstats,
                                     exstat.component_id, 'process_ms_avg')
                    update_avg_stats(stats.specific.bolt.execute_ms_avg["600"], boltspoutstats,
                                     exstat.component_id, 'execute_ms_avg')
                if stats.specific.spout is not None:
                    update_whole_num_stat(stats.specific.spout.acked[":all-time"], boltspoutstats,
                                          exstat.component_id, 'Acked')
                    update_whole_num_stat(stats.specific.spout.failed[":all-time"], boltspoutstats,
                                          exstat.component_id, 'Failed')
                    update_avg_stats(stats.specific.spout.complete_ms_avg[":all-time"], boltspoutstats,
                                     exstat.component_id, 'complete_ms_avg')
            if '__acker' in boltspoutstats:
                del boltspoutstats['__acker']
            for key in boltspoutstats:
                if 'complete_ms_avg' in boltspoutstats[key]:
                    avg = get_avg(boltspoutstats[key]['complete_ms_avg'])
                    boltspoutstats[key]['CompleteLatency'] = avg
                    del boltspoutstats[key]['complete_ms_avg']
                if 'process_ms_avg' in boltspoutstats[key]:
                    avg = get_avg(boltspoutstats[key]['process_ms_avg'])
                    boltspoutstats[key]['ProcessLatency'] = avg
                    del boltspoutstats[key]['process_ms_avg']
                if 'execute_ms_avg' in boltspoutstats[key]:
                    avg = get_avg(boltspoutstats[key]['execute_ms_avg'])
                    boltspoutstats[key]['ExecuteLatency'] = avg
                    del boltspoutstats[key]['execute_ms_avg']

            for key in component_task_count:
                if key in boltspoutstats:
                    boltspoutstats[key]['Tasks'] = component_task_count[key]
            for key in component_exec_count:
                if key in boltspoutstats:
                    boltspoutstats[key]['Executors'] = component_exec_count[key]
        framedtrasp.close()

    except Exception as e:
        clusterinfo = None
        logging.warn(e)


def get_topology_stats(toplogyname):
    global lastinfotime
    if (lastinfotime + 4) < time():
        for t in all_topology_stats:
            all_topology_stats[t] = None
        lastinfotime = time()
        refresh_topology_stats()
    return all_topology_stats[toplogyname]


def metric_init_topology(params):
    global descriptors
    groupname = ''
    if 'topology' in params and len(params['topology']):
        groupname = params['topology']
    else:
        return
    topology = groupname
    topology_mod = re.sub("\s+", "", topology)
    topology_mod = re.sub("[_]+", "", topology_mod)
    toplogy_mods[topology_mod] = topology
    if 'spouts' in params:
        spout_array[topology] = re.split('[,]+', params['spouts'])
        for spout in spout_array[topology]:
            for statname in spout_stats:
                d = {'name': topology_mod + '_' + spout + '_' + statname, 'call_back': callback_boltspout,
                     'time_max': 90,
                     'value_type': spout_stats[statname][2],
                     'units': spout_stats[statname][0],
                     'slope': 'both',
                     'format': spout_stats[statname][1],
                     'description': '',
                     'groups': groupname}
                descriptors.append(d)

    if 'bolts' in params:
        bolt_array[topology] = re.split('[,]+', params['bolts'])
        for bolt in bolt_array[topology]:
            for statname in bolt_stats:
                d = {'name': topology_mod + '_' + bolt + '_' + statname, 'call_back': callback_boltspout,
                     'time_max': 90,
                     'value_type': bolt_stats[statname][2],
                     'units': bolt_stats[statname][0],
                     'slope': 'both',
                     'format': bolt_stats[statname][1],
                     'description': '',
                     'groups': groupname}
                descriptors.append(d)

    for key in overall:
        d = {'name': topology_mod + '_' + key, 'call_back': callback_overall,
             'time_max': 90,
             'value_type': overall[key][2],
             'units': overall[key][0],
             'slope': 'both',
             'format': overall[key][1],
             'description': '',
             'groups': groupname}
        descriptors.append(d)
    logging.info('Inited metric for ' + groupname)


def metric_init(params):
    global topologies, nimbus_host, nimbus_port
    if 'nimbus_host' in params:
        nimbus_host = params['nimbus_host']
    if 'nimbus_port' in params:
        nimbus_port = params['nimbus_port']
    if 'topologies' not in params:
        return
    if 'storm_thrift_gen' in params:
        sys.path.append(params['storm_thrift_gen'])
    else:
        sys.path.append('/usr/lib/ganglia')
    if 'loglevel' in params:
        loglevel = params['loglevel'].strip().upper()
        if loglevel in logging_levels:
            logging.getLogger().setLevel(logging_levels[loglevel])

    global Nimbus, ttypes
    from stormpy.storm import Nimbus, ttypes

    tss = re.split('[,]+', params['topologies'])
    topologies = tss
    alltops = {}
    for t in tss:
        alltops[t] = {'topology': t}
        alltops[t]['tlen'] = len(t)
        t_bolts = t + '_bolts'
        if t_bolts in params:
            alltops[t]['bolts'] = params[t_bolts]
        t_spouts = t + '_spouts'
        if t_spouts in params:
            alltops[t]['spouts'] = params[t_spouts]
    for t in alltops:
        logging.info('Initing metric for ' + t)
        metric_init_topology(alltops[t])
    return descriptors

if __name__ == '__main__':
    params = {'topologies': 'SampleTopology,AnotherTopology',
              'SampleTopology_spouts': 'SampleSpoutTwo', 'SampleTopology_bolts': 'boltc',
              'AnotherTopology_spouts': 'Spout', 'AnotherTopology_bolts': 'bolta,boltb,boltd',
              'loglevel': 'ERROR'}
    metric_init(params)
    for d in descriptors:
        v = d['call_back'](d['name'])
        formt = "%s " + d['format']
        print formt % (d['name'], v)

Saturday, February 14, 2015

Splitting Elasticsearch Index


Elasticsearch doesn't provide facilities for splitting an index. The main reason may be because the Elasticsearch nodes may not be able to hold the intermediate data created for splitting an index. So, if we need split an index,  we need to do something like  (a) Create the two new indices (b) reindex the data from the original index to the new indices by adding the alternate documents to the two new indices created.
Problem with the above approach is that, most of the time we disable to storing the source documents in the index. For example we may index 5 petabytes of data in an index, but we may not like to store the documents in the index as it will result in a very large index. So, for re-indexing we need have all original documents somewhere. We cannot just get all the documents from the original index itself.

But sometimes we may want to split an existing index when the index grows very large. This may be due to performance issues, when an index is too big there is a performance hit.

So, I came up with the below approach which worked fine. Hopefully it will be useful for you as well.

Let us assume, we have an index "original-index"  and we may want to split it to "original-index-firsthalf" and "original-index-secondhalf". 

Basically we need to follow the below steps.

  • Create an index  original-index-firsthalf with the same settings as that of original-index,  and put same mappings on the new index.
  •  Stop adding new docs to original-index-firsthalf and original-index till the splitting is over.
  • Flush original-index
  • Shutdown Elasticsearch nodes
  •  Copy (scp or something like that) the lucne indices in shards from original-index to original-index-firsthalf. We need to copy shard 0 directory index from source to shard 0 directory index of destination (Eg. original-index-firsthalf/0/index/* is copied to original-index-firsthalf/0/index/*.  Same needs to be repeated for all other shards (and for the replicas as well)
  • Restart Elasticsearch cluster
  •  Now original-index and original-index-firsthalf contain same documents indexed and will produce similar search results
  • Let us assume there were two mappings mapping1 and mapping2 in the indices for two types type1 and type2. Let us assume there is a field mapping1.date1 and mapping2.date2 in the two mappings and they are of "date" types (We may chose to split on the basis of some other mapping field as well, just for this example I am chosing some date fields
  • Let us assume docs in type1 includes values for mapping1.date1  in the range start_date and end_date and for simplicity let us assume docs in type2 also includes dates in the same range (from start_date to end_date).  Let us assume middle_date is the date which lies almost halfway from start_date and end_date.
  •  Delete all the documents in type1 and type2 that matches the queries with "type1.date1 >=  middle_date"  and "type2.date2 >=  middle_date"  respectively from original-index-firsthalf.
  • Delete all the documents in type1 and type2 that matches the queries with "type1.date1 <  middle_date"  and "type2.date2 < middle_date"  respectively from original-index.
  • Optimize original-index and original-index-firsthalf.
  • Now original-index-firsthalf and original-index contains almost half the documents from the original index, but they don’t share any documents
  •  May be we create an alias for original-index as original-index-secondhalf or simply create original-index-secondhalf index and replace its data from original-index and then delete original-index.


This may be useful when we want to split big indices into smaller indices (with same number of shards) as we don’t have to re-index the all the documents again. I could have written a shell-script to demonstrate the operation, but don't have time today. But shortly will post a shell script for your benefits :):)