Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-3073

KafkaSource EXCEPTION java.lang.IllegalStateException: Correlation id for response (1077833) does not match request (1077776)

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.7.0
    • None
    • Sinks+Sources
    • None

    Description

      my agent:kafkaSource->file channel->hdfsSink

      my.conf

      agent.sources = kafkaSource

      agent.channels = kafka2HdfsConnectionMon

      agent.sinks = hdfsSink

      agent.sources.kafkaSource.channels = kafka2HdfsConnectionMon
      agent.sinks.hdfsSink.channel = kafka2HdfsConnectionMon
      #---------kafkasource ------------------
      agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
      agent.sources.kafkaSource.kafka.bootstrap.servers = 10.2.1.23:9092

      agent.sources.kafkaSource.topic = userMon

      agent.sources.kafkaSource.groupId = flumeConsumer
      agent.sources.kafkaSource.kafka.consumer.timeout.ms = 300000

      #---------filechannel ------------------

      agent.channels.kafka2HdfsConnectionMon.type = file
      agent.channels.kafka2HdfsConnectionMon.checkpointDir = /data/filechannle_data/kafka2HdfsConnectionMon/checkpoint
      agent.channels.kafka2HdfsConnectionMon.dataDirs = /data/filechannle_data/kafka2HdfsConnectionMon/data

      #---------hdfsSink ------------------
      agent.sinks.hdfsSink.type = hdfs

      agent.sinks.hdfsSink.hdfs.path = hdfs://mycluster/connectionMon/%Y%m%d
      agent.sinks.hdfsSink.hdfs.writeFormat = Text
      agent.sinks.hdfsSink.hdfs.fileType = DataStream

      agent.sinks.hdfsSink.hdfs.rollSize = 134217720
      agent.sinks.hdfsSink.hdfs.rollCount = 100000
      agent.sinks.hdfsSink.hdfs.rollInterval = 600

      agent.sinks.hdfsSink.hdfs.filePrefix=run
      agent.sinks.hdfsSink.hdfs.fileSuffix=.data

      agent.sinks.hdfsSink.hdfs.inUserPrefix=_
      agent.sinks.hdfsSink.hdfs.inUserSuffix=

      flume-env.sh:
      JAVA_OPTS="-Xms2048m -Xmx2048m -Xss256k -Xmn1g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"

      When the flume run for a period of time(About a few hours),flume will thow this exception.
      -----------------------------------------------------------------------
      15 Mar 2017 03:56:45,687 ERROR [PollableSourceRunner-KafkaSource-kafkaSource] (org.apache.flume.source.kafka.KafkaSource.doProcess:314) - KafkaSource EXCEPTION, {}
      org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': java.nio.BufferUnderflowException
      at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
      at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
      at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
      at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:358)
      at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968)
      at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:304)
      at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
      at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
      at java.lang.Thread.run(Thread.java:745)
      15 Mar 2017 03:56:47,697 ERROR [PollableSourceRunner-KafkaSource-kafkaSource] (org.apache.flume.source.kafka.KafkaSource.doProcess:314) - KafkaSource EXCEPTION, {}
      java.lang.IllegalStateException: Correlation id for response (1077833) does not match request (1077776)
      at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
      at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
      at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
      at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:358)
      at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968)
      at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:304)
      at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
      at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
      at java.lang.Thread.run(Thread.java:745)
      15 Mar 2017 03:56:50,880 ERROR [PollableSourceRunner-KafkaSource-kafkaSource] (org.apache.flume.source.kafka.KafkaSource.doProcess:314) - KafkaSource EXCEPTION, {}
      java.lang.IllegalStateException: Correlation id for response (1077886) does not match request (1077833)
      at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
      at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
      at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
      at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968)
      at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:304)
      at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
      at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
      at java.lang.Thread.run(Thread.java:745)
      。。。。。。。。。。
      。。。。。。。。。
      -----------------------------------------------------------------------

      The flume will always produce the exception,Data can be normal to write HDFS,but these data are repeated。
      I think there is no correct offset。

      Attachments

        Activity

          People

            Unassigned Unassigned
            xuwei xuwei
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: