Skip to main content

Towards a Timely, Well-Balanced, Event-Driven Architecture

September 11, 2007

{cs.r.title}







In most Java applications, system performance and throughput are
adversely impacted if a receiver consumes events at a slower rate
than that which they can be dispatched.

For instance, in a financial system for monitoring trading
orders, diverse event streams (customer orders, order executions,
market data updates, etc.) might all contribute in the creation of
near-realtime views. If a particular event occurs, it might take
x milliseconds for the analytics to recompute the view. But
if a sequence of events occurs in intervals smaller than x,
the analytics won't be able to put up with the load. Somehow, we
need to provide the application to be well-conditioned to load
preventing resources from being committed when demand exceeds
capacity.

If we could assume that any type of event is consumed as fast as
it can be produced, there would be no need to separate "event
submission" from "event handling." In fact, it is the inability to
recognize this need that leads to bottlenecks that become only
visible when the system is tested under a heavy load.

On the other hand, it is often possible to design receivers in
which dealing with one event or 100 events takes roughly
the same time or, to be more accurate, in which the code does not
scale linearly with the number of events. This is because whether
it is one event or 100 events, the same number of calls to
the database, the filesystem, or to a CPU-intensive mathematical
library might still be required. And it is these operations in
which the most time must be spent.

In this article, we discuss a simple framework that forms the
basis for a powerful and flexible solution to decouple
event production and consumption while supporting a
pluggable "event-dispatching" policy.

Dispatchers, Receivers, and Events

Let's start by defining our main players: the dispatcher, the
receiver, and the events.

The Dispatcher interface provides the means for any user
to send events.


/** 
 * An object that dispatches submitted events. This interface 
 * provides a way of decoupling event submission from the 
 * mechanics of how and when each event will be handled 
**/
public interface Dispatcher<T>
{
    public void push(T event) throws NullPointerException; 
}

The Receiver interface provides the means to handle message
notifications.

/**
 * Defines an object which listens for generic events.
**/
public interface Receiver<T>
{
   /**
    * Invoked by the target dispatcher when flushing events. 
    *  
    * @param events An array of events (size at least one) 
    *    currently pushed into the dispatcher. The elements of 
    *    the array are guaranteed to be in the order in which 
    *    they were pushed into the dispatcher (insertion-order).  
   **/
   public void received(T[] events);
}

Within the context of this article, events can be any object: a
String, an Integer, a user-defined class, etc.

Attentive readers may argue that these interfaces do not capture
the problem in its entirety. In particular, the Dispatcher
interface could provide a few other signatures of the push
method to allow atomic "push multiple" functionality as well as
lifecycle functionality. For instance, termination could be
managed via a few shutdown methods as in the
java.util.concurrent.ExecutorService interface. In the
interest of simplicity, we omitted this aspect.

Direct Dispatching

Dispatcher implementations are responsible for transforming
push calls into well-regulated calls to the receiver. Their
sole role is to act as an intermediary between the source (or
sources) of events and the event handler.

In the simplest case, a dispatcher can send the submitted
message immediately in the caller's thread.


/**
 * A Dispatcher implementation which sends any submitted message 
 * immediately to the Receiver within the same thread. 
 * @param <T>
 */
public class DirectDispatcher<T> implements Dispatcher<T>
{
   private final Receiver<T> receiver;
   
   public DirectDispatcher(Receiver<T> receiver) throws 
                                             NullPointerException
   {
      if (receiver == null) throw new NullPointerException();
      this.receiver = receiver;
   }
   
   public void push(T object) throws NullPointerException
   {
      if (t == null) throw new NullPointerException(); 
      T[] array =(T[]) new Object[1];
      array[0] = object;
      this.receiver.received(array);
   }
}

DirectDispatcher effectively is just an adapter for
the interfaces Dispatcher and Receiver.
It wires up the dispatcher/receiver abstraction we just decided to
disjoin. Direct dispatching relies upon receivers handling the
events as fast as they are produced.

A more sophisticated implementation could batch events and wait
until n calls have been made before dispatching them to the
receiver:


public class DirectBatchDispatcher<T> implements Dispatcher<T>
{
   private final Receiver<T> receiver;
   private final AtomicInteger counter = new AtomicInteger();
   private final ConcurrentLinkedQueue<T> queue = 
                                  new ConcurrentLinkedQueue<T>();
   private final int batchSize;
   
   public DirectBatchDispatcher(Receiver<T> receiver, int size) 
                                 throws IllegalArgumentException,
                                        NullPointerException
                                             
   {
      if (receiver == null) throw new NullPointerException();
      if (batchSize < 1)    throw new IllegalArgumentException();
      this.receiver = receiver;
      this.batchSize = batchSize; 
   }
   
   public void push(T event) throws NullPointerException
   {
      if (event == null) throw new NullPointerException();
      queue.offer(event);
      int i = counter.incrementAndGet();
      if (i>0 && i % batchSize == 0)
      {
         T[] array = (T[]) new Object[batchSize];
         for (int j=0;j<batchSize;j++)
         {
            array[j] = queue.poll();
         }
         receiver.received(array);   
      }
   }
}

There are several limitations with the
DirectBatchDispatcher:

  • Event dispatching still occurs in the caller's thread.
  • The choice of n (size of the batches) implies some a
    priori
    knowledge of the number of incoming events and their
    distribution over time.

In its first cut, it is hard to think of many use cases that
would benefit from the DirectBatchDispatcher. The next
implementation fixes its limitations.

Asynchronous Dispatching

The implementation we want to propose needs to translate calls
from dispatchers into timely, well-regulated calls to the receiver,
so that if the dispatcher's push method is called
several times in quick succession, it translates into just one call
to the receiver. We call this implementation the
TimelyDispatcher.


public class TimelyDispatcher<T> implements Dispatcher<T>
{
  private final LinkedBlockingQueue<T> lbq = 
                                new LinkedBlockingQueue<T>();
  private final Runnable runnable;
  private final AtomicLong timeLastAction = new AtomicLong();

  private TimelyDispatcher(final Receiver<T> receiver,
                             final long checkPeriod, 
                             final long maxPeriod, 
                             final TimeUnit timeUnit)
  {
    final long checkPeriodNanos = timeUnit.toNanos(checkPeriod);
    final long maxPeriodNanos = timeUnit.toNanos(maxPeriod);
    runnable = new Runnable()
    {
      private long timeFirstTake;

      public void run()
      {
        timeLastAction.set(System.nanoTime());
        timeFirstTake = System.nanoTime();
        ArrayList<T> list = new ArrayList<T>();
        while (true)
        {
          try
          {
            T t = (list.isEmpty()) ? lbq.take() : 
                     lbq.poll(checkPeriod, timeUnit);
            long now = System.nanoTime();
            if (list.isEmpty()) 
            {
              timeFirstTake = now;
            }
            if (t!=null)
            {
              list.add(t);
            }
            if ((now - timeLastAction.get()) > checkPeriodNanos
              || (now - timeFirstTake) > maxPeriodNanos)
            {
              timeLastAction.set(now);
              T[] array = list.toArray((T[]) new Object[0]);
              receiver.received(array);
              list.clear();
            }
          } 
          catch (InterruptedException e)
          {
            // For sake of simplicity the code 
            // does not deal with interruptions.
            throw new RuntimeException(e);
          }
        }
      }
    };
  }

  // the safe way to start a thread is outside a constructor. 
  private void startThread(ThreadFactory threadFactory)
  {
    Thread t = threadFactory.newThread(runnable);
    t.start();
  }

  public void push(T t) throws NullPointerException
  {
    if (t == null) throw new NullPointerException();
    timeLastAction.set(System.nanoTime());
    lbq.offer(t);
  }

  /**
   * Constructs a new ConcurrentDispatcher. 
   *  
   * @param <T> the type of messages/events to push.
   * @param receiver the receiver implementation which will handle the events.
   * @param checkPeriod 
   * @param maxPeriod
   * @param timeUnit the unit in which checkPeriod and maxPeriod are expressed (e.g.: milliseconds)
   * @param threadFactory the ThreadFactory to create the thread responsible of wiring up dispatcher 
   *                      and receiver.
   * @return
  **/
  public static <T> TimelyDispatcher<T> newInstance(Receiver<T> receiver,
        long checkPeriod, long maxPeriod, TimeUnit timeUnit,
        ThreadFactory threadFactory) throws NullPointerException,
        IllegalArgumentException
  {
    if (receiver == null || timeUnit == null || threadFactory == null)
       throw new NullPointerException();
    if (checkPeriod <= 0 || maxPeriod <= 0 || checkPeriod >= maxPeriod)
       throw new IllegalArgumentException();
    TimelyDispatcher<T> c = new TimelyDispatcher<T>(receiver,
          checkPeriod, maxPeriod, timeUnit);
    c.startThread(threadFactory);
    return c;
  }
}

TimelyDispatcher

TimelyDispatcher is an immutable class. At
creation time, the user must specify a non-null
Receiver implementation to wire up, and a
checkPeriod and a maxPeriod. The checkPeriod
defines the time interval in which events must occur before they are
dispatched to the receiver. The maxPeriod defines the maximum
time interval for which, in the presence of events, calls to the
receiver can be delayed. Both checkPeriod and
maxPeriod are defined via the same TimeUnit argument
(e.g., milliseconds). Finally, since our implementation must create
a new thread to perform asynchronous calls, it is given a
ThreadFactory.

How the Algorithm Works

An embedded thread is responsible for monitoring the activity of
the push method. When the method is invoked, we
register the time of the call and update a thread-safe queue. We
use a LinkedBlockingQueue so that as items are added
to it, the embedded thread can take them off and store them into a
private (thread-confined) list. This list's lifecycle is
add-add-add-... , snapshot (toArray), and empty.

The embedded thread encapsulates the logic that determines when
to invoke the receiver's received method by checking
on the current nano time against the values stored in the atomic
timeLastAction and thread-private
timeLastNotification variables.

In Action

Figures 1 and 2 show how calls to the Dispatcher
are collected into smaller numbers of segmented calls to the
Receiver. The elapsed time is captured on the X axis
and each call is represented by a small multicolored dot.

<br "Asynchronous event dispatching with one source" />

Figure 1. Asynchronous event dispatching with one source

Figure 2 further enhances the problem by having two sources of
events.

<br "Asynchronous event dispatching with two sources" />
Figure 2. Asynchronous event dispatching with two
sources

Considerations and Improvements

The TimelyDispatcher is not necessarily the best
Dispatcher implementation; it is just one of many.
It should be possible to write many other interesting
implementations. For instance, we could write another that uses
two queues, so that when we write on one queue the other queue can
be drained in parallel. A volatile Boolean flag could be toggled by
the embedded thread, determining which queue is for writing and
which queue is for reading and emptying. If it were safe to assume
that the push method is always invoked by the same
thread, we would also be able to replace the two thread-safe queues
with any non-thread-safe ordered collection (e.g., ArrayList) and
verify the performance gain (if any). Furthermore, if a common
use case in our application is to handle bursts of events from
multiple threads at the same time, it could be possible to use
ThreadLocal to direct each thread to its own queue in
an attempt to reduce contention to the minimum.

While it is possible to enhance or modify the
TimelyDispatcher for specific needs, we should always
remind ourselves that writing (and testing) concurrent software is
still very hard and that it is very easy to get carried away with
potentially more sophisticated yet unnecessary implementations.
Therefore, before attempting to write a better concurrent
dispatcher, we should be absolutely sure that the available one does
not meet the needed criteria for performance and scalability.

Asynchronous Dispatching Versus the Producer/Consumer Pattern

It is important not to confuse the applicability of the
asynchronous dispatching technique with the Producer/Consumer
pattern. In the Producer/Consumer pattern, events represent
self-contained units of work (tasks) that can be tackled by any
consumer. In its simplest forms, scalability is achieved by adding
more consumers. The developer's task is therefore to identify
highly homogeneous, parallelizable activities. On the other hand, in
the dispatcher/receiver framework, events represent the trigger for
generally expensive activities to be performed. The time to perform
those activities cannot be reduced and the separation between
event production and consumption is there to prevent requests from
executing more activities than the JVM would be able to sustain. In a
way, the two approaches are orthogonal to each other, and in some
circumstances it is also possible to combine them together--for
example, the same class could act as receiver and producer at the
same time.

Conclusions

With Java SE 5 and 6, developers are given a powerful API that
significantly simplifies the tasks of writing concurrent
applications. At the same time, multi-core machines are becoming
the norm; consequently, leveraging their full power requires a deep
understanding of concurrent programming. In this article, we
presented the problem of balancing the flow of events between
dispatchers and receivers that is intrinsically linked with
concurrency. We proposed a solution and used the
java.util.concurrent API
(AtomicLong, LinkedBlockingQueue, etc.) to implement
it.

Resources

Acknowledgments

The author would like to thank Brian Goetz for his suggestions
and assistance during the writing of this article. His guidance was
particularly beneficial in reducing unnecessary complexity in the
sample code.


width="1" height="1" border="0" alt=" " />
Lorenzo Puccetti is a software developer/architect living and working in London, England.
Related Topics >> Programming   |