Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-7143

Partition assignment for Kafka consumer is not stable

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.3.1
    • Fix Version/s: 1.4.0, 1.3.2
    • Component/s: Kafka Connector
    • Labels:
      None

      Description

      Important Notice:

      Upgrading jobs from 1.2.x exhibits no known problems. Jobs from 1.3.0 and 1.3.1 with incorrect partition assignments cannot be automatically fixed by upgrading to Flink 1.3.2 via a savepoint, because the upgraded version would resume the wrong partition assignment from the savepoint. A workaround is to assign a different uuid to the Kafka source (so the offsets won't be resumed from the savepoint) and let it start from the latest offsets committed to Kafka instead. Note that this may violate exactly-once semantics and introduce some duplicates, because Kafka's committed offsets are not guaranteed to be 100% up date date with Flink's internal offset tracking. To maximize the alignment between the offsets in Kafka and those tracked by Flink, we suggest to abort the 1.3.x job via the "cancel with savepoint" command (https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html#cancel-job-with-savepoint) during the upgrade process.

      Original Issue Description

      While deploying Flink 1.3 release to hundreds of routing jobs, we found some issues with partition assignment for Kafka consumer. some partitions weren't assigned and some partitions got assigned more than once.

      Here is the bug introduced in Flink 1.3.

      	protected static void initializeSubscribedPartitionsToStartOffsets(...) {
                      ...
      		for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
      			if (i % numParallelSubtasks == indexOfThisSubtask) {
      				if (startupMode != StartupMode.SPECIFIC_OFFSETS) {
      					subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel());
      				}
                      ...
               }
      

      The bug is using array index i to mod against numParallelSubtasks. if the kafkaTopicPartitions has different order among different subtasks, assignment is not stable cross subtasks and creates the assignment issue mentioned earlier.

      fix is also very simple, we should use partitionId to do the mod if (kafkaTopicPartitions.get(i).getPartition() % numParallelSubtasks == indexOfThisSubtask). That would result in stable assignment cross subtasks that is independent of ordering in the array.

      marking it as blocker because of its impact.

        Issue Links

          Activity

          Hide
          aljoscha Aljoscha Krettek added a comment -

          Re-open to adapt issue test.

          Show
          aljoscha Aljoscha Krettek added a comment - Re-open to adapt issue test.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Tests are now also ported to master, via e111d7730ec6032dc14579bd274e7822f7176e39.
          Closing this!

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Tests are now also ported to master, via e111d7730ec6032dc14579bd274e7822f7176e39. Closing this!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/4387

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4387
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4387

          Merging this now, as the changes were already reviewed when applying them onto `release-1.3`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4387 Merging this now, as the changes were already reviewed when applying them onto `release-1.3`.
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Fixed on release-1.3 in
          6e0d90ccfd9c457fd99add5833cd7bf6c976d6b8
          b0564322c61168c3a7bb23bdca3db0648454a691

          Show
          aljoscha Aljoscha Krettek added a comment - Fixed on release-1.3 in 6e0d90ccfd9c457fd99add5833cd7bf6c976d6b8 b0564322c61168c3a7bb23bdca3db0648454a691
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/4387

          FLINK-7143 [kafka] Forward ports of new Kafka tests to master

          This PR forward ports all new tests added in #4357 to `master`, so that the behaviors is correctly guarded there also.

            1. Changes
              1. Introduce `KafkaTopicPartitionAssigner` class to master branch, which strictly defines the partition assignment contract (discussed in #4301).
              2. Port rescaling unit test. Note that some tested behaviors needed to be changed due to the differences between 1.3 and 1.4 for the Kafka consumer.
              3. Make checkpoint methods final.
              4. (new change, not a port) Remove invalid `checkRestoredNullCheckpointWhenFetcherNotReady` test, which was testing a legacy behavior of the consumer

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink kafka-forward-ports

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4387.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4387


          commit 7295777984084fc470edaee44e1bc32881409665
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-07-24T07:00:16Z

          FLINK-7143 [kafka] Introduce KafkaTopicPartitionAssigner with stricter assignment contracts

          This commit refactors the local partition assignment logic to be located
          in a strict contract-defining method, to make it explicit of the
          expected partition to subtask assignment without relying solely on
          hashCode's of kafka partitions.

          commit 72a8c42505ba791f51001129a08744b104a171d7
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2017-07-18T09:57:46Z

          FLINK-7143 [kafka] Add test for Kafka Consumer rescaling

          This verifies that the consumer always correctly knows whether it is
          restored or not and is not affected by changes in the partitions as
          reported by Kafka.

          Previously, operator state reshuffling could lead to partitions being
          subscribed to multiple times.

          commit 100936c7ab0b7bca4dab10143aa184dc31e2fd46
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2017-07-18T08:35:54Z

          [hotfix] [kafka] Make checkpoint methods final in KafkaConsumerBase

          This prevents concrete Kafka Source implementations from accidentally
          overriding the checkpointing methods. This would be problematic when not
          providing tests. We test the checkpoint methods of the ConsumerBase but
          derived methods would not be tested.

          commit b0cf8779b76b5fe94beb4ffb6ba9adad16280be6
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-07-24T08:34:37Z

          FLINK-7248 [kafka, tests] Remove invalid checkRestoredNullCheckpointWhenFetcherNotReady test

          This test is an invalid remnant from recent major Kafka consumer
          refactorings. The actual behaviour is covered by
          checkRestoredCheckpointWhenFetcherNotReady. When the fetcher is not yet
          ready and exposed and a checkpoint happens, we fallback to using any
          restored state as the checkpoint.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/4387 FLINK-7143 [kafka] Forward ports of new Kafka tests to master This PR forward ports all new tests added in #4357 to `master`, so that the behaviors is correctly guarded there also. Changes 1. Introduce `KafkaTopicPartitionAssigner` class to master branch, which strictly defines the partition assignment contract (discussed in #4301). 2. Port rescaling unit test. Note that some tested behaviors needed to be changed due to the differences between 1.3 and 1.4 for the Kafka consumer. 3. Make checkpoint methods final. 4. (new change, not a port) Remove invalid `checkRestoredNullCheckpointWhenFetcherNotReady` test, which was testing a legacy behavior of the consumer You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink kafka-forward-ports Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4387.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4387 commit 7295777984084fc470edaee44e1bc32881409665 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-07-24T07:00:16Z FLINK-7143 [kafka] Introduce KafkaTopicPartitionAssigner with stricter assignment contracts This commit refactors the local partition assignment logic to be located in a strict contract-defining method, to make it explicit of the expected partition to subtask assignment without relying solely on hashCode's of kafka partitions. commit 72a8c42505ba791f51001129a08744b104a171d7 Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2017-07-18T09:57:46Z FLINK-7143 [kafka] Add test for Kafka Consumer rescaling This verifies that the consumer always correctly knows whether it is restored or not and is not affected by changes in the partitions as reported by Kafka. Previously, operator state reshuffling could lead to partitions being subscribed to multiple times. commit 100936c7ab0b7bca4dab10143aa184dc31e2fd46 Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2017-07-18T08:35:54Z [hotfix] [kafka] Make checkpoint methods final in KafkaConsumerBase This prevents concrete Kafka Source implementations from accidentally overriding the checkpointing methods. This would be problematic when not providing tests. We test the checkpoint methods of the ConsumerBase but derived methods would not be tested. commit b0cf8779b76b5fe94beb4ffb6ba9adad16280be6 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-07-24T08:34:37Z FLINK-7248 [kafka, tests] Remove invalid checkRestoredNullCheckpointWhenFetcherNotReady test This test is an invalid remnant from recent major Kafka consumer refactorings. The actual behaviour is covered by checkRestoredCheckpointWhenFetcherNotReady. When the fetcher is not yet ready and exposed and a checkpoint happens, we fallback to using any restored state as the checkpoint.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/4302

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4302
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai closed the pull request at:

          https://github.com/apache/flink/pull/4301

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai closed the pull request at: https://github.com/apache/flink/pull/4301
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4301

          This was merged via #4357. Closing ..

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4301 This was merged via #4357. Closing ..
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4302

          Merging

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4302 Merging
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai closed the pull request at:

          https://github.com/apache/flink/pull/4357

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai closed the pull request at: https://github.com/apache/flink/pull/4357
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/4302

          @tzulitai You can merge this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/4302 @tzulitai You can merge this.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Fixed for release-1.3 in 3369cfe200bf2cb7ed04caf19d8599075e4cfe21.
          This ticket should only be closed after related tests are merged to master.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Fixed for release-1.3 in 3369cfe200bf2cb7ed04caf19d8599075e4cfe21. This ticket should only be closed after related tests are merged to master.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4357

          @aljoscha sure, merging this now

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4357 @aljoscha sure, merging this now
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/4357

          I reviewed this a last time with Stephan, offline, and we think we should go with this version. Overriding the snapshot methods is most likely not necessary anymore but if it is we will "un-finalize" them again before release.

          @tzulitai Do you want to have the honours of merging? 😃

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4357 I reviewed this a last time with Stephan, offline, and we think we should go with this version. Overriding the snapshot methods is most likely not necessary anymore but if it is we will "un-finalize" them again before release. @tzulitai Do you want to have the honours of merging? 😃
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4357#discussion_r128755399

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java —
          @@ -517,16 +519,13 @@ public void initializeState(FunctionInitializationContext context) throws Except
          LOG.debug("Using the following offsets: {}", restoredState);
          }
          }

          • if (restoredState != null && restoredState.isEmpty()) { - restoredState = null; - }

            } else

            { LOG.info("No restore state for FlinkKafkaConsumer."); }

            }

          @Override

          • public void snapshotState(FunctionSnapshotContext context) throws Exception {
            + public final void snapshotState(FunctionSnapshotContext context) throws Exception {
              • End diff –

          @aljoscha I think that's a good approach to avoid making the methods final for now. Would also be a good opportunity to clean up the `FlinkKafkaConsumerBaseTest` test a bit

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4357#discussion_r128755399 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java — @@ -517,16 +519,13 @@ public void initializeState(FunctionInitializationContext context) throws Except LOG.debug("Using the following offsets: {}", restoredState); } } if (restoredState != null && restoredState.isEmpty()) { - restoredState = null; - } } else { LOG.info("No restore state for FlinkKafkaConsumer."); } } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { + public final void snapshotState(FunctionSnapshotContext context) throws Exception { End diff – @aljoscha I think that's a good approach to avoid making the methods final for now. Would also be a good opportunity to clean up the `FlinkKafkaConsumerBaseTest` test a bit
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4357#discussion_r128754182

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java —
          @@ -517,16 +519,13 @@ public void initializeState(FunctionInitializationContext context) throws Except
          LOG.debug("Using the following offsets: {}", restoredState);
          }
          }

          • if (restoredState != null && restoredState.isEmpty()) { - restoredState = null; - }

            } else

            { LOG.info("No restore state for FlinkKafkaConsumer."); }

            }

          @Override

          • public void snapshotState(FunctionSnapshotContext context) throws Exception {
            + public final void snapshotState(FunctionSnapshotContext context) throws Exception {
              • End diff –

          The concern here is that not making the methods final makes it easy for contributors to accidentally override them. We don't have specific unit tests for the 0.9 `FlinkKafkaConsumer` or the 0.10 `FlinkKafkaConsumer` and only test the base `FlinkKafkaConsumerBase`. This is OK, as long as specific implementations don't override important methods. If the `FlinkKafkaConsumer090` did override the `snapshot()`/`restore()` methods, for example, no tests would catch this.

          @tzulitai I don't want to discuss here about these methods to much since we want to get the fixes in for release 1.3.2. A way around the problem is to turn the `FlinkKafkaConsumerBaseTest` into an abstract `FlinkKafkaConsumerBaseTestBase` that has an abstract method `createTestingConsumer(List<KafkaTopicPartition> mockFetchedPartitions)` that creates a "dummy" consumer for a specific Kafka version. Then we would have individual `FlinkKafkaConsumer09Test`, `FlinkKafkaConsumer010Test` and so on that derive form the abstract test base and just implement the method for creating the testing consumer.

          What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4357#discussion_r128754182 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java — @@ -517,16 +519,13 @@ public void initializeState(FunctionInitializationContext context) throws Except LOG.debug("Using the following offsets: {}", restoredState); } } if (restoredState != null && restoredState.isEmpty()) { - restoredState = null; - } } else { LOG.info("No restore state for FlinkKafkaConsumer."); } } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { + public final void snapshotState(FunctionSnapshotContext context) throws Exception { End diff – The concern here is that not making the methods final makes it easy for contributors to accidentally override them. We don't have specific unit tests for the 0.9 `FlinkKafkaConsumer` or the 0.10 `FlinkKafkaConsumer` and only test the base `FlinkKafkaConsumerBase`. This is OK, as long as specific implementations don't override important methods. If the `FlinkKafkaConsumer090` did override the `snapshot()`/`restore()` methods, for example, no tests would catch this. @tzulitai I don't want to discuss here about these methods to much since we want to get the fixes in for release 1.3.2. A way around the problem is to turn the `FlinkKafkaConsumerBaseTest` into an abstract `FlinkKafkaConsumerBaseTestBase` that has an abstract method `createTestingConsumer(List<KafkaTopicPartition> mockFetchedPartitions)` that creates a "dummy" consumer for a specific Kafka version. Then we would have individual `FlinkKafkaConsumer09Test`, `FlinkKafkaConsumer010Test` and so on that derive form the abstract test base and just implement the method for creating the testing consumer. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user stevenzwu commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4357#discussion_r128642917

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java —
          @@ -517,16 +519,13 @@ public void initializeState(FunctionInitializationContext context) throws Except
          LOG.debug("Using the following offsets: {}", restoredState);
          }
          }

          • if (restoredState != null && restoredState.isEmpty()) { - restoredState = null; - }

            } else

            { LOG.info("No restore state for FlinkKafkaConsumer."); }

            }

          @Override

          • public void snapshotState(FunctionSnapshotContext context) throws Exception {
            + public final void snapshotState(FunctionSnapshotContext context) throws Exception {
              • End diff –

          ```
          the verrsion-specific implementations for FlinkKafkaConsumerBase may override that and have incorrect implementations, where as our tests would never realize it.
          ```
          @tzulitai why would this be a concern for FlinkKafkaConsumerBase. if version-specific implementations have bugs, they should have test to catch and prevent bugs. We do need the capability to override the snapshot method to no-op. what would be your suggested alternative?

          Show
          githubbot ASF GitHub Bot added a comment - Github user stevenzwu commented on a diff in the pull request: https://github.com/apache/flink/pull/4357#discussion_r128642917 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java — @@ -517,16 +519,13 @@ public void initializeState(FunctionInitializationContext context) throws Except LOG.debug("Using the following offsets: {}", restoredState); } } if (restoredState != null && restoredState.isEmpty()) { - restoredState = null; - } } else { LOG.info("No restore state for FlinkKafkaConsumer."); } } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { + public final void snapshotState(FunctionSnapshotContext context) throws Exception { End diff – ``` the verrsion-specific implementations for FlinkKafkaConsumerBase may override that and have incorrect implementations, where as our tests would never realize it. ``` @tzulitai why would this be a concern for FlinkKafkaConsumerBase. if version-specific implementations have bugs, they should have test to catch and prevent bugs. We do need the capability to override the snapshot method to no-op. what would be your suggested alternative?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user stevenzwu commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4357#discussion_r128642186

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java —
          @@ -517,16 +519,13 @@ public void initializeState(FunctionInitializationContext context) throws Except
          LOG.debug("Using the following offsets: {}", restoredState);
          }
          }

          • if (restoredState != null && restoredState.isEmpty()) { - restoredState = null; - }

            } else

            { LOG.info("No restore state for FlinkKafkaConsumer."); }

            }

          @Override

          • public void snapshotState(FunctionSnapshotContext context) throws Exception {
            + public final void snapshotState(FunctionSnapshotContext context) throws Exception {
              • End diff –

          @tzulitai looks like the behavior was changed/fix in 1.3. Here is the Kafka09Fetcher.java code from 1.2 that was causing the behavior I described earlier.

              		// if checkpointing is enabled, we are not automatically committing to Kafka.
              		kafkaProperties.setProperty(
              				ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
              				Boolean.toString(!enableCheckpointing));
              
          Show
          githubbot ASF GitHub Bot added a comment - Github user stevenzwu commented on a diff in the pull request: https://github.com/apache/flink/pull/4357#discussion_r128642186 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java — @@ -517,16 +519,13 @@ public void initializeState(FunctionInitializationContext context) throws Except LOG.debug("Using the following offsets: {}", restoredState); } } if (restoredState != null && restoredState.isEmpty()) { - restoredState = null; - } } else { LOG.info("No restore state for FlinkKafkaConsumer."); } } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { + public final void snapshotState(FunctionSnapshotContext context) throws Exception { End diff – @tzulitai looks like the behavior was changed/fix in 1.3. Here is the Kafka09Fetcher.java code from 1.2 that was causing the behavior I described earlier. // if checkpointing is enabled, we are not automatically committing to Kafka. kafkaProperties.setProperty( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean .toString(!enableCheckpointing));
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/4357

          I don't but I would like to give @StephanEwen a chance to comment on the changes that make the checkpoint-related methods on the consumer base final.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4357 I don't but I would like to give @StephanEwen a chance to comment on the changes that make the checkpoint-related methods on the consumer base final.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4357

          @aljoscha do you have any last comments on these changes?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4357 @aljoscha do you have any last comments on these changes?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4357#discussion_r128147542

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java —
          @@ -517,16 +519,13 @@ public void initializeState(FunctionInitializationContext context) throws Except
          LOG.debug("Using the following offsets: {}", restoredState);
          }
          }

          • if (restoredState != null && restoredState.isEmpty()) { - restoredState = null; - }

            } else

            { LOG.info("No restore state for FlinkKafkaConsumer."); }

            }

          @Override

          • public void snapshotState(FunctionSnapshotContext context) throws Exception {
            + public final void snapshotState(FunctionSnapshotContext context) throws Exception {
              • End diff –

          See https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4357#discussion_r128147542 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java — @@ -517,16 +519,13 @@ public void initializeState(FunctionInitializationContext context) throws Except LOG.debug("Using the following offsets: {}", restoredState); } } if (restoredState != null && restoredState.isEmpty()) { - restoredState = null; - } } else { LOG.info("No restore state for FlinkKafkaConsumer."); } } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { + public final void snapshotState(FunctionSnapshotContext context) throws Exception { End diff – See https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration .
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4357#discussion_r128147490

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java —
          @@ -517,16 +519,13 @@ public void initializeState(FunctionInitializationContext context) throws Except
          LOG.debug("Using the following offsets: {}", restoredState);
          }
          }

          • if (restoredState != null && restoredState.isEmpty()) { - restoredState = null; - }

            } else

            { LOG.info("No restore state for FlinkKafkaConsumer."); }

            }

          @Override

          • public void snapshotState(FunctionSnapshotContext context) throws Exception {
            + public final void snapshotState(FunctionSnapshotContext context) throws Exception {
              • End diff –

          On the other hand, I think that this part of your description is strange:
          >we can't set Flink checkpoint to false, because otherwise Kafka consumer auto.commit will be hard-coded to true.
          This should not be the case (at least starting from Flink 1.3.x). The "auto.commit" is independent of checkpointing. If you don't enable checkpointing, "auto.commit" decides whether or not periodic checkpointing is used. Otherwise, you can still disable offset committing with checkpointing on by using `FlinkKafkaConsumer#disableOffsetCommittingOnCheckpoints`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4357#discussion_r128147490 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java — @@ -517,16 +519,13 @@ public void initializeState(FunctionInitializationContext context) throws Except LOG.debug("Using the following offsets: {}", restoredState); } } if (restoredState != null && restoredState.isEmpty()) { - restoredState = null; - } } else { LOG.info("No restore state for FlinkKafkaConsumer."); } } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { + public final void snapshotState(FunctionSnapshotContext context) throws Exception { End diff – On the other hand, I think that this part of your description is strange: >we can't set Flink checkpoint to false, because otherwise Kafka consumer auto.commit will be hard-coded to true. This should not be the case (at least starting from Flink 1.3.x). The "auto.commit" is independent of checkpointing. If you don't enable checkpointing, "auto.commit" decides whether or not periodic checkpointing is used. Otherwise, you can still disable offset committing with checkpointing on by using `FlinkKafkaConsumer#disableOffsetCommittingOnCheckpoints`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4357#discussion_r128147294

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java —
          @@ -517,16 +519,13 @@ public void initializeState(FunctionInitializationContext context) throws Except
          LOG.debug("Using the following offsets: {}", restoredState);
          }
          }

          • if (restoredState != null && restoredState.isEmpty()) { - restoredState = null; - }

            } else

            { LOG.info("No restore state for FlinkKafkaConsumer."); }

            }

          @Override

          • public void snapshotState(FunctionSnapshotContext context) throws Exception {
            + public final void snapshotState(FunctionSnapshotContext context) throws Exception {
              • End diff –

          @stevenzwu the snapshotState method was actually never intended to be overriden, hence making it final here to state that clearly. For example, the verrsion-specific implementations for `FlinkKafkaConsumerBase` may override that and have incorrect implementations, where as our tests would never realize it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4357#discussion_r128147294 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java — @@ -517,16 +519,13 @@ public void initializeState(FunctionInitializationContext context) throws Except LOG.debug("Using the following offsets: {}", restoredState); } } if (restoredState != null && restoredState.isEmpty()) { - restoredState = null; - } } else { LOG.info("No restore state for FlinkKafkaConsumer."); } } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { + public final void snapshotState(FunctionSnapshotContext context) throws Exception { End diff – @stevenzwu the snapshotState method was actually never intended to be overriden, hence making it final here to state that clearly. For example, the verrsion-specific implementations for `FlinkKafkaConsumerBase` may override that and have incorrect implementations, where as our tests would never realize it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user stevenzwu commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4357#discussion_r128113797

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java —
          @@ -517,16 +519,13 @@ public void initializeState(FunctionInitializationContext context) throws Except
          LOG.debug("Using the following offsets: {}", restoredState);
          }
          }

          • if (restoredState != null && restoredState.isEmpty()) { - restoredState = null; - }

            } else

            { LOG.info("No restore state for FlinkKafkaConsumer."); }

            }

          @Override

          • public void snapshotState(FunctionSnapshotContext context) throws Exception {
            + public final void snapshotState(FunctionSnapshotContext context) throws Exception {
              • End diff –

          @tzulitai what's the reason to make this final? In our router use case, we override the snapshotState method to no-op. We disabled Flink checkpoint by setting checkpoint interval to Long.MAX_VALUE. we can't set Flink checkpoint to false, because otherwise Kafka consumer auto.commit will be hard-coded to true.

          @zhenzhongxu ^

          Show
          githubbot ASF GitHub Bot added a comment - Github user stevenzwu commented on a diff in the pull request: https://github.com/apache/flink/pull/4357#discussion_r128113797 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java — @@ -517,16 +519,13 @@ public void initializeState(FunctionInitializationContext context) throws Except LOG.debug("Using the following offsets: {}", restoredState); } } if (restoredState != null && restoredState.isEmpty()) { - restoredState = null; - } } else { LOG.info("No restore state for FlinkKafkaConsumer."); } } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { + public final void snapshotState(FunctionSnapshotContext context) throws Exception { End diff – @tzulitai what's the reason to make this final? In our router use case, we override the snapshotState method to no-op. We disabled Flink checkpoint by setting checkpoint interval to Long.MAX_VALUE. we can't set Flink checkpoint to false, because otherwise Kafka consumer auto.commit will be hard-coded to true. @zhenzhongxu ^
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4357

          R: @aljoscha

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4357 R: @aljoscha
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/4357

          (release-1.3) [FLINK-7143, FLINK-7195] Collection of Kafka fixes for release-1.3

          This PR subsumes #4344 and #4301, including changes in both PRs merged and conflicts resolved.
          Apparently, some new tests added in one of the PRs relies also on the fix of the other PR, so opening this one to have a better overall view of the status of the fixes.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink kafka-13-fixes

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4357.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4357


          commit 919b23a6e1c650a3d08f5418a53e712e86d51506
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-07-11T17:03:01Z

          FLINK-7143 [kafka] Fix indeterminate partition assignment in FlinkKafkaConsumer

          Apart from fixing the previous incorrect, indeterministic assignment
          logic, this commit also adds an explicitly defined method that properly
          states a strict contract for the assignment, instead of just relying on
          some hashCode implementation that doesn't convey this contract as well
          as the importance of the assignment's deterministic characteristic well.

          commit 00bcdbf24c177276f203063f905886becfe23db5
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-07-14T11:51:03Z

          FLINK-7195 [kafka] Remove partition list querying when restoring state in FlinkKafkaConsumer

          Previously, querying the partition list and using it to filter out
          restored partition states is problematic since the queried partition
          list may be missing partitions due to temporary downtime of Kafka
          brokers. Effectively, this caused the potential dropping of state on
          restores.

          This commit fixes this by completely removing partition querying if
          we're restoring state (as notified by
          FunctionInitializationContext.isRestored()). The subscribed partitions
          will always be exactly what the restored state contains.

          commit a4ca2f559b1d530e68ce3516035964f569ff7c7f
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2017-07-17T17:06:09Z

          FLINK-7143 [kafka] Fix detection of restored bit in Kafka Consumer

          Before, the problem was that empty state was associated with the source
          not being restored. However, a source can have empty restored state in
          one of two cases:

          1. The source was not restored.
          2. The overall job was restored but the source simply didn't get any
          operator state assigned.

          commit faf957209220d2779062321d7ab58c9356906ad8
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2017-07-18T08:35:54Z

          [hotfix] [kafka] Make checkpoint methods final in KafkaConsumerBase

          This prevents concrete Kafka Source implementations from accidentally
          overriding the checkpointing methods. This would be problematic when not
          providing tests. We test the checkpoint methods of the ConsumerBase but
          derived methods would not be tested.

          commit 5180f898c48ce2e416547dcdf76caef72c5a8dee
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2017-07-18T09:57:46Z

          FLINK-7143 [kafka] Add test for Kafka Consumer rescaling

          This verifies that the consumer always correctly knows whether it is
          restored or not and is not affected by changes in the partitions as
          reported by Kafka.

          Previously, operator state reshuffling could lead to partitions being
          subscribed to multiple times.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/4357 (release-1.3) [FLINK-7143, FLINK-7195] Collection of Kafka fixes for release-1.3 This PR subsumes #4344 and #4301, including changes in both PRs merged and conflicts resolved. Apparently, some new tests added in one of the PRs relies also on the fix of the other PR, so opening this one to have a better overall view of the status of the fixes. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink kafka-13-fixes Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4357.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4357 commit 919b23a6e1c650a3d08f5418a53e712e86d51506 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-07-11T17:03:01Z FLINK-7143 [kafka] Fix indeterminate partition assignment in FlinkKafkaConsumer Apart from fixing the previous incorrect, indeterministic assignment logic, this commit also adds an explicitly defined method that properly states a strict contract for the assignment, instead of just relying on some hashCode implementation that doesn't convey this contract as well as the importance of the assignment's deterministic characteristic well. commit 00bcdbf24c177276f203063f905886becfe23db5 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-07-14T11:51:03Z FLINK-7195 [kafka] Remove partition list querying when restoring state in FlinkKafkaConsumer Previously, querying the partition list and using it to filter out restored partition states is problematic since the queried partition list may be missing partitions due to temporary downtime of Kafka brokers. Effectively, this caused the potential dropping of state on restores. This commit fixes this by completely removing partition querying if we're restoring state (as notified by FunctionInitializationContext.isRestored()). The subscribed partitions will always be exactly what the restored state contains. commit a4ca2f559b1d530e68ce3516035964f569ff7c7f Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2017-07-17T17:06:09Z FLINK-7143 [kafka] Fix detection of restored bit in Kafka Consumer Before, the problem was that empty state was associated with the source not being restored. However, a source can have empty restored state in one of two cases: 1. The source was not restored. 2. The overall job was restored but the source simply didn't get any operator state assigned. commit faf957209220d2779062321d7ab58c9356906ad8 Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2017-07-18T08:35:54Z [hotfix] [kafka] Make checkpoint methods final in KafkaConsumerBase This prevents concrete Kafka Source implementations from accidentally overriding the checkpointing methods. This would be problematic when not providing tests. We test the checkpoint methods of the ConsumerBase but derived methods would not be tested. commit 5180f898c48ce2e416547dcdf76caef72c5a8dee Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2017-07-18T09:57:46Z FLINK-7143 [kafka] Add test for Kafka Consumer rescaling This verifies that the consumer always correctly knows whether it is restored or not and is not affected by changes in the partitions as reported by Kafka. Previously, operator state reshuffling could lead to partitions being subscribed to multiple times.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/4301

          Oye, this is more complicated than I thought. On `release-1.3` the assignment actually works if the Kafka brokers always return the partitions in the same order. The reason is that the assignment of partitions and the assignment of operator state (in `RoundRobinOperatorStateRepartitioner`) is aligned. This meant that it's not a problem when sources think that they are "fresh" (not restored from state) because they didn't get any state. If they tried to assign a partition to themselves this would also mean that they have the state for that (again, because partition assignment and operator state assignment are aligned).

          This PR breaks the alignment because the `startIndex` is not necessarily `0`. However, this is not caught by any tests because the `StateAssignmentOperation` has an optimisation where it doesn't repartition operator state if the parallelism doesn't change. If we deactivate that optimisation by turning this line into `if (true)`: https://github.com/apache/flink/blob/b1f762127234e323b947aa4a363935f87be1994f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L561-L561 the test in Kafka09ITCase will fail.

          The fix is to properly forward the information of whether we're restored in `initializeState()`, I did a commit for that: https://github.com/aljoscha/flink/tree/finish-pr-4301-kafka-13-fixings. The problem is that it is not easy to change the tests to catch this bug. I think an ITCase that uses Kafka and does a savepoint and rescaling would do the trick.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4301 Oye, this is more complicated than I thought. On `release-1.3` the assignment actually works if the Kafka brokers always return the partitions in the same order. The reason is that the assignment of partitions and the assignment of operator state (in `RoundRobinOperatorStateRepartitioner`) is aligned. This meant that it's not a problem when sources think that they are "fresh" (not restored from state) because they didn't get any state. If they tried to assign a partition to themselves this would also mean that they have the state for that (again, because partition assignment and operator state assignment are aligned). This PR breaks the alignment because the `startIndex` is not necessarily `0`. However, this is not caught by any tests because the `StateAssignmentOperation` has an optimisation where it doesn't repartition operator state if the parallelism doesn't change. If we deactivate that optimisation by turning this line into `if (true)`: https://github.com/apache/flink/blob/b1f762127234e323b947aa4a363935f87be1994f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L561-L561 the test in Kafka09ITCase will fail. The fix is to properly forward the information of whether we're restored in `initializeState()`, I did a commit for that: https://github.com/aljoscha/flink/tree/finish-pr-4301-kafka-13-fixings . The problem is that it is not easy to change the tests to catch this bug. I think an ITCase that uses Kafka and does a savepoint and rescaling would do the trick.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/4301

          Note, that this doesn't normally occur because the strategy for assigning Kafka partitions and for assigning operator state is the same (right now). However, this means that you will have active partition discovery for parallel instances that didn't previously have state: assume we have 1 partition and 1 parallel source. Now we add a new partition and restart the Flink job. Now, parallel instance 1 will still read from partition 0, parallel instance 2 will think that it didn't restart (because it didn't get state) and will discover partitions and take ownership of partition 1.

          (This is with current `release-1.3` branch code.)

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4301 Note, that this doesn't normally occur because the strategy for assigning Kafka partitions and for assigning operator state is the same (right now). However, this means that you will have active partition discovery for parallel instances that didn't previously have state: assume we have 1 partition and 1 parallel source. Now we add a new partition and restart the Flink job. Now, parallel instance 1 will still read from partition 0, parallel instance 2 will think that it didn't restart (because it didn't get state) and will discover partitions and take ownership of partition 1. (This is with current `release-1.3` branch code.)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/4301

          Yes, I don't think we can get around this when restoring from "old" state.

          I also have another suspicion: I don't think that `KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest()` accurately catches some cases and I think there is a problem that we cannot accurately detect whether we are restoring or whether we are opening from scratch. Consider this case: 5 partitions, 5 parallel source instances. Now we rescale to 10 parallel source instances. Some sources don't get state, so they think that we are starting from scratch and they will run partition discovery. Doesn't this mean that they could possibly read from a topic where already another source is reading from, because it got the state for that? (Not this doesn't occur on master because all sources get all state.)

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4301 Yes, I don't think we can get around this when restoring from "old" state. I also have another suspicion: I don't think that `KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest()` accurately catches some cases and I think there is a problem that we cannot accurately detect whether we are restoring or whether we are opening from scratch. Consider this case: 5 partitions, 5 parallel source instances. Now we rescale to 10 parallel source instances. Some sources don't get state, so they think that we are starting from scratch and they will run partition discovery. Doesn't this mean that they could possibly read from a topic where already another source is reading from, because it got the state for that? (Not this doesn't occur on master because all sources get all state.)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4301

          @aljoscha on some second thinking, I don't think we can easily deal with the fact that, when restoring from 1.3.1 / 1.3.0 savepoints in 1.3.2, users will not benefit from this bug fix.

          There is basically no guarantee in what the distribution would be when restoring from 1.3.1 / 1.3.0, and therefore no way to manipulate it to follow the new fixed distribution scheme we introduce here.
          It may be possible if we force a migrate to union list state in 1.3.2, but I'm not really sure that we want to do that ..

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4301 @aljoscha on some second thinking, I don't think we can easily deal with the fact that, when restoring from 1.3.1 / 1.3.0 savepoints in 1.3.2, users will not benefit from this bug fix. There is basically no guarantee in what the distribution would be when restoring from 1.3.1 / 1.3.0, and therefore no way to manipulate it to follow the new fixed distribution scheme we introduce here. It may be possible if we force a migrate to union list state in 1.3.2, but I'm not really sure that we want to do that ..
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4301

          @StephanEwen
          Regarding no-rediscover on restore test:
          yes, could say that it is covered in `KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest()`. It's an end-to-end exactly-once test for the case where Flink source subtask count > partition count.

          Regarding `ListState`:
          The redistribution of `ListState` doesn't conflict with discovery and assignment of partitions in the `release-1.3` case (where there is no partition discovery), because we don't respect the partition assignment logic if we're starting from savepoints. We only consider what's in the restored state. See also @aljoscha's comment above.

          For `master` where partition discovery is already merged, the `ListState` is a union list state, where all subtasks are broadcasted with all partition states. On restore, the restored union list state is filtered again with the assignment logic.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4301 @StephanEwen Regarding no-rediscover on restore test: yes, could say that it is covered in `KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest()`. It's an end-to-end exactly-once test for the case where Flink source subtask count > partition count. Regarding `ListState`: The redistribution of `ListState` doesn't conflict with discovery and assignment of partitions in the `release-1.3` case (where there is no partition discovery), because we don't respect the partition assignment logic if we're starting from savepoints. We only consider what's in the restored state. See also @aljoscha's comment above. For `master` where partition discovery is already merged, the `ListState` is a union list state, where all subtasks are broadcasted with all partition states. On restore, the restored union list state is filtered again with the assignment logic.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/4301

          Just to double-check: I see that the state of the partitions is in a `ListState`. That means after recovery, they can be differently distributed than before. Does that not conflict with the discovery and assignment of partitions?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4301 Just to double-check: I see that the state of the partitions is in a `ListState`. That means after recovery, they can be differently distributed than before. Does that not conflict with the discovery and assignment of partitions?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/4301

          Do we have a test for the case where there are fewer partitions than sources so that some sources do not get partitions on restore? To make sure they do not accidentally re-discover?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4301 Do we have a test for the case where there are fewer partitions than sources so that some sources do not get partitions on restore? To make sure they do not accidentally re-discover?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4301

          This would then mean we discourage restoring from a 1.3.x savepoint, because the state is potentially incorrect.
          I wonder if we then actually want to always fetch partitions on startup (fresh or from savepoint) to deal with this (just a fix for the `release-1.3` branch)?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4301 This would then mean we discourage restoring from a 1.3.x savepoint, because the state is potentially incorrect. I wonder if we then actually want to always fetch partitions on startup (fresh or from savepoint) to deal with this (just a fix for the `release-1.3` branch)?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4301

          Yes, that is true. This assignment logic is only applied on fresh starts.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4301 Yes, that is true. This assignment logic is only applied on fresh starts.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/4301

          Quick comment for my own clarification: when restoring from a 1.3.x savepoint, the new assignment logic will not be used, right? In Flink 1.3.x there is no dynamic partition discovery and so when restarting from state we have to strictly stick to what we have in the (operator) state to avoid reading partitions on multiple subtasks.

          If this is true, this also means that folks that have savepoints on 1.3.x cannot them if they want to benefit from this fix, right?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4301 Quick comment for my own clarification: when restoring from a 1.3.x savepoint, the new assignment logic will not be used, right? In Flink 1.3.x there is no dynamic partition discovery and so when restarting from state we have to strictly stick to what we have in the (operator) state to avoid reading partitions on multiple subtasks. If this is true, this also means that folks that have savepoints on 1.3.x cannot them if they want to benefit from this fix, right?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4301#discussion_r126962857

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java —
          @@ -0,0 +1,56 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.kafka.internals;
          +
          +/**
          + * Utility for assigning Kafka partitions to consumer subtasks.
          + */
          +public class KafkaTopicPartitionAssigner {
          +
          + /**
          + * Returns the index of the target subtask that a specific Kafka partition should be
          + * assigned to.
          + *
          + * <p>The resulting distribution of partitions of a single topic has the following contract:
          + * <ul>
          + * <li>1. Uniformly distributed across subtasks</li>
          + * <li>2. Partitions are round-robin distributed (strictly clockwise w.r.t. ascending
          + * subtask indices) by using the partition id as the offset from a starting index
          + * (i.e., the index of the subtask which partition 0 of the topic will be assigned to,
          + * determined using the topic name).</li>
          + * </ul>
          + *
          + * <p>The above contract is crucial and cannot be broken. Consumer subtasks rely on this
          + * contract to locally filter out partitions that it should not subscribe to, guaranteeing
          + * that all partitions of a single topic will always be assigned to some subtask in a
          + * uniformly distributed manner.
          + *
          + * @param partition the Kafka partition
          + * @param numParallelSubtasks total number of parallel subtasks
          + *
          + * @return index of the target subtask that the Kafka partition should be assigned to.
          + */
          + public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
          + int startIndex = Math.abs(partition.getTopic().hashCode() * 31 % numParallelSubtasks);
          — End diff –

          Good catch!

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4301#discussion_r126962857 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java — @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +/** + * Utility for assigning Kafka partitions to consumer subtasks. + */ +public class KafkaTopicPartitionAssigner { + + /** + * Returns the index of the target subtask that a specific Kafka partition should be + * assigned to. + * + * <p>The resulting distribution of partitions of a single topic has the following contract: + * <ul> + * <li>1. Uniformly distributed across subtasks</li> + * <li>2. Partitions are round-robin distributed (strictly clockwise w.r.t. ascending + * subtask indices) by using the partition id as the offset from a starting index + * (i.e., the index of the subtask which partition 0 of the topic will be assigned to, + * determined using the topic name).</li> + * </ul> + * + * <p>The above contract is crucial and cannot be broken. Consumer subtasks rely on this + * contract to locally filter out partitions that it should not subscribe to, guaranteeing + * that all partitions of a single topic will always be assigned to some subtask in a + * uniformly distributed manner. + * + * @param partition the Kafka partition + * @param numParallelSubtasks total number of parallel subtasks + * + * @return index of the target subtask that the Kafka partition should be assigned to. + */ + public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) { + int startIndex = Math.abs(partition.getTopic().hashCode() * 31 % numParallelSubtasks); — End diff – Good catch!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4301#discussion_r126952350

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java —
          @@ -0,0 +1,56 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.kafka.internals;
          +
          +/**
          + * Utility for assigning Kafka partitions to consumer subtasks.
          + */
          +public class KafkaTopicPartitionAssigner {
          +
          + /**
          + * Returns the index of the target subtask that a specific Kafka partition should be
          + * assigned to.
          + *
          + * <p>The resulting distribution of partitions of a single topic has the following contract:
          + * <ul>
          + * <li>1. Uniformly distributed across subtasks</li>
          + * <li>2. Partitions are round-robin distributed (strictly clockwise w.r.t. ascending
          + * subtask indices) by using the partition id as the offset from a starting index
          + * (i.e., the index of the subtask which partition 0 of the topic will be assigned to,
          + * determined using the topic name).</li>
          + * </ul>
          + *
          + * <p>The above contract is crucial and cannot be broken. Consumer subtasks rely on this
          + * contract to locally filter out partitions that it should not subscribe to, guaranteeing
          + * that all partitions of a single topic will always be assigned to some subtask in a
          + * uniformly distributed manner.
          + *
          + * @param partition the Kafka partition
          + * @param numParallelSubtasks total number of parallel subtasks
          + *
          + * @return index of the target subtask that the Kafka partition should be assigned to.
          + */
          + public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
          + int startIndex = Math.abs(partition.getTopic().hashCode() * 31 % numParallelSubtasks);
          — End diff –

          Minor detail: `Math.abs` does not work for `Integer.MIN_VALUE`, so it is slightly safe to do
          ```java
          int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks);
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4301#discussion_r126952350 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java — @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +/** + * Utility for assigning Kafka partitions to consumer subtasks. + */ +public class KafkaTopicPartitionAssigner { + + /** + * Returns the index of the target subtask that a specific Kafka partition should be + * assigned to. + * + * <p>The resulting distribution of partitions of a single topic has the following contract: + * <ul> + * <li>1. Uniformly distributed across subtasks</li> + * <li>2. Partitions are round-robin distributed (strictly clockwise w.r.t. ascending + * subtask indices) by using the partition id as the offset from a starting index + * (i.e., the index of the subtask which partition 0 of the topic will be assigned to, + * determined using the topic name).</li> + * </ul> + * + * <p>The above contract is crucial and cannot be broken. Consumer subtasks rely on this + * contract to locally filter out partitions that it should not subscribe to, guaranteeing + * that all partitions of a single topic will always be assigned to some subtask in a + * uniformly distributed manner. + * + * @param partition the Kafka partition + * @param numParallelSubtasks total number of parallel subtasks + * + * @return index of the target subtask that the Kafka partition should be assigned to. + */ + public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) { + int startIndex = Math.abs(partition.getTopic().hashCode() * 31 % numParallelSubtasks); — End diff – Minor detail: `Math.abs` does not work for `Integer.MIN_VALUE`, so it is slightly safe to do ```java int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks); ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4301

          @StephanEwen thanks for the review. Your suggestion makes a lot of sense.

          I've fixed this up as the following:

          • Have a new method `KafkaTopicAssigner.assign(KafkaTopicPartition partition, int numSubtasks)` that defines a strict contract, such that when locally used to filter out partitions, the resulting distribution of the partitions of a single topic are guaranteed to be:
            1. Uniformly distributed across subtasks
            2. Partitions are round-robin distributed (strictly CLOCKWISE w.r.t. ascending subtask indices) by using the partition id as the offset from a starting index determined using the topic name. The extra directional contract makes this more stricter than what we had before, which we may be round-robin assigning partitions counter-clockwise. This should make the actual assignment scheme much more predictable as perceived by the user, since they just need to know the start index of a specific topic to understand how the partitions of the topic are distributed across subtasks. We could add some log that states the start index of the topics it is consuming.
          • Strengthen the tests in `KafkaConsumerPartitionAssignmentTest` to test this contract. Uniform distribution was already tested in that suite of tests, the change makes the tests also verify the "clockwise round-robin since some start index" contract.
          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4301 @StephanEwen thanks for the review. Your suggestion makes a lot of sense. I've fixed this up as the following: Have a new method `KafkaTopicAssigner.assign(KafkaTopicPartition partition, int numSubtasks)` that defines a strict contract, such that when locally used to filter out partitions, the resulting distribution of the partitions of a single topic are guaranteed to be: 1. Uniformly distributed across subtasks 2. Partitions are round-robin distributed (strictly CLOCKWISE w.r.t. ascending subtask indices) by using the partition id as the offset from a starting index determined using the topic name. The extra directional contract makes this more stricter than what we had before, which we may be round-robin assigning partitions counter-clockwise. This should make the actual assignment scheme much more predictable as perceived by the user, since they just need to know the start index of a specific topic to understand how the partitions of the topic are distributed across subtasks. We could add some log that states the start index of the topics it is consuming. Strengthen the tests in `KafkaConsumerPartitionAssignmentTest` to test this contract. Uniform distribution was already tested in that suite of tests, the change makes the tests also verify the "clockwise round-robin since some start index" contract.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/4301

          I think that would fix the bug. There are two things I would like to improve, though:

          1. Relying on `hashCode()` makes very implicit assumptions about the behavior of the hash code implementation. This does not really document/articulate well how critical this `int` value that we rely on is. For example, by Java specification, hashCode may vary between processes - it only needs to be stable within a single JVM. Our hash code implementation happens to be stable currently, as long as the JDK does not change the implementation of the String hash code method (which they could in theory do in any minor release, although they have not done that in a while).

          2. It is crucial that the distribution of partitions is uniform. That is a bit harder to guarantee when all sources pick up their own set of topics. At the least, distribution should be uniform of the partitions within a topic. For example, the topic defines "where to start" in the parallel subtasks, and the partitions then go "round robin".
          Well, as it happens, this is actually the implementation of the hash code function, but again, this looks a bit like it "coincidentally" behaves like that, rather than that we have a strict contract for that behavior. For example, changing the hashCode from `31 * topic + partition` to `31 * partition + topic` results in non-uniform distribution, but is an equally valid hashCode.

          I would suggest to have a function `int assignmentIndex()` or so, for which we define the above contract. We should also have tests that this distributes partitions within a single topic uniform.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4301 I think that would fix the bug. There are two things I would like to improve, though: 1. Relying on `hashCode()` makes very implicit assumptions about the behavior of the hash code implementation. This does not really document/articulate well how critical this `int` value that we rely on is. For example, by Java specification, hashCode may vary between processes - it only needs to be stable within a single JVM. Our hash code implementation happens to be stable currently, as long as the JDK does not change the implementation of the String hash code method (which they could in theory do in any minor release, although they have not done that in a while). 2. It is crucial that the distribution of partitions is uniform. That is a bit harder to guarantee when all sources pick up their own set of topics. At the least, distribution should be uniform of the partitions within a topic. For example, the topic defines "where to start" in the parallel subtasks, and the partitions then go "round robin". Well, as it happens, this is actually the implementation of the hash code function, but again, this looks a bit like it "coincidentally" behaves like that, rather than that we have a strict contract for that behavior. For example, changing the hashCode from `31 * topic + partition` to `31 * partition + topic` results in non-uniform distribution, but is an equally valid hashCode. I would suggest to have a function `int assignmentIndex()` or so, for which we define the above contract. We should also have tests that this distributes partitions within a single topic uniform.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Steven Zhen Wu Sorting would not work, because dynamically discovered topic and partitions (a feature in Flink 1.4) will potentially break the sorting (although partition id is always ascending, again, we need to consider multiple topics). That’s why it was changed in the first place. If we’re considering a solution only for 1.3 where partition metadata fetching only happens a single time on startup, it would make sense to continue using sorting + round-robin assignment.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Steven Zhen Wu Sorting would not work, because dynamically discovered topic and partitions (a feature in Flink 1.4) will potentially break the sorting (although partition id is always ascending, again, we need to consider multiple topics). That’s why it was changed in the first place. If we’re considering a solution only for 1.3 where partition metadata fetching only happens a single time on startup, it would make sense to continue using sorting + round-robin assignment.
          Hide
          stevenz3wu Steven Zhen Wu added a comment -

          Aljoscha Krettek Tzu-Li (Gordon) Tai agree that partitionId % parallelism is probably not a good idea for multiple topics case. what about the old sorting way? Sort the list by (topic, partition) tuple first. Then do a simple round-robin assignment (mod). Advantage is that it is much easier to see the assignment pattern (comparing to hashCode). That usually can help with debugging: easier to figure out expected assignment.

          Here is an example of 3 topics and parallelism of 4

          [subtask-id] topic, partition
          [0] t1, p0
          [1] t1, p1
          [2] t1, p2
          [3] t2, p0
          [0] t3, p0
          [1] t3, p1
          
          Show
          stevenz3wu Steven Zhen Wu added a comment - Aljoscha Krettek Tzu-Li (Gordon) Tai agree that partitionId % parallelism is probably not a good idea for multiple topics case. what about the old sorting way? Sort the list by (topic, partition) tuple first. Then do a simple round-robin assignment (mod). Advantage is that it is much easier to see the assignment pattern (comparing to hashCode). That usually can help with debugging: easier to figure out expected assignment. Here is an example of 3 topics and parallelism of 4 [subtask-id] topic, partition [0] t1, p0 [1] t1, p1 [2] t1, p2 [3] t2, p0 [0] t3, p0 [1] t3, p1
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/4302

          (master) FLINK-7143 [kafka] Stricter tests for deterministic partition assignment

          This is the forward port of the fix in #4301. The `master` branch does not require a fix, because the new `AbstractPartitionDiscoverer` already correctly uses `partition.hashCode % numTasks` for the partition discovery filtering.

          This PR simply makes the tests for partition assignment more strict, to guard against breaking deterministic assignment in the future.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-7143-master

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4302.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4302



          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/4302 (master) FLINK-7143 [kafka] Stricter tests for deterministic partition assignment This is the forward port of the fix in #4301. The `master` branch does not require a fix, because the new `AbstractPartitionDiscoverer` already correctly uses `partition.hashCode % numTasks` for the partition discovery filtering. This PR simply makes the tests for partition assignment more strict, to guard against breaking deterministic assignment in the future. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-7143 -master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4302.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4302
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4301

          R: @aljoscha @rmetzger

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4301 R: @aljoscha @rmetzger
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/4301

          FLINK-7143 [kafka] Fix indeterminate partition assignment in FlinkKafkaConsumer

          This PR changes the mod operation for partition assignment from `i % numTasks == subtaskIndex` to `partition.hashCode % numTasks == subtaskIndex`.

          The bug was initially caused by #3378, when moving away from sorting the partition list. Apparently, the tests for partition assignment was not strict enough and did not catch this. This PR additionally adds verifications that the partitions end up in the expected subtasks, and that different partition ordering will still have the same partition assignments.

          Note: a fix is not required for the `master` branch, since the partition discovery changes already indirectly fixed the issue. However, test coverage for deterministic assignment should likewise be improved in `master` as well. A separate PR will be opened for that.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-7143-1.3

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4301.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4301


          commit 563f605d00f5d184fce2eb505c59033f22d3d0ab
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-07-11T17:03:01Z

          FLINK-7143 [kafka] Fix indeterminate partition assignment in FlinkKafkaConsumer


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/4301 FLINK-7143 [kafka] Fix indeterminate partition assignment in FlinkKafkaConsumer This PR changes the mod operation for partition assignment from `i % numTasks == subtaskIndex` to `partition.hashCode % numTasks == subtaskIndex`. The bug was initially caused by #3378, when moving away from sorting the partition list. Apparently, the tests for partition assignment was not strict enough and did not catch this. This PR additionally adds verifications that the partitions end up in the expected subtasks, and that different partition ordering will still have the same partition assignments. Note: a fix is not required for the `master` branch, since the partition discovery changes already indirectly fixed the issue. However, test coverage for deterministic assignment should likewise be improved in `master` as well. A separate PR will be opened for that. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-7143 -1.3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4301.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4301 commit 563f605d00f5d184fce2eb505c59033f22d3d0ab Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-07-11T17:03:01Z FLINK-7143 [kafka] Fix indeterminate partition assignment in FlinkKafkaConsumer
          Hide
          aljoscha Aljoscha Krettek added a comment -

          IMHO, if we used partitionId % parallelism in the multi-topics cases we could get bad utilisation. For example, assume we have 10 parallel source instances, we read from two topics, each topic has 5 partitions. Now, if we used partitionId % parallelism each of the firsts 5 source instances would read two partitions (one from each topic) while the lasts 5 source instances would not read any partition. Does that make sense?

          Show
          aljoscha Aljoscha Krettek added a comment - IMHO, if we used partitionId % parallelism in the multi-topics cases we could get bad utilisation. For example, assume we have 10 parallel source instances, we read from two topics, each topic has 5 partitions. Now, if we used partitionId % parallelism each of the firsts 5 source instances would read two partitions (one from each topic) while the lasts 5 source instances would not read any partition. Does that make sense?
          Hide
          stevenz3wu Steven Zhen Wu added a comment -

          I see. change is for support of multiple topics. hashCode() can work as a stable assignment. My only complain is that it makes it non-obvious to see the assignment pattern (especially for single topic case). not critical.

          I don't known if it make sense to do assignment using partitionId % parallelism for each topic in the multiple-topics case?

          Show
          stevenz3wu Steven Zhen Wu added a comment - I see. change is for support of multiple topics. hashCode() can work as a stable assignment. My only complain is that it makes it non-obvious to see the assignment pattern (especially for single topic case). not critical. I don't known if it make sense to do assignment using partitionId % parallelism for each topic in the multiple-topics case?
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          steven zhang yes. One small thing, it would need to be partition.hashCode() % parallelism, since we allow multiple topics and therefore cannot rely only on the partition id.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited steven zhang yes. One small thing, it would need to be partition.hashCode() % parallelism , since we allow multiple topics and therefore cannot rely only on the partition id.
          Hide
          stevenz3wu Steven Zhen Wu added a comment - - edited

          regarding test coverage, we need a test that verify the assignment is partitionId % parallelism. right now, it is totally random and test coverage didn't catch it.

          Show
          stevenz3wu Steven Zhen Wu added a comment - - edited regarding test coverage, we need a test that verify the assignment is partitionId % parallelism . right now, it is totally random and test coverage didn't catch it.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          IIRC, sorting the fetched partition list was removed in favor of using the hashCode of KafkaTopicPartition s for the mod operation. This must have been a remnant from that change ...

          Apparently, the current partition assignment tests do not have enough coverage. We also need a test that verifies assignment stability in the case of different fetched partitions ordering.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited IIRC, sorting the fetched partition list was removed in favor of using the hashCode of KafkaTopicPartition s for the mod operation. This must have been a remnant from that change ... Apparently, the current partition assignment tests do not have enough coverage. We also need a test that verifies assignment stability in the case of different fetched partitions ordering.

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              stevenz3wu Steven Zhen Wu
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development