Tuesday, December 11, 2012

Apache Zookeeper examples for distributed configuration service, distributed registry, distributed coordination etc

Apache Zookeeper is an excellent piece of software that is really helpful for achieving the below stuffs in distributed systems:

  • coordination 
  • synchronization
  • lock service
  • configuration service
  • naming registry
etc. etc.
Zookeeper supports high availability by running multiple instances of the service. In case of one of the server that the clients are connecting to goes down, then it will switch over another server transparently. Great thing is irrespective of the server a client may connect to, it will see the updates in the same order. It is a high performance system and can be used for large distributed systems as well.
Zookeeper lets clients coordinate by a shared hierarchical namespace. It is a tree like structure as in normal file system. A client will be connected to a single zookeeper server (as long as the server is accessible).

Please go through Apache Zookeeper getting started guide for how to build, configure zookeeper first.
All these examples are available  in github.

Below is an example (zoo_create_node.c) for how to create Znodes (nodes in the zookeeper database).
In this example we demonstrate a simple way to create nodes in the zookeeper
server. Twenty nodes will be created with path /testpath0, /testpath1,     
/testpath2, /testpath3, ....., /testpath19.                                
All these nodes will be initialized to the same value "myvalue1".          
We will use zookeeper synchronus API to create the nodes. As soon as our   
client enters connected state, we start creating the nodes.                

All the examples used latest stable version of zookeeper that is version
3.3.6                                                                  
We may use the zookeeper client program to get the nodes and examine their
contents.                                                                
Suppose you have downloaded and extracted zookeeper to a directory       
/home/yourdir/packages/zookeeper-3.3.6 . Then after you build the C libraries
,they will be available at /home/yourdir/packages/zookeeper-3.3.6/src/c/.libs/
and the c command line tool will available at                                
/home/yourdir/packages/zookeeper-3.3.6/src/c. The command line tools are cli_st
and cli_mt and cli. They are convenient tool to examine zookeeper data.       

Compile the below code as shown below:
$gcc -o testzk1 zoo_create_node.c -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/include -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/generated -L \
/home/yourdir/packages/zookeeper-3.3.6/src/c/.libs/ -lzookeeper_mt
 *                                                               
Make sure that your LD_LIBRARY_PATH includes the zookeeper C libraries to run
the example. Before you run the example, you have to configure and run the  
zookeeper server. Please go through the zookeeper wiki how to do that.      
Now you run the example as shown below:                                     
./testzk1 127.0.0.1:22181  # Assuming zookeeper server is listening on port 
22181 and IP 127.0.0.1                                                      
Now use one of the cli tools to examine the znodes created and also their   
values.

#include <unistd.h>
#include <sys/select.h>
#include <sys/time.h>  
#include <time.h>      
#include <stdlib.h>    
#include <stdio.h>     
#include <string.h>    
#include <errno.h>     

#include <zookeeper.h>
static const char *hostPort;
static zhandle_t *zh;       
static clientid_t myid;     
static int connected;       

void watcher(zhandle_t *zzh, int type, int state, const char *path,
             void* context)                                        
{                                                                  
    if (type == ZOO_SESSION_EVENT) {                               
        if (state == ZOO_CONNECTED_STATE) {                        
            connected = 1;                                         
        } else if (state == ZOO_AUTH_FAILED_STATE) {               
            zookeeper_close(zzh);                                  
            exit(1);                                               
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {           
            zookeeper_close(zzh);                                  
            exit(1);                                               
        }                                                          
    }                                                              
}                                                                  


int main(int argc, char *argv[])
{                               
    int rc;                     
    int fd;                     
    int interest;               
    int events;                 
    struct timeval tv;          
    fd_set rfds, wfds, efds;    

    if (argc != 2) {
        fprintf(stderr, "USAGE: %s host:port\n", argv[0]);
        exit(1);                                          
    }                                                     

    FD_ZERO(&rfds);
    FD_ZERO(&wfds);
    FD_ZERO(&efds);

    zoo_set_debug_level(ZOO_LOG_LEVEL_INFO);
    zoo_deterministic_conn_order(1);        
    hostPort = argv[1];                     
    int x = 0;                              
    zh = zookeeper_init(hostPort, watcher, 30000, &myid, 0, 0);
    if (!zh) {                                                 
        return errno;                                          
    }                                                          
    while (1) {                                                
        char mypath[255];                                      
        zookeeper_interest(zh, &fd, &interest, &tv);           
        usleep(10);                                            
        memset(mypath, 0, 255);                                
        if (connected) {                                       
            while (x < 20) {                                   
                sprintf(mypath, "/testpath%d", x);             
                usleep(10);                                    
                rc = zoo_create(zh, mypath, "myvalue1", 9, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
                if (rc){                                                                  
                    printf("Problems %s %d\n", mypath, rc);                               
                }                                                                         
                x++;                                                                      
            }                                                                             
            connected++;
        }
        if (fd != -1) {
            if (interest&ZOOKEEPER_READ) {
                FD_SET(fd, &rfds);
            } else {
                FD_CLR(fd, &rfds);
            }
            if (interest&ZOOKEEPER_WRITE) {
                FD_SET(fd, &wfds);
            } else {
                FD_CLR(fd, &wfds);
            }
        } else {
            fd = 0;
        }
        FD_SET(0, &rfds);
        rc = select(fd+1, &rfds, &wfds, &efds, &tv);
        events = 0;
        if (rc > 0) {
            if (FD_ISSET(fd, &rfds)) {
                    events |= ZOOKEEPER_READ;
            }
            if (FD_ISSET(fd, &wfds)) {
                events |= ZOOKEEPER_WRITE;
            }
        }
        zookeeper_process(zh, events);
        if (2 == connected ) {
            // We created the nodes, so we will exit now
            zookeeper_close(zh);
            break;
        }
    }
    return 0;
}


In the below example (zoo_get_node.c) we demonstrate a simple way to get nodes in the zookeeper
server. Twenty nodes with path /testpath0, /testpath1,                       
/testpath2, /testpath3, ....., /testpath19 will be examined and value        
associated with them will be printed.                                        
We will use zookeeper synchronus API to get the value of the nodes. As soon  
as our client enters connected state, we start getting the node values.      

Compile the code as shown below:
$gcc -o testzk1 zoo_get_node.c -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/include -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/generated -L \
/home/yourdir/packages/zookeeper-3.3.6/src/c/.libs/ -lzookeeper_mt
 *                                                               
Now you run the example as shown below:                          
./testzk1 127.0.0.1:22181  # Assuming zookeeper server is listening on port
22181 and IP 127.0.0.1                                                    

#include <unistd.h>
#include <sys/select.h>
#include <sys/time.h>  
#include <time.h>      
#include <stdlib.h>    
#include <stdio.h>     
#include <string.h>    
#include <errno.h>     

#include <zookeeper.h>
static const char *hostPort;
static zhandle_t *zh;       
static clientid_t myid;     
static int connected;       

void watcher(zhandle_t *zzh, int type, int state, const char *path,
             void* context)                                        
{                                                                  
    if (type == ZOO_SESSION_EVENT) {                               
        if (state == ZOO_CONNECTED_STATE) {                        
            connected = 1;                                         
        } else if (state == ZOO_AUTH_FAILED_STATE) {               
            zookeeper_close(zzh);                                  
            exit(1);                                               
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {           
            zookeeper_close(zzh);                                  
            exit(1);                                               
        }                                                          
    }                                                              
}                                                                  

void watcherforwget(zhandle_t *zzh, int type, int state, const char *path,
             void* context)                                               
{                                                                         
    char *p = (char *)context;                                            
    if (type == ZOO_SESSION_EVENT) {                                      
        if (state == ZOO_CONNECTED_STATE) {                               
            connected = 1;                                                
        } else if (state == ZOO_AUTH_FAILED_STATE) {                      
            zookeeper_close(zzh);                                         
            exit(1);                                                      
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {                  
            zookeeper_close(zzh);                                         
            exit(1);                                                      
        }                                                                 
    }                                                                     
    printf("Watcher context %s\n", p);                                    
}                                                                         

int main(int argc, char *argv[])
{                               
    int rc;                     
    int fd;                     
    int interest;               
    int events;                 
    struct timeval tv;          
    fd_set rfds, wfds, efds;    

    if (argc != 2) {
        fprintf(stderr, "USAGE: %s host:port\n", argv[0]);
        exit(1);                                          
    }                                                     

    FD_ZERO(&rfds);
    FD_ZERO(&wfds);
    FD_ZERO(&efds);

    zoo_set_debug_level(ZOO_LOG_LEVEL_INFO);
    zoo_deterministic_conn_order(1);        
    hostPort = argv[1];                     
    int x = 0;                              
    zh = zookeeper_init(hostPort, watcher, 30000, &myid, 0, 0);
    if (!zh) {                                                 
        return errno;                                          
    }                                                          
    while (1) {                                                
        char mypath[255];                                      
        char buffer[255];                                      
        struct Stat st;                                        
        zookeeper_interest(zh, &fd, &interest, &tv);           
        usleep(10);                                            
        memset(mypath, 0, 255);                                
        memset(buffer, 0, 255);                                
        if (connected) {                                       
            char mycontext[] = "This is context data for test";
            int len = 254;                                     
            while (x < 20) {                                   
                sprintf(mypath, "/testpath%d", x);             
                usleep(10);                                    
                rc = zoo_wget(zh, mypath, watcherforwget , mycontext, buffer, &len, &st);
                if (ZOK != rc){                                                          
                    printf("Problems %s %d\n", mypath, rc);                              
                } else if (len >= 0) {                                                   
                   buffer[len] = 0;                                                      
                   printf("Path: %s Data: %s\n", mypath, buffer);                        
                }                                                                        
                x++;                                                                     
                len = 254;                                                               
            }                                                                            
            connected++;
        }
        if (fd != -1) {
            if (interest&ZOOKEEPER_READ) {
                FD_SET(fd, &rfds);
            } else {
                FD_CLR(fd, &rfds);
            }
            if (interest&ZOOKEEPER_WRITE) {
                FD_SET(fd, &wfds);
            } else {
                FD_CLR(fd, &wfds);
            }
        } else {
            fd = 0;
        }
        FD_SET(0, &rfds);
        rc = select(fd+1, &rfds, &wfds, &efds, &tv);
        events = 0;
        if (rc > 0) {
            if (FD_ISSET(fd, &rfds)) {
                    events |= ZOOKEEPER_READ;
            }
            if (FD_ISSET(fd, &wfds)) {
                events |= ZOOKEEPER_WRITE;
            }
        }
        zookeeper_process(zh, events);
        if (2 == connected ) {
            // We created the nodes, so we will exit now
            zookeeper_close(zh);
            break;
        }
    }
    return 0;
}

In this example below (zoo_data_watches.c) we demonstrate a simple way to watch nodes in the zookeeper                                          
server. Twenty nodes with path /testpath0, /testpath1, /testpath2, /testpath3, .....,                                      
/testpath19 will be watches for any changes in their values.                                                               
We will use zookeeper synchronus API to watch the nodes. As soon                                                           
as our client enters connected state, we start putting watches for the nodes.                                              

Compile the below code as shown below:
$gcc -o testzk1 zoo_data_watches.c -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/include -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/generated -L \
/home/yourdir/packages/zookeeper-3.3.6/src/c/.libs/ -lzookeeper_mt
 *                                                               
Now you run the example as shown below:                          
./testzk1 127.0.0.1:22181  # Assuming zookeeper server is listening on port
22181 and IP 127.0.0.1                                    

#include <unistd.h>
#include <sys/select.h>
#include <sys/time.h>  
#include <time.h>      
#include <stdlib.h>    
#include <stdio.h>     
#include <string.h>    
#include <errno.h>     

#include <zookeeper.h>
static const char *hostPort;
static zhandle_t *zh;       
static clientid_t myid;     
static int connected;       
static char mycontext[] = "This is context data for test";

void watcher(zhandle_t *zzh, int type, int state, const char *path,
             void* context)                                        
{                                                                  
    if (type == ZOO_SESSION_EVENT) {                               
        if (state == ZOO_CONNECTED_STATE) {                        
            connected = 1;                                         
        } else if (state == ZOO_AUTH_FAILED_STATE) {               
            zookeeper_close(zzh);                                  
            exit(1);                                               
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {           
            zookeeper_close(zzh);                                  
            exit(1);                                               
        }                                                          
    }                                                              
}                                                                  

void watcherforwget(zhandle_t *zzh, int type, int state, const char *path,
             void* context)                                               
{                                                                         
    char buffer[255];                                                     
    int len, rc;                                                          
    struct Stat st;                                                       
    char *p = (char *)context;                                            
    if (type == ZOO_SESSION_EVENT) {                                      
        if (state == ZOO_CONNECTED_STATE) {                               
            return;                                                       
        } else if (state == ZOO_AUTH_FAILED_STATE) {                      
            zookeeper_close(zzh);                                         
            exit(1);                                                      
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {                  
            zookeeper_close(zzh);                                         
            exit(1);                                                      
        }                                                                 
    } else if (type == ZOO_CHANGED_EVENT) {                               
        printf("Data changed for %s \n", path);                           
        len = 254;                                                        
        //get the changed data and set an watch again                     
        rc = zoo_wget(zh, path, watcherforwget , mycontext, buffer, &len, &st);
        if (ZOK != rc){                                                        
            printf("Problems %s %d\n", path, rc);                              
        } else if (len >= 0) {                                                 
           buffer[len] = 0;                                                    
           printf("Path: %s changed data: %s\n", path, buffer);                
        }                                                                      
    }                                                                          

    printf("Watcher context %s\n", p);
}                                     

int main(int argc, char *argv[])
{                               
    int rc;                     
    int fd;                     
    int interest;               
    int events;                 
    struct timeval tv;          
    fd_set rfds, wfds, efds;    

    if (argc != 2) {
        fprintf(stderr, "USAGE: %s host:port\n", argv[0]);
        exit(1);                                          
    }                                                     

    FD_ZERO(&rfds);
    FD_ZERO(&wfds);
    FD_ZERO(&efds);

    zoo_set_debug_level(ZOO_LOG_LEVEL_INFO);
    zoo_deterministic_conn_order(1);        
    hostPort = argv[1];                     
    int x = 0;                              
    zh = zookeeper_init(hostPort, watcher, 30000, &myid, 0, 0);
    if (!zh) {                                                 
        return errno;                                          
    }                                                          
    while (1) {                                                
        char mypath[255];                                      
        char buffer[255];                                      
        struct Stat st;                                        
        zookeeper_interest(zh, &fd, &interest, &tv);           
        usleep(10);                                            
        memset(mypath, 0, 255);                                
        memset(buffer, 0, 255);                                
        if (connected) {                                       
            //Put the watches for the nodes                    
            int len = 254;                                     
            while (x < 20) {                                   
                sprintf(mypath, "/testpath%d", x);             
                usleep(10);                                    
                rc = zoo_wget(zh, mypath, watcherforwget , mycontext, buffer, &len, &st);
                if (ZOK != rc){                                                          
                    printf("Problems %s %d\n", mypath, rc);                              
                } else if (len >= 0) {                                                   
                   buffer[len] = 0;                                                      
                   printf("Path: %s Data: %s\n", mypath, buffer);
                }
                x++;
                len = 254;
            }
            connected++;
        }
        if (fd != -1) {
            if (interest&ZOOKEEPER_READ) {
                FD_SET(fd, &rfds);
            } else {
                FD_CLR(fd, &rfds);
            }
            if (interest&ZOOKEEPER_WRITE) {
                FD_SET(fd, &wfds);
            } else {
                FD_CLR(fd, &wfds);
            }
        } else {
            fd = 0;
        }
        FD_SET(0, &rfds);
        rc = select(fd+1, &rfds, &wfds, &efds, &tv);
        events = 0;
        if (rc > 0) {
            if (FD_ISSET(fd, &rfds)) {
                    events |= ZOOKEEPER_READ;
            }
            if (FD_ISSET(fd, &wfds)) {
                events |= ZOOKEEPER_WRITE;
            }
        }
        zookeeper_process(zh, events);
    }
    return 0;
}


In this example below (zoo_data_watches.c) we demonstrate a simple way to watch the appearance of a node
in the server. It will also watch if the node is deleted after it was created.

In this program we will watch for the appearance and deletion of a node
"/testforappearance". Once we create the node from another program, the watch
event will be sent to the client and the watcher routine woll be called.    
 *                                                                          

Compile the below code as shown below:
$gcc -o testzk1 zoo_exist_watch.c -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/include -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/generated -L \
/home/yourdir/packages/zookeeper-3.3.6/src/c/.libs/ -lzookeeper_mt
 *                                                               
./testzk1 127.0.0.1:22181  # Assuming zookeeper server is listening on port 22181 and
IP 127.0.0.1                                                                         

#include <unistd.h>
#include <sys/select.h>
#include <sys/time.h> 
#include <time.h>     
#include <stdlib.h>   
#include <stdio.h>    
#include <string.h>   
#include <errno.h>    

#include <zookeeper.h>
static const char *hostPort;
static zhandle_t *zh;      
static clientid_t myid;    
static int connected;      
static char mycontext[] = "This is context data for test";

void watcher(zhandle_t *zzh, int type, int state, const char *path,
             void* context)                                       
{                                                                 
    if (type == ZOO_SESSION_EVENT) {                              
        if (state == ZOO_CONNECTED_STATE) {                       
            connected = 1;                                        
        } else if (state == ZOO_AUTH_FAILED_STATE) {              
            zookeeper_close(zzh);                                 
            exit(1);                                              
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {          
            zookeeper_close(zzh);                                 
            exit(1);                                              
        }                                                         
    }                                                             
}                                                                 

void watchexistence(zhandle_t *zzh, int type, int state, const char *path,
             void* context)                                              
{                                                                        
    static struct Stat st;                                               
    int rc;                                                              

    if (type == ZOO_SESSION_EVENT) {
        if (state == ZOO_CONNECTED_STATE) {
            return;                       
        } else if (state == ZOO_AUTH_FAILED_STATE) {
            zookeeper_close(zzh);                  
            exit(1);                               
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {
            zookeeper_close(zzh);                      
            exit(1);                                   
        }                                              
    } else if (type == ZOO_CREATED_EVENT) {            
        printf("Node appeared %s, now Let us watch for its delete \n", path);
        rc = zoo_wexists(zh, path,                                          
                watchexistence , mycontext, &st);                           
        if (ZOK != rc){                                                     
            printf("Problems  %d\n", rc);                                   
        }                                                                   
    } else if (type == ZOO_DELETED_EVENT) {                                 
        printf("Node deleted %s, now Let us watch for its creation \n", path);
        rc = zoo_wexists(zh, path,                                           
                watchexistence , mycontext, &st);                            
        if (ZOK != rc){                                                      
            printf("Problems  %d\n", rc);                                    
        }                                                                    
    }                                                                        
}                                                                            

int main(int argc, char *argv[])
{                              
    int rc;                    
    int fd;                    
    int interest;              
    int events;                
    struct timeval tv;         
    fd_set rfds, wfds, efds;   

    if (argc != 2) {
        fprintf(stderr, "USAGE: %s host:port\n", argv[0]);
        exit(1);                                         
    }                                                    

    FD_ZERO(&rfds);
    FD_ZERO(&wfds);
    FD_ZERO(&efds);

    zoo_set_debug_level(ZOO_LOG_LEVEL_INFO);
    zoo_deterministic_conn_order(1);       
    hostPort = argv[1];                    

    zh = zookeeper_init(hostPort, watcher, 30000, &myid, 0, 0);
    if (!zh) {                                                
        return errno;                                         
    }                                                         

    while (1) {
        static struct Stat st;

        zookeeper_interest(zh, &fd, &interest, &tv);
        usleep(10);                                
        if (connected == 1) {                      
            // watch existence of the node         
            usleep(10);                            
            rc = zoo_wexists(zh, "/testforappearance",
                    watchexistence , mycontext, &st);
            if (ZOK != rc){
                printf("Problems  %d\n", rc);
            }
            connected++;
        }
        if (fd != -1) {
            if (interest & ZOOKEEPER_READ) {
                FD_SET(fd, &rfds);
            } else {
                FD_CLR(fd, &rfds);
            }
            if (interest & ZOOKEEPER_WRITE) {
                FD_SET(fd, &wfds);
            } else {
                FD_CLR(fd, &wfds);
            }
        } else {
            fd = 0;
        }
        FD_SET(0, &rfds);
        rc = select(fd+1, &rfds, &wfds, &efds, &tv);
        events = 0;
        if (rc > 0) {
            if (FD_ISSET(fd, &rfds)) {
                    events |= ZOOKEEPER_READ;
            }
            if (FD_ISSET(fd, &wfds)) {
                events |= ZOOKEEPER_WRITE;
            }
        }
        zookeeper_process(zh, events);
    }
    return 0;
}


In this example below(zoo_children_watch.c) we demonstrate a simple way to monitot the appearance of children znode in a path
in the server. We will check znode /testpath1 for its children and will examine if any children is
added or deleted under this path.

Compile the below code as shown below:
$gcc -o testzk1 zoo_childten_watch.c -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/include -I \
/home/yourdir/packages/zookeeper-3.3.6/src/c/generated -L \
/home/yourdir/packages/zookeeper-3.3.6/src/c/.libs/ -lzookeeper_mt
*
Make sure that your LD_LIBRARY_PATH includes the zookeeper C libraries to run the example. Before
you run the example, you have to configure and run the zookeeper server. Please go through the
zookeeper wiki how to do that. Now you run the example as shown below:
./testzk1 127.0.0.1:22181 # Assuming zookeeper server is listening on port 22181 and IP 127.0.0.1

#include <unistd.h>
#include <sys/select.h>
#include <sys/time.h>  
#include <time.h>      
#include <stdlib.h>    
#include <stdio.h>     
#include <string.h>    
#include <errno.h>     

#include <zookeeper.h>
static const char *hostPort;
static zhandle_t *zh;       
static clientid_t myid;     
static int connected;       
static char mycontext[] = "This is context data for test";

void watcher(zhandle_t *zzh, int type, int state, const char *path,
             void* context)                                        
{                                                                  
    if (type == ZOO_SESSION_EVENT) {                               
        if (state == ZOO_CONNECTED_STATE) {                        
            connected = 1;                                         
        } else if (state == ZOO_AUTH_FAILED_STATE) {               
            zookeeper_close(zzh);                                  
            exit(1);                                               
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {           
            zookeeper_close(zzh);                                  
            exit(1);                                               
        }                                                          
    }                                                              
}                                                                  

void watchchildren(zhandle_t *zzh, int type, int state, const char *path,
             void* context)                                              
{                                                                        
    struct String_vector str;                                            
    int rc;                                                              

    printf("The event path %s, event type %d\n", path, type);
    if (type == ZOO_SESSION_EVENT) {                         
        if (state == ZOO_CONNECTED_STATE) {                  
            return;                                          
        } else if (state == ZOO_AUTH_FAILED_STATE) {         
            zookeeper_close(zzh);                            
            exit(1);                                         
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {     
            zookeeper_close(zzh);                            
            exit(1);                                         
        }                                                    
    }                                                        
    // Put the watch again                                   
    rc = zoo_wget_children(zh, "/testpath1",                 
            watchchildren , mycontext, &str);                
    if (ZOK != rc){                                          
        printf("Problems  %d\n", rc);                        
    } else {                                                 
        int i = 0;                                           
        while (i < str.count) {                              
            printf("Children %s\n", str.data[i++]);          
        }                                                    
        if (str.count) {                                     
            deallocate_String_vector(&str);                  
        }                                                    
    }                                                        
}                                                            

int main(int argc, char *argv[])
{                               
    int rc;                     
    int fd;                     
    int interest;               
    int events;                 
    struct timeval tv;          
    fd_set rfds, wfds, efds;    

    if (argc != 2) {
        fprintf(stderr, "USAGE: %s host:port\n", argv[0]);
        exit(1);                                          
    }                                                     

    FD_ZERO(&rfds);
    FD_ZERO(&wfds);
    FD_ZERO(&efds);

    zoo_set_debug_level(ZOO_LOG_LEVEL_INFO);
    zoo_deterministic_conn_order(1);        
    hostPort = argv[1];                     

    zh = zookeeper_init(hostPort, watcher, 30000, &myid, 0, 0);
    if (!zh) {                                                 
        return errno;                                          
    }                                                          

    while (1) {
        zookeeper_interest(zh, &fd, &interest, &tv);
        usleep(10);                                 
        if (connected == 1) {                       
            struct String_vector str;               

            usleep(10);
            // watch existence of the node
            rc = zoo_wget_children(zh, "/testpath1", 
                    watchchildren , mycontext, &str);
            if (ZOK != rc){                          
                printf("Problems  %d\n", rc);        
            } else {                                 
                int i = 0;                           
                while (i < str.count) {              
                    printf("Children %s\n", str.data[i++]);
                }
                if (str.count) {
                    deallocate_String_vector(&str);
                }
            }
            connected++;
        }
        if (fd != -1) {
            if (interest & ZOOKEEPER_READ) {
                FD_SET(fd, &rfds);
            } else {
                FD_CLR(fd, &rfds);
            }
            if (interest & ZOOKEEPER_WRITE) {
                FD_SET(fd, &wfds);
            } else {
                FD_CLR(fd, &wfds);
            }
        } else {
            fd = 0;
        }
        FD_SET(0, &rfds);
        rc = select(fd+1, &rfds, &wfds, &efds, &tv);
        events = 0;
        if (rc > 0) {
            if (FD_ISSET(fd, &rfds)) {
                    events |= ZOOKEEPER_READ;
            }
            if (FD_ISSET(fd, &wfds)) {
                events |= ZOOKEEPER_WRITE;
            }
        }
        zookeeper_process(zh, events);
    }
    return 0;
}


2 comments:

  1. Hi, thanks for the zookeeper_mt program. I m looking for a zookeeper_st client program. Can anyone help me?

    ReplyDelete
  2. Thank you.Well it was nice post and very helpful information on Big Data Hadoop Online Course

    ReplyDelete