Details
-
Improvement
-
Status: Open
-
Major
-
Resolution: Unresolved
-
2.1.0, 2.0.0, 2.0.1, 2.1.1, 2.0.2, 2.0.3, 2.1.2, 2.0.4, 2.1.3
-
None
-
None
Description
In NettyRpcServer#NettyRpcServer constructor method, we have the following:
ServerBootstrap bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass) .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay) .childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive) .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6); preambleDecoder.setSingleDecode(true); pipeline.addLast("preambleDecoder", preambleDecoder); pipeline.addLast("preambleHandler", createNettyRpcServerPreambleHandler()); pipeline.addLast("frameDecoder", new NettyRpcFrameDecoder(maxRequestSize)); pipeline.addLast("decoder", new NettyRpcServerRequestDecoder(allChannels, metrics)); pipeline.addLast("encoder", new NettyRpcServerResponseEncoder(metrics)); } }); try { serverChannel = bootstrap.bind(this.bindAddress).sync().channel(); LOG.info("Bind to {}", serverChannel.localAddress()); } catch (InterruptedException e) { throw new InterruptedIOException(e.getMessage()); }
In build ServerBootstrap, we would configure ServerSocketChannel options and SocketChannel child options to improve rpc perfermance.These options and child options are as follows:
.option(ChannelOption.SO_BACKLOG, transportConfig.getBacklog())
.option(ChannelOption.SO_REUSEADDR, transportConfig.isReuseAddr())
.option(ChannelOption.RCVBUF_ALLOCATOR, NettyHelper.getRecvByteBufAllocator())
.option(ChannelOption.ALLOCATOR, NettyHelper.getByteBufAllocator())
.childOption(ChannelOption.SO_KEEPALIVE, transportConfig.isKeepAlive())
.childOption(ChannelOption.TCP_NODELAY, transportConfig.isTcpNoDelay())
.childOption(ChannelOption.SO_RCVBUF, 8192 * 128)
.childOption(ChannelOption.SO_SNDBUF, 8192 * 128)
.childOption(ChannelOption.ALLOCATOR, NettyHelper.getByteBufAllocator())
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
transportConfig.getBufferMin(), transportConfig.getBufferMax()))
What's more,ChannelPipeline includes NettyRpcFrameDecoder,this decorder extends ByteToMessageDecoder.ChannelPipeline is as follows:
.childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6); preambleDecoder.setSingleDecode(true); pipeline.addLast("preambleDecoder", preambleDecoder); pipeline.addLast("preambleHandler", createNettyRpcServerPreambleHandler()); pipeline.addLast("frameDecoder", new NettyRpcFrameDecoder(maxRequestSize)); pipeline.addLast("decoder", new NettyRpcServerRequestDecoder(allChannels, metrics)); pipeline.addLast("encoder", new NettyRpcServerResponseEncoder(metrics)); } });
Netty provides a convenient decoding tool class ByteToMessageDecoder , this class has accumulate bulk unpacking ability, can read bytes from the socket as much as possible, then synchronously call the decode method, decode the business object, and form a List. Finally, the traversal traverses the List and submits it to ChannelPipeline for processing.
Here we can make a small change, submit the submitted content from a single command to the entire List, which can reduce the number of pipeline execution and increase throughput. This mode has no advantage in low-concurrency scenarios, and has a significant performance boost in boost throughput in high-concurrency scenarios.
Will provide an patch and some perf-comparison for this.