Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
None
-
None
Description
It seems that the Kafka Consumer (0.8) fails to restart a job after it failed due to a Kafka broker shutdown.
java.lang.Exception: Unable to get last offset for partitions [FetchPartition {topic=a, partition=13, offset=-915623761776}, FetchPartition {topic=b, partition=13, offset=-915623761776}, FetchPartition {topic=c, partition=13, offset=-915623761776}, FetchPartition {topic=d, partition=13, offset=-915623761776}, FetchPartition {topic=e, partition=13, offset=-915623761776}, FetchPartition {topic=f, partition=13, offset=-915623761776}, FetchPartition {topic=g, partition=13, offset=-915623761776}]. Exception for partition 13: kafka.common.NotLeaderForPartitionException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at java.lang.Class.newInstance(Class.java:442) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala) at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getLastOffset(LegacyFetcher.java:551) at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:379)
I haven't understood the cause of this issue, but I'll investigate it.