Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-3077

Unexpected exception from downstream with Avro sink+source compression (deflate)

    XMLWordPrintableJSON

Details

    • Question
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.7.0
    • None
    • Sinks+Sources
    • None

    Description

      I'm trying to setup a basic 2-tier Flume using the Avro source/sink to communicate between tiers. I set the same compression-type on both side as the following config:

      on the sink side:
      agent.sinks.k1.type = avro
      agent.sinks.k1.channel = c1
      agent.sinks.k1.batch-size = 100
      agent.sinks.k1.connect-timeout = 60000
      agent.sinks.k1.request-timeout = 60000
      agent.sinks.k1.reset-connection-interval = 3
      agent.sinks.k1.hostname = <ip_source_host>
      agent.sinks.k1.port = 4145
      agent.sinks.k1.compression-type=deflate
      agent.sinks.k1.compression-level = 6
      agent.sinks.k1.maxIoWorkers = 8

      And the same compression on the source side:
      collector.sources=r1
      collector.sources.r1.bind=0.0.0.0
      collector.sources.r1.channels=c1
      collector.sources.r1.compression-level=6
      collector.sources.r1.compression-type=deflate
      collector.sources.r1.port=4145
      collector.sources.r1.type=avro

      1. But I was getting this error:
        id: 0x09405401, /10.199.6.225:32780 => /10.199.6.232:4145] BOUND: /10.199.6.232:4145
        24 Mar 2017 10:39:46,891 INFO New I/O worker #6 (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) - [id: 0x09405401, /10.199.6.225:32780 => /10.199.6.232:4145] CONNECTED: /10.199.6.225:32780
        24 Mar 2017 10:39:46,913 INFO New I/O worker #5 (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) - [id: 0xebef3dda, /10.199.6.225:60953 :> /10.199.6.232:4145] DISCONNECTED
        24 Mar 2017 10:39:46,913 INFO New I/O worker #5 (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) - [id: 0xebef3dda, /10.199.6.225:60953 :> /10.199.6.232:4145] UNBOUND
        24 Mar 2017 10:39:46,913 INFO New I/O worker #5 (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) - [id: 0xebef3dda, /10.199.6.225:60953 :> /10.199.6.232:4145] CLOSED
        24 Mar 2017 10:39:46,913 INFO New I/O worker #5 (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed:209) - Connection to /10.199.6.225:60953 disconnected.
        24 Mar 2017 10:39:46,913 WARN New I/O worker #5 (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaught:201) - Unexpected exception from downstream.
        java.nio.channels.ClosedChannelException
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.cleanUpWriteBuffer(AbstractNioWorker.java:433)
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromUserCode(AbstractNioWorker.java:128)
        at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.handleAcceptedSocket(NioServerSocketPipelineSink.java:99)
        at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.eventSunk(NioServerSocketPipelineSink.java:36)
        at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:779)
        at org.jboss.netty.channel.Channels.write(Channels.java:725)
        at org.jboss.netty.channel.Channels.write(Channels.java:686)
        at org.jboss.netty.handler.codec.compression.ZlibEncoder.finishEncode(ZlibEncoder.java:380)
        at org.jboss.netty.handler.codec.compression.ZlibEncoder.handleDownstream(ZlibEncoder.java:316)
        at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591)
        at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:784)
        at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54)
        at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591)
        at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582)
        at org.jboss.netty.channel.Channels.close(Channels.java:812)
        at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:197)
        at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(NettyServer.java:212)
        at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:88)
        at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
        at org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:493)
        at org.jboss.netty.handler.codec.frame.FrameDecoder.channelClosed(FrameDecoder.java:371)
        at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:88)
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
        at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:60)
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
        at org.jboss.netty.channel.Channels.fireChannelClosed(Channels.java:468)
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:375)
        at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:93)
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
        at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
        at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

      This warning appear after the channel closed the connection and occur continuously. It only appear when config compression with "deflate"
      Despite there are many continuously warnings, the data can decompress ok. Does this warning effect to my flow ? I'm using Flume v1.7

      Attachments

        Activity

          People

            Unassigned Unassigned
            dulh DuLe
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: