Ganglia is a highly scalable monitor for distributed systems. It facilitates users to examine different stats of the monitored machines in a single pane. It maintains historical stats and hence very good for checking how the CPU usage, memory usage, disk usage etc., JVM GC pauses are varying over time on the distributed machines.
Ganglia basically have three components. They are Ganglia monitoring daemon(gmond), Ganglia meta daemon (gmetad) and Ganglia web front end.
Gmond monitors the changes in the host state, announces them, listen state changes from other nodes running gmond, and returns the XML description of the cluster state when queried. Gmond depends on plug-ins to collect host stats. The plug-ins may be written in C,C++,Python, Perl or PHP. In fact, we can implement real piece of code that collects the stats in any language with some tricks. Also, it won't be difficult to extend it support plug-ins from other languages as well.
Gmetad polls the clusters it is monitoring and collects the reported metrics and write them to individual round robin databases.
Ganglia web front end provides the metric Visualization UI.
There is a very good link to install and configure Ganglia on Ubuntu 14.04 here (from digital ocean). Please follow the steps to setup Ganglia monitoring on your machines. You may follow the similar steps to install on other Linux systems.
As you have installed and and configured Ganglia, now it is time to write and test a new Ganglia plug-in. From Ganglia 3.1 onwards, Gmond has a modular interface that allows us to extend its capability through plug-ins. We can write modules or plug-ins to collect new kind of metrices. Gmond uses the Mod_Python module (modpython.so) to interface with all the Pythin plug-ins to collect the metrices published by them.
First we need to create an entry for modpython.so in gmond configuration file. Edit gmond.conf (by default it is installed at /etc/ganglia/gmond.conf) and add the below lines in the "modules" section:
module {
name = "python_module"
path = "/usr/lib/ganglia/modpython.so"
params = "/usr/lib/ganglia/python"
}
So, the modules section in gmond.conf will look like somewhat like as shown below:
modules {
module {
name = "core_metrics"
}
module {
name = "cpu_module"
path = "/usr/lib/ganglia/modcpu.so"
}
module {
name = "disk_module"
path = "/usr/lib/ganglia/moddisk.so"
}
module {
name = "load_module"
path = "/usr/lib/ganglia/modload.so"
}
module {
name = "mem_module"
path = "/usr/lib/ganglia/modmem.so"
}
module {
name = "net_module"
path = "/usr/lib/ganglia/modnet.so"
}
module {
name = "proc_module"
path = "/usr/lib/ganglia/modproc.so"
}
module {
name = "sys_module"
path = "/usr/lib/ganglia/modsys.so"
}
module {
name = "python_module"
path = "/usr/lib/ganglia/modpython.so"
params = "/usr/lib/ganglia/python"
}
}
Also, create a directory /etc/ganglia/pyconfs to store the configuration files for the python modules. Let us use .pyconf as the extension of the Python modules. Add the below line in /etc/ganglia/gmond.conf
include ('/etc/ganglia/pyconfs/*.pyconf')
We will add a configuration file for the sample plugin (/etc/ganglia/pyconfs/ganglia_plugin_sample.pyconf). Content of this file is pasted below:
modules {
module {
name = "ganglia_plugin_sample"
language = "python"
param firstparam {
value = 100
}
param secondparam {
value = 500
}
}
}
collection_group {
collect_every = 20
time_threshold = 90
metric {
name_match = ".*"
value_threshold = "1"
}
}
Now let us add a very simple Ganglia python plug-in. The module should implement the two functions at least, they are metric_init and metric_cleanup. metric_init generally initializes the values for different metrices that the plug-in will be publishing.
Below is the script:
Here, the plug-in is collecting two metrices (firstparam and secondparam). The value for firstparam is computed by function callback_fun1 and for secondparam is collected by callback_fun2. THe "slope" is set to "both" for both the metrices. This a hint for the gmetad to know that the value may increase or decrease.
We should copy this script to/usr/lib/ganglia/python/ganglia_plugin_sample.py.
After you copy the plugin-script and make the configuration changes, restart ganglia-monitor (gmond) service.
$sudo service ganglia-monitor restart
After some time you should see the "Sample metrics" group and the graphs for the metrices firstparam and secondparam under it.
I have created a Python module for collecting storm stats. The module is available here . Also for instructions please refer this readme.
I have pasted the module below for your easy reference:
Ganglia basically have three components. They are Ganglia monitoring daemon(gmond), Ganglia meta daemon (gmetad) and Ganglia web front end.
Gmond monitors the changes in the host state, announces them, listen state changes from other nodes running gmond, and returns the XML description of the cluster state when queried. Gmond depends on plug-ins to collect host stats. The plug-ins may be written in C,C++,Python, Perl or PHP. In fact, we can implement real piece of code that collects the stats in any language with some tricks. Also, it won't be difficult to extend it support plug-ins from other languages as well.
Gmetad polls the clusters it is monitoring and collects the reported metrics and write them to individual round robin databases.
Ganglia web front end provides the metric Visualization UI.
There is a very good link to install and configure Ganglia on Ubuntu 14.04 here (from digital ocean). Please follow the steps to setup Ganglia monitoring on your machines. You may follow the similar steps to install on other Linux systems.
As you have installed and and configured Ganglia, now it is time to write and test a new Ganglia plug-in. From Ganglia 3.1 onwards, Gmond has a modular interface that allows us to extend its capability through plug-ins. We can write modules or plug-ins to collect new kind of metrices. Gmond uses the Mod_Python module (modpython.so) to interface with all the Pythin plug-ins to collect the metrices published by them.
First we need to create an entry for modpython.so in gmond configuration file. Edit gmond.conf (by default it is installed at /etc/ganglia/gmond.conf) and add the below lines in the "modules" section:
module {
name = "python_module"
path = "/usr/lib/ganglia/modpython.so"
params = "/usr/lib/ganglia/python"
}
So, the modules section in gmond.conf will look like somewhat like as shown below:
modules {
module {
name = "core_metrics"
}
module {
name = "cpu_module"
path = "/usr/lib/ganglia/modcpu.so"
}
module {
name = "disk_module"
path = "/usr/lib/ganglia/moddisk.so"
}
module {
name = "load_module"
path = "/usr/lib/ganglia/modload.so"
}
module {
name = "mem_module"
path = "/usr/lib/ganglia/modmem.so"
}
module {
name = "net_module"
path = "/usr/lib/ganglia/modnet.so"
}
module {
name = "proc_module"
path = "/usr/lib/ganglia/modproc.so"
}
module {
name = "sys_module"
path = "/usr/lib/ganglia/modsys.so"
}
module {
name = "python_module"
path = "/usr/lib/ganglia/modpython.so"
params = "/usr/lib/ganglia/python"
}
}
Also, create a directory /etc/ganglia/pyconfs to store the configuration files for the python modules. Let us use .pyconf as the extension of the Python modules. Add the below line in /etc/ganglia/gmond.conf
include ('/etc/ganglia/pyconfs/*.pyconf')
We will add a configuration file for the sample plugin (/etc/ganglia/pyconfs/ganglia_plugin_sample.pyconf). Content of this file is pasted below:
modules {
module {
name = "ganglia_plugin_sample"
language = "python"
param firstparam {
value = 100
}
param secondparam {
value = 500
}
}
}
collection_group {
collect_every = 20
time_threshold = 90
metric {
name_match = ".*"
value_threshold = "1"
}
}
Now let us add a very simple Ganglia python plug-in. The module should implement the two functions at least, they are metric_init and metric_cleanup. metric_init generally initializes the values for different metrices that the plug-in will be publishing.
Below is the script:
import sys import os import random descriptors = list() firstparam_max = 1000 secondparam_max = 1000 def callback_fun1(name): ''' Returns a random number between 1 and firstparam_max ''' random.seed() return random.randint(1, firstparam_max) def callback_fun2(name): ''' Returns a random number between 5 and secondparam_max ''' random.seed() return random.randint(5,secondparam_max) def metric_init(params): global descriptors, firstparam_max, secondparam_max if 'firstparam' in params: firstparam_max = int(params['firstparam']) d = {'name': 'firstparam', 'call_back': callback_fun1, 'time_max': 90, 'value_type': 'uint', 'units': 'Count', 'slope': 'both', 'format': '%u', 'description': 'Sample metric', 'groups': 'Sample'} descriptors.append(d) if 'secondparam' in params: secondparam_max = int(params['secondparam']) d = {'name': 'secondparam', 'call_back': callback_fun2, 'time_max': 90, 'value_type': 'uint', 'units': 'Count', 'slope': 'both', 'format': '%u', 'description': 'Sample metric', 'groups': 'Sample'} descriptors.append(d) return descriptors def metric_cleanup(): ''' We don't need any cleanup :) :) ''' pass # This routine is for debugging purpose only and not used by gmond # To debug the output, run as below: # $ python ganglia_plugin_sample.py if __name__ == '__main__': params = {'firstparam': 100, 'secondparam' : 500} metric_init(params) for d in descriptors: v = d['call_back'](d['name']) print '%s --> %u' % (d['name'], v)
Here, the plug-in is collecting two metrices (firstparam and secondparam). The value for firstparam is computed by function callback_fun1 and for secondparam is collected by callback_fun2. THe "slope" is set to "both" for both the metrices. This a hint for the gmetad to know that the value may increase or decrease.
We should copy this script to/usr/lib/ganglia/python/ganglia_plugin_sample.py.
After you copy the plugin-script and make the configuration changes, restart ganglia-monitor (gmond) service.
$sudo service ganglia-monitor restart
After some time you should see the "Sample metrics" group and the graphs for the metrices firstparam and secondparam under it.
I have created a Python module for collecting storm stats. The module is available here . Also for instructions please refer this readme.
I have pasted the module below for your easy reference:
import logging import md5 import pickle import re import sys from time import time from thrift.transport import TTransport, TSocket from thrift.protocol import TBinaryProtocol clusterinfo = None topology_found = True descriptors = list() topologies = [] serialfile_dir = '/tmp' topology_summary_cols_map = {'status': 'Status', 'num_workers': 'Worker Count', 'num_executors': 'ExecutorCount', 'uptime_secs': 'Uptime', 'num_tasks': 'TaskCount'} spout_stats = {'Executors': ['Count', '%u', 'uint'], 'Tasks': ['Count', '%u', 'uint'], 'Emitted': ['Count', '%f', 'double'], 'Transferred': ['Count/sec', '%f', 'double'], 'CompleteLatency': ['ms', '%f', 'double'], 'Acked': ['Count/Sec', '%f', 'double'], 'Failed': ['Count/sec', '%f', 'double']} bolt_stats = {'Executors': ['Count', '%u', 'uint'], 'Tasks': ['Count', '%u', 'uint'], 'Emitted': ['Count/sec', '%f', 'double'], 'Executed': ['Count/sec', '%f', 'double'], 'Transferred': ['Count/sec', '%f', 'double'], 'ExecuteLatency': ['ms', '%f', 'double'], 'ProcessLatency': ['ms', '%f', 'double'], 'Acked': ['Count/sec', '%f', 'double'], 'Failed': ['Count/sec', '%f', 'double']} diff_cols = ['Acked', 'Failed', 'Executed', 'Transferred', 'Emitted'] overall = {'ExecutorCount': ['Count', '%u', 'uint'], 'WorkerCount': ['Count', '%u', 'uint'], 'TaskCount': ['Count', '%u', 'uint'], 'UptimeSecs': ['Count', '%u', 'uint']} toplogy_mods = {} lastchecktime = 0 lastinfotime = 0 maxinterval = 6 all_topology_stats = {} bolt_array = {} spout_array = {} nimbus_host = '127.0.0.1' nimbus_port = 6627 logging.basicConfig(filename='/tmp/storm_topology.log', level=logging.INFO, format='%(asctime)s %(levelname)s line:%(lineno)d %(message)s', filemode='w') logging_levels = {'INFO': logging.INFO, 'DEBUG': logging.DEBUG, 'WARNING': logging.WARNING, 'ERROR': logging.ERROR, 'CRITICAL': logging.CRITICAL} def get_avg(arr): if len(arr) < 1: return 0 return sum(arr) / len(arr) def normalize_stats(stats, duration): for k in stats: statsk = stats[k] if 'Emitted' in statsk and duration > 0: if statsk['Emitted'] > 0: statsk['Emitted'] = statsk['Emitted'] / duration if 'Acked' in statsk and duration > 0: if statsk['Acked'] > 0: statsk['Acked'] = statsk['Acked'] / duration if 'Executed' in statsk and duration > 0: if statsk['Executed'] > 0: statsk['Executed'] = statsk['Executed'] / duration def freshen_topology(topology): tmpsavestats = None inf = None savedlastchecktime = 0 tmp = md5.new() tmp.update(topology) filename = '/tmp/save_stats_for_' + tmp.hexdigest() + '.pk' try: inf = open(filename, 'rb') except IOError as e: logging.warn(e) if inf is not None: try: tmpsavestats = pickle.load(inf) savedlastchecktime = pickle.load(inf) except EOFError as e: logging.warn(e.message()) inf.close() if not all_topology_stats[topology]['topology_stats_got']: logging.warn('Info not got for topology ' + topology) return overallstats = all_topology_stats[topology]['overallstats'] boltspoutstats = all_topology_stats[topology]['boltspoutstats'] of = open(filename, 'wb') if of is not None: pickle.dump(boltspoutstats, of) pickle.dump(time(), of) of.close() if overallstats['UptimeSecs'] > (lastchecktime - savedlastchecktime): if tmpsavestats is not None: for bolt in bolt_array[topology]: if bolt in tmpsavestats and bolt in boltspoutstats: stats_new = boltspoutstats[bolt] stats_old = tmpsavestats[bolt] for key in bolt_stats: if key == 'ExecuteLatency' or key == 'ProcessLatency': continue if key not in stats_new: continue if key not in stats_old: continue if key in diff_cols: stats_new[key] -= stats_old[key] for spout in spout_array[topology]: if spout in tmpsavestats and spout in boltspoutstats: stats_new = boltspoutstats[spout] stats_old = tmpsavestats[spout] for key in spout_stats: if key == 'CompleteLatency': continue if key not in stats_new: continue if key not in stats_old: continue if key in diff_cols: stats_new[key] -= stats_old[key] normalize_stats(boltspoutstats, lastchecktime - savedlastchecktime) else: normalize_stats(boltspoutstats, overallstats['UptimeSecs']) else: normalize_stats(boltspoutstats, overallstats['UptimeSecs']) def freshen(): global lastchecktime if time() > (lastchecktime + maxinterval): lastchecktime = time() get_topology_stats_for(topologies) for topology in topologies: freshen_topology(topology) def callback_boltspout(name): freshen() first_pos = name.find('_') last_pos = name.rfind('_') topology_mod = name[0:first_pos] bolt = name[first_pos + 1: last_pos] statname = name[last_pos + 1:] topology = toplogy_mods[topology_mod] if not all_topology_stats[topology]['topology_stats_got']: logging.debug('Returing 0 for ' + name) return 0 logging.debug('Got stats for ' + name + " " + str(all_topology_stats[topology]['boltspoutstats'][bolt][statname])) return all_topology_stats[topology]['boltspoutstats'][bolt][statname] def callback_overall(name): freshen() topology_mod, name = name.split('_') topology = toplogy_mods[topology_mod] if not all_topology_stats[topology]['topology_stats_got']: logging.debug('Returing 0 for ' + name) return 0 logging.debug(topology + ' ' + name + ' ' + str(all_topology_stats[topology]['overallstats'][name])) return all_topology_stats[topology]['overallstats'][name] def update_task_count(component_task_count, component_name, count): if component_name not in component_task_count: component_task_count[component_name] = 0 component_task_count[component_name] += count def update_exec_count(component_exec_count, component_name, count): if component_name not in component_exec_count: component_exec_count[component_name] = 0 component_exec_count[component_name] += count def update_whole_num_stat_special(stats, store, boltname, statname): if boltname not in store: store[boltname] = {} if statname not in store[boltname]: store[boltname][statname] = 0 for k in stats: if k == '__metrics' or k == '__ack_init' or k == '__ack_ack' or k == '__system': continue store[boltname][statname] += stats[k] def update_whole_num_stat(stats, store, boltname, statname): if boltname not in store: store[boltname] = {} if statname not in store[boltname]: store[boltname][statname] = 0 for k in stats: store[boltname][statname] += stats[k] def update_avg_stats(stats, store, boltname, statname): if boltname not in store: store[boltname] = {} if statname not in store[boltname]: store[boltname][statname] = [] for k in stats: store[boltname][statname].append(stats[k]) def get_topology_stats_for(topologies): all_topology_stats.clear() for topology in topologies: all_topology_stats[topology] = get_topology_stats(topology) def refresh_topology_stats(): logging.debug('Refreshing topology stats') for t in topologies: all_topology_stats[t] = {'topology_stats_got': False} global clusterinfo try: transport = TSocket.TSocket(nimbus_host, nimbus_port) transport.setTimeout(1000) framedtrasp = TTransport.TFramedTransport(transport) protocol = TBinaryProtocol.TBinaryProtocol(framedtrasp) client = Nimbus.Client(protocol) framedtrasp.open() boltspoutstats = None component_task_count = None component_exec_count = None clusterinfo = client.getClusterInfo() for tsummary in clusterinfo.topologies: if tsummary.name not in topologies: continue toplogyname = tsummary.name overallstats = {} overallstats['ExecutorCount'] = tsummary.num_executors overallstats['TaskCount'] = tsummary.num_tasks overallstats['WorkerCount'] = tsummary.num_workers overallstats['UptimeSecs'] = tsummary.uptime_secs all_topology_stats[toplogyname]['overallstats'] = overallstats boltspoutstats = {} component_task_count = {} component_exec_count = {} all_topology_stats[toplogyname]['boltspoutstats'] = boltspoutstats all_topology_stats[toplogyname]['component_task_count'] = component_task_count all_topology_stats[toplogyname]['component_exec_count'] = component_exec_count tinfo = client.getTopologyInfo(tsummary.id) all_topology_stats[toplogyname]['topology_stats_got'] = True for exstat in tinfo.executors: stats = exstat.stats update_whole_num_stat_special(stats.emitted[":all-time"], boltspoutstats, exstat.component_id, 'Emitted') update_whole_num_stat_special(stats.transferred[":all-time"], boltspoutstats, exstat.component_id, 'Transferred') numtask = exstat.executor_info.task_end - exstat.executor_info.task_end + 1 update_task_count(component_task_count, exstat.component_id, numtask) update_exec_count(component_exec_count, exstat.component_id, 1) if stats.specific.bolt is not None: update_whole_num_stat(stats.specific.bolt.acked[":all-time"], boltspoutstats, exstat.component_id, 'Acked') update_whole_num_stat(stats.specific.bolt.failed[":all-time"], boltspoutstats, exstat.component_id, 'Failed') update_whole_num_stat(stats.specific.bolt.executed[":all-time"], boltspoutstats, exstat.component_id, 'Executed') update_avg_stats(stats.specific.bolt.process_ms_avg["600"], boltspoutstats, exstat.component_id, 'process_ms_avg') update_avg_stats(stats.specific.bolt.execute_ms_avg["600"], boltspoutstats, exstat.component_id, 'execute_ms_avg') if stats.specific.spout is not None: update_whole_num_stat(stats.specific.spout.acked[":all-time"], boltspoutstats, exstat.component_id, 'Acked') update_whole_num_stat(stats.specific.spout.failed[":all-time"], boltspoutstats, exstat.component_id, 'Failed') update_avg_stats(stats.specific.spout.complete_ms_avg[":all-time"], boltspoutstats, exstat.component_id, 'complete_ms_avg') if '__acker' in boltspoutstats: del boltspoutstats['__acker'] for key in boltspoutstats: if 'complete_ms_avg' in boltspoutstats[key]: avg = get_avg(boltspoutstats[key]['complete_ms_avg']) boltspoutstats[key]['CompleteLatency'] = avg del boltspoutstats[key]['complete_ms_avg'] if 'process_ms_avg' in boltspoutstats[key]: avg = get_avg(boltspoutstats[key]['process_ms_avg']) boltspoutstats[key]['ProcessLatency'] = avg del boltspoutstats[key]['process_ms_avg'] if 'execute_ms_avg' in boltspoutstats[key]: avg = get_avg(boltspoutstats[key]['execute_ms_avg']) boltspoutstats[key]['ExecuteLatency'] = avg del boltspoutstats[key]['execute_ms_avg'] for key in component_task_count: if key in boltspoutstats: boltspoutstats[key]['Tasks'] = component_task_count[key] for key in component_exec_count: if key in boltspoutstats: boltspoutstats[key]['Executors'] = component_exec_count[key] framedtrasp.close() except Exception as e: clusterinfo = None logging.warn(e) def get_topology_stats(toplogyname): global lastinfotime if (lastinfotime + 4) < time(): for t in all_topology_stats: all_topology_stats[t] = None lastinfotime = time() refresh_topology_stats() return all_topology_stats[toplogyname] def metric_init_topology(params): global descriptors groupname = '' if 'topology' in params and len(params['topology']): groupname = params['topology'] else: return topology = groupname topology_mod = re.sub("\s+", "", topology) topology_mod = re.sub("[_]+", "", topology_mod) toplogy_mods[topology_mod] = topology if 'spouts' in params: spout_array[topology] = re.split('[,]+', params['spouts']) for spout in spout_array[topology]: for statname in spout_stats: d = {'name': topology_mod + '_' + spout + '_' + statname, 'call_back': callback_boltspout, 'time_max': 90, 'value_type': spout_stats[statname][2], 'units': spout_stats[statname][0], 'slope': 'both', 'format': spout_stats[statname][1], 'description': '', 'groups': groupname} descriptors.append(d) if 'bolts' in params: bolt_array[topology] = re.split('[,]+', params['bolts']) for bolt in bolt_array[topology]: for statname in bolt_stats: d = {'name': topology_mod + '_' + bolt + '_' + statname, 'call_back': callback_boltspout, 'time_max': 90, 'value_type': bolt_stats[statname][2], 'units': bolt_stats[statname][0], 'slope': 'both', 'format': bolt_stats[statname][1], 'description': '', 'groups': groupname} descriptors.append(d) for key in overall: d = {'name': topology_mod + '_' + key, 'call_back': callback_overall, 'time_max': 90, 'value_type': overall[key][2], 'units': overall[key][0], 'slope': 'both', 'format': overall[key][1], 'description': '', 'groups': groupname} descriptors.append(d) logging.info('Inited metric for ' + groupname) def metric_init(params): global topologies, nimbus_host, nimbus_port if 'nimbus_host' in params: nimbus_host = params['nimbus_host'] if 'nimbus_port' in params: nimbus_port = params['nimbus_port'] if 'topologies' not in params: return if 'storm_thrift_gen' in params: sys.path.append(params['storm_thrift_gen']) else: sys.path.append('/usr/lib/ganglia') if 'loglevel' in params: loglevel = params['loglevel'].strip().upper() if loglevel in logging_levels: logging.getLogger().setLevel(logging_levels[loglevel]) global Nimbus, ttypes from stormpy.storm import Nimbus, ttypes tss = re.split('[,]+', params['topologies']) topologies = tss alltops = {} for t in tss: alltops[t] = {'topology': t} alltops[t]['tlen'] = len(t) t_bolts = t + '_bolts' if t_bolts in params: alltops[t]['bolts'] = params[t_bolts] t_spouts = t + '_spouts' if t_spouts in params: alltops[t]['spouts'] = params[t_spouts] for t in alltops: logging.info('Initing metric for ' + t) metric_init_topology(alltops[t]) return descriptors if __name__ == '__main__': params = {'topologies': 'SampleTopology,AnotherTopology', 'SampleTopology_spouts': 'SampleSpoutTwo', 'SampleTopology_bolts': 'boltc', 'AnotherTopology_spouts': 'Spout', 'AnotherTopology_bolts': 'bolta,boltb,boltd', 'loglevel': 'ERROR'} metric_init(params) for d in descriptors: v = d['call_back'](d['name']) formt = "%s " + d['format'] print formt % (d['name'], v)