Building RPC layer in a distributed system using Netty – An introductory tutorial


In this post, I will talk about how we can build a minimal RPC layer of a distributed system using Netty. By the end of this post, readers will have some familiarity with Netty concepts, protocol buffers and how these can be put together to build an initial (somewhat rudimentary) version of messaging component in a distributed system.

Introduction to Netty

Netty is an open source framework for building a high performance networking (messaging or RPC or transport) component of a distributed system. It is built on the design principles of asynchronous programming and helps developers by abstracting away significant design, implementation and maintenance effort required for having a high performance networking layer in a system. Non-blocking, asynchronous and event driven I/O are the basic building blocks of Netty.

Why use Netty when we have Java NIO?

Let’s refresh our memory of I/O capabilities we get with JDK. The following code is a simple server written using Java I/O (old IO libraries)

public class SimpleBlockingServer {

  private final int port;

  private SimpleBlockingServer(final int port) {
    this.port = port;
  }

  public static void main(String[] args) throws Exception {
    int port = Integer.parseInt(args[0]);
    new SimpleBlockingServer(port).start();
  }

  private void start() throws Exception {
    // bind the server to specified port
    // ServerSocket will listen for new connections
    // and accept them
    final ServerSocket serverSocket = new ServerSocket(port);
    // infinite loop to handle incoming connections
    while (true) {
      // listen on the server socket for new connections and accept
      // will block until connection is available
      // socket represents a newly accepted connection
      final Socket socket = serverSocket.accept();
      System.out.println("Accepted connection from socket: " + socket);
      // create a separate thread for each incoming connection
      final RequestHandler requestHandler = new RequestHandler(socket);
      new Thread(requestHandler).start();
    }
  }
}

The full code can be seen here. The main problem with the above code is that I/O methods provided by java.net (sometimes referred to as old Java I/O) underneath use blocking system calls.

  • The accept() call will wait forever until a new connection is accepted. This implies that the above thread can’t process any other already accepted connection having some activity (for e.g, data available to read). To do this multiplexing between connections, the server simply creates a separate thread for each connection otherwise the code will not be able to handle more than one connection.
  • Creating a separate thread for each connection will not scale beyond a point when the server should be able to handle several hundred thousands of concurrent connections without any noticeable degradation in performance. With thread per connection approach, it is very likely that OS will spend much more time in context switching than doing the actual work.
  • Similarly, thread per connection approach is not the wisest approach to utilize system resources at peak workloads. What we need is a mechanism to handle several concurrent connections in fewer threads.

Java NIO (Non blocking IO) solved this problem by introducing Selector – the holy grail of high performance network I/O in Java. Selector can be used to register a set of non blocking sockets/descriptors that are monitored for any activity/events. This allows a single thread to handle multiple concurrent connections. A sample implementation can be seen here where a single thread is multiplexing between multiple non-blocking registered sockets.

Now a sophisticated implementation using Selector will require some sort of custom dispatch code (in accordance with reactor pattern) where events happening on the descriptors registered with Selector are handed off to appropriate handler thread which further uses non blocking read/write socket operations. Writing and maintaining this specialized multi-threaded networking code can be quite cumbersome especially for large scale distributed systems.

This is where Netty comes into picture. Java NIO is the foundation of non-blocking IO libraries in Netty. Netty goes one step ahead and leverages the concept of callbacks, futures, promises, event based programming paradigm to deliver a fully asynchronous network I/O library. Developers no longer have to directly work with Selector as Netty abstracts away all of that including multi-threading.

In fact, selector has its own performance disadvantages and is not the most efficient option available today. More performant variants of select()/poll() system calls are available in the form of epoll (Linux only) and kqueue (BSD only) for highly scalable multiplexing. The good news is that Netty provides wrappers over all of them.

Distributed In-Memory Partitioned Store

As part of my weekend project, I am implementing a distributed system (to run some experiments for learning purposes). The following diagram depicts the architecture as a 3-node cluster. Each node will host some data. Since the focus of this post is RPC, I have tried to show how the nodes will talk to each other (for sending/receiving messages, heartbeat pings etc) via the RPC client and service running on each endpoint — to keep diagram clean, all M*N arrows are not shown between endpoints.

Now to complement this post with actual deployable code, I developed a version of the above architecture with a simple demo service to demonstrate RPC request-response between endpoints. I call this as V0 design

  • The in-memory store is on a single node. Let’s call it data node
  • The demo service runs on all other nodes and sends/receives messages to/from data node.
    • It is fine to run the demo service on data node as well. In that case, the request/response will happen via local RPC endpoint.

V0 Design

  • In-Memory Store – A simple hash table storing Key-Value mappings on data node
  • Async thread pool – The requests received by RPC server are processed asynchronously — explained in detail further in the post
  • Demo service – Multi-threaded application that issues get/put requests via the local RPC client service
  • Netty Channel – This represents a connection between two endpoints.

Protocol

The messaging format is specified using Google Protocol buffers. The following code shows the format of RPCRequest and how it is composed of a get or put request.

message GetRequest {
  required string key = 1;
}

message PutRequest {
  required string key = 1;
  required string value = 2;
}

message RPCRequest {
  required int64 sequenceNum = 1;
  oneof KVStoreServerRequest {
    GetRequest getRequest = 2;
    PutRequest putRequest = 3;
  }
}

RPC Server

Channel and Event Loop

Channel represents a connection (at least for current discussion) and supports asynchronous and non-blocking I/O operations. A channel is assigned to a single event loop in netty. An event loop is a single threaded event executor responsible for multiplexing amongst the channels it is managing. An event loop group is a collection of such single threaded event executors. As mentioned earlier in the post, the event loop (and group) come in multiple flavors depending on what they use internally — selector, epoll or kqueue.

I have used NioEventLoopGroup which uses Java NIO Selector internally. We need to have two event loop groups for RPC server.

  • One event loop group responsible for accepting connections at the server. For each accepted connection, a child channel (SocketChannel) is created and assigned to the second event loop group.
  • The second event loop group assigns the accepted connection (represented by SocketChannel) to one of its event loop for the life of connection. All the event activity on the connection will be managed by that particular event loop.

Channel Initializer, Channel Handler and Channel Pipeline

Channel handler is powerful concept in Netty that allows us to write custom code for handling inbound and outbound events as they happen and propagate across the channel. Each channel has its own pipeline where channel handler(s) are installed by our code.

Initializer is a custom code we write to tell Netty how we would like to initialize each accepted connection (SocketChannel). This is where we specify the inbound and outbound handlers and whenever a connection is accepted, Netty will add the handlers to the pipeline of newly created channel.

Netty’s encapsulation over the bytes traveling on wire is ByteBuf (NIO buffer). We need to have appropriate encoder/decoder handlers in the pipeline to handle the conversion between ByteBuf and protobuf. Message encoders and decoders are also installed as handlers in the pipeline of a channel and Netty does a very good job at providing base classes and adapters which can easily be extended.

The flow of events in the channel through the handlers is in the same order as they are added in the pipeline. Netty takes care of differentiating inbound v/s outbound event handlers.

Bootstrap

This concept is exposed by Netty to configure and start the server by specifying the two event loop groups, socket channel initializer, handlers etc. We specify the port that server will bind to and be ready to accept connections.

Please read the code for a working implementation of RPC server using above concepts. Complete RPC protocol specification can be seen here.

Async worker pool

As you may notice in the code, the RPCRequestHandler uses Completable Future to run an async computation inside an executor pool and a callback code that sends the RPC response when future completes.

By now we should know that our channel handler code is invoked by event loop thread of Netty (Netty documentation refers to it as I/O or network thread). The handler code should be fairly quick (should not block) in order to let the I/O thread continue processing the events on the connections it is managing. This is a recommended best practice by Netty for scalability and performance.

This is where Completable Future comes into picture which allows us to use Java Future along with callback to be invoked when future completes rather than us doing future.get() and again blocking thus defeating the purpose of future. The worker pool (currently 2 threads) executes the task of talking to the in-memory store and running the get/put request. Once the task completes (implying future completes and we have the result of get/put), callback is invoked by worker pool to send the RPC response.

RPC Client

Event Loop and Channel Initializer

Unlike RPC server, we need a single event loop in RPC client to manage connections. The channel initializer is more or less same except for protobuf decoder which now converts the bytes in RPCResponse. Full working implementation can be seen here

Bootstrap

The concept of Bootstrap is also applicable in RPC client. We configure the client by providing the event loop and channel initializer. Finally, as and when needed, bootstrap is used to connect to RPC server endpoint at a particular address and port. All operations are asynchronous and thus remote connect returns a future

/**
 * Establish connection with an endpoint
 * @param endpoint {@link Endpoint} representing a peer with host * 
 * address/port
 * @return {@link ChannelFuture}
 */
ChannelFuture connectToPeer(Endpoint endpoint) {
  return bootstrap.connect(endpoint.getAddress(), endpoint.getPort());
}

RPC Response Handler, Response Listeners and Sequence Numbers

The final inbound handler that handles RPCResponse extends the basic inbound event handler implementation provided by Netty. As you may notice, the memory associated with incoming response message is not released by channelRead0() method. This is something that is taken care of by base class implementation which releases the resource after the call returns from channelRead0().

In the current implementation, as soon as the response is received, we just simply log a message (response data which also includes the original request and a sequence number). This is done within the event loop thread itself.

Every time, demo service thread makes a request via the local RPC client, it gets a new sequence number and RPC client code maintains a mapping between sequence number and corresponding listener (a sort of future but current implementation is dumb). Once the response arrives, we remove the corresponding listener from table and invoke the done() operation. I plan to improve the response listener semantics

class RPCResponseHandler extends SimpleChannelInboundHandler<KVStoreRPC.RPCResponse> {
    @Override
    public void channelRead0(ChannelHandlerContext context, KVStoreRPC.RPCResponse rpcResponse) {
      final long sequenceNum = rpcResponse.getSequenceNum();
      final RPCResponseListener listener = responseListeners.remove(sequenceNum);
      // right now just complete it in the network thread -- event loop thread
      listener.done(rpcResponse);
    }
  }

Connection Proxy

A simple wrapper over RPC Client for talking to a particular endpoint. It establishes connection and sends requests to a RPC server endpoint in the cluster. Demo service threads on a node use a single connection proxy (same physical connection to remote endpoint) to send requests.

Adding listeners to Netty’s ChannelFuture

As mentioned earlier, I/O operations (read, write, connect etc) on the channel are asynchronous. Pretty much all of them return ChannelFuture — Netty’s extension to Java Future. We can add custom listener code to this future as a callback to be invoked by the event loop thread when async operation completed. For example, the following listener is invoked in the client side once RPC request send operation completes. Similar listeners have been used in a couple of other places.

final ChannelFuture sendFuture = channel.writeAndFlush(rpcRequest);
sendFuture.addListener(new ChannelFutureListener() {
 @Override
 public void operationComplete(ChannelFuture future) throws Exception 
 {
    if (future.isSuccess()) {
      System.out.println("Request# " + sequenceNum + " sent successfully to " + future.channel().remoteAddress());
    } else {
      System.out.println("Unable to send request# " + sequenceNum);
    }
 }
});

Modules

  • Daemon is the main KVStore process that runs on each node and starts/creates every other service — currently RPC , demo and in-memory store.

Deployment and Testing

KVStore is a maven project and distribution is done in the form of tarball. I have tested this both locally and on a 3 node EC2 cluster. The 3-node deployment architecture is same as shown earlier in V0 architecture diagram. Please see detailed instructions here

Future Work

There is a lot more I want to do here. Some immediate items are:

  • Improvement to response listener
  • Service discovery support using Zookeeper or Consul.
  • Exception handling — currently there is basic resource cleanup.

More on Netty

This post is by no means a Netty cookbook. The purpose is to put something useful that can help people get started with Netty both theoretically and some initial tested distributed system implementing the concepts. I would encourage people to read Netty in Action for detailed information on Netty internals.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

Blog at WordPress.com.

Up ↑

%d bloggers like this: