Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-3972

kafka java consumer poll returns 0 records after seekToBeginning

    XMLWordPrintableJSON

Details

    • Task
    • Status: Resolved
    • Major
    • Resolution: Invalid
    • 0.10.0.0
    • None
    • consumer
    • docker image elasticsearch:latest, kafka scala version 2.11, kafka version 0.10.0.0

    Description

      kafkacat successfully returns rows for the topic, but the following java source reliably fails to produce rows. I have the suspicion that I am missing some simple thing in my setup, but I have been unable to find a way out. I am using the current docker and using docker network commands to connect the processes in my cluster. The properties are:
      bootstrap.servers: kafka01:9092,kafka02:9092,kafka03:9092
      group.id: dhcp1
      topic: dhcp
      enable.auto.commit: false
      auto.commit.interval.ms: 1000
      session.timeout.ms 30000
      key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value.deserializer: org.apache.kafka.common.serialization.StringDeserializer

      the kafka consumer follows. One thing that I find curious is that, although I seem to successfully make the call to seekToBeginning(), when I print offsets on failure, I get large offsets for all partitions although I had expected them to be 0 or at least some small number.
      Here is the code:

      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;

      import org.apache.kafka.common.errors.TimeoutException;
      import org.apache.kafka.common.protocol.types.SchemaException;
      import org.apache.kafka.common.KafkaException;
      import org.apache.kafka.common.Node;
      import org.apache.kafka.common.PartitionInfo;
      import org.apache.kafka.common.TopicPartition;

      import java.io.FileInputStream;
      import java.io.FileNotFoundException;
      import java.io.IOException;
      import java.lang.Integer;
      import java.lang.System;
      import java.lang.Thread;
      import java.lang.InterruptedException;
      import java.util.Arrays;
      import java.util.ArrayList;
      import java.util.Collections;
      import java.util.List;
      import java.util.Map;
      import java.util.Properties;

      public class KConsumer {
      private Properties prop;
      private String topic;
      private Integer polln;
      private KafkaConsumer<String, String> consumer;
      private String[] pna =

      {ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ConsumerConfig.GROUP_ID_CONFIG, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG}

      ;

      public KConsumer(String pf) throws FileNotFoundException,
      IOException

      { this.setProperties(pf); this.newClient(); }

      public void setProperties(String p) throws FileNotFoundException,
      IOException

      { this.prop = new Properties(); this.prop.load(new FileInputStream(p)); this.topic = this.prop.getProperty("topic"); this.polln = new Integer(this.prop.getProperty("polln")); }

      public void setTopic(String t)

      { this.topic = t; }

      public String getTopic()

      { return this.topic; }

      public void newClient() {
      System.err.println("creating consumer");
      Properties kp = new Properties();
      for(String p : pna) {
      String v = this.prop.getProperty(p);
      if(v != null)

      { kp.put(p, v); }

      }
      //this.consumer = new KafkaConsumer<>(this.prop);
      this.consumer = new KafkaConsumer<>(kp);
      //this.consumer.subscribe(Collections.singletonList(this.topic));
      System.err.println("subscribing to " + this.topic);
      this.consumer.subscribe(Arrays.asList(this.topic));
      //this.seekToBeginning();
      }

      public void close()

      { this.consumer.close(); this.consumer = null; }

      public void seekToBeginning() {
      if(this.topic == null)

      { System.err.println("KConsumer: topic not set"); System.exit(1); }

      System.err.println("setting partition offset to beginning");
      java.util.Set<TopicPartition> tps = this.consumer.assignment();
      this.consumer.seekToBeginning(tps);
      }

      public ConsumerRecords<String,String> nextBatch()
      throws KafkaException {
      while(true) {
      try

      { System.err.printf("polling..."); ConsumerRecords<String,String> records = this.consumer.poll(this.polln); System.err.println("returned"); System.err.printf("record count %d\n", records.count()); return records; }

      catch(SchemaException e)

      { System.err.println("nextBatch: " + e); }

      catch(KafkaException e)

      { System.err.println("nextBatch: " + e); throw e; }

      catch(Exception e)

      { System.err.println("nextBatch: " + e); this.consumer.close(); System.exit(1); }

      try

      { System.err.println("sleeping"); Thread.sleep(2000); }

      catch(InterruptedException e)

      { System.err.println(e); System.exit(0); }

      }
      }

      public void printBatch(ConsumerRecords<String,String> records) {
      System.err.println("printing...");
      Iterable<ConsumerRecord<String,String>> ri =
      records.records(this.topic);
      for (ConsumerRecord<String, String> record : ri)

      { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }

      }

      public void doProcess() {
      Integer n = 0;
      Integer f = 0;
      long total = 0;
      try {
      while(true) {
      ConsumerRecords<String,String> r = this.nextBatch();
      long count = r.count();
      if(r.count() > 0)

      { total += count; this.printBatch(r); n = n + 1; }

      else

      { f = f + 1; }

      if(f > 10)

      { System.err.printf("total %d\n", total); this.printMisc(); break; }

      }
      } finally

      { this.consumer.close(); }

      }

      public void printPosition(int pid) {
      try

      { TopicPartition tp = new TopicPartition(this.topic, pid); long pos = this.consumer.position(tp); System.err.printf(" offset: %d\n", pos); }

      catch(IllegalArgumentException e)

      { System.err.printf("printPosition: %d %s\n", pid, e); }

      }
      public void printMisc() {
      Map<String,List<PartitionInfo>> topicMap;
      List<PartitionInfo> partitionList;

      System.err.println("in printMisc");
      try {
      topicMap = this.consumer.listTopics();
      for(String key: topicMap.keySet()) {
      if(key.compareTo(this.topic) != 0) continue;
      System.err.printf("topic: %s\n", key);
      List<PartitionInfo> pl = topicMap.get(key);
      for(PartitionInfo pinf: pl) {
      System.err.printf("partition %d\n", pinf.partition());
      System.err.printf(" leader %s\n",
      pinf.leader().host());
      this.printPosition(pinf.partition());
      System.err.printf(" replicas:\n");
      for(Node r: pinf.replicas())

      { System.err.printf(" %s %s\n", r.id(), r.host()); }
      System.err.printf(" inSyncReplicas:\n");
      for(Node r: pinf.inSyncReplicas()) { System.err.printf(" %s %sn", r.id(), r.host()); }

      }
      }
      } catch (TimeoutException e)

      { System.err.printf("printMisc: %s\n", e); //System.exit(1); }

      }

      public static void main(String[] args) throws FileNotFoundException,
      IOException, InterruptedException {
      if(args.length == 1)

      { Thread.sleep(2000); // docker network connect KConsumer kc = new KConsumer(args[0]); //kc.printMisc(); kc.doProcess(); }

      else

      { System.err.println("Usage KConsumer propfile"); System.exit(1); }

      }
      }

      Attachments

        Activity

          People

            ewencp Ewen Cheslack-Postava
            caldwe21@purdue.edu don caldwell
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: