Zookeeper is a tool that can be used for distributed system synchronization, coordination, registry, lock service etc. There are not many open source alternatives to Zookeeper and Zookeeper seems to be pretty good at what it does. That is why many open source projects such as Storm, Kafka use it for service discovery, service registry etc.
In this blog I will explain, how we can use Zookeeper as a coordinator for distributed job watchers.
Let us assume there are processes running on different machines which collect and submits batches (small batches) of works to some queues. There are job executors which polls for the jobs submitted and executes them and submit the results back to the queue. Job watchers keep on monitoring the job statuses and once a job completed they collect the results and do something with the result.
There may be multiple job submitters, watchers and executors. Many instances of job submitters, watchers or executors may come up and go down. The job watchers and executors do a fair sharing of the jobs to watch and execute. Appearance of new executors or disappearance of existing of executors will trigger re-sharing of the jobs among the executors. Similarly, appearance of disappearance of watchers will trigger re-sharing of watches.
All the above necessitates reliable co-ordination among the different tasks may be executing different machines. Doing the co-ordination correctly is hard as there are too many nuances to address. Fortunately Zookeeper has addressed all of these and it is a proven piece of software that in use for some years now. Let me explain how we can do the co-ordination through zookeeper. (Please refer Zookeeper getting started guide for an overview of Zookeeper if you haven't used it already.)
First, a job watcher has to be notified about the appearance and disappearance of other job watchers. Each watcher while starting up registers themselves under an well known znode /watchers. So, /watchers will have a list of children which are the unique ids of the watcher processes. A child node gets added when a new watcher starts and a child node will disappear when the corresponding watcher process dies, disconnects or loses network connection. Each job watcher sets a watch on the /watchers node and when a watcher process appears or disappears, it gets a notification with list of currently registered watchers ids.
Also, when a watcher chooses to watch a job, it locks that job so as to signal others that they shouldn't spend cycles watching the job it already watching. It will create a lock with the same name under another znode /watchlocks.
The job executors share the jobs to execute. So, whenever a new executor comes up re-sharing of jobs is triggered. Similarly whenever a executor disappears, re-sharing of jobs is again triggered. Each job executors registers themselves under the node /executors and they also put an watch on the znode "/watchers" so that they get notifications for changes in the list of executors currently working.
The executors may not know when a job completes. But the watchers know when a job is complete. This is because when the job submitter submits a job, it submits the job with enough information so that watcher can know when a job is complete. Actually, the executors can also know when a job is complete. But in this example, I am assigning just one responsibility to the executors, which they execute the tasks and submit the results to some results queue.
Whenever watcher detects the completion of a job, it collects the results of the completed job, remove job details from /jobs znode and do something with the result. As a node under /jobs znode is deleted, hence the watcher again re-share the jobs to watch.
This approach gives us the ability to monitor complex workflows. Because there is no reason watcher cannot submit the completed jobs to some other queues which is again executed by the executors. Here I am just giving a very basic example to explain the overall working.
We will use Python for our examples.
We will use Kazoo Zookeeper client. Install it:
$ sudo pip install kazoo
We will use Redis as the jobs queue and jobs results queue. We need to install Redis.
Download Redis from here.
Build Redis.
Extract the downloaded tar archive:
$ tar zxf redis-2.8.17.tar.gz
$ cd redis-2.8.17
$ sudo make
$ sudo make install
Now we install Python redis clients
$ sudo pip install redis
$ sudo pip install hiredis # Needed for better performance of Python redis client
Let us start Zookeeper server and Redis servers. We will run all run all of them in the same machine as this post is for demonstration purpose only.
$ (zookeeper-install-directory)/bin/zkServer.sh start-foreground
$redis-server
Now let us look at the Python code samples. You may get the complete example in this github link.
I have also pasted the code below:
In this blog I will explain, how we can use Zookeeper as a coordinator for distributed job watchers.
Let us assume there are processes running on different machines which collect and submits batches (small batches) of works to some queues. There are job executors which polls for the jobs submitted and executes them and submit the results back to the queue. Job watchers keep on monitoring the job statuses and once a job completed they collect the results and do something with the result.
There may be multiple job submitters, watchers and executors. Many instances of job submitters, watchers or executors may come up and go down. The job watchers and executors do a fair sharing of the jobs to watch and execute. Appearance of new executors or disappearance of existing of executors will trigger re-sharing of the jobs among the executors. Similarly, appearance of disappearance of watchers will trigger re-sharing of watches.
All the above necessitates reliable co-ordination among the different tasks may be executing different machines. Doing the co-ordination correctly is hard as there are too many nuances to address. Fortunately Zookeeper has addressed all of these and it is a proven piece of software that in use for some years now. Let me explain how we can do the co-ordination through zookeeper. (Please refer Zookeeper getting started guide for an overview of Zookeeper if you haven't used it already.)
First, a job watcher has to be notified about the appearance and disappearance of other job watchers. Each watcher while starting up registers themselves under an well known znode /watchers. So, /watchers will have a list of children which are the unique ids of the watcher processes. A child node gets added when a new watcher starts and a child node will disappear when the corresponding watcher process dies, disconnects or loses network connection. Each job watcher sets a watch on the /watchers node and when a watcher process appears or disappears, it gets a notification with list of currently registered watchers ids.
Also, when a watcher chooses to watch a job, it locks that job so as to signal others that they shouldn't spend cycles watching the job it already watching. It will create a lock with the same name under another znode /watchlocks.
The job executors share the jobs to execute. So, whenever a new executor comes up re-sharing of jobs is triggered. Similarly whenever a executor disappears, re-sharing of jobs is again triggered. Each job executors registers themselves under the node /executors and they also put an watch on the znode "/watchers" so that they get notifications for changes in the list of executors currently working.
The executors may not know when a job completes. But the watchers know when a job is complete. This is because when the job submitter submits a job, it submits the job with enough information so that watcher can know when a job is complete. Actually, the executors can also know when a job is complete. But in this example, I am assigning just one responsibility to the executors, which they execute the tasks and submit the results to some results queue.
Whenever watcher detects the completion of a job, it collects the results of the completed job, remove job details from /jobs znode and do something with the result. As a node under /jobs znode is deleted, hence the watcher again re-share the jobs to watch.
This approach gives us the ability to monitor complex workflows. Because there is no reason watcher cannot submit the completed jobs to some other queues which is again executed by the executors. Here I am just giving a very basic example to explain the overall working.
We will use Python for our examples.
We will use Kazoo Zookeeper client. Install it:
$ sudo pip install kazoo
We will use Redis as the jobs queue and jobs results queue. We need to install Redis.
Download Redis from here.
Build Redis.
Extract the downloaded tar archive:
$ tar zxf redis-2.8.17.tar.gz
$ cd redis-2.8.17
$ sudo make
$ sudo make install
Now we install Python redis clients
$ sudo pip install redis
$ sudo pip install hiredis # Needed for better performance of Python redis client
Let us start Zookeeper server and Redis servers. We will run all run all of them in the same machine as this post is for demonstration purpose only.
$ (zookeeper-install-directory)/bin/zkServer.sh start-foreground
$redis-server
Now let us look at the Python code samples. You may get the complete example in this github link.
I have also pasted the code below:
import sys from atexit import register from time import sleep from random import randint import logging import uuid from redis import ConnectionPool, Redis from kazoo.client import KazooClient from math import sqrt from threading import Thread, Lock, Condition SUBMITTED = 'subm' PROCESSED = 'prcd' ZIPPED = 'zpd' UPLOADED = 'uploaded' inited = False ALLOWED_COMMANDS = ['watcher', 'jobsubmitter', 'jobexecutor'] def state_listener(state): print state def create_path_if_not_exists(zk, path): ''' Create the znode path if it is not existing already ''' if not zk.exists(path): try: zk.ensure_path(path) except Exception as e: print e return False return True def stop_zk(zkwrapper): zkwrapper.stop() def init_redis(): ''' Connect to redis server. For this example, we are running Redis on the same machine ''' pool = ConnectionPool(host='localhost', port=6379, db=0) r = Redis(connection_pool=pool) return r class zk_wrapper: ''' Callable class wrapping a zookeeper kazooclient object ''' def __init__(self, zk): self.zk = zk self.state = '' register(stop_zk, self) def stop(self): self.zk.stop() def __call__(self, state): self.state = state def init(): global inited zk = None try: zk = KazooClient(hosts='127.0.0.1:2181') zk.add_listener(state_listener) zk.start() register(stop_zk, zk) create_path_if_not_exists(zk, '/jobs') create_path_if_not_exists(zk, '/watchers') create_path_if_not_exists(zk, '/watchlocks') create_path_if_not_exists(zk, '/executors') except Exception as e: print 'Zk problem ', e if zk is not None: zk.stop() sys.exit(1) inited = True return zk class job_watcher: def register_myself(self): self.zk.create('/watchers/' + self.myid, ephemeral=True) def __init__(self): self.lock = Lock() self.zk = init() self.redis = init_redis() self.myid = uuid.uuid4().hex self.register_myself() self.my_jobs = {} children = self.zk.get_children('/jobs', watch=self) self.alljobs = children children = self.zk.get_children('/watchers', watch=self) self.watchers = children self.myindex = self.watchers.index(self.myid) self.num_watchers = len(self.watchers) self.lock_my_job_watches() self.job_watcher_thread = Thread(target=self, args=[None]) self.job_watcher_thread.start() def unlock_my_jobs(self): self.lock.acquire() for job, lock in self.my_jobs.items(): try: lock.release() except Exception as e: print 'Unlocking issue', e self.my_jobs.clear() self.lock.release() def stop_monitoring(self): self.stall_monitor = True def start_monitoring(self): self.stall_monitor = False def watch_for_completion(self): jobcount = {} self.lock.acquire() for job in self.my_jobs: try: vals = self.zk.get('/jobs/' + job) stat, count = vals[0].split('=') jobcount[job] = {'count': int(count), 'stat': stat} except Exception as e: print 'Job watch error ', e self.lock.release() return self.lock.release() times = 0 while (not self.stall_monitor) and (times < 4): times += 1 temp = '' self.lock.acquire() for job in self.my_jobs: try: if (job not in jobcount) or jobcount[job]['stat'] != PROCESSED: continue processedcount = self.redis.hlen(job + '_completed') if processedcount == jobcount[job]['count'] or processedcount == 0: self.my_jobs[job].release() self.zk.delete('/watchlocks/' + job) self.redis.delete(job + '_completed') self.zk.delete('/jobs/' + job) print 'Job finished ' + job temp = job break except Exception as e: print 'Monitor error ', e if temp != '': del self.my_jobs[temp] sleep(0.4) self.lock.release() def run(self): while True: if self.stall_monitor: sleep(1) continue self.watch_for_completion() def lock_my_job_watches(self): self.stop_monitoring() self.unlock_my_jobs() self.lock.acquire() for child in self.alljobs: slot = abs(hash(child)) % self.num_watchers if slot != self.myindex: continue lock = self.zk.Lock('/watchlocks/' + child) try: if lock.acquire(blocking=True, timeout=1): self.my_jobs[child] = lock except Exception as e: print 'Lock problem ', e self.lock.release() if len(self.my_jobs) > 0: self.start_monitoring() def __call__(self, event): if event is None: ''' I am not the zookeeper event callback ''' self.run() if event.path == '/jobs': children = self.zk.get_children('/jobs', watch=self) self.alljobs = children else: self.watchers = self.zk.get_children('/watchers', watch=self) self.num_watchers = len(self.watchers) self.myindex = self.watchers.index(self.myid) self.lock_my_job_watches() class job_executor: def register_myself(self): self.zk.create('/executors/' + self.myid, ephemeral=True) def __init__(self): zk = init() self.zk = zk self.lock = Lock() self.condition = Condition(self.lock) self.some_change = 0 self.redis = init_redis() self.myid = uuid.uuid4().hex self.register_myself() self.my_jobs = {} children = zk.get_children('/jobs', watch=self) self.alljobs = children children = zk.get_children('/executors', watch=self) self.executors = children self.myindex = self.executors.index(self.myid) self.num_executors = len(self.executors) self.keep_running = True self.executor_thread = Thread(target=self, args=[None]) self.executor_thread.start() def execute(self): self.my_jobs = filter(lambda x: (self.alljobs.index(x) % self.num_executors) == self.myindex, self.alljobs) self.execute_jobs() def execute_jobs(self): some_change = self.some_change def isprime(number): number = abs(number) if number <= 1: return False if number <= 3: return True if number & 1 == 0: return False end = int(sqrt(number)) i = 3 while i <= end: if number % i == 0: return False i += 2 return True if some_change != self.some_change: return jobs = set() for job in self.my_jobs: if some_change != self.some_change: return try: jobval = self.zk.get('/jobs/' + job) stat, counts = jobval[0].split('=') if stat == SUBMITTED: jobs.add(job) except Exception as e: print 'Problem happened ', e while len(jobs) > 0: for job in jobs: if some_change != self.some_change: return try: val = self.redis.lrange(job, 0, 0) if val is None or len(val) == 0: stat = PROCESSED self.zk.set('/jobs/' + job, PROCESSED + '=' + counts) jobs.remove(job) break ival = int(val[0]) if self.redis.hmset(job + '_completed', {ival: isprime(ival)}): self.redis.lpop(job) except Exception as e: print 'Some problem ', e sys.exit(1) def run(self): while self.keep_running: self.execute() self.condition.acquire() self.condition.wait(1.0) self.condition.release() def __call__(self, event): if event is None: self.run() if event.path == '/jobs': children = self.zk.get_children('/jobs', watch=self) self.alljobs = children else: self.executors = self.zk.get_children('/executors', watch=self) self.num_executors = len(self.executors) self.myindex = self.executors.index(self.myid) self.some_change += 1 self.condition.acquire() self.condition.notify() self.condition.release() def job_submitter_main(): try: zk = init() cpool = ConnectionPool(host='localhost', port=6379, db=0) r = Redis(connection_pool=cpool) added = 0 tried = 0 max_add_try = 5000 jobname = uuid.uuid4().hex added_nums = set() while tried < max_add_try: value = randint(5000, 90000000) tried += 1 if value not in added_nums: added_nums.add(value) else: continue while True: try: r.lpush(jobname, value) added += 1 break except Exception as e: sleep(1) print "Lpush ", jobname, e zk = KazooClient(hosts='127.0.0.1:2181') zk.add_listener(state_listener) zk.start() value = SUBMITTED + "=" + str(added) zk.create('/jobs/' + jobname, value=value) zk.stop() except Exception as e: print 'Big problem in submitting job ', e sys.exit(1) print 'Job submitted ' + jobname def watcher_main(): job_watcher() def job_executor_main(): job_executor() if __name__ == '__main__': if len(sys.argv) < 2: print 'Usage: ' + sys.argv[0] + ' command' print 'Valid commands are : ' + ', '.join(ALLOWED_COMMANDS) sys.exit(1) logging.basicConfig() if sys.argv[1] not in ALLOWED_COMMANDS: print sys.argv[1] + ' not a valid command' sys.exit(1) if sys.argv[1] == 'watcher': watcher_main() sleep(86400) elif sys.argv[1] == 'jobsubmitter': job_submitter_main() sleep(2) elif sys.argv[1] == 'jobexecutor': job_executor_main() sleep(86400)
The tasks here are some numbers and the executors checks if the numbers are prime or not. As I said before, this is just for demonstration purpose and hence the tasks are simplest possible tasks, in real life we don't need any distributed environment to check if the numbers are prime. The tasks (i.e. the numbers) are submitted to a queue in Redis in small batches. For the queue, we are just using lists here and hence and hence a job is submitted as a list numbers to a Redis list.
job_submitter_main is the function that submits the job to Redis. It pushes the list of numbers to Redis and also create a job description znode under the node /jobs node in Zookeeper. The znode name and the name of the list created in Redis are same. The znode created for a job will have the data "subm=count" where count is the number of tasks for the job (so, it will be length of the corresponding list in Zookeeper, let us say it is the job size)
job_watcher is a callable class which watches the /watcher znode and also /jobs znode. All the job_watchers gets a notifications when new job description is created under the /jobs znode. The watchers shares the jobs to watch by following the below algorithm:
Let us say there are N watchers and J is the sorted list of incomplete jobs. All the watchers has the list of currently running registered watchers. Each watcher sorts the list and find its position within the list. Watcher with position 0 will watch the jobs at index 0, N, 2N, 3N, 4N,...., watcher with position 1 will watch the jobs at index 1, N + 1, 2N + 1, 3N + 1, 4N + 1, .... in list J, watcher will position n will watch the jobs at index n, N +n , 2N + n, 3N + n, 4N + n, .,.. etc.
The watcher processes create an job_watcher object and start watching the jobs for its completion.
job_executor is the callable class which executes the jobs. The executor processes creates instances of job_executor class and start executing the jobs. The executors share the jobs for execution using a similar algorithm as done by the watchers. An executor completes a task and write the result in a hashmap in Redis. The hashmap is named as
The job watchers keep checking size of the completed queues (hashmaps), and once the size becomes equal to the job size it assumes the job is complete. The watcher simply deletes the job node from zookeeper and the
We can submit any number of jobs and we don't really care if the job executors or watchers are running or not.
To run the watchers:
$ python distjob.py watcher
We can run any number of watchers and we don't care how many job executors or job submitters are running.
To run the job executors:
$ python distjob.py jobexecutor
We can run any number of job executors and we don't really care how many job submitters or watchers are running.
In this example, to demonstrate distributed job co-ordination using Zookeeper, we are running all the watchers, submitters, executors and Redis server and Zookeeper servers from the same node. But that need not be so. We can easily make it really distributed system. We need to just replace "localhost" with the distributed Zookeeper server list. Also, we also have to use the remote Redis server host or IP whereas in the example script above we are using "localhost" as the Redis server host.