Avro
  1. Avro
  2. AVRO-1001

Adding thread pool to NettyServerAvroHandler

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Duplicate
    • Affects Version/s: 1.6.1
    • Fix Version/s: None
    • Component/s: java
    • Labels:

      Description

      Request code review.

      The current NettyServer implementation processes each request/response sequentially on a single channel. Thus in the case where a second request is received from the same client while a request is still being processed, the behavior is undefined.

      This patch updates NettyServerAvroHandler.messageReceived() to process each request in a separate thread using an ExecutorService.

      1. AVRO-1001.patch
        2 kB
        Shaun Williams
      2. AVRO-1001-test.patch
        7 kB
        James Baldassari

        Issue Links

          Activity

          Hide
          Shaun Williams added a comment -

          patch for non-blocking request handling in NettyServer.

          Show
          Shaun Williams added a comment - patch for non-blocking request handling in NettyServer.
          Hide
          Shaun Williams added a comment -

          changed patch name to follow convention.

          Show
          Shaun Williams added a comment - changed patch name to follow convention.
          Hide
          James Baldassari added a comment -

          Thanks for the patch. I was surprised to see this issue because when the NettyServer is initialized it actually passes two different thread pools to the NioServerSocketChannelFactory:

          public NettyServer(Responder responder, InetSocketAddress addr) {
            this(responder, addr, new NioServerSocketChannelFactory
                 (Executors .newCachedThreadPool(), Executors.newCachedThreadPool()));
          }
          

          The first thread pool is for the Netty "boss" threads, and the second is for the Netty "worker" threads. When I dug into the Netty code I found that Netty only starts a single pair of boss and worker threads for each Netty channel. The worker thread sits in a for (;; ) loop selecting on the channel and then processing each message it receives in order. So you're correct that only one RPC can be executing concurrently on a per-channel basis. I was also able to reproduce this issue in a unit test.

          Given how Netty works, one work-around is to create a new client for each concurrent RPC that you want to execute. This is not an ideal long-term solution, and I think your idea to add a new thread pool in NettyServer is a good fix. However, the cached thread pool might not be the best implementation for all use cases. For example, cached thread pools can be problematic in high-throughput environments because they are unbounded by definition. So let's allow the ExecutorService to be passed into the NettyServer constructor if the user wants to specify a different/custom implementation. As the default implementation, the cached thread pool should be fine, as long as it can be changed if necessary.

          I'll add my unit test to your patch to verify that this change fixes the issue.

          Show
          James Baldassari added a comment - Thanks for the patch. I was surprised to see this issue because when the NettyServer is initialized it actually passes two different thread pools to the NioServerSocketChannelFactory: public NettyServer(Responder responder, InetSocketAddress addr) { this (responder, addr, new NioServerSocketChannelFactory (Executors .newCachedThreadPool(), Executors.newCachedThreadPool())); } The first thread pool is for the Netty "boss" threads, and the second is for the Netty "worker" threads. When I dug into the Netty code I found that Netty only starts a single pair of boss and worker threads for each Netty channel. The worker thread sits in a for (;; ) loop selecting on the channel and then processing each message it receives in order. So you're correct that only one RPC can be executing concurrently on a per-channel basis. I was also able to reproduce this issue in a unit test. Given how Netty works, one work-around is to create a new client for each concurrent RPC that you want to execute. This is not an ideal long-term solution, and I think your idea to add a new thread pool in NettyServer is a good fix. However, the cached thread pool might not be the best implementation for all use cases. For example, cached thread pools can be problematic in high-throughput environments because they are unbounded by definition. So let's allow the ExecutorService to be passed into the NettyServer constructor if the user wants to specify a different/custom implementation. As the default implementation, the cached thread pool should be fine, as long as it can be changed if necessary. I'll add my unit test to your patch to verify that this change fixes the issue.
          Hide
          James Baldassari added a comment -

          Actually, I wonder if AVRO-976 fixes this?

          Show
          James Baldassari added a comment - Actually, I wonder if AVRO-976 fixes this?
          Hide
          James Baldassari added a comment -

          I've verified that AVRO-976 does in fact address this issue. Just create the NettyServer like this:

          Server server = new NettyServer(
            new SpecificResponder(ProtocolInterface.class, new ProtocolImplementation()), 
            new InetSocketAddress(0), 
            new NioServerSocketChannelFactory(
              Executors.newCachedThreadPool(), Executors.newCachedThreadPool()), 
            new ExecutionHandler(Executors.newCachedThreadPool()));
          

          So I don't believe any changes are required in NettyServer to make RPCs execute concurrently, now that AVRO-976 has been committed. However, I think we may want to specify an ExecutionHandler by default in the NettyServer, or at least make the documentation very clear that if an ExecutionHandler is not specified then each connection will be able to execute only one RPC at a time. I'll clean up my unit test and submit it anyway.

          Show
          James Baldassari added a comment - I've verified that AVRO-976 does in fact address this issue. Just create the NettyServer like this: Server server = new NettyServer( new SpecificResponder(ProtocolInterface.class, new ProtocolImplementation()), new InetSocketAddress(0), new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool()), new ExecutionHandler(Executors.newCachedThreadPool())); So I don't believe any changes are required in NettyServer to make RPCs execute concurrently, now that AVRO-976 has been committed. However, I think we may want to specify an ExecutionHandler by default in the NettyServer, or at least make the documentation very clear that if an ExecutionHandler is not specified then each connection will be able to execute only one RPC at a time. I'll clean up my unit test and submit it anyway.
          Hide
          James Baldassari added a comment -

          One other thing keep in mind is that the first RPC has to complete before any any subsequent RPCs can be executed. That is, any subsequent RPCs will block until the first RPC has returned. The reason is that the Avro handshake must be completed before any other communication can proceed.

          Show
          James Baldassari added a comment - One other thing keep in mind is that the first RPC has to complete before any any subsequent RPCs can be executed. That is, any subsequent RPCs will block until the first RPC has returned. The reason is that the Avro handshake must be completed before any other communication can proceed.
          Hide
          James Baldassari added a comment -

          Here's a patch with a unit test showing that AVRO-976 is sufficient to allow concurrent execution of RPCs in NettyServer.

          Show
          James Baldassari added a comment - Here's a patch with a unit test showing that AVRO-976 is sufficient to allow concurrent execution of RPCs in NettyServer.
          Hide
          James Baldassari added a comment -

          Shaun, when you get a chance please let us know whether passing in an ExecutionHandler works for you. If so, then maybe we could resolve this issue as a duplicate of AVRO-976, although we should still commit the unit test.

          Show
          James Baldassari added a comment - Shaun, when you get a chance please let us know whether passing in an ExecutionHandler works for you. If so, then maybe we could resolve this issue as a duplicate of AVRO-976 , although we should still commit the unit test.
          Hide
          Shaun Williams added a comment -

          Yes this does indeed resolve my issue. Thanks!

          Show
          Shaun Williams added a comment - Yes this does indeed resolve my issue. Thanks!
          Hide
          James Baldassari added a comment -

          Resolving as duplicate of AVRO-976

          Show
          James Baldassari added a comment - Resolving as duplicate of AVRO-976
          Hide
          James Baldassari added a comment -

          I submitted the patch that adds a unit test for concurrent execution as AVRO-1019.

          Show
          James Baldassari added a comment - I submitted the patch that adds a unit test for concurrent execution as AVRO-1019 .

            People

            • Assignee:
              Unassigned
              Reporter:
              Shaun Williams
            • Votes:
              0 Vote for this issue
              Watchers:
              0 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development