Wednesday, May 27, 2015

An Eventbus for Python

Sometimes we need publish-subscribe kind of communications between components running in the same process. It is even better if the components don't need to know each-other as that simplifies the design of complex and highly-concurrent components. 

We may design similar systems using Queues also. But then also, coupling between components are not completely eliminated and we end up spawning threads and writing lots of complex logic ourselves. Moreover the pattern gets repeated across many pieces and ultimately making it a nightmare to weave the components together in a simpler way. 

So, Here is a Python eventbus library, geeteventbus  which is addressing exactly these problems we are facing. 

The API related documentations are available  here .

geeteventbus is inspired by the Java library guava eventbus. But it is not exactly the similar to Guava eventbus. 


To install the library, issue the below command:

$ sudo pip install geeteventbus

This is a platform independent package and so may be used on any OS where Python can be run.


Below diagram explains the architecture of the event-bus.



There are three type of components here. The publishers post events to the event-bus. The event-bus takes care of delivering the events to appropriate subscribers. The subscribers registers themselves to the event-bus and they register themselves for certain topics. Each event is associated with a topic, data and an optional ordering field. The topic of the event decides which subscribers it will be delivered to by the event-bus.

The event-bus can be synchronous or asynchronous. 

  • A synchronous event-bus delivers the events from the same threads the were posted from.
  • An asynchronous event-bus delivers the events to the subscribers from different threads. Basically, here the events are delivered from some of the executor threads run by the event-bus internally.
  • While creating the event-bus, we may declare the future subscribers to be thread-safe. In that case, the subscribers invocation won't be synchronized. 
  • In some cases, we may want the events to be processed by the subscribers in the same order as they were posted. To enforce that, event objects are created with a special ordering field. E.g. event('atopic', 'somedata for this event', 'an-ordering-key'). All the events with the ordering key "an-ordering-key" will be processed in the same order they were posted to the event-bus.
  • Multiple event-bus can be created in the same process if needed.



Basic working

  1. We create an eventbus
    from geeteventbus.eventbus import eventbus
    eb = eventbus()
    This will create an eventbus with the defaults. The default eventbus will have below characteristics:
    1. the maximum queued event limit is set to 10000
    2. number of executor thread is 8
    3. the subscribers will be called asynchronously
    4. subscibers are treated as thread-safe and hence same subscribers may be invoked simultaneously on different threads
  2. Create a subsclass of subscriber and override the process method. Create an object of this class and register it to the eventbus for receiving messages with certain topics:
    from geeteventbus.subscriber import subscriber
    from geeteventbus.eventbus import eventbus
    from geeteventbus.event import event
    
    class mysubscriber(subscriber):
        def process(self, eventobj):
            if not isinstance(eventobj, event):
                print('Invalid object type is passed.')
                return
            topic = eventobj.get_topic()
            data = eventobj.get_data()
            print('Processing event with TOPIC: %s, DATA: %s' % (topic, data))
    
    subscr = mysubscriber()
    eb.register_consumer(subscr, 'an_important_topic')
  3. Post some events to the eventbus with the topic "an_important_topic".
    from geeteventbus.event import event
    
    eobj1 = ('an_important_topic', 'This is some data for the event 1')
    eobj2 = ('an_important_topic', 'This is some data for the event 2')
    eobj3 = ('an_important_topic', 'This is some data for the event 3')
    eobj3 = ('an_important_topic', 'This is some data for the event 4')
    
    eb.post(eobj1)
    eb.post(eobj2)
    eb.post(eobj3)
    eb.post(eobj4)
  4. We may gracefully shutdown the eventbus before exiting the process
    eb.shutdown()
The complete example is below:
from time import sleep
from geeteventbus.subscriber import subscriber
from geeteventbus.eventbus import eventbus
from geeteventbus.event import event

class mysubscriber(subscriber):
    def process(self, eventobj):
        if not isinstance(eventobj, event):
            print('Invalid object type is passed.')
            return
        topic = eventobj.get_topic()
        data = eventobj.get_data()
        print('Processing event with TOPIC: %s, DATA: %s' % (topic, data))


eb = eventbus()
subscr = mysubscriber()
eb.register_consumer(subscr, 'an_important_topic')


eobj1 = event('an_important_topic', 'This is some data for the event 1')
eobj2 = event('an_important_topic', 'This is some data for the event 2')
eobj3 = event('an_important_topic', 'This is some data for the event 3')
eobj4 = event('an_important_topic', 'This is some data for the event 4')

eb.post(eobj1)
eb.post(eobj2)
eb.post(eobj3)
eb.post(eobj4)

eb.shutdown()
sleep(2)
A more detailed example is given below. A subscriber (counter_aggregator) aggregates the values for a set of counters. It registers itself to an eventbus for receiving events for the counters(topics). A set of producers update the values for the counters and post events describing the counter to the eventbus:
from threading import Lock, Thread
from time import sleep, time
from geeteventbus.eventbus import eventbus
from geeteventbus.event import event
from geeteventbus.subscriber import subscriber
from random import randint


class counter_aggregator(subscriber, Thread):
    '''
    Aggregator for a set of counters. Multiple threads updates the counts which
    are aggregated by this class and output the aggregated value periodically.
    '''
    def __init__(self, counter_names):
        Thread.__init__(self)
        self.counter_names = counter_names
        self.locks = {}
        self.counts = {}
        self.keep_running = True
        self.collect_times = {}
        for counter in counter_names:
            self.locks[counter] = Lock()
            self.counts[counter] = 0
            self.collect_times[counter] = time()

    def process(self, eobj):
        '''
        Process method calls with the event object eobj. eobj has the counter name as the topic
        and an int count as the value for the counter.
        '''
        counter_name = eobj.get_topic()
        if counter_name not in self.counter_names:
            return
        count = eobj.get_data()
        with self.locks[counter_name]:
            self.counts[counter_name] += count

    def stop(self):
        self.keep_running = False

    def __call__(self):
        '''
        Keep outputing the aggregated counts every 2 seconds
        '''
        while self.keep_running:
            sleep(2)
            for counter_name in self.counter_names:
                with self.locks[counter_name]:
                    print('Change for counter %s = %d, in last %f secs' % (counter_name,
                          self.counts[counter_name], time() - self.collect_times[counter_name]))
                    self.counts[counter_name] = 0
                    self.collect_times[counter_name] = time()
        print('Aggregator exited')


class count_producer:
    '''
    Producer for counters. Every 0.02 seconds post the "updated" value for a
    counter randomly
    '''
    def __init__(self, counters, ebus):
        self.counters = counters
        self.ebus = ebus
        self.keep_running = True
        self.num_counter = len(counters)

    def stop(self):
        self.keep_running = False

    def __call__(self):
        while self.keep_running:
            ev = event(self.counters[randint(0, self.num_counter - 1)], randint(1, 100))
            ebus.post(ev)
            sleep(0.02)
        print('producer exited')

if __name__ == '__main__':
    ebus = eventbus()
    counters = ['c1', 'c2', 'c3', 'c4']
    subcr = counter_aggregator(counters)
    producer = count_producer(counters, ebus)
    for counter in counters:
        ebus.register_consumer(subcr, counter)
    threads = []
    i = 30
    while i > 0:
        threads.append(Thread(target=producer))
        i -= 1

    aggregator_thread = Thread(target=subcr)
    aggregator_thread.start()
    for thrd in threads:
        thrd.start()
    sleep(20)
    producer.stop()
    subcr.stop()
    sleep(2)
    ebus.shutdown()