Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5849

Kafka Consumer checkpointed state may contain undefined offsets

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • None
    • 1.3.0
    • Connectors / Kafka
    • None

    Description

      This is a regression due to FLINK-4280.

      In FLINK-4280, all initial offset determination was refactored to be consolidated at the start of AbstractFetcher#runFetchLoop. However, this caused checkpoints that were triggered before the method was ever reached to contain undefined partition offsets.

      Ref:

      org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
          at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
          at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
          at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
          at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:392)
          at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:209)
          at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:173)
          at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:32)
          at org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest(KafkaConsumerTestBase.java:942)
          at org.apache.flink.streaming.connectors.kafka.Kafka09ITCase.testMultipleSourcesOnePartition(Kafka09ITCase.java:76)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:606)
          at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
          at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
          at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
          at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
          at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
          at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
          at java.util.concurrent.FutureTask.run(FutureTask.java:262)
          at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
          at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:915)
          at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:858)
          at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:858)
          at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
          at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
          at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
          at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
          at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
          at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
          at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
          at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      Caused by: java.lang.IllegalArgumentException: Restoring from a checkpoint / savepoint, but found a partition state Partition: KafkaTopicPartition{topic='manyToOneTopic', partition=2}, KafkaPartitionHandle=manyToOneTopic-2, offset=(not set) that does not have a defined offset.
          at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(KafkaConsumerThread.java:133)
          at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.<init>(Kafka09Fetcher.java:113)
          at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.createFetcher(FlinkKafkaConsumer09.java:182)
          at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:275)
          at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
          at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
          at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:668)
          at java.lang.Thread.run(Thread.java:745)
      

      Attachments

        Issue Links

          Activity

            People

              tzulitai Tzu-Li (Gordon) Tai
              tzulitai Tzu-Li (Gordon) Tai
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: