Details
-
Question
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.7.0
-
None
-
None
Description
I'm trying to setup a basic 2-tier Flume using the Avro source/sink to communicate between tiers. I set the same compression-type on both side as the following config:
on the sink side:
agent.sinks.k1.type = avro
agent.sinks.k1.channel = c1
agent.sinks.k1.batch-size = 100
agent.sinks.k1.connect-timeout = 60000
agent.sinks.k1.request-timeout = 60000
agent.sinks.k1.reset-connection-interval = 3
agent.sinks.k1.hostname = <ip_source_host>
agent.sinks.k1.port = 4145
agent.sinks.k1.compression-type=deflate
agent.sinks.k1.compression-level = 6
agent.sinks.k1.maxIoWorkers = 8
And the same compression on the source side:
collector.sources=r1
collector.sources.r1.bind=0.0.0.0
collector.sources.r1.channels=c1
collector.sources.r1.compression-level=6
collector.sources.r1.compression-type=deflate
collector.sources.r1.port=4145
collector.sources.r1.type=avro
- But I was getting this error:
id: 0x09405401, /10.199.6.225:32780 => /10.199.6.232:4145] BOUND: /10.199.6.232:4145
24 Mar 2017 10:39:46,891 INFO New I/O worker #6 (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) - [id: 0x09405401, /10.199.6.225:32780 => /10.199.6.232:4145] CONNECTED: /10.199.6.225:32780
24 Mar 2017 10:39:46,913 INFO New I/O worker #5 (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) - [id: 0xebef3dda, /10.199.6.225:60953 :> /10.199.6.232:4145] DISCONNECTED
24 Mar 2017 10:39:46,913 INFO New I/O worker #5 (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) - [id: 0xebef3dda, /10.199.6.225:60953 :> /10.199.6.232:4145] UNBOUND
24 Mar 2017 10:39:46,913 INFO New I/O worker #5 (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) - [id: 0xebef3dda, /10.199.6.225:60953 :> /10.199.6.232:4145] CLOSED
24 Mar 2017 10:39:46,913 INFO New I/O worker #5 (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed:209) - Connection to /10.199.6.225:60953 disconnected.
24 Mar 2017 10:39:46,913 WARN New I/O worker #5 (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaught:201) - Unexpected exception from downstream.
java.nio.channels.ClosedChannelException
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.cleanUpWriteBuffer(AbstractNioWorker.java:433)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromUserCode(AbstractNioWorker.java:128)
at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.handleAcceptedSocket(NioServerSocketPipelineSink.java:99)
at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.eventSunk(NioServerSocketPipelineSink.java:36)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:779)
at org.jboss.netty.channel.Channels.write(Channels.java:725)
at org.jboss.netty.channel.Channels.write(Channels.java:686)
at org.jboss.netty.handler.codec.compression.ZlibEncoder.finishEncode(ZlibEncoder.java:380)
at org.jboss.netty.handler.codec.compression.ZlibEncoder.handleDownstream(ZlibEncoder.java:316)
at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:784)
at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54)
at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591)
at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582)
at org.jboss.netty.channel.Channels.close(Channels.java:812)
at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:197)
at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(NettyServer.java:212)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:88)
at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
at org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:493)
at org.jboss.netty.handler.codec.frame.FrameDecoder.channelClosed(FrameDecoder.java:371)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:88)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:60)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
at org.jboss.netty.channel.Channels.fireChannelClosed(Channels.java:468)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:375)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:93)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
This warning appear after the channel closed the connection and occur continuously. It only appear when config compression with "deflate"
Despite there are many continuously warnings, the data can decompress ok. Does this warning effect to my flow ? I'm using Flume v1.7