Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-1741

consumer get always old messages

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Won't Fix
    • 0.8.1.1, 0.8.2.0
    • None
    • consumer
    • None

    Description

      every time when a consumer get a message, i have this error, and when i restart consumer i get old message knowing i specified in my consumer config to do not get old message

      my nodejs consumer code :

      var kafka = require('kafka-node');
      var HighLevelConsumer = kafka.HighLevelConsumer;
      var Offset = kafka.Offset;
      var Client = kafka.Client;
      var argv = require('optimist').argv;
      var topic = argv.topic || 'sLNzXYHLJA';
      var client = new Client('XXX.XXX.XXX:2181','consumer'+process.pid);
      var payloads = [

      {topic:topic}

      ];
      var options = {
      groupId: 'kafka-node-group',
      // Auto commit config
      autoCommit: true,
      autoCommitMsgCount: 100,
      autoCommitIntervalMs: 5000,
      // Fetch message config
      fetchMaxWaitMs: 100,
      fetchMinBytes: 1,
      fetchMaxBytes: 1024 * 10,
      fromOffset: false,
      fromBeginning: false
      };
      var consumer = new HighLevelConsumer(client, payloads, options);
      var offset = new Offset(client);

      consumer.on('message', function (message) {
      console.log(this.id, message);
      });
      consumer.on('error', function (err) {
      console.log('error', err);
      });
      consumer.on('offsetOutOfRange', function (topic) {
      console.log("------------- offsetOutOfRange ------------");
      topic.maxNum = 2;
      offset.fetch([topic], function (err, offsets)

      { var min = Math.min.apply(null, offsets[topic.topic][topic.partition]); consumer.setOffset(topic.topic, topic.partition, min); }

      );
      });

      error kafka log :

      [2014-10-31 17:13:32,173] ERROR Closing socket for /212.XXX.XXX.XXX because of error (kafka.network.Processor)
      java.nio.BufferUnderflowException
      at java.nio.Buffer.nextGetIndex(Buffer.java:498)
      at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:406)
      at kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:62)
      at kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:58)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
      at scala.collection.immutable.Range.foreach(Range.scala:141)
      at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
      at scala.collection.AbstractTraversable.map(Traversable.scala:105)
      at kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:58)
      at kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:55)
      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
      at scala.collection.immutable.Range.foreach(Range.scala:141)
      at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
      at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
      at kafka.api.OffsetCommitRequest$.readFrom(OffsetCommitRequest.scala:55)
      at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47)
      at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47)
      at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:50)
      at kafka.network.Processor.read(SocketServer.scala:450)
      at kafka.network.Processor.run(SocketServer.scala:340)
      at java.lang.Thread.run(Thread.java:745)

      Attachments

        Activity

          People

            nehanarkhede Neha Narkhede
            hamzaezzi hamza ezzi
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: