Details
Description
kafkacat successfully returns rows for the topic, but the following java source reliably fails to produce rows. I have the suspicion that I am missing some simple thing in my setup, but I have been unable to find a way out. I am using the current docker and using docker network commands to connect the processes in my cluster. The properties are:
bootstrap.servers: kafka01:9092,kafka02:9092,kafka03:9092
group.id: dhcp1
topic: dhcp
enable.auto.commit: false
auto.commit.interval.ms: 1000
session.timeout.ms 30000
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
the kafka consumer follows. One thing that I find curious is that, although I seem to successfully make the call to seekToBeginning(), when I print offsets on failure, I get large offsets for all partitions although I had expected them to be 0 or at least some small number.
Here is the code:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.Integer;
import java.lang.System;
import java.lang.Thread;
import java.lang.InterruptedException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class KConsumer {
private Properties prop;
private String topic;
private Integer polln;
private KafkaConsumer<String, String> consumer;
private String[] pna =
;
public KConsumer(String pf) throws FileNotFoundException,
IOException
public void setProperties(String p) throws FileNotFoundException,
IOException
public void setTopic(String t)
{ this.topic = t; }public String getTopic()
{ return this.topic; } public void newClient() {
System.err.println("creating consumer");
Properties kp = new Properties();
for(String p : pna) {
String v = this.prop.getProperty(p);
if(v != null)
}
//this.consumer = new KafkaConsumer<>(this.prop);
this.consumer = new KafkaConsumer<>(kp);
//this.consumer.subscribe(Collections.singletonList(this.topic));
System.err.println("subscribing to " + this.topic);
this.consumer.subscribe(Arrays.asList(this.topic));
//this.seekToBeginning();
}
public void close()
{ this.consumer.close(); this.consumer = null; } public void seekToBeginning() {
if(this.topic == null)
System.err.println("setting partition offset to beginning");
java.util.Set<TopicPartition> tps = this.consumer.assignment();
this.consumer.seekToBeginning(tps);
}
public ConsumerRecords<String,String> nextBatch()
throws KafkaException {
while(true) {
try
catch(SchemaException e)
{ System.err.println("nextBatch: " + e); }catch(KafkaException e)
{ System.err.println("nextBatch: " + e); throw e; }catch(Exception e)
{ System.err.println("nextBatch: " + e); this.consumer.close(); System.exit(1); }try
{ System.err.println("sleeping"); Thread.sleep(2000); }catch(InterruptedException e)
{ System.err.println(e); System.exit(0); } }
}
public void printBatch(ConsumerRecords<String,String> records) {
System.err.println("printing...");
Iterable<ConsumerRecord<String,String>> ri =
records.records(this.topic);
for (ConsumerRecord<String, String> record : ri)
}
public void doProcess() {
Integer n = 0;
Integer f = 0;
long total = 0;
try {
while(true) {
ConsumerRecords<String,String> r = this.nextBatch();
long count = r.count();
if(r.count() > 0)
else
{ f = f + 1; }if(f > 10)
{ System.err.printf("total %d\n", total); this.printMisc(); break; } }
} finally
}
public void printPosition(int pid) {
try
catch(IllegalArgumentException e)
{ System.err.printf("printPosition: %d %s\n", pid, e); } }
public void printMisc() {
Map<String,List<PartitionInfo>> topicMap;
List<PartitionInfo> partitionList;
System.err.println("in printMisc");
try {
topicMap = this.consumer.listTopics();
for(String key: topicMap.keySet()) {
if(key.compareTo(this.topic) != 0) continue;
System.err.printf("topic: %s\n", key);
List<PartitionInfo> pl = topicMap.get(key);
for(PartitionInfo pinf: pl) {
System.err.printf("partition %d\n", pinf.partition());
System.err.printf(" leader %s\n",
pinf.leader().host());
this.printPosition(pinf.partition());
System.err.printf(" replicas:\n");
for(Node r: pinf.replicas())
System.err.printf(" inSyncReplicas:\n");
for(Node r: pinf.inSyncReplicas()) { System.err.printf(" %s %sn", r.id(), r.host()); }
}
}
} catch (TimeoutException e)
}
public static void main(String[] args) throws FileNotFoundException,
IOException, InterruptedException {
if(args.length == 1)
else
{ System.err.println("Usage KConsumer propfile"); System.exit(1); } }
}