Skip to main content

Architecture of a Highly Scalable NIO-Based Server

February 13, 2007

{cs.r.title}



If you are asked to write a highly scalable Java-based server,
it won't take long to decide to use the Java NIO package. To get
your server running, you will probably spend a lot of time reading
blogs and tutorials to understand the thread synchronization needs
of the NIO Selector class and to deal with common
pitfalls. This article describes the basic architecture of a
connection-oriented NIO-based server. It takes a look at a
preferred threading model and discusses the basic components of
such a server.

Threading Architecture

The first and most intuitive way to implement a multi-threaded
server is to follow the thread-per-connection
approach. This is the traditional pre-Java-1.4 solution, caused by
the lack of non-blocking I/O support in older Java versions. The
thread-per-connection approach uses an exclusive worker thread for
each connection. Within the handling loop, a worker thread waits
for new incoming data, processes the request, returns the response
data, and calls the blocking socket's read method
again.


public class Server {
  private ExecutorService executors = Executors.newFixedThreadPool(10);
  private boolean isRunning = true;
  
  public static void main(String... args) throws ... {
    new Server().launch(Integer.parseInt(args[0]));
  } 

  public void launch(int port) throws ... {
    ServerSocket sso = new ServerSocket(port);
    while (isRunning) {
      Socket s = sso.accept();
      executors.execute(new Worker(s));
    }
  }

  private class Worker implements Runnable {
    private LineNumberReader in = null;
    ...

    Worker(Socket s) throws ... {
      in = new LineNumberReader(new InputStreamReader(...));
      out = ...
    }

    public void run() {
      while (isRunning) {
        try {
          // blocking read of a request (line) 
          String request = in.readLine();

          // processing the request
          ...
          String response = ...

          // return the response
          out.write(resonse);
          out.flush();
        } catch (Exception e ) { 
          ... 
        }
      }
      in.close();
      ...
    } 
  }
}

There is always a one-to-one relationship between simultaneous
client connections and the number of concurrent worker threads.
Because each connection has an associated thread waiting on the
server side, very good response times can be achieved. However,
higher loads require a higher number of running, concurrent
threads, which limits scalability. In particular, long-living
connections like persistent HTTP connections lead to a lot of
concurrent worker threads, which tend to waste their time waiting
for new client requests. In addition, hundreds or even thousands of
concurrent threads can waste a great deal of stack space. Note, for
example, that the default
Java thread stack size for Solaris/Sparc
is 512 KB.

If the server has to handle a high number of simultaneous
clients and tolerate slow, unresponsive clients, an alternative
threading architecture is needed. The thread-on-event
approach implements such requirements in a very efficient way. The
worker threads are independent from the connections and will only
be used to handle specific events. For instance, if a
data received event occurs, a worker thread will be used to
process the application-specific encoding and service tasks (or at
least to start them). Once this job is complete, the worker will be
returned to the thread pool. This approach requires performing the
socket I/O operations in a non-blocking manner. The socket's
read or write method calls have to be
non-blocking. Additionally, an event system is required; it signals
if new data is available, which in turn initiates the socket
read call. This removes the one-to-one relationship
between waiting reads and taken threads. The design of such an
event-driven I/O system is described by the Reactor
pattern
.

The Reactor Pattern

The Reactor pattern, illustrated in
Figure 1, separates the detection of events like readiness for
read
or readiness for accepting and the processing of
these events. If a readiness event occurs, an event handler will be
notified to perform the appropriate processing within dedicated
worker threads.

A NIO-based Reactor pattern implementation
Figure 1. A NIO-based Reactor pattern implementation

To participate in the event architecture, the connection's
Channel has to be registered on a
Selector. This will be done by calling the
register method. Although this method is part of the
SocketChannel, the channel will be registered on the
Selector, not the other way around.


...
SocketChannel channel = serverChannel.accept();
channel.configureBlocking(false);

// register the connection
SelectionKey sk = channel.register(selector, SelectionKey.OP_READ);
...

To detect new events, the Selector provides the
capability to ask the registered channels for their readiness
events. By calling the select method, the
Selector collects the readiness events of the
registered channels. This method call blocks until at least one
event has been occurred. In this case, the method returns the
number of connections that have become ready for I/O operations
since the last select call. The selected connections
can be retrieved by calling the Selector's selectedKey
method. This method returns a set of SelectionKey
objects, which holds the IO event status and the reference of the
connection's Channel.

A Selector is held by the Dispatcher.
This is a single-threaded active class that surrounds the
Selector. The Dispatcher is responsible
to retrieve the events and to dispatch the handling of the consumed
events to the EventHandler. Within the dispatch loop,
the Dispatcher calls the Selector's
select method to wait for new events. If at least one
event has been occurred, the method call returns and the associated
channel for each event can be acquired by calling the
selectedKeys method.


...
while (isRunning) {
  // blocking call, to wait for new readiness events
  int eventCount = selector.select(); 
 
  // get the events
  Iterator<SelectionKey> it = selector.selectedKeys().iterator();
  while (it.hasNext()) {
    SelectionKey key = it.next();
    it.remove();
    
    // readable event?
    if (key.isValid() && key.isReadable()) {
      eventHandler.onReadableEvent(key.channel());
    }
   
    // writable event? 
    if (key.isValid() && key.isWritable()) {
      key.interestOps(SelectionKey.OP_READ); // reset to read only
      eventHandler.onWriteableEvent(key.channel());
    }
    ...
  }
  ...
}

Based on an event like readiness for read or readiness
for write
, the EventHandler will be called by the
Dispatcher to process the event. The
EventHandler decodes the request data, processes the
required service activities, and encodes the response data. Because
worker threads are not forced to waste time by waiting for new
requests to open a connection, the scalability and throughput of
this approach is conceptually only limited by system resources like
CPU or memory. That said, the response times wouldn't be as good
as for the thread-per-connection approach, because of the required
thread switches and synchronization. The challenge of the
event-driven approach is therefore to minimize synchronizations and
optimize thread management, so that this overhead will be
negligible.

Component Architecture

Most highly scalable Java servers are built on the top of the
Reactor pattern. By doing this, the classes of the Reactor
pattern will be enhanced by additional classes for connection
management, buffer management, and for load balancing reasons. The
entry class of such a server is the Acceptor. This
arrangement is shown in Figure 2.

Major components of a connection-oriented server
Figure 2. Major components of a connection-oriented server

Acceptor

Every new client connection of a server will be accepted by the
single Acceptor, which is bound to the server port.
The Acceptor is a single threaded active class.
Because it is only responsible for handling the very short-running
client connection request, it is often sufficient to implement the
Acceptor using the blocking I/O model. The
Acceptor gets the handle of a new connection by
calling the ServerSocketChannel's blocking
accept method. The new connection will be registered
to a Dispatcher. After this, the connection
participates in event handling.

Because the scalability of a single Dispatcher is
limited, often a small pool of Dispatchers will be
used. One reason for this limitation is the operating-system-specific implementation of the Selector. Most popular
operating systems map a SocketChannel to a file handle
in a one-to-one relationship. Depending on the concrete system, the
maximum number of file handles per Selector is limited
in a different way.


class Acceptor implements Runnable {
  ...
  void init() {
    ServerSocketChannel serverChannel = ServerSocketChannel.open();
    serverChannel.configureBlocking(true);
    serverChannel.socket().bind(new InetSocketAddress(serverPort));
  }

  public void run() {
    while (isRunning) {
      try {
        SocketChannel channel = serverChannel.accept(); 

        Connection con = new Connection(channel, appHandler);
        dispatcherPool.nextDispatcher().register(con);  
      } catch (...) {
        ...
      }
    }
  }
}

In the example code, a Connection object holds the
SocketChannel and an application-level event handler.
These classes will be described below.

Dispatcher

By calling the Dispatcher's register
method, the SocketChannel will be registered on the
underlying Selector. Here is where the trouble comes
in. The Selector manages the registered channels
internally by using key sets. This means that by registering a
channel, an associated SelectionKey will be created
and be added to the Selector's registered key set. At the
same time, the concurrent dispatcher thread could call the
Selector's select method, which also
accesses the key set. Because the key sets are not
thread-safe, an unsynchronized registration in the context of the
Acceptor thread can lead to deadlocks and race
conditions. This can be solved by implementing the selector
guard object idiom
, which allows suspending the dispatcher
thread temporarily. See " "http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf">
How to Build a Scalable Multiplexed Server with NIO
" (PDF) for an
explanation of this approach.


class Dispatcher implements Runnable {
  private Object guard = new Object();
  …

  void register(Connection con) {
    // retrieve the guard lock and wake up the dispatcher thread
    // to register the connection's channel
    synchronized (guard) {
      selector.wakeup();  
      con.getChannel().register(selector, SelectionKey.OP_READ, con);
    }

    // notify the application EventHandler about the new connection 
    …
  }


  void announceWriteNeed(Connection con) {
    SelectionKey key = con.getChannel().keyFor(selector);
    synchronized (guard) {
      selector.wakeup();
      key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
    }
  }

  public void run() {
    while (isRunning) {
      synchronized (guard) {
        // suspend the dispatcher thead if guard is locked 
      }
      int eventCount = selector.select();

      Iterator<SelectionKey> it = selector.selectedKeys().iterator();
      while (it.hasNext()) {
        SelectionKey key = it.next(); 
        it.remove();

        // read event?
        if (key.isValid() && key.isReadable()) {
          Connection con = (Connection) key.attachment();
          disptacherEventHandler.onReadableEvent(con);
        }

        // write event?
        …
      }
    }
  }
}

After a connection has been registered, the
Selector listens for readiness events of this
connection. If a event occurs, the appropriated callback method of
the Dispatcher's event handler will be called by
passing the associated connection.

Dispatcher-Level EventHandler

The first activity performed while processing a readiness for
read
event is to call the channel's read method.
In contrast to the streaming interface, the Channel
interface requires that a read buffer has to be passed over. Often
direct-allocated ByteBuffers will be used. Direct
buffers reside in native memory, bypassing the Java heap space. By
using direct buffers, socket IO operations will be performed
without the need to create internal intermediate buffers.

Normally the read call will be performed very
quickly. Depending on the operating system, the socket read
operation often only puts a copy of the received data from the
kernel memory space into the read buffer, which resides in the user-controlled memory space.

The received data will be appended to the connection's
thread-safe read queue for further processing. Based on the
result of the I/O operation, application-specific tasks have to be
processed. Such tasks will be processed by the assigned
application-level event handler. This handler will typically called
by using a worker thread.


class DispatcherEventHandler {
  ...

  void onReadableEvent(final Connection con) {
    // get the received data 
    ByteBuffer readBuffer = allocateMemory();
    con.getChannel().read(readBuffer);
    ByteBuffer data = extractReadAndRecycleRenaming(readBuffer);

    // append it to read queue
    con.getReadQueue().add(data); 
    ...
   
    // perform further operations (encode, process, decode) 
    // by a worker thread
    if (con.getReadQueue().getSize() > 0) {
      workerPool.execute(new Runnable() {
        public void run() {
          synchronized (con) {
            con.getAppHandler().onData(con);
          }
        }
      }); 
    }
  }

  void onWriteableEvent(Connection con) {
    ByteBuffer[] data = con.getWriteQueue().drain();
    con.getChannel().write(data); // write the data
    ...

    if (con.getWriteQueue().isEmpty()) {
      if (con.isClosed()) {
        dispatcher.deregister(con);
      }

    } else {
       // there is remaining data to write
       dispatcher.announceWriteNeed(con); 
    }
  }
}

Within the application-specific tasks, data will be encoded,
services will be performed, and data will be written. By writing data,
the data to send will be appended to the write queue, and the
Dispatcher's announceWriteNeed method will be called.
This method causes the Selector to listen for
readiness for write events. If such an event occurs, the
Dispatcher-level event handler's method
onWriteableEvent will be performed. It gets the data
from the connection's write queue and performs the required write
I/O operation. Trying to write data in a direct way, by bypassing
this event approach, will end in deadlocks and race conditions.

Application-Level EventHandler

In contrast to the Dispatcher's event handler, the
application-specific event handler listens for higher-level
connection-oriented events, like connection established,
data received, or connection disconnected. The
concrete event handler design is one of the major differences
between NIO server frameworks like SEDA, MINA, or
emberIO
. Such frameworks often implement a multi-staged
architecture, where chains of event handlers can be used. This
allows adding handlers like SSLHandler or
DelayedWriteHandler, which intercept the
request/response processing. The following example shows an
application-level handler based on the xSocket framework. The xSocket
framework supports different handler interfaces that define
callback methods to be implemented by application-specific
code.


class POP3ProtocolHandler implements IConnectHandler, IDataHandler, ... {
  private static final String DELIMITER = ...
  private Mailbox mailbox = ...
  

  public static void main(String... args) throws ... {
    new MultithreadedServer(110, new POP3ProtocolHandler()).run();
  }

  public boolean onConnect(INonBlockingConnection con) throws ... {
    if (gatekeeper.isSuspiciousAddress(con.getRemoteAddress())) {
      con.setWriteTransferRate(5);  // reduce transfer: 5byte/sec
    }

    con.write("+OK My POP3-Server" + DELIMITER);
    return true;
  }

  public boolean onData(INonBlockingConnection con) throws ... {
    String request = con.readStringByDelimiter(DELIMITER);

    if (request.startsWith("QUIT")) {
      mailbox.close();
      con.write("+OK POP3 server signing off" + DELIMITER);
      con.close();

    } else if (request.startsWith("USER")) {
      this.user = request.substring(4, request.length());
      con.write("+OK enter password" + DELIMITER);


    } else if (request.startsWith("PASS")) {
      String pwd = request.substring(4, request.length());
      boolean isAuthenticated = authenticator.check(user, pwd);
      if (isAuthenticated) {
        mailbox = MailBox.openAndLock(user);
        con.write("+OK mailbox locked and ready" + DELIMITER);
      } else {
        ...
      }  
    } else if (...) {
      ...
    }
    return true;
  }
}

To ease in accessing the underlying read and write queue, the
Connection object provides several convenience
read and write methods for stream- and
channel-oriented operations.

By closing the connection, the underlying implementation
initiates a writeable event round-trip to flush the write queue.
The connection will be terminated after the remaining data has been
written. Besides such a controlled termination, connections can be
disconnected for other reasons. For instance, hardware malfunctions
could cause the termination of a TCP-based connection. Such a
situation can only be detected by performing read or write
operations on the socket, or by idle timeouts. Most NIO frameworks
provide a built-in function to handle such uncontrolled
terminations.

Conclusion

An event-driven non-blocking architecture is a fundamental layer
to implement highly efficient, scalable, and reliable servers. The
challenge is to minimize the thread synchronization overhead and to
optimize the connection/buffer management. This will be the hardest
part to program.

But there is no need to reinvent the wheel. Server frameworks
like xSocket, emberIO, SEDA, or MINA abstract the low-level event
handling and thread management to ease the creation of highly
scalable servers. Most of these server frameworks also support
features like SSL or UDP, which haven't been discussed in this
article.

Resources

width="1" height="1" border="0" alt=" " />
Gregor Roth works as a software architect at United Internet group, a leading European Internet Service Provider to which GMX, 1&1, and Web.de belong. His areas of interest include software and system architecture, enterprise architecture management, object-oriented design, distributed computing, and development methodologies.
Related Topics >> Programming   |