Similar to what's happening to idle sources blocking watermark progression in downstream operators (see
FLINK-5017), the per-partition watermark mechanism in FlinkKafkaConsumer is also being blocked of progressing watermarks when a partition is idle. The watermark of idle partitions is always Long.MIN_VALUE, therefore the overall min watermark across all partitions of a consumer subtask will never proceed.
It's normally not a common case to have Kafka partitions not producing any data, but it'll probably be good to handle this as well. I think we should have a localized solution similar to
FLINK-5017 for the per-partition watermarks in AbstractFetcher.