Ganglia is a highly scalable monitor for distributed systems. It facilitates users to examine different stats of the monitored machines in a single pane. It maintains historical stats and hence very good for checking how the CPU usage, memory usage, disk usage etc., JVM GC pauses are varying over time on the distributed machines.
Ganglia basically have three components. They are Ganglia monitoring daemon(gmond), Ganglia meta daemon (gmetad) and Ganglia web front end.
Gmond monitors the changes in the host state, announces them, listen state changes from other nodes running gmond, and returns the XML description of the cluster state when queried. Gmond depends on plug-ins to collect host stats. The plug-ins may be written in C,C++,Python, Perl or PHP. In fact, we can implement real piece of code that collects the stats in any language with some tricks. Also, it won't be difficult to extend it support plug-ins from other languages as well.
Gmetad polls the clusters it is monitoring and collects the reported metrics and write them to individual round robin databases.
Ganglia web front end provides the metric Visualization UI.
There is a very good link to install and configure Ganglia on Ubuntu 14.04
here (from digital ocean). Please follow the steps to setup Ganglia monitoring on your machines. You may follow the similar steps to install on other Linux systems.
As you have installed and and configured Ganglia, now it is time to write and test a new Ganglia plug-in. From Ganglia 3.1 onwards, Gmond has a modular interface that allows us to extend its capability through plug-ins. We can write modules or plug-ins to collect new kind of metrices. Gmond uses the Mod_Python module (modpython.so) to interface with all the Pythin plug-ins to collect the metrices published by them.
First we need to create an entry for modpython.so in gmond configuration file. Edit gmond.conf (by default it is installed at /etc/ganglia/gmond.conf) and add the below lines in the "modules" section:
module {
name = "python_module"
path = "/usr/lib/ganglia/modpython.so"
params = "/usr/lib/ganglia/python"
}
So, the modules section in gmond.conf will look like somewhat like as shown below:
modules {
module {
name = "core_metrics"
}
module {
name = "cpu_module"
path = "/usr/lib/ganglia/modcpu.so"
}
module {
name = "disk_module"
path = "/usr/lib/ganglia/moddisk.so"
}
module {
name = "load_module"
path = "/usr/lib/ganglia/modload.so"
}
module {
name = "mem_module"
path = "/usr/lib/ganglia/modmem.so"
}
module {
name = "net_module"
path = "/usr/lib/ganglia/modnet.so"
}
module {
name = "proc_module"
path = "/usr/lib/ganglia/modproc.so"
}
module {
name = "sys_module"
path = "/usr/lib/ganglia/modsys.so"
}
module {
name = "python_module"
path = "/usr/lib/ganglia/modpython.so"
params = "/usr/lib/ganglia/python"
}
}
Also, create a directory /etc/ganglia/pyconfs to store the configuration files for the python modules. Let us use .pyconf as the extension of the Python modules. Add the below line in /etc/ganglia/gmond.conf
include ('/etc/ganglia/pyconfs/*.pyconf')
We will add a configuration file for the sample plugin (/etc/ganglia/pyconfs/ganglia_plugin_sample.pyconf). Content of this file is pasted below:
modules {
module {
name = "ganglia_plugin_sample"
language = "python"
param firstparam {
value = 100
}
param secondparam {
value = 500
}
}
}
collection_group {
collect_every = 20
time_threshold = 90
metric {
name_match = ".*"
value_threshold = "1"
}
}
Now let us add a very simple Ganglia python plug-in. The module should implement the two functions at least, they are metric_init and metric_cleanup. metric_init generally initializes the values for different metrices that the plug-in will be publishing.
Below is the script:
import sys
import os
import random
descriptors = list()
firstparam_max = 1000
secondparam_max = 1000
def callback_fun1(name):
'''
Returns a random number between 1 and firstparam_max
'''
random.seed()
return random.randint(1, firstparam_max)
def callback_fun2(name):
'''
Returns a random number between 5 and secondparam_max
'''
random.seed()
return random.randint(5,secondparam_max)
def metric_init(params):
global descriptors, firstparam_max, secondparam_max
if 'firstparam' in params:
firstparam_max = int(params['firstparam'])
d = {'name': 'firstparam',
'call_back': callback_fun1,
'time_max': 90,
'value_type': 'uint',
'units': 'Count',
'slope': 'both',
'format': '%u',
'description': 'Sample metric',
'groups': 'Sample'}
descriptors.append(d)
if 'secondparam' in params:
secondparam_max = int(params['secondparam'])
d = {'name': 'secondparam',
'call_back': callback_fun2,
'time_max': 90,
'value_type': 'uint',
'units': 'Count',
'slope': 'both',
'format': '%u',
'description': 'Sample metric',
'groups': 'Sample'}
descriptors.append(d)
return descriptors
def metric_cleanup():
'''
We don't need any cleanup :) :)
'''
pass
# This routine is for debugging purpose only and not used by gmond
# To debug the output, run as below:
# $ python ganglia_plugin_sample.py
if __name__ == '__main__':
params = {'firstparam': 100, 'secondparam' : 500}
metric_init(params)
for d in descriptors:
v = d['call_back'](d['name'])
print '%s --> %u' % (d['name'], v)
Here, the plug-in is collecting two metrices (firstparam and secondparam). The value for firstparam is computed by function callback_fun1 and for secondparam is collected by callback_fun2. THe "slope" is set to "both" for both the metrices. This a hint for the gmetad to know that the value may increase or decrease.
We should copy this script to/usr/lib/ganglia/python/ganglia_plugin_sample.py.
After you copy the plugin-script and make the configuration changes, restart ganglia-monitor (gmond) service.
$sudo service ganglia-monitor restart
After some time you should see the "Sample metrics" group and the graphs for the metrices firstparam and secondparam under it.
I have created a Python module for collecting storm stats. The module is available
here . Also for instructions please refer this
readme.
I have pasted the module below for your easy reference:
import logging
import md5
import pickle
import re
import sys
from time import time
from thrift.transport import TTransport, TSocket
from thrift.protocol import TBinaryProtocol
clusterinfo = None
topology_found = True
descriptors = list()
topologies = []
serialfile_dir = '/tmp'
topology_summary_cols_map = {'status': 'Status', 'num_workers': 'Worker Count',
'num_executors': 'ExecutorCount', 'uptime_secs': 'Uptime',
'num_tasks': 'TaskCount'}
spout_stats = {'Executors': ['Count', '%u', 'uint'], 'Tasks': ['Count', '%u', 'uint'],
'Emitted': ['Count', '%f', 'double'],
'Transferred': ['Count/sec', '%f', 'double'],
'CompleteLatency': ['ms', '%f', 'double'], 'Acked': ['Count/Sec', '%f', 'double'],
'Failed': ['Count/sec', '%f', 'double']}
bolt_stats = {'Executors': ['Count', '%u', 'uint'], 'Tasks': ['Count', '%u', 'uint'],
'Emitted': ['Count/sec', '%f', 'double'],
'Executed': ['Count/sec', '%f', 'double'],
'Transferred': ['Count/sec', '%f', 'double'],
'ExecuteLatency': ['ms', '%f', 'double'],
'ProcessLatency': ['ms', '%f', 'double'],
'Acked': ['Count/sec', '%f', 'double'],
'Failed': ['Count/sec', '%f', 'double']}
diff_cols = ['Acked', 'Failed', 'Executed', 'Transferred', 'Emitted']
overall = {'ExecutorCount': ['Count', '%u', 'uint'],
'WorkerCount': ['Count', '%u', 'uint'],
'TaskCount': ['Count', '%u', 'uint'],
'UptimeSecs': ['Count', '%u', 'uint']}
toplogy_mods = {}
lastchecktime = 0
lastinfotime = 0
maxinterval = 6
all_topology_stats = {}
bolt_array = {}
spout_array = {}
nimbus_host = '127.0.0.1'
nimbus_port = 6627
logging.basicConfig(filename='/tmp/storm_topology.log', level=logging.INFO,
format='%(asctime)s %(levelname)s line:%(lineno)d %(message)s', filemode='w')
logging_levels = {'INFO': logging.INFO, 'DEBUG': logging.DEBUG, 'WARNING': logging.WARNING,
'ERROR': logging.ERROR, 'CRITICAL': logging.CRITICAL}
def get_avg(arr):
if len(arr) < 1:
return 0
return sum(arr) / len(arr)
def normalize_stats(stats, duration):
for k in stats:
statsk = stats[k]
if 'Emitted' in statsk and duration > 0:
if statsk['Emitted'] > 0:
statsk['Emitted'] = statsk['Emitted'] / duration
if 'Acked' in statsk and duration > 0:
if statsk['Acked'] > 0:
statsk['Acked'] = statsk['Acked'] / duration
if 'Executed' in statsk and duration > 0:
if statsk['Executed'] > 0:
statsk['Executed'] = statsk['Executed'] / duration
def freshen_topology(topology):
tmpsavestats = None
inf = None
savedlastchecktime = 0
tmp = md5.new()
tmp.update(topology)
filename = '/tmp/save_stats_for_' + tmp.hexdigest() + '.pk'
try:
inf = open(filename, 'rb')
except IOError as e:
logging.warn(e)
if inf is not None:
try:
tmpsavestats = pickle.load(inf)
savedlastchecktime = pickle.load(inf)
except EOFError as e:
logging.warn(e.message())
inf.close()
if not all_topology_stats[topology]['topology_stats_got']:
logging.warn('Info not got for topology ' + topology)
return
overallstats = all_topology_stats[topology]['overallstats']
boltspoutstats = all_topology_stats[topology]['boltspoutstats']
of = open(filename, 'wb')
if of is not None:
pickle.dump(boltspoutstats, of)
pickle.dump(time(), of)
of.close()
if overallstats['UptimeSecs'] > (lastchecktime - savedlastchecktime):
if tmpsavestats is not None:
for bolt in bolt_array[topology]:
if bolt in tmpsavestats and bolt in boltspoutstats:
stats_new = boltspoutstats[bolt]
stats_old = tmpsavestats[bolt]
for key in bolt_stats:
if key == 'ExecuteLatency' or key == 'ProcessLatency':
continue
if key not in stats_new:
continue
if key not in stats_old:
continue
if key in diff_cols:
stats_new[key] -= stats_old[key]
for spout in spout_array[topology]:
if spout in tmpsavestats and spout in boltspoutstats:
stats_new = boltspoutstats[spout]
stats_old = tmpsavestats[spout]
for key in spout_stats:
if key == 'CompleteLatency':
continue
if key not in stats_new:
continue
if key not in stats_old:
continue
if key in diff_cols:
stats_new[key] -= stats_old[key]
normalize_stats(boltspoutstats, lastchecktime - savedlastchecktime)
else:
normalize_stats(boltspoutstats, overallstats['UptimeSecs'])
else:
normalize_stats(boltspoutstats, overallstats['UptimeSecs'])
def freshen():
global lastchecktime
if time() > (lastchecktime + maxinterval):
lastchecktime = time()
get_topology_stats_for(topologies)
for topology in topologies:
freshen_topology(topology)
def callback_boltspout(name):
freshen()
first_pos = name.find('_')
last_pos = name.rfind('_')
topology_mod = name[0:first_pos]
bolt = name[first_pos + 1: last_pos]
statname = name[last_pos + 1:]
topology = toplogy_mods[topology_mod]
if not all_topology_stats[topology]['topology_stats_got']:
logging.debug('Returing 0 for ' + name)
return 0
logging.debug('Got stats for ' + name + " " +
str(all_topology_stats[topology]['boltspoutstats'][bolt][statname]))
return all_topology_stats[topology]['boltspoutstats'][bolt][statname]
def callback_overall(name):
freshen()
topology_mod, name = name.split('_')
topology = toplogy_mods[topology_mod]
if not all_topology_stats[topology]['topology_stats_got']:
logging.debug('Returing 0 for ' + name)
return 0
logging.debug(topology + ' ' + name + ' ' +
str(all_topology_stats[topology]['overallstats'][name]))
return all_topology_stats[topology]['overallstats'][name]
def update_task_count(component_task_count, component_name, count):
if component_name not in component_task_count:
component_task_count[component_name] = 0
component_task_count[component_name] += count
def update_exec_count(component_exec_count, component_name, count):
if component_name not in component_exec_count:
component_exec_count[component_name] = 0
component_exec_count[component_name] += count
def update_whole_num_stat_special(stats, store, boltname, statname):
if boltname not in store:
store[boltname] = {}
if statname not in store[boltname]:
store[boltname][statname] = 0
for k in stats:
if k == '__metrics' or k == '__ack_init' or k == '__ack_ack' or k == '__system':
continue
store[boltname][statname] += stats[k]
def update_whole_num_stat(stats, store, boltname, statname):
if boltname not in store:
store[boltname] = {}
if statname not in store[boltname]:
store[boltname][statname] = 0
for k in stats:
store[boltname][statname] += stats[k]
def update_avg_stats(stats, store, boltname, statname):
if boltname not in store:
store[boltname] = {}
if statname not in store[boltname]:
store[boltname][statname] = []
for k in stats:
store[boltname][statname].append(stats[k])
def get_topology_stats_for(topologies):
all_topology_stats.clear()
for topology in topologies:
all_topology_stats[topology] = get_topology_stats(topology)
def refresh_topology_stats():
logging.debug('Refreshing topology stats')
for t in topologies:
all_topology_stats[t] = {'topology_stats_got': False}
global clusterinfo
try:
transport = TSocket.TSocket(nimbus_host, nimbus_port)
transport.setTimeout(1000)
framedtrasp = TTransport.TFramedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(framedtrasp)
client = Nimbus.Client(protocol)
framedtrasp.open()
boltspoutstats = None
component_task_count = None
component_exec_count = None
clusterinfo = client.getClusterInfo()
for tsummary in clusterinfo.topologies:
if tsummary.name not in topologies:
continue
toplogyname = tsummary.name
overallstats = {}
overallstats['ExecutorCount'] = tsummary.num_executors
overallstats['TaskCount'] = tsummary.num_tasks
overallstats['WorkerCount'] = tsummary.num_workers
overallstats['UptimeSecs'] = tsummary.uptime_secs
all_topology_stats[toplogyname]['overallstats'] = overallstats
boltspoutstats = {}
component_task_count = {}
component_exec_count = {}
all_topology_stats[toplogyname]['boltspoutstats'] = boltspoutstats
all_topology_stats[toplogyname]['component_task_count'] = component_task_count
all_topology_stats[toplogyname]['component_exec_count'] = component_exec_count
tinfo = client.getTopologyInfo(tsummary.id)
all_topology_stats[toplogyname]['topology_stats_got'] = True
for exstat in tinfo.executors:
stats = exstat.stats
update_whole_num_stat_special(stats.emitted[":all-time"], boltspoutstats,
exstat.component_id, 'Emitted')
update_whole_num_stat_special(stats.transferred[":all-time"], boltspoutstats,
exstat.component_id, 'Transferred')
numtask = exstat.executor_info.task_end - exstat.executor_info.task_end + 1
update_task_count(component_task_count, exstat.component_id, numtask)
update_exec_count(component_exec_count, exstat.component_id, 1)
if stats.specific.bolt is not None:
update_whole_num_stat(stats.specific.bolt.acked[":all-time"], boltspoutstats,
exstat.component_id, 'Acked')
update_whole_num_stat(stats.specific.bolt.failed[":all-time"], boltspoutstats,
exstat.component_id, 'Failed')
update_whole_num_stat(stats.specific.bolt.executed[":all-time"], boltspoutstats,
exstat.component_id, 'Executed')
update_avg_stats(stats.specific.bolt.process_ms_avg["600"], boltspoutstats,
exstat.component_id, 'process_ms_avg')
update_avg_stats(stats.specific.bolt.execute_ms_avg["600"], boltspoutstats,
exstat.component_id, 'execute_ms_avg')
if stats.specific.spout is not None:
update_whole_num_stat(stats.specific.spout.acked[":all-time"], boltspoutstats,
exstat.component_id, 'Acked')
update_whole_num_stat(stats.specific.spout.failed[":all-time"], boltspoutstats,
exstat.component_id, 'Failed')
update_avg_stats(stats.specific.spout.complete_ms_avg[":all-time"], boltspoutstats,
exstat.component_id, 'complete_ms_avg')
if '__acker' in boltspoutstats:
del boltspoutstats['__acker']
for key in boltspoutstats:
if 'complete_ms_avg' in boltspoutstats[key]:
avg = get_avg(boltspoutstats[key]['complete_ms_avg'])
boltspoutstats[key]['CompleteLatency'] = avg
del boltspoutstats[key]['complete_ms_avg']
if 'process_ms_avg' in boltspoutstats[key]:
avg = get_avg(boltspoutstats[key]['process_ms_avg'])
boltspoutstats[key]['ProcessLatency'] = avg
del boltspoutstats[key]['process_ms_avg']
if 'execute_ms_avg' in boltspoutstats[key]:
avg = get_avg(boltspoutstats[key]['execute_ms_avg'])
boltspoutstats[key]['ExecuteLatency'] = avg
del boltspoutstats[key]['execute_ms_avg']
for key in component_task_count:
if key in boltspoutstats:
boltspoutstats[key]['Tasks'] = component_task_count[key]
for key in component_exec_count:
if key in boltspoutstats:
boltspoutstats[key]['Executors'] = component_exec_count[key]
framedtrasp.close()
except Exception as e:
clusterinfo = None
logging.warn(e)
def get_topology_stats(toplogyname):
global lastinfotime
if (lastinfotime + 4) < time():
for t in all_topology_stats:
all_topology_stats[t] = None
lastinfotime = time()
refresh_topology_stats()
return all_topology_stats[toplogyname]
def metric_init_topology(params):
global descriptors
groupname = ''
if 'topology' in params and len(params['topology']):
groupname = params['topology']
else:
return
topology = groupname
topology_mod = re.sub("\s+", "", topology)
topology_mod = re.sub("[_]+", "", topology_mod)
toplogy_mods[topology_mod] = topology
if 'spouts' in params:
spout_array[topology] = re.split('[,]+', params['spouts'])
for spout in spout_array[topology]:
for statname in spout_stats:
d = {'name': topology_mod + '_' + spout + '_' + statname, 'call_back': callback_boltspout,
'time_max': 90,
'value_type': spout_stats[statname][2],
'units': spout_stats[statname][0],
'slope': 'both',
'format': spout_stats[statname][1],
'description': '',
'groups': groupname}
descriptors.append(d)
if 'bolts' in params:
bolt_array[topology] = re.split('[,]+', params['bolts'])
for bolt in bolt_array[topology]:
for statname in bolt_stats:
d = {'name': topology_mod + '_' + bolt + '_' + statname, 'call_back': callback_boltspout,
'time_max': 90,
'value_type': bolt_stats[statname][2],
'units': bolt_stats[statname][0],
'slope': 'both',
'format': bolt_stats[statname][1],
'description': '',
'groups': groupname}
descriptors.append(d)
for key in overall:
d = {'name': topology_mod + '_' + key, 'call_back': callback_overall,
'time_max': 90,
'value_type': overall[key][2],
'units': overall[key][0],
'slope': 'both',
'format': overall[key][1],
'description': '',
'groups': groupname}
descriptors.append(d)
logging.info('Inited metric for ' + groupname)
def metric_init(params):
global topologies, nimbus_host, nimbus_port
if 'nimbus_host' in params:
nimbus_host = params['nimbus_host']
if 'nimbus_port' in params:
nimbus_port = params['nimbus_port']
if 'topologies' not in params:
return
if 'storm_thrift_gen' in params:
sys.path.append(params['storm_thrift_gen'])
else:
sys.path.append('/usr/lib/ganglia')
if 'loglevel' in params:
loglevel = params['loglevel'].strip().upper()
if loglevel in logging_levels:
logging.getLogger().setLevel(logging_levels[loglevel])
global Nimbus, ttypes
from stormpy.storm import Nimbus, ttypes
tss = re.split('[,]+', params['topologies'])
topologies = tss
alltops = {}
for t in tss:
alltops[t] = {'topology': t}
alltops[t]['tlen'] = len(t)
t_bolts = t + '_bolts'
if t_bolts in params:
alltops[t]['bolts'] = params[t_bolts]
t_spouts = t + '_spouts'
if t_spouts in params:
alltops[t]['spouts'] = params[t_spouts]
for t in alltops:
logging.info('Initing metric for ' + t)
metric_init_topology(alltops[t])
return descriptors
if __name__ == '__main__':
params = {'topologies': 'SampleTopology,AnotherTopology',
'SampleTopology_spouts': 'SampleSpoutTwo', 'SampleTopology_bolts': 'boltc',
'AnotherTopology_spouts': 'Spout', 'AnotherTopology_bolts': 'bolta,boltb,boltd',
'loglevel': 'ERROR'}
metric_init(params)
for d in descriptors:
v = d['call_back'](d['name'])
formt = "%s " + d['format']
print formt % (d['name'], v)