Wednesday, March 9, 2016

Design of a Scalable and Reliable pipleline with Storm and Kafka

Apache Storm is a real-time event processing system. We may even call it real-time Hadoop. Unlike Hadoop map-reduce jobs, Storm topologies keep running continuously looking for events to process. A storm topology defines how the events are moved in the execution pipeline which is expressed as directed acyclic graph. A topology has hooks to work on the data. The hooks are called spouts and bolts. Generally, the spout is the event "generator" and the entry point to the topology. Spout may get the events from anything, but it is common to get the events from Kafka, Kestrel, Mongodb etc. as the event source. Hooks are the entry points for code and the routing of tuples (data) is controlled by Storm framework. When a Spout or Bolt emits a tuple, the tuple is sent to next bolt in the pipeline. Each spout or bolts run a set of tasks doing the exactly same work. The tasks may be run in separate threads or a set of threads may be multiplexed among the tasks.


A typical topology looks like as shown below:

The spout is the entry point to the topology. There may be multiple Spouts in a topology and there may be multiple data sources. In this above example, a spout is emitting data to bolt 1 and bolt 2. Bolt 2 is further emitting data to bolt 3. Each of the spouts and bolts may running multiple tasks (may be separate threads) and the tasks may running on same or different JVM or across machines. Storm framework takes care of routing the data (tuples) among the various components.

Storm cluster has three different components. They are Nimbus, Supervisors and UI. A storm cluster has just one Nimbus node and this is a single point of failure till now.  When a topology is submitted, the topology Jar, along with configs are sent to Nimbus (using Thrift protocol). Nimbus decides which Supervisors to run the the topology and the topology Jar is submitted to those Supervisor machines as well (For testing purpose we may run Nimbus, Supervisors and UI all in the same machine.
Each supervisor exposes a few slots. Each slot actually can be occupied by a topology worker, where a worker is a JVM process. When we create the topology, we can specify the number workers the topology should be run with and Storm tries best to provide that number of workers provided that many slots available in the cluster.

Another essential component in the cluster is Zookeeper. Storm uses Zookeeper  for co-ordination of Nimbus, Supervisor and UI components.


Now enough of theories. Let us make a topology and run it in a Storm cluster.

Our data source is Kafka which is reliable and highly available message queue. It is simple and performs really well.

Now let is assume a simple scenario where we collect details of people for doing some analysis. As this is a demo, so I will just put the very basic information about the person: name, age and id. The information is a Json document and a sample is given below:

{    
    "id" : "udhdgcbcdg",
    "name" : "Geet",
    "age" : 8

}


The document is added to a Mongodb collection person
Now let us create a Kafka topic "demo". I assume you have set up a Zookeeper and Kafka cluster . A Kafka cluster with just one node is enough for demo. Similarly a Zookeeper node is enough for our demo. We don't need high availability to run a demo :)

Cd to Kafka install directory and issue the below command:
$ bin/kafka-topics.sh --create --topic demo --partitions 16 --replication-factor 1 --zookeeper

If you are running Zookeeper and Kafka locally, use 127.0.0.1:2181 for host and port.
$bin/kafka-topics.sh --create --topic demo --partitions 16 --replication-factor 1 --zookeeper 127.0.0.1:2181

 Now a topic "demo" is created with 16 partitions, and no replication for the topic. So, no high-availability!

Let us download Storm 0.9.6. Extract the package. I modified Storm config storm.yaml as shown below:
# Storm.yaml ###########
storm.zookeeper.servers:
    - "127.0.0.1"
nimbus.host: "127.0.0.1"
supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703
###################

Note that I am running Nimbus and Zookeeper on my localhost.


Change directory to storm installation directory (delete the jar log4j-over-slf4j-1.6.6.jar from Storm lib directory, otherwise we may face some conflicts in running our topology):


and issue the below commands:
$ bin/storm nimbus  #Starts nimbus
$ bin/storm supervisor #Starts Supervisor
$ bin/storm ui #Starts the UI

Now we may check the Storm UI by pointing our browser to http://127.0.0.1:8080

The sample code for the topology prototype is here. Please download the code so that we may run the topology.
After you downloaded the code, build the topology jar by issuing the below command:

$cd  datainfralytics/pipeline/proto
$ mvn package

This creates the a jar for our topology (proto-1.0-SNAPSHOT-jar-with-dependencies.jar in directory datainfralytics/pipeline/proto/target).
We submit the jar issuing the below command:
$  $STORM_HOME/bin jar proto-1.0-SNAPSHOT-jar-with-dependencies.jar geet.org.topolgy.StartTopology  topology.properties

STORM_HOME environment variable should be pointing to storm installation directory.
topology.properties file contain the configuration details such as elasticsearch cluster name, mongos host ip etc. A sample is given below:

#### topology.properties####
es.cluster=demo
es.host=127.0.0.1
es.port=9300
mongos.host=127.0.0.1
mongos.port=27017
###################

Now our topology will be running and we can inspect the details from the Storm UI.

The topology is consuming events from Kafka. These events are nothing but JSON documents containing information about persons.  A producer process writes the person details to Kafka and our topology consumes the events from Kafka. A very simple Python producer script is given below:

from time import time, sleep
from random import randint
from json import dumps
from uuid import uuid1
from kafka import KafkaClient, KeyedProducer
 
kafka_brokers = '127.0.0.1:9092' # Comma separated list of brokers
topic = 'demo' 
kclient = KafkaClient(kafka_brokers)
producer = KeyedProducer(kclient)

try:
    i = 1
    namein = 'name_{}'.format(int(time()))
    while i <= 10000:
        name = '{}_{}'.format(namein, i)
        person = {'id' : uuid1().hex , 'name' : name, 'age' : randint(1,100)}
        producer.send(topic, name, dumps(person))
        i += 1
        print i
except Exception as e:
    print e
    exit(1)


Run the script and it will put 10000 person documents in Kafka in the topic demo. The topology will almost instantly consume the person JSON documents and put them to MongoDB and index them on Elasticsearch.

Now let us look at the Topology and understand how can it be scalable and highly available. The topology is defined in the following Java class:

package geet.org.topolgy;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
import java.util.Set;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import geet.org.dbbolt.MongoBolt;
import geet.org.indexbolt.EsBolt;
import geet.org.spout.DataSpout;

public class StartTopology {
 private static final String TOPOLOGY = "DemoTopolgy";

 public static void main(String[] args) {
  if (args.length < 1) {
   System.err.println("Please provide the proprty file name");
   System.exit(1);
  }
  TopologyBuilder tbuilder = new TopologyBuilder();
  tbuilder.setSpout("dataspout", new DataSpout(), 16);
  Fields groupingField = new Fields("person");
  tbuilder.setBolt("boltdb", new MongoBolt(), 16).fieldsGrouping("dataspout", groupingField);
  tbuilder.setBolt("boltindex", new EsBolt(), 32).fieldsGrouping("boltdb", groupingField);
  Config conf = new Config();
  conf.setMaxTaskParallelism(128);
  conf.setNumWorkers(2);
  conf.setNumAckers(2);
  conf.setDebug(false);
  conf.put(Config.TOPOLOGY_DEBUG, false);
  conf.setMaxSpoutPending(64);
  conf.setMessageTimeoutSecs(300);

  Properties props = new Properties();
  try {
   FileInputStream inf = new FileInputStream(args[0]);
   props.load(inf);
   Set<Object> propnames = props.keySet();
   for (Object x : propnames) {
    conf.put((String) x, props.get(x));
   }
   inf.close();
   props.list(System.out);
  } catch (IOException e) {
   e.printStackTrace();
   System.exit(1);
  }
  try {
   StormSubmitter.submitTopology(TOPOLOGY, conf, tbuilder.createTopology());
  } catch (AlreadyAliveException | InvalidTopologyException e) {
   e.printStackTrace();
   System.exit(1);
  }
 }
}


At line 28: We are specifying a spout (labelled "dataspout") which will be run with 16 tasks. It is implemented by the DataSpout class.
Line 30: Specifies the DB updater bolt (labelled "boltdb") which will be run with 16 tasks. It takes input from "dataspout", i.e. the tuples emitted by "dataspout" are directed to this bolt. It is implemented by MongoBolt class.
Line 31:  Specifies the index updater bolt (labelled "boltindex") which is run with 32 tasks. It takes input from "boltdb" and updates the index. It is implemented by EsBolt class.

So, data flows from Kafka to Spout (dataspout), from Spout to DB Updater bolt(boltdb) and from DB updater bolt to index updater bolt (indexbolt)

Now how it is scalable?
It is scalable because we are using scalable technologies such as Kafka, MongoDB, ElasticSearch:)
But what about the pipeline we have just built ? Is it also scalable, is it also reliable?
The pipeline itself is scalable. As we may  increase the number of tasks for each stage.  We may also increase the number of workers executing the topology, and may also increase the number of machines running the workers. So, scaling is not a problem here.
 

Order of updates:
Sometimes the order in which updates are applied across storage systems is very important. For example. there may be an event which specify update of a record, and a subsequent event which specifies the delete of the same record. If the events are applied in out of sequence, then the deleted record may actually reappear in our storage systems, in such a case our customers will scold us and may even switch to our competitor's product. That is why we should make sure that events are applied in order. That is achievable by applying the events to the queue (Kafka) in order. Each event may be associated with a time stamp and ensure that events are added to the queue in the order of that time stamp.

How to keep the storage systems in Sync:
Maintaining the order of processing:
Error handling: