Description
Currently, KafkaConsumer's subscribe methods accept Collection<String> as topics to be subscribed, but a Collection may have null as its element. For example
String topic = null; Collection<String> topics = Arrays.asList(topic); consumer.subscribe(topics)
When this happens, consumer will throw a puzzling NullPointerException:
at org.apache.kafka.common.utils.Utils.utf8Length(Utils.java:245) at org.apache.kafka.common.protocol.types.Type$6.sizeOf(Type.java:248) at org.apache.kafka.common.protocol.types.ArrayOf.sizeOf(ArrayOf.java:85) at org.apache.kafka.common.protocol.types.Schema.sizeOf(Schema.java:89) at org.apache.kafka.common.protocol.types.Struct.sizeOf(Struct.java:244) at org.apache.kafka.common.requests.RequestSend.serialize(RequestSend.java:35) at org.apache.kafka.common.requests.RequestSend.<init>(RequestSend.java:29) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.request(NetworkClient.java:616) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:639) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:552) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:258) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:179) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:970) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:934)
Maybe it's better to remove null when doing subscription.