Sunday, November 30, 2014

Using Zookeeper to coordinate distributed Job watcher

Zookeeper is a tool that can be used for distributed system synchronization, coordination, registry, lock service etc. There are not many open source alternatives to Zookeeper and Zookeeper seems to be pretty good at what it does. That is why many open source projects such as Storm, Kafka use it for service discovery, service registry etc.

In this blog I will explain, how we can use Zookeeper as a coordinator for distributed job watchers.

Let us assume there are processes running on different machines which collect and submits batches (small batches) of works to some queues. There are job executors which polls for the jobs submitted and executes them and submit the results back to the queue. Job watchers keep on monitoring the job statuses and once a job completed they collect the results and do something with the result.
There may be multiple job submitters, watchers and executors. Many instances of job submitters, watchers or executors may come up and go down. The job watchers and executors do a fair sharing of the jobs to watch and execute.  Appearance of new executors or disappearance of existing of executors will trigger re-sharing of the jobs among the executors. Similarly, appearance of disappearance of watchers will trigger re-sharing of watches.

All the above necessitates reliable co-ordination among the different tasks may be executing different machines. Doing the co-ordination correctly is hard as there are too many nuances to address. Fortunately Zookeeper has addressed all of these and it is a proven piece of software that in use for some years now. Let me explain how we can do the co-ordination through zookeeper. (Please refer Zookeeper getting started guide for an overview of Zookeeper if you haven't used it already.)

First, a job watcher has to be notified about the appearance and disappearance of other job watchers. Each watcher while starting up registers themselves under an well known znode /watchers. So, /watchers will have a list of children which are the unique ids of the watcher processes. A child node gets added when a new watcher starts and a child node will disappear when the corresponding watcher process dies, disconnects or loses network connection. Each job watcher sets a watch on the /watchers node and when a watcher process appears or disappears, it gets a notification with list of currently registered watchers ids.




Also, when a watcher chooses to watch a job, it locks that job so as to signal others that they shouldn't spend cycles watching the job it already watching. It will create a lock with the same name under another znode /watchlocks.

The job executors share the jobs to execute. So, whenever a new executor comes up re-sharing of jobs is triggered. Similarly whenever a executor disappears, re-sharing of jobs is again triggered. Each job executors registers themselves under the node /executors and they also put an watch on the znode "/watchers" so that they get notifications for changes in the list of executors currently working.

The executors may not know when a job completes. But the watchers know when a job is complete. This is because when the job submitter submits a job, it submits the job with enough information so that watcher can know when a job is complete. Actually, the executors can also know when a job is complete. But in this example, I am assigning just one responsibility to the executors, which they execute the tasks and submit the results to some results queue.

Whenever watcher detects the completion of a job, it collects the results of the completed job, remove job details from /jobs znode and do something with the result. As a node under /jobs znode is deleted, hence the watcher again re-share the jobs to watch.


This approach gives us the ability to monitor complex workflows. Because there is no reason watcher cannot submit the completed jobs to some other queues which is again executed by the executors. Here I am just giving a very basic example to explain the overall working.

We will use Python for our examples.

We will use Kazoo Zookeeper client.   Install it:
$ sudo pip install kazoo

We will use Redis  as the jobs queue and jobs results queue. We need to install  Redis.
Download Redis from here.
Build Redis.
Extract the downloaded tar archive:
$ tar zxf redis-2.8.17.tar.gz
$ cd redis-2.8.17
$ sudo make
$ sudo make install

Now we install Python redis clients
$ sudo pip install redis
$ sudo pip install hiredis  # Needed for better performance of Python redis client

Let us start Zookeeper server and Redis servers. We will run all run all of them in the same machine as this post is for demonstration purpose only.

$ (zookeeper-install-directory)/bin/zkServer.sh start-foreground
$redis-server



Now let us look at the Python code samples. You may get the complete example in this github link.

I have also pasted the code below:

import sys
from atexit import register
from time import sleep
from random import randint
import logging
import uuid
from redis import ConnectionPool, Redis
from kazoo.client import KazooClient
from math import sqrt
from threading import Thread, Lock, Condition

SUBMITTED = 'subm'
PROCESSED = 'prcd'
ZIPPED = 'zpd'
UPLOADED = 'uploaded'

inited = False

ALLOWED_COMMANDS = ['watcher', 'jobsubmitter', 'jobexecutor']


def state_listener(state):
    print state


def create_path_if_not_exists(zk, path):
    '''
    Create the znode path if it is not existing already
    '''
    if not zk.exists(path):
        try:
            zk.ensure_path(path)
        except Exception as e:
            print e
            return False
    return True


def stop_zk(zkwrapper):
    zkwrapper.stop()


def init_redis():
    '''
    Connect to redis server. For this example, we are running
    Redis on the same machine
    '''
    pool = ConnectionPool(host='localhost', port=6379, db=0)
    r = Redis(connection_pool=pool)
    return r


class zk_wrapper:
    '''
    Callable class wrapping a zookeeper kazooclient object
    '''
    def __init__(self, zk):
        self.zk = zk
        self.state = ''
        register(stop_zk, self)

    def stop(self):
        self.zk.stop()

    def __call__(self, state):
        self.state = state


def init():

    global inited
    zk = None
    try:
        zk = KazooClient(hosts='127.0.0.1:2181')
        zk.add_listener(state_listener)
        zk.start()
        register(stop_zk, zk)
        create_path_if_not_exists(zk, '/jobs')
        create_path_if_not_exists(zk, '/watchers')
        create_path_if_not_exists(zk, '/watchlocks')
        create_path_if_not_exists(zk, '/executors')
    except Exception as e:
        print 'Zk problem ', e
        if zk is not None:
            zk.stop()
        sys.exit(1)

    inited = True
    return zk


class job_watcher:
    def register_myself(self):
        self.zk.create('/watchers/' + self.myid, ephemeral=True)

    def __init__(self):
        self.lock = Lock()
        self.zk = init()
        self.redis = init_redis()
        self.myid = uuid.uuid4().hex
        self.register_myself()
        self.my_jobs = {}
        children = self.zk.get_children('/jobs', watch=self)
        self.alljobs = children
        children = self.zk.get_children('/watchers', watch=self)
        self.watchers = children
        self.myindex = self.watchers.index(self.myid)
        self.num_watchers = len(self.watchers)
        self.lock_my_job_watches()
        self.job_watcher_thread = Thread(target=self, args=[None])
        self.job_watcher_thread.start()

    def unlock_my_jobs(self):
        self.lock.acquire()
        for job, lock in self.my_jobs.items():
            try:
                lock.release()
            except Exception as e:
                print 'Unlocking issue', e
        self.my_jobs.clear()
        self.lock.release()

    def stop_monitoring(self):
        self.stall_monitor = True

    def start_monitoring(self):
        self.stall_monitor = False

    def watch_for_completion(self):
        jobcount = {}
        self.lock.acquire()
        for job in self.my_jobs:
            try:
                vals = self.zk.get('/jobs/' + job)
                stat, count = vals[0].split('=')
                jobcount[job] = {'count': int(count), 'stat': stat}
            except Exception as e:
                print 'Job watch error ', e
                self.lock.release()
                return
        self.lock.release()
        times = 0
        while (not self.stall_monitor) and (times < 4):
            times += 1
            temp = ''
            self.lock.acquire()
            for job in self.my_jobs:
                try:
                    if (job not in jobcount) or jobcount[job]['stat'] != PROCESSED:
                        continue
                    processedcount = self.redis.hlen(job + '_completed')
                    if processedcount == jobcount[job]['count'] or processedcount == 0:
                        self.my_jobs[job].release()
                        self.zk.delete('/watchlocks/' + job)
                        self.redis.delete(job + '_completed')
                        self.zk.delete('/jobs/' + job)
                        print 'Job finished ' + job
                        temp = job
                        break
                except Exception as e:
                    print 'Monitor error ', e
            if temp != '':
                del self.my_jobs[temp]
                sleep(0.4)
            self.lock.release()

    def run(self):
        while True:
            if self.stall_monitor:
                sleep(1)
                continue
            self.watch_for_completion()

    def lock_my_job_watches(self):
        self.stop_monitoring()
        self.unlock_my_jobs()
        self.lock.acquire()
        for child in self.alljobs:
            slot = abs(hash(child)) % self.num_watchers
            if slot != self.myindex:
                continue
            lock = self.zk.Lock('/watchlocks/' + child)
            try:
                if lock.acquire(blocking=True, timeout=1):
                    self.my_jobs[child] = lock
            except Exception as e:
                print 'Lock problem ', e
        self.lock.release()
        if len(self.my_jobs) > 0:
            self.start_monitoring()

    def __call__(self, event):
        if event is None:
            '''
            I am not the zookeeper event callback
            '''
            self.run()
        if event.path == '/jobs':
            children = self.zk.get_children('/jobs', watch=self)
            self.alljobs = children
        else:
            self.watchers = self.zk.get_children('/watchers', watch=self)
            self.num_watchers = len(self.watchers)
            self.myindex = self.watchers.index(self.myid)
        self.lock_my_job_watches()


class job_executor:

    def register_myself(self):
        self.zk.create('/executors/' + self.myid, ephemeral=True)

    def __init__(self):
        zk = init()
        self.zk = zk
        self.lock = Lock()
        self.condition = Condition(self.lock)
        self.some_change = 0
        self.redis = init_redis()
        self.myid = uuid.uuid4().hex
        self.register_myself()
        self.my_jobs = {}
        children = zk.get_children('/jobs', watch=self)
        self.alljobs = children
        children = zk.get_children('/executors', watch=self)
        self.executors = children
        self.myindex = self.executors.index(self.myid)
        self.num_executors = len(self.executors)
        self.keep_running = True
        self.executor_thread = Thread(target=self, args=[None])
        self.executor_thread.start()

    def execute(self):
        self.my_jobs = filter(lambda x: (self.alljobs.index(x) % self.num_executors)
                              == self.myindex, self.alljobs)
        self.execute_jobs()

    def execute_jobs(self):
        some_change = self.some_change

        def isprime(number):
            number = abs(number)
            if number <= 1:
                return False
            if number <= 3:
                return True
            if number & 1 == 0:
                return False
            end = int(sqrt(number))
            i = 3
            while i <= end:
                if number % i == 0:
                    return False
                i += 2
            return True

        if some_change != self.some_change:
            return
        jobs = set()

        for job in self.my_jobs:
            if some_change != self.some_change:
                return
            try:
                jobval = self.zk.get('/jobs/' + job)
                stat, counts = jobval[0].split('=')
                if stat == SUBMITTED:
                    jobs.add(job)
            except Exception as e:
                print 'Problem happened ', e

        while len(jobs) > 0:
            for job in jobs:
                if some_change != self.some_change:
                    return
                try:
                    val = self.redis.lrange(job, 0, 0)
                    if val is None or len(val) == 0:
                        stat = PROCESSED
                        self.zk.set('/jobs/' + job, PROCESSED + '=' + counts)
                        jobs.remove(job)
                        break
                    ival = int(val[0])
                    if self.redis.hmset(job + '_completed', {ival: isprime(ival)}):
                        self.redis.lpop(job)
                except Exception as e:
                    print 'Some problem ', e
                    sys.exit(1)

    def run(self):
        while self.keep_running:
            self.execute()
            self.condition.acquire()
            self.condition.wait(1.0)
            self.condition.release()

    def __call__(self, event):
        if event is None:
            self.run()
        if event.path == '/jobs':
            children = self.zk.get_children('/jobs', watch=self)
            self.alljobs = children
        else:
            self.executors = self.zk.get_children('/executors', watch=self)
            self.num_executors = len(self.executors)
            self.myindex = self.executors.index(self.myid)
        self.some_change += 1
        self.condition.acquire()
        self.condition.notify()
        self.condition.release()


def job_submitter_main():
    try:
        zk = init()
        cpool = ConnectionPool(host='localhost', port=6379, db=0)
        r = Redis(connection_pool=cpool)
        added = 0
        tried = 0
        max_add_try = 5000
        jobname = uuid.uuid4().hex
        added_nums = set()

        while tried < max_add_try:
            value = randint(5000, 90000000)
            tried += 1
            if value not in added_nums:
                added_nums.add(value)
            else:
                continue

            while True:
                try:
                    r.lpush(jobname, value)
                    added += 1
                    break
                except Exception as e:
                    sleep(1)
                    print "Lpush ", jobname, e

        zk = KazooClient(hosts='127.0.0.1:2181')
        zk.add_listener(state_listener)
        zk.start()
        value = SUBMITTED + "=" + str(added)
        zk.create('/jobs/' + jobname, value=value)
        zk.stop()

    except Exception as e:
        print 'Big problem in submitting job ', e
        sys.exit(1)
    print 'Job submitted ' + jobname


def watcher_main():
    job_watcher()


def job_executor_main():
    job_executor()


if __name__ == '__main__':
    if len(sys.argv) < 2:
        print 'Usage: ' + sys.argv[0] + ' command'
        print 'Valid commands are : ' + ', '.join(ALLOWED_COMMANDS)
        sys.exit(1)
    logging.basicConfig()
    if sys.argv[1] not in ALLOWED_COMMANDS:
        print sys.argv[1] + ' not a valid command'
        sys.exit(1)
    if sys.argv[1] == 'watcher':
        watcher_main()
        sleep(86400)
    elif sys.argv[1] == 'jobsubmitter':
        job_submitter_main()
        sleep(2)
    elif sys.argv[1] == 'jobexecutor':
        job_executor_main()
        sleep(86400)

The tasks here are some numbers and the executors checks if the numbers are prime or not. As I said before, this is just for demonstration purpose and hence the tasks are simplest possible tasks, in real life we don't need any distributed environment to check if the numbers are prime. The tasks (i.e. the numbers) are submitted to a queue in Redis in small batches. For the queue, we are just using lists here and hence and hence a job is submitted as a list numbers to a Redis list.

job_submitter_main is the function that submits the job to Redis. It pushes the list of numbers to Redis and also create a job description znode under the node /jobs node in Zookeeper.  The znode name and the name of the list created in Redis are same. The znode created for a job will have the data "subm=count" where count is the number of tasks for the job (so, it will be length of the corresponding list in Zookeeper, let us say it is the job size)

job_watcher is a callable class which watches the /watcher znode and also /jobs znode. All the job_watchers gets a notifications when new job description is created under the /jobs znode. The watchers shares the jobs to watch by following the below algorithm:
Let us say there are N watchers and J is the sorted list of incomplete jobs. All the watchers has the list of currently running registered watchers.  Each watcher sorts the list and find its position within the list. Watcher with position 0 will watch the jobs at index 0, N, 2N, 3N, 4N,....,  watcher with position 1 will watch the jobs at index 1, N + 1, 2N + 1, 3N + 1, 4N + 1, ....  in list J,  watcher will position n will watch the jobs at index n, N +n , 2N + n, 3N + n, 4N + n,  .,.. etc.
The watcher processes create an job_watcher object and start watching the jobs for its completion.

job_executor is the callable class which executes the jobs. The executor processes creates instances of job_executor class and start executing the jobs. The executors share the jobs for execution using a similar algorithm as done by the watchers. An executor completes a task and write the result in a hashmap  in Redis. The hashmap is named as _completed. Each key in the hashmap is a number and the value is 0 or 1. A value of zero indicates the number is not prime, 1 indicates the number is prime. Once a task is completely executed (i,e. the number is checked),  the executor removes the number from the job queue (list) and puts an entry in the hashmap for the result.

The job watchers keep checking size of the completed queues (hashmaps), and once the size becomes equal to the job size it assumes the job is complete. The watcher simply deletes the job node from zookeeper and the _completed hashmap from Redis.

We can run the different components as follows (save the above script in file distjob.py):
To submit jobs:
$ python distjob.py   jobsubmitter
We can submit any number of jobs and we don't really care if the job executors or watchers are running or not.

To run the watchers:
$ python distjob.py  watcher
We can run any number of watchers and we don't care how many job executors or job submitters are running.

To run the job executors:
$ python distjob.py  jobexecutor
We can run any number of job executors and we don't really care how many job submitters or watchers are running.

In this example, to demonstrate distributed job co-ordination using Zookeeper, we are running all the watchers, submitters, executors and Redis server and Zookeeper servers from the same node. But that need not be so. We can easily make it really distributed system. We need to just replace "localhost" with the distributed Zookeeper server list. Also, we also have to use the remote Redis server host or IP whereas in the example script above we are using "localhost" as the Redis server host.