Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-2871

avro sink reset-connection-interval cause EventDeliveryException

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.6.0
    • 1.8.1
    • Sinks+Sources
    • None

    Description

      I found avro sink use reset-connection-interval will throw this exception:

      29 Jan 2016 14:01:45,257 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver event. Exception follows.
      org.apache.flume.EventDeliveryException: Failed to send events
      at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:392)
      at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
      at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
      at java.lang.Thread.run(Thread.java:724)
      Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient

      { host: localhost, port: 58989 }

      : Failed to send batch
      at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:315)
      at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:376)
      ... 3 more
      Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient

      { host: localhost, port: 58989 }

      : Handshake timed out after 30000ms
      at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:359)
      at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:303)
      ... 4 more
      Caused by: java.util.concurrent.TimeoutException
      at java.util.concurrent.FutureTask.get(FutureTask.java:201)
      at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:357)
      ... 5 more

      and then I repace netty-3.5.12.Final.jar to netty-3.10.5.Final.jar, the avro sink works well.

      #client.conf
      agent1.channels = c1
      agent1.sources = s1
      agent1.sinks = k1

      agent1.channels = c1
      agent1.channels.c1.type = memory
      agent1.channels.c1.capacity = 10000
      agent1.channels.c1.transactionCapacity = 100

      agent1.sources.s1.type = exec
      agent1.sources.s1.command = tail -F /var/log/secure
      agent1.sources.s1.restart = true
      agent1.sources.s1.channels = c1

      agent1.sinks.k1.channel = c1
      agent1.sinks.k1.type = avro
      agent1.sinks.k1.hostname = 127.0.0.1
      agent1.sinks.k1.port = 58989
      agent1.sinks.k1.batch-size = 100
      agent1.sinks.k1.reset-connection-interval = 120
      agent1.sinks.k1.compression-type = deflate
      agent1.sinks.k1.compression-level = 6
      agent1.sinks.k1.connect-timeout = 30000
      agent1.sinks.k1.request-timeout = 30000

      #center.conf
      agent1.channels = c1
      agent1.sources = s1
      agent1.sinks = k1

      agent1.channels = c1
      agent1.channels.c1.type = memory
      agent1.channels.c1.capacity = 10000
      agent1.channels.c1.transactionCapacity = 100

      agent1.sources.s1.type = avro
      agent1.sources.s1.bind = 0.0.0.0
      agent1.sources.s1.port = 58989
      agent1.sources.s1.threads = 1 #fast failed
      agent1.sources.s1.channels = c1
      agent1.sources.s1.compression-type = deflate

      agent1.sinks.k1.type = file_roll
      agent1.sinks.k1.sink.directory = /tmp/center1
      agent1.sinks.k1.channel = c1
      agent1.sinks.k1.sink.rollInterval = 86400

      Attachments

        Activity

          People

            Unassigned Unassigned
            Assertion chenchunbin
            Votes:
            1 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: