Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.7.4, 1.8.0
-
None
-
None
-
Flume Agent : 1.6.0
OS : Mac OS X 10.7.5 and CentOS release 6.6 2.6.32-504.1.3.el6.x86_64
avro : 1.7.4
avro-ipc : 1.7.4
JDK: 1.6.0_65 and 1.7.0-45Flume Client - NettyAvroRpcClient
flume-ng-sdk : 1.6.0
OS : Mac OS X 10.7.5
avro : 1.7.4
avro-ipc : 1.7.4
JDK: 1.6.0_65The agent config:
# Define a memory channel called ch1 on agent1 agent1.channels.ch1.type = memory agent1.channels.ch1.capacity = 10000 # Define an Avro source called avro-source1 on agent1 and tell it # to bind to 0.0.0.0:41414. Connect it to channel ch1. agent1.sources.avro-source1.channels = ch1 agent1.sources.avro-source1.type = avro agent1.sources.avro-source1.bind = 0.0.0.0 agent1.sources.avro-source1.port = 41414 # Define a logger sink that simply logs all events it receives # and connect it to the other end of the same channel. agent1.sinks.log-sink1.channel = ch1 #agent1.sinks.log-sink1.type = logger agent1.sinks.log-sink1.type = null agent1.sinks.log-sink1.batchSize = 10 # Finally, now that we've defined all of our components, tell # agent1 which ones we want to activate. agent1.channels = ch1 agent1.sources = avro-source1 agent1.sinks = log-sink1
The client config:
batch-size = 1
connect-timeout = 10s
request-timout = 10sFlume Agent : 1.6.0 OS : Mac OS X 10.7.5 and CentOS release 6.6 2.6.32-504.1.3.el6.x86_64 avro : 1.7.4 avro-ipc : 1.7.4 JDK: 1.6.0_65 and 1.7.0-45 Flume Client - NettyAvroRpcClient flume-ng-sdk : 1.6.0 OS : Mac OS X 10.7.5 avro : 1.7.4 avro-ipc : 1.7.4 JDK: 1.6.0_65 The agent config: # Define a memory channel called ch1 on agent1 agent1.channels.ch1.type = memory agent1.channels.ch1.capacity = 10000 # Define an Avro source called avro-source1 on agent1 and tell it # to bind to 0.0.0.0:41414. Connect it to channel ch1. agent1.sources.avro-source1.channels = ch1 agent1.sources.avro-source1.type = avro agent1.sources.avro-source1.bind = 0.0.0.0 agent1.sources.avro-source1.port = 41414 # Define a logger sink that simply logs all events it receives # and connect it to the other end of the same channel. agent1.sinks.log-sink1.channel = ch1 #agent1.sinks.log-sink1.type = logger agent1.sinks.log-sink1.type = null agent1.sinks.log-sink1.batchSize = 10 # Finally, now that we've defined all of our components, tell # agent1 which ones we want to activate. agent1.channels = ch1 agent1.sources = avro-source1 agent1.sinks = log-sink1 The client config: batch-size = 1 connect-timeout = 10s request-timout = 10s
Description
A handshake request is not read because connection.isConnected() == true here:
This should be an invalid state, but we see this happening in production which causes the agent to have OOM errors. Seems to happen when there are rapid client disconnects and connects or connection reset by peer and broken pipe errors.
Here is the base64 encoded form of buffers.get(0).array() when this occurs:
hqra4sRUdMD+k//Q8jUKZQCGqtrixFR0wP6T/9DyNQplAgAADGFwcGVuZA==
which appears to be the serialized handshake info.
The handshake is not read because this code is skipped:
When not read by the handshakeReader, the BinaryDecoder's position in the internal byte array is not advanced (to 19).
Causing this code:
to use index 0 of the handshake byte array as the size of the map it is expecting to deserialize.
At this point:
The size of the hash map is determined to be 1452339317379, or 640371331 when cast to an int here:
and we get an OOM error:
2015-06-29 15:30:24,590 (New I/O worker #4) [WARN - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaught(NettyServer.java:201)] Unexpected exception from downstream. java.lang.OutOfMemoryError: Java heap space at java.util.HashMap.<init>(HashMap.java:187) at java.util.HashMap.<init>(HashMap.java:199) at org.apache.avro.generic.GenericDatumReader.newMap(GenericDatumReader.java:330) at org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:239) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:139) at org.apache.avro.ipc.Responder.respond(Responder.java:124) at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:695)
Attachments
Issue Links
- blocks
-
FLUME-2731 Flume Agent throws OutOfMemoryError during load tests.
- Open