Description
1. new KafkaConsumer
val groupIdString = "test1"
val props = new Properties();
props.put("bootstrap.servers", "99.12.143.240:9093");
props.put("group.id", groupIdString);
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset","earliest");
props.put("auto.commit.interval.ms", "5000");
props.put("metadata.max.age.ms","300000");
props.put("session.timeout.ms", "30000");
props.setProperty("key.deserializer", classOf[ByteArrayDeserializer].getName)
props.setProperty("value.deserializer", classOf[ByteArrayDeserializer].getName)
props.setProperty("client.id", groupIdString)
new KafkaConsumer[Array[Byte], Array[Byte]](props)
2. subscribe topic through Partten
consumer.subscribe(Pattern.compile(".*
.sh$"), consumerRebalanceListener)
3. use poll(1000) fetching messages
4. delete topic test1.sh in Kafka broker
then the consumer throw NullPointerException
Exception in thread "main" java.lang.NullPointerException
at java.util.ArrayList.addAll(Unknown Source)
at org.apache.kafka.clients.Metadata.getClusterForCurrentTopics(Metadata.java:271)
at org.apache.kafka.clients.Metadata.update(Metadata.java:185)
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleResponse(NetworkClient.java:606)
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeHandleCompletedReceive(NetworkClient.java:583)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:450)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:998)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
at TestNewConsumer$MyMirrorMakerNewConsumer.receive(TestNewConsumer.scala:146)
at TestNewConsumer$.main(TestNewConsumer.scala:38)
at TestNewConsumer.main(TestNewConsumer.scala)