Details
Description
Using a simple consumer, and reading messages using StreamIterator noticed that the consumptions suddenly stops and the lag starts building up till the consumer is restarted. Below is the code snippet
final Map<String, List<KafkaStream<byte[], byte[]>>> streamsByName = consumerConnector.createMessageStreams(topicCountMap);
ConsumerIterator<byte[], byte[]> streamIterator = streamsByName.get(topicName).get(IDX_FIRST_ITEM).iterator();
if (streamIterator.hasNext()) {
final MessageAndMetadata<byte[], byte[]> item = streamIterator.next();
...
}