Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-3040

Flume client can not append events batch, deflate compression is failing

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 1.6.0
    • Fix Version/s: None
    • Component/s: File Channel
    • Labels:
      None
    • Environment:

      Description

      I'm getting following Failed to send batch error at NettyAvroRpcClient#appendBatch, which seems because
      it could not compress the event if I'm not wrong.

      https://github.com/apache/flume/blob/d9c9a7dd9a6889ecf6b9dc88fb8e02ccc1cd5167/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java#L314

      the compression algorithm failure starts at https://github.com/netty/netty/blob/3.10/src/main/java/org/jboss/netty/util/internal/jzlib/Deflate.java#L1601 and ends at
      https://github.com/netty/netty/blob/3.10/src/main/java/org/jboss/netty/util/internal/jzlib/Tree.java#L127

      org.apache.flume.EventDeliveryException: Failed to send events
      	at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:389)
      	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
      	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
      	at java.lang.Thread.run(Thread.java:785)
      Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { logagg.prod.aws.cloud.divinen.net, port: 443 }: Failed to send batch
      	at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:320)
      	at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:373)
      	... 3 more
      Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: logagg.prod.aws.cloud.divinen.net, port: 443 }: Exception thrown from remote handler
      	at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:402)
      	at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:379)
      	at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:308)
      	... 4 more
      Caused by: java.util.concurrent.ExecutionException: java.lang.ArrayIndexOutOfBoundsException
      	at org.apache.avro.ipc.CallFuture.get(CallFuture.java:128)
      	at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:394)
      	... 6 more
      Caused by: java.lang.ArrayIndexOutOfBoundsException
      	at org.jboss.netty.util.internal.jzlib.Tree.d_code(Tree.java:127)
      	at org.jboss.netty.util.internal.jzlib.Deflate.compress_block(Deflate.java:637)
      	at org.jboss.netty.util.internal.jzlib.Deflate._tr_flush_block(Deflate.java:860)
      	at org.jboss.netty.util.internal.jzlib.Deflate.flush_block_only(Deflate.java:724)
      	at org.jboss.netty.util.internal.jzlib.Deflate.deflate_slow(Deflate.java:1168)
      	at org.jboss.netty.util.internal.jzlib.Deflate.deflate(Deflate.java:1601)
      	at org.jboss.netty.util.internal.jzlib.ZStream.deflate(ZStream.java:140)
      	at org.jboss.netty.handler.codec.compression.ZlibEncoder.encode(ZlibEncoder.java:282)
      	at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.doEncode(OneToOneEncoder.java:66)
      	at org.jboss.netty.handler.codec.oneone.OneToOneStrictEncoder.doEncode(OneToOneStrictEncoder.java:35)
      	at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:59)
      	at org.jboss.netty.handler.codec.compression.ZlibEncoder.handleDownstream(ZlibEncoder.java:322)
      	at org.jboss.netty.channel.Channels.write(Channels.java:725)
      	at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.doEncode(OneToOneEncoder.java:71)
      	at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:59)
      	at org.jboss.netty.channel.Channels.write(Channels.java:704)
      	at org.jboss.netty.channel.Channels.write(Channels.java:671)
      	at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:248)
      	at org.apache.avro.ipc.NettyTransceiver.writeDataPack(NettyTransceiver.java:515)
      	at org.apache.avro.ipc.NettyTransceiver.transceive(NettyTransceiver.java:476)
      	at org.apache.avro.ipc.Requestor.request(Requestor.java:147)
      	at org.apache.avro.ipc.Requestor.request(Requestor.java:129)
      	at org.apache.avro.ipc.specific.SpecificRequestor.invoke(SpecificRequestor.java:84)
      	at com.sun.proxy.$Proxy29.appendBatch(Unknown Source)
      	at org.apache.flume.api.NettyAvroRpcClient$2.call(NettyAvroRpcClient.java:353)
      	at org.apache.flume.api.NettyAvroRpcClient$2.call(NettyAvroRpcClient.java:1)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:277)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      	... 1 more
      

      My flume config is,

      client.sources=source_1 source_2 src-flume-log src-flume-metrics 
      client.channels = file-channel-1 file-metrics-1 file-flumelog-1
      client.sinks = avro-sink-1 avro-metrics-sink-1 avro-flumelog-sink-1
      
      #source definations
      client.sources.source_1.type=org.apache.flume.source.taildir.TaildirSource
      client.sources.source_1.filegroups = f1
      client.sources.source_1.fileHeader = true
      client.sources.source_1.filegroups.f1 = /u01/app/Logs/microservice1/logging12.log
      client.sources.source_1.idleTimeout=3000
      client.sources.source_1.batchSize=100
      client.sources.source_1.channels=file-channel-1
      client.sources.source_1.multiline=false
      client.sources.source_1.bufferSize=1048576
      client.sources.source_1.positionFile=/u01/app/Flume/offsets/offset_microservice1.json
      client.sources.source_1.interceptors = filterLogsWithAppId cleanupEnricher logEnricher removeEmptyElems addFields
      client.sources.source_1.interceptors.filterLogsWithAppId.type = regex_filter
      client.sources.source_1.interceptors.filterLogsWithAppId.regex = .*applicationId=MicroService1.*
      client.sources.source_1.interceptors.filterLogsWithAppId.excludeEvents = false
      client.sources.source_1.interceptors.cleanupEnricher.type=com.divinen.enricher.api.CleanupLogEventEnrichBuilder
      client.sources.source_1.interceptors.logEnricher.type=com.divinen.enricher.api.StandardSchemaLogEnrichBuilder
      client.sources.source_1.interceptors.removeEmptyElems.type=com.divinen.enricher.api.RemoveEmptyElementsLogEnrichBuilder
      client.sources.source_1.interceptors.addFields.type=com.divinen.ClientStaticInterceptor$Builder
      client.sources.source_1.interceptors.addFields.keyList=appCategory|appName|logEnv|logType
      client.sources.source_1.interceptors.addFields.valueList=x|y|prod|microservice1
      
      client.sources.source_2.type=org.apache.flume.source.taildir.TaildirSource
      client.sources.source_2.filegroups = f1
      client.sources.source_2.fileHeader = true
      client.sources.source_2.filegroups.f1 = /u01/app/Logs/microservice2/logging12.log
      client.sources.source_2.idleTimeout=3000
      client.sources.source_2.batchSize=100
      client.sources.source_2.channels=file-channel-1
      client.sources.source_2.multiline=false
      client.sources.source_2.bufferSize=1048576
      client.sources.source_2.positionFile=/u01/app/Flume/offsets/offset_microservice2.json
      client.sources.source_2.interceptors = filterLogsWithAppId cleanupEnricher logEnricher removeEmptyElems addFields
      client.sources.source_2.interceptors.filterLogsWithAppId.type = regex_filter
      client.sources.source_2.interceptors.filterLogsWithAppId.regex = .*applicationId=MicroService2.*
      client.sources.source_2.interceptors.filterLogsWithAppId.excludeEvents = false
      client.sources.source_2.interceptors.cleanupEnricher.type=com.divinen.enricher.api.CleanupLogEventEnrichBuilder
      client.sources.source_2.interceptors.logEnricher.type=com.divinen.enricher.api.StandardSchemaLogEnrichBuilder
      client.sources.source_2.interceptors.removeEmptyElems.type=com.divinen.enricher.api.RemoveEmptyElementsLogEnrichBuilder
      client.sources.source_2.interceptors.addFields.type=com.divinen.ClientStaticInterceptor$Builder
      client.sources.source_2.interceptors.addFields.keyList=appCategory|appName|logEnv|logType
      client.sources.source_2.interceptors.addFields.valueList=x|y|prod|microservice2
      
      client.sources.src-flume-log.type=org.apache.flume.source.taildir.TaildirSource
      client.sources.src-flume-log.filegroups = f1
      client.sources.src-flume-log.fileHeader = true
      client.sources.src-flume-log.filegroups.f1 =/u01/app/Flume/logs/flume.log
      client.sources.src-flume-log.idleTimeout=3000
      client.sources.src-flume-log.batchSize=100
      client.sources.src-flume-log.positionFile=/u01/app/Flume/offsets/offset_flume.json
      client.sources.src-flume-log.channels=file-flumelog-1
      client.sources.src-flume-log.interceptors = i1 
      client.sources.src-flume-log.interceptors.i1.type=com.divinen.ClientStaticInterceptor$Builder
      client.sources.src-flume-log.interceptors.i1.keyList=appCategory|appName|logEnv|logType
      client.sources.src-flume-log.interceptors.i1.valueList=x|y|prod|flume
      
      client.sources.src-flume-metrics.type = exec
      client.sources.src-flume-metrics.command = while true ; do echo  ; sleep 5; done
      client.sources.src-flume-metrics.shell = /bin/sh -c
      client.sources.src-flume-metrics.channels = file-metrics-1
      client.sources.src-flume-metrics.positionFile=/u01/app/Flume/offsets/offset_metrics.json
      client.sources.src-flume-metrics.interceptors = i1 i2
      client.sources.src-flume-metrics.interceptors.i1.type=com.divinen.ClientStaticInterceptor$Builder
      client.sources.src-flume-metrics.interceptors.i1.keyList = appCategory|appName|logEnv|logType
      client.sources.src-flume-metrics.interceptors.i1.valueList=x|y|prod|metrics
      client.sources.src-flume-metrics.interceptors.i2.type=com.divinen.FormatFlumeMetricsInterceptor$Builder
      
      #channel definations
      
      client.channels.file-channel-1.type=file
      client.channels.file-channel-1.checkpointDir=/u01/app/Flume/offsets/checkPointDir/filechannel1
      client.channels.file-channel-1.dataDirs=/u01/app/Flume/offsets/dataDir/filechannel1
      client.channels.file-channel-1.checkpointOnClose=true
      
      client.channels.file-metrics-1.type=file
      client.channels.file-metrics-1.checkpointDir=/u01/app/Flume/offsets/checkPointDir/filemetrics1
      client.channels.file-metrics-1.dataDirs=/u01/app/Flume/offsets/dataDir/filemetrics1
      client.channels.file-metrics-1.checkpointOnClose=true
      
      client.channels.file-flumelog-1.type=file
      client.channels.file-flumelog-1.checkpointDir=/u01/app/Flume/offsets/checkPointDir/fileflumelog1
      client.channels.file-flumelog-1.dataDirs=/u01/app/Flume/offsets/dataDir/fileflumelog1
      client.channels.file-flumelog-1.checkpointOnClose=true
      
      #sink definations
      
      client.sinks.avro-sink-1.type=avro
      client.sinks.avro-sink-1.hostname=logagg-prod.prod.divinen.net
      client.sinks.avro-sink-1.port=443
      client.sinks.avro-sink-1.compression-type=deflate
      client.sinks.avro-sink-1.channel=file-channel-1
      client.sinks.avro-sink-1.ssl=false
      client.sinks.avro-sink-1.trust-all-certs=false
      
      client.sinks.avro-metrics-sink-1.type=avro
      client.sinks.avro-metrics-sink-1.hostname=logagg-prod.prod.divinen.net
      client.sinks.avro-metrics-sink-1.port=443
      client.sinks.avro-metrics-sink-1.compression-type=deflate
      client.sinks.avro-metrics-sink-1.channel=file-metrics-1
      client.sinks.avro-metrics-sink-1.ssl=false
      client.sinks.avro-metrics-sink-1.trust-all-certs=false
      
      client.sinks.avro-flumelog-sink-1.type=avro
      client.sinks.avro-flumelog-sink-1.hostname=logagg-prod.prod.divinen.net
      client.sinks.avro-flumelog-sink-1.port=443
      client.sinks.avro-flumelog-sink-1.compression-type=deflate
      client.sinks.avro-flumelog-sink-1.channel=file-flumelog-1
      client.sinks.avro-flumelog-sink-1.ssl=false
      client.sinks.avro-flumelog-sink-1.trust-all-certs=false
      

      <h2>It actually works for a hours/days or so, but then the same failure to appendBatch issue, which seems to be an compression issue.</h2>

      At this time, I don't see anything in queueset,

      ll /u01/app/Flume/offsets/checkPointDir/filechannel1/queueset/
      total 0
      
      cat /u01/app/Flume/offsets/checkPointDir/filechannel1/checkpoint.meta 
      0
      4??Y?%?Z*
      *
      
      cat /u01/app/Flume/offsets/checkPointDir/filechannel1/inflighttakes 
      J?6?K??y?u.#H?
      
      cat /u01/app/Flume/offsets/checkPointDir/filechannel1/inflightputs 
      J?6?K??y?u.#H?
      

      The end of filechannel1 dataDir log-2 and log-3 are all filled up by ??????

      ???????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????
      
      

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              prayagupd prayagupd
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated: