Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-7195

FlinkKafkaConsumer should not respect fetched partitions to filter restored partition states

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.3.1
    • Fix Version/s: 1.3.2
    • Component/s: Kafka Connector
    • Labels:
      None

      Description

      This issue is a re-appearance of FLINK-6006. On restore, we should not respect any fetched partitions list from Kafka and perform any filtering of the restored partition states. There are corner cases where, due to Kafka broker downtime, some partitions may be missing in the fetched partition list. To be more precise, we actually should not require fetching partitions on restore.

      We've stepped on our own foot again and reintroduced this bug in https://github.com/apache/flink/pull/3378/commits/ed68fedbe90db03823d75a020510ad3c344fa73e. The previous test for this behavior was too implementation specific, and therefore the leak in catching this on different internal implementations.
      We should have a proper unit test for this that does not rely on the internal implementations and test only on public abstractions of FlinkKafkaConsumerBase.

        Issue Links

          Activity

          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Note that master branch, due to the partition discovery behaviour, does not have this bug re-introduced.
          Nevertheless, new tests for this should be forward-ported there also.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Note that master branch, due to the partition discovery behaviour, does not have this bug re-introduced. Nevertheless, new tests for this should be forward-ported there also.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/4344

          (release-1.3) FLINK-7195 [kafka] Remove partition list querying when restoring state in FlinkKafkaConsumer

          This issue is a re-appearance of FLINK-6006. On restore, we should not respect any fetched partitions list from Kafka and perform any filtering of the restored partition states. There are corner cases where, due to Kafka broker downtime, some partitions may be missing in the fetched partition list. Therefore, we should not respect the fetched partitions list on restore time to manipulate the restored state, which may lead to broken state. To be more precise, we actually should not require fetching partitions on restore.

          We've stepped on our own foot again and reintroduced this bug in ed68fedbe90db03823d75a020510ad3c344fa73e. This PR adds proper unit tests for this that does not rely on the internal implementations and test only on public abstractions of `FlinkKafkaConsumerBase`.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-7195

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4344.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4344


          commit 12af5d8b0e43b62935dc619258fb8f957b11d0bc
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-07-14T11:51:03Z

          FLINK-7195 [kafka] Remove partition list querying when restoring state in FlinkKafkaConsumer


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/4344 (release-1.3) FLINK-7195 [kafka] Remove partition list querying when restoring state in FlinkKafkaConsumer This issue is a re-appearance of FLINK-6006 . On restore, we should not respect any fetched partitions list from Kafka and perform any filtering of the restored partition states. There are corner cases where, due to Kafka broker downtime, some partitions may be missing in the fetched partition list. Therefore, we should not respect the fetched partitions list on restore time to manipulate the restored state, which may lead to broken state. To be more precise, we actually should not require fetching partitions on restore. We've stepped on our own foot again and reintroduced this bug in ed68fedbe90db03823d75a020510ad3c344fa73e. This PR adds proper unit tests for this that does not rely on the internal implementations and test only on public abstractions of `FlinkKafkaConsumerBase`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-7195 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4344.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4344 commit 12af5d8b0e43b62935dc619258fb8f957b11d0bc Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-07-14T11:51:03Z FLINK-7195 [kafka] Remove partition list querying when restoring state in FlinkKafkaConsumer
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4344

          For this change, I think we also need to verify how the consumers behave when some restored partition is no longer reachable. (since previously, no longer reachable partitions will be filtered out on restore, but that has a bad side effect of dropping state).

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4344 For this change, I think we also need to verify how the consumers behave when some restored partition is no longer reachable. (since previously, no longer reachable partitions will be filtered out on restore, but that has a bad side effect of dropping state).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/4344

          So partitions can be removed, in Kafka?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4344 So partitions can be removed, in Kafka?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4344

          @aljoscha the issue is that there may be missing partitions when querying partitions from Kafka (e.g., if some brokers are down).

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4344 @aljoscha the issue is that there may be missing partitions when querying partitions from Kafka (e.g., if some brokers are down).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/4344

          Ah, I thought you meant something else. Because this is pretty much the bug this PR is trying to solve, right? 😅

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4344 Ah, I thought you meant something else. Because this is pretty much the bug this PR is trying to solve, right? 😅
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4344#discussion_r127481937

          — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java —
          @@ -532,6 +533,107 @@ public void testSnapshotStateWithCommitOnCheckpointsDisabled() throws Exception
          verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
          }

          + @Test
          + public void testRestoredStateInsensitiveToMissingPartitions() throws Exception

          { + List<KafkaTopicPartition> mockFetchedPartitionsOnStartup = Arrays.asList( + new KafkaTopicPartition("test-topic", 0), + new KafkaTopicPartition("test-topic", 1), + new KafkaTopicPartition("test-topic", 2)); + + // missing fetched partitions on restore + List<KafkaTopicPartition> mockFetchedPartitionsOnRestore = mockFetchedPartitionsOnStartup.subList(0, 2); + + testRestoredStateInsensitiveToFetchedPartitions(mockFetchedPartitionsOnStartup, mockFetchedPartitionsOnRestore); + }

          +
          + @Test
          + public void testRestoredStateInsensitiveToNewPartitions() throws Exception {
          + List<KafkaTopicPartition> mockFetchedPartitionsOnStartup = Arrays.asList(
          + new KafkaTopicPartition("test-topic", 0),
          + new KafkaTopicPartition("test-topic", 1),
          + new KafkaTopicPartition("test-topic", 2));
          +
          + // new partitions on restore
          + List<KafkaTopicPartition> mockFetchedPartitionsOnRestore = new ArrayList<>(mockFetchedPartitionsOnStartup);
          — End diff –

          Why is this one not also using `Arrays.asList()`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4344#discussion_r127481937 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java — @@ -532,6 +533,107 @@ public void testSnapshotStateWithCommitOnCheckpointsDisabled() throws Exception verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed } + @Test + public void testRestoredStateInsensitiveToMissingPartitions() throws Exception { + List<KafkaTopicPartition> mockFetchedPartitionsOnStartup = Arrays.asList( + new KafkaTopicPartition("test-topic", 0), + new KafkaTopicPartition("test-topic", 1), + new KafkaTopicPartition("test-topic", 2)); + + // missing fetched partitions on restore + List<KafkaTopicPartition> mockFetchedPartitionsOnRestore = mockFetchedPartitionsOnStartup.subList(0, 2); + + testRestoredStateInsensitiveToFetchedPartitions(mockFetchedPartitionsOnStartup, mockFetchedPartitionsOnRestore); + } + + @Test + public void testRestoredStateInsensitiveToNewPartitions() throws Exception { + List<KafkaTopicPartition> mockFetchedPartitionsOnStartup = Arrays.asList( + new KafkaTopicPartition("test-topic", 0), + new KafkaTopicPartition("test-topic", 1), + new KafkaTopicPartition("test-topic", 2)); + + // new partitions on restore + List<KafkaTopicPartition> mockFetchedPartitionsOnRestore = new ArrayList<>(mockFetchedPartitionsOnStartup); — End diff – Why is this one not also using `Arrays.asList()`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4344#discussion_r127935505

          — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java —
          @@ -532,6 +533,107 @@ public void testSnapshotStateWithCommitOnCheckpointsDisabled() throws Exception
          verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
          }

          + @Test
          + public void testRestoredStateInsensitiveToMissingPartitions() throws Exception

          { + List<KafkaTopicPartition> mockFetchedPartitionsOnStartup = Arrays.asList( + new KafkaTopicPartition("test-topic", 0), + new KafkaTopicPartition("test-topic", 1), + new KafkaTopicPartition("test-topic", 2)); + + // missing fetched partitions on restore + List<KafkaTopicPartition> mockFetchedPartitionsOnRestore = mockFetchedPartitionsOnStartup.subList(0, 2); + + testRestoredStateInsensitiveToFetchedPartitions(mockFetchedPartitionsOnStartup, mockFetchedPartitionsOnRestore); + }

          +
          + @Test
          + public void testRestoredStateInsensitiveToNewPartitions() throws Exception {
          + List<KafkaTopicPartition> mockFetchedPartitionsOnStartup = Arrays.asList(
          + new KafkaTopicPartition("test-topic", 0),
          + new KafkaTopicPartition("test-topic", 1),
          + new KafkaTopicPartition("test-topic", 2));
          +
          + // new partitions on restore
          + List<KafkaTopicPartition> mockFetchedPartitionsOnRestore = new ArrayList<>(mockFetchedPartitionsOnStartup);
          — End diff –

          will change 👌

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4344#discussion_r127935505 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java — @@ -532,6 +533,107 @@ public void testSnapshotStateWithCommitOnCheckpointsDisabled() throws Exception verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed } + @Test + public void testRestoredStateInsensitiveToMissingPartitions() throws Exception { + List<KafkaTopicPartition> mockFetchedPartitionsOnStartup = Arrays.asList( + new KafkaTopicPartition("test-topic", 0), + new KafkaTopicPartition("test-topic", 1), + new KafkaTopicPartition("test-topic", 2)); + + // missing fetched partitions on restore + List<KafkaTopicPartition> mockFetchedPartitionsOnRestore = mockFetchedPartitionsOnStartup.subList(0, 2); + + testRestoredStateInsensitiveToFetchedPartitions(mockFetchedPartitionsOnStartup, mockFetchedPartitionsOnRestore); + } + + @Test + public void testRestoredStateInsensitiveToNewPartitions() throws Exception { + List<KafkaTopicPartition> mockFetchedPartitionsOnStartup = Arrays.asList( + new KafkaTopicPartition("test-topic", 0), + new KafkaTopicPartition("test-topic", 1), + new KafkaTopicPartition("test-topic", 2)); + + // new partitions on restore + List<KafkaTopicPartition> mockFetchedPartitionsOnRestore = new ArrayList<>(mockFetchedPartitionsOnStartup); — End diff – will change 👌
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4344#discussion_r127951191

          — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java —
          @@ -532,6 +533,107 @@ public void testSnapshotStateWithCommitOnCheckpointsDisabled() throws Exception
          verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed
          }

          + @Test
          + public void testRestoredStateInsensitiveToMissingPartitions() throws Exception

          { + List<KafkaTopicPartition> mockFetchedPartitionsOnStartup = Arrays.asList( + new KafkaTopicPartition("test-topic", 0), + new KafkaTopicPartition("test-topic", 1), + new KafkaTopicPartition("test-topic", 2)); + + // missing fetched partitions on restore + List<KafkaTopicPartition> mockFetchedPartitionsOnRestore = mockFetchedPartitionsOnStartup.subList(0, 2); + + testRestoredStateInsensitiveToFetchedPartitions(mockFetchedPartitionsOnStartup, mockFetchedPartitionsOnRestore); + }

          +
          + @Test
          + public void testRestoredStateInsensitiveToNewPartitions() throws Exception {
          + List<KafkaTopicPartition> mockFetchedPartitionsOnStartup = Arrays.asList(
          + new KafkaTopicPartition("test-topic", 0),
          + new KafkaTopicPartition("test-topic", 1),
          + new KafkaTopicPartition("test-topic", 2));
          +
          + // new partitions on restore
          + List<KafkaTopicPartition> mockFetchedPartitionsOnRestore = new ArrayList<>(mockFetchedPartitionsOnStartup);
          — End diff –

          I've addressed this and re-opened as #4357 which subsumes this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4344#discussion_r127951191 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java — @@ -532,6 +533,107 @@ public void testSnapshotStateWithCommitOnCheckpointsDisabled() throws Exception verify(fetcher, never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be committed } + @Test + public void testRestoredStateInsensitiveToMissingPartitions() throws Exception { + List<KafkaTopicPartition> mockFetchedPartitionsOnStartup = Arrays.asList( + new KafkaTopicPartition("test-topic", 0), + new KafkaTopicPartition("test-topic", 1), + new KafkaTopicPartition("test-topic", 2)); + + // missing fetched partitions on restore + List<KafkaTopicPartition> mockFetchedPartitionsOnRestore = mockFetchedPartitionsOnStartup.subList(0, 2); + + testRestoredStateInsensitiveToFetchedPartitions(mockFetchedPartitionsOnStartup, mockFetchedPartitionsOnRestore); + } + + @Test + public void testRestoredStateInsensitiveToNewPartitions() throws Exception { + List<KafkaTopicPartition> mockFetchedPartitionsOnStartup = Arrays.asList( + new KafkaTopicPartition("test-topic", 0), + new KafkaTopicPartition("test-topic", 1), + new KafkaTopicPartition("test-topic", 2)); + + // new partitions on restore + List<KafkaTopicPartition> mockFetchedPartitionsOnRestore = new ArrayList<>(mockFetchedPartitionsOnStartup); — End diff – I've addressed this and re-opened as #4357 which subsumes this PR.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Fixed for release-1.3 in e9d493db4a29f3c781644cec99fbd37a2f51806c.
          Will close this ticket after tests are forward ported to master.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Fixed for release-1.3 in e9d493db4a29f3c781644cec99fbd37a2f51806c. Will close this ticket after tests are forward ported to master.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/4344

          @tzulitai Could you close this PR since its subsumed by #4357?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/4344 @tzulitai Could you close this PR since its subsumed by #4357?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai closed the pull request at:

          https://github.com/apache/flink/pull/4344

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai closed the pull request at: https://github.com/apache/flink/pull/4344
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Implemented on release-1.3 in
          e9d493db4a29f3c781644cec99fbd37a2f51806c

          Show
          aljoscha Aljoscha Krettek added a comment - Implemented on release-1.3 in e9d493db4a29f3c781644cec99fbd37a2f51806c
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          There actually isn't any tests needed to be ported to master for this issue.
          The partition fetching on restart behaviour is different in master, and therefore irrelevant.
          Closing this now.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - There actually isn't any tests needed to be ported to master for this issue. The partition fetching on restart behaviour is different in master, and therefore irrelevant. Closing this now.

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development