Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5849

Kafka Consumer checkpointed state may contain undefined offsets

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Kafka Connector
    • Labels:
      None

      Description

      This is a regression due to FLINK-4280.

      In FLINK-4280, all initial offset determination was refactored to be consolidated at the start of AbstractFetcher#runFetchLoop. However, this caused checkpoints that were triggered before the method was ever reached to contain undefined partition offsets.

      Ref:

      org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
          at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
          at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
          at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
          at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:392)
          at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:209)
          at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:173)
          at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:32)
          at org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest(KafkaConsumerTestBase.java:942)
          at org.apache.flink.streaming.connectors.kafka.Kafka09ITCase.testMultipleSourcesOnePartition(Kafka09ITCase.java:76)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:606)
          at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
          at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
          at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
          at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
          at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
          at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
          at java.util.concurrent.FutureTask.run(FutureTask.java:262)
          at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
          at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:915)
          at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:858)
          at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:858)
          at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
          at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
          at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
          at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
          at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
          at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
          at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
          at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      Caused by: java.lang.IllegalArgumentException: Restoring from a checkpoint / savepoint, but found a partition state Partition: KafkaTopicPartition{topic='manyToOneTopic', partition=2}, KafkaPartitionHandle=manyToOneTopic-2, offset=(not set) that does not have a defined offset.
          at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(KafkaConsumerThread.java:133)
          at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.<init>(Kafka09Fetcher.java:113)
          at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.createFetcher(FlinkKafkaConsumer09.java:182)
          at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:275)
          at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
          at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
          at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:668)
          at java.lang.Thread.run(Thread.java:745)
      

        Issue Links

          Activity

          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          Some background context on the problem:
          In FLINK-4280, the logic that fetcher's run at the start of runFetchLoop() was changed to:

          if (isRestored) {
              // just use the offsets in the restored state as starting position
          } else {
              // find out the starting offsets based on StartupMode (either EARLIEST, LATEST, or GROUP_OFFSETS),
              // and set the partition states with the discovered start offsets so that the state has defined offsets
          }
          

          So, the change also assumed that on restore, the state should not have undefined offsets. As pointed out in the description of this JIRA, this is not true if a checkpoint was taken before runFetchLoop() was reached.

          The approaches I see in fixing this:
          1. The faster fix - also let the if (isRestored) branch handle states that don't have defined offsets.
          2. Rework the life cycle of AbstractFetcher. We should instantiate AbstractFetcher in the open() method of the UDF, and let the startup offset determining process happen in the constructor of AbstractFetcher. This assures that there will always be defined offsets when checkpointing happens.

          Option (2) will be more work, but I prefer that over (1) because it seems to be a more proper fix.
          Option (1) will lead to more complicated start position determining logic and also make it less self-contained, since for partition states with undefined offsets, we need to "fallback" to the StartupMode for that partition. This will be a problem for the LATEST startup mode - we would be using the latest record at the time the job was restored with that undefined offset, and not correctly at the time of the actual first execution of the job.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited Some background context on the problem: In FLINK-4280 , the logic that fetcher's run at the start of runFetchLoop() was changed to: if (isRestored) { // just use the offsets in the restored state as starting position } else { // find out the starting offsets based on StartupMode (either EARLIEST, LATEST, or GROUP_OFFSETS), // and set the partition states with the discovered start offsets so that the state has defined offsets } So, the change also assumed that on restore, the state should not have undefined offsets. As pointed out in the description of this JIRA, this is not true if a checkpoint was taken before runFetchLoop() was reached. The approaches I see in fixing this: 1. The faster fix - also let the if (isRestored) branch handle states that don't have defined offsets. 2. Rework the life cycle of AbstractFetcher . We should instantiate AbstractFetcher in the open() method of the UDF, and let the startup offset determining process happen in the constructor of AbstractFetcher . This assures that there will always be defined offsets when checkpointing happens. Option (2) will be more work, but I prefer that over (1) because it seems to be a more proper fix. Option (1) will lead to more complicated start position determining logic and also make it less self-contained, since for partition states with undefined offsets, we need to "fallback" to the StartupMode for that partition. This will be a problem for the LATEST startup mode - we would be using the latest record at the time the job was restored with that undefined offset , and not correctly at the time of the actual first execution of the job.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/3378

          FLINK-5849 [kafka] Move FlinkKafkaConsumer start offset determination to open()

          This PR fixes a regression due to the recently merged #2509 (FLINK-4280).
          The new start position feature added in #2509 needed to assume that on restore, all offsets are defined. This was not true, if a restored checkpoint was taken before the fetcher was ever initialized or run.

          This PR fixes this by changing the following:
          1. Move the start position determination logic to `open()`. This assures that when `snapshotState()` is called, we will always have defined offsets.
          2. Introduce special "magic offset values" to represent that a partition is to be started from either `EARLIEST`, `LATEST`, or `GROUP_OFFSETS`. These values are set as placeholders in `open()`. The consumer follows a lazy evaluation approach to only replace these magic values with actual offsets when the fetcher actually starts running.

          Therefore, with this PR, if a checkpoint happens before a fetcher fully starts consuming all of its subscribed partitions, it will at least contain the "magic offset value" in the state, instead of an undefined offset like before.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-5849

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3378.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3378


          commit 7e7bf1d106d4dc0d24fa6746e94ccdadbc06088e
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-02-21T15:05:32Z

          FLINK-5849 [kafka] Move FlinkKafkaConsumer start offset determination to open()


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/3378 FLINK-5849 [kafka] Move FlinkKafkaConsumer start offset determination to open() This PR fixes a regression due to the recently merged #2509 ( FLINK-4280 ). The new start position feature added in #2509 needed to assume that on restore, all offsets are defined. This was not true, if a restored checkpoint was taken before the fetcher was ever initialized or run. This PR fixes this by changing the following: 1. Move the start position determination logic to `open()`. This assures that when `snapshotState()` is called, we will always have defined offsets. 2. Introduce special "magic offset values" to represent that a partition is to be started from either `EARLIEST`, `LATEST`, or `GROUP_OFFSETS`. These values are set as placeholders in `open()`. The consumer follows a lazy evaluation approach to only replace these magic values with actual offsets when the fetcher actually starts running. Therefore, with this PR, if a checkpoint happens before a fetcher fully starts consuming all of its subscribed partitions, it will at least contain the "magic offset value" in the state, instead of an undefined offset like before. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-5849 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3378.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3378 commit 7e7bf1d106d4dc0d24fa6746e94ccdadbc06088e Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-02-21T15:05:32Z FLINK-5849 [kafka] Move FlinkKafkaConsumer start offset determination to open()
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3378

          Rebased on `master`.

          Note about changes to partition assignment logic in deleted lines 538 - 553 and added lines 563 -565 of `FlinkKafkaConsumerBase`:
          The change is irrelevant to this issue, but something I stumbled across when touching that part of the code. Problems:

          1. The `KafkaConsumerPartitionAssignmentTest` was testing a no-longer used `assignPartitions` method, so the tests actually never covered the actual behaviour.

          2. Previously, the partition assignment was changed from using the "modulo on KafkaTopicPartition hashes" approach to "pre-sorting the partition list and round-robin assigning". This change should actually breaks the tests in `KafkaConsumerPartitionAssignmentTest`, but didn't because as mentioned above, the tests were testing an unused method. The current approach will also be problematic for dynamically growing subscribed partition lists, because the sorting order will change as the list grows with newly discovered partitions.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3378 Rebased on `master`. Note about changes to partition assignment logic in deleted lines 538 - 553 and added lines 563 -565 of `FlinkKafkaConsumerBase`: The change is irrelevant to this issue, but something I stumbled across when touching that part of the code. Problems: 1. The `KafkaConsumerPartitionAssignmentTest` was testing a no-longer used `assignPartitions` method, so the tests actually never covered the actual behaviour. 2. Previously, the partition assignment was changed from using the "modulo on KafkaTopicPartition hashes" approach to "pre-sorting the partition list and round-robin assigning". This change should actually breaks the tests in `KafkaConsumerPartitionAssignmentTest`, but didn't because as mentioned above, the tests were testing an unused method. The current approach will also be problematic for dynamically growing subscribed partition lists, because the sorting order will change as the list grows with newly discovered partitions.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3378#discussion_r103196367

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java —
          @@ -330,8 +315,49 @@ public void cancel() {
          public void open(Configuration configuration) {
          — End diff –

          I wonder if it makes sense to move the method above the run() method.
          Then its more logical going through the source code.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3378#discussion_r103196367 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java — @@ -330,8 +315,49 @@ public void cancel() { public void open(Configuration configuration) { — End diff – I wonder if it makes sense to move the method above the run() method. Then its more logical going through the source code.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3378#discussion_r103197252

          — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java —
          @@ -42,13 +43,7 @@

          import java.io.Serializable;
          import java.lang.reflect.Field;
          -import java.util.ArrayList;
          -import java.util.Arrays;
          -import java.util.Collections;
          -import java.util.HashMap;
          -import java.util.HashSet;
          -import java.util.List;
          -import java.util.Set;
          +import java.util.*;
          — End diff –

          Star imports are not wanted in Flink

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3378#discussion_r103197252 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java — @@ -42,13 +43,7 @@ import java.io.Serializable; import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; — End diff – Star imports are not wanted in Flink
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3378#discussion_r103199655

          — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java —
          @@ -42,13 +43,7 @@

          import java.io.Serializable;
          import java.lang.reflect.Field;
          -import java.util.ArrayList;
          -import java.util.Arrays;
          -import java.util.Collections;
          -import java.util.HashMap;
          -import java.util.HashSet;
          -import java.util.List;
          -import java.util.Set;
          +import java.util.*;
          — End diff –

          Yikes, second time :/
          I think there's a settings to disable star imports in Intellij, will try to use it!

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3378#discussion_r103199655 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java — @@ -42,13 +43,7 @@ import java.io.Serializable; import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; — End diff – Yikes, second time :/ I think there's a settings to disable star imports in Intellij, will try to use it!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3378#discussion_r103199733

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java —
          @@ -330,8 +315,49 @@ public void cancel() {
          public void open(Configuration configuration) {
          — End diff –

          Makes sense, will change this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3378#discussion_r103199733 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java — @@ -330,8 +315,49 @@ public void cancel() { public void open(Configuration configuration) { — End diff – Makes sense, will change this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3378

          Thanks for the review @rmetzger!
          For the moving of `open()` to before `run()`, I've included that as the commit tagged with FLINK-5849.
          For the star import fix, I'm going to include that within a follow-up hotfix that cleansup all Flink Kafka connector tests of star & unused imports.

          Doing one final Travis run locally and merging this to `master` once it turns green.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3378 Thanks for the review @rmetzger! For the moving of `open()` to before `run()`, I've included that as the commit tagged with FLINK-5849 . For the star import fix, I'm going to include that within a follow-up hotfix that cleansup all Flink Kafka connector tests of star & unused imports. Doing one final Travis run locally and merging this to `master` once it turns green.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3378

          Tests pass, merging ..

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3378 Tests pass, merging ..
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3378

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3378
          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Resolved for master via http://git-wip-us.apache.org/repos/asf/flink/commit/ed68fed

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development