Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6006

Kafka Consumer can lose state if queried partition list is incomplete on restore

    Details

      Description

      In 1.1.x and 1.2.x, the FlinkKafkaConsumer performs partition list querying on restore. Then, only restored state of partitions that exists in the queried list is used to initialize the fetcher's state holders.

      If in any case the returned partition list is incomplete (i.e. missing partitions that existed before, perhaps due to temporary ZK / broker downtime), then the state of the missing partitions is dropped and cannot be recovered anymore.

      In 1.3-SNAPSHOT, this is fixed by changes in FLINK-4280, so only 1.1 and 1.2 is affected.

      We can backport some of the behavioural changes there to 1.1 and 1.2. Generally, we should not depend on the current partition list in Kafka when restoring, but just restore all previous state into the fetcher's state holders.

      This would therefore also require some checking on how the consumer threads / Kafka clients behave when its assigned partitions cannot be reached.

        Issue Links

          Activity

          Hide
          gyfora Gyula Fora added a comment -

          Don't be sorry, there is a high probability that I did something bad, or our kafka is screwed up in weird ways Adding some logging for this (maybe Info level) would probably help if this comes up again. Thanks for all the work

          Show
          gyfora Gyula Fora added a comment - Don't be sorry, there is a high probability that I did something bad, or our kafka is screwed up in weird ways Adding some logging for this (maybe Info level) would probably help if this comes up again. Thanks for all the work
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Gyula Fora no problem. Here's how I'll proceed with this:

          I'll try to add some logs into the Kafka Consumer in 1.2.1 so that if this issue pops up again, there'll at least be good logs to serve as the ground for debugging.
          I'll perhaps do another thorough check before creating the candidate for 1.2.1 to see where this could have gone wrong, but as I have mentioned from my earlier check, right now I can't really find anything. If there is still a bug and we only manage to solve it after 1.2.1, we could always push out 1.2.2 asap.

          Really sorry you had to cope with this.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Gyula Fora no problem. Here's how I'll proceed with this: I'll try to add some logs into the Kafka Consumer in 1.2.1 so that if this issue pops up again, there'll at least be good logs to serve as the ground for debugging. I'll perhaps do another thorough check before creating the candidate for 1.2.1 to see where this could have gone wrong, but as I have mentioned from my earlier check, right now I can't really find anything. If there is still a bug and we only manage to solve it after 1.2.1, we could always push out 1.2.2 asap. Really sorry you had to cope with this.
          Hide
          gyfora Gyula Fora added a comment -

          Unfortunately I only have the JM log and that doesnt seem to contain any relevant information

          Show
          gyfora Gyula Fora added a comment - Unfortunately I only have the JM log and that doesnt seem to contain any relevant information
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Gyula Fora that's very odd then .. I did a double check on the code, but currently couldn't really pinpoint what else could have gone wrong.

          Could you perhaps post a relevant part of the log? That might be helpful.
          Ideally, that would be the parts where the consumer was started for the first time and initially picks up partitions, and also after restore. You can also DM me (tzulitai@apache.org) if you prefer to share it privately.

          I'll try to make sure if there's any more issues on this before 1.2.1.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Gyula Fora that's very odd then .. I did a double check on the code, but currently couldn't really pinpoint what else could have gone wrong. Could you perhaps post a relevant part of the log? That might be helpful. Ideally, that would be the parts where the consumer was started for the first time and initially picks up partitions, and also after restore. You can also DM me (tzulitai@apache.org) if you prefer to share it privately. I'll try to make sure if there's any more issues on this before 1.2.1.
          Hide
          gyfora Gyula Fora added a comment -

          I tried restarting the consumer when I was sure that the brokers have completely recovered and that still didnt help.

          Show
          gyfora Gyula Fora added a comment - I tried restarting the consumer when I was sure that the brokers have completely recovered and that still didnt help.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          Hi Gyula Fora,

          After the restore, could you find any logs like the following:
          Unable to reach broker after ... retries. Returning all current partitions?
          This log should be at WARN level.

          I'm guessing that the 0.8 consumer will continue to attempt consuming an unreachable broker even after all retries, in that case this message should be popping up non-stop for the brokers that are down. So, the complete state is restored (including the partitions which couldn't be reached due to downtime), but the 0.8 consumer simply is not failing on the unreachable brokers when it tries to consume them. This shouldn't cause the consumer state to be broken like before, though; you'll simply see that the offsets of the unreachable partitions won't be advancing in the consumer's state.

          Also, if DEBUG level happens to be on, could you also find:

          • Setting restore state in the FlinkKafkaConsumer.
          • Using the following offsets: ...
            The partitions you see in the second message should be the complete partition list for each subtask
          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited Hi Gyula Fora , After the restore, could you find any logs like the following: Unable to reach broker after ... retries. Returning all current partitions ? This log should be at WARN level. I'm guessing that the 0.8 consumer will continue to attempt consuming an unreachable broker even after all retries, in that case this message should be popping up non-stop for the brokers that are down. So, the complete state is restored (including the partitions which couldn't be reached due to downtime), but the 0.8 consumer simply is not failing on the unreachable brokers when it tries to consume them. This shouldn't cause the consumer state to be broken like before, though; you'll simply see that the offsets of the unreachable partitions won't be advancing in the consumer's state. Also, if DEBUG level happens to be on, could you also find: Setting restore state in the FlinkKafkaConsumer. Using the following offsets: ... The partitions you see in the second message should be the complete partition list for each subtask
          Hide
          gyfora Gyula Fora added a comment -

          I have a slight feeling that this didnt completely solve the issue.

          I saw this occur again today using the 0.8 connector from the 1.2 branch built with the fix.

          Show
          gyfora Gyula Fora added a comment - I have a slight feeling that this didnt completely solve the issue. I saw this occur again today using the 0.8 connector from the 1.2 branch built with the fix.
          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Fixed for 1.2.1 with http://git-wip-us.apache.org/repos/asf/flink/commit/24306ad . Fixed for 1.1.5 with http://git-wip-us.apache.org/repos/asf/flink/commit/e296aca .
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai closed the pull request at:

          https://github.com/apache/flink/pull/3507

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai closed the pull request at: https://github.com/apache/flink/pull/3507
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3507

          Merging ..

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3507 Merging ..
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3507

          Doing another Travis run locally before merging just to be safe:
          https://travis-ci.org/tzulitai/flink/builds/211031758

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3507 Doing another Travis run locally before merging just to be safe: https://travis-ci.org/tzulitai/flink/builds/211031758
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai closed the pull request at:

          https://github.com/apache/flink/pull/3505

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai closed the pull request at: https://github.com/apache/flink/pull/3505
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3505

          Merging ..

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3505 Merging ..
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3505#discussion_r105869364

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java —
          @@ -194,14 +194,29 @@ protected AbstractFetcher(

          /**

          • Restores the partition offsets.
            + * The partitions in the provided map of restored partitions to offsets must completely match
            + * the fetcher's subscribed partitions.
          • * @param snapshotState The offsets for the partitions
            + * @param restoredOffsets The restored offsets for the partitions
            + *
            + * @throws IllegalStateException if the partitions in the provided restored offsets map
            + * cannot completely match the fetcher's subscribed partitions.
            */
          • public void restoreOffsets(Map<KafkaTopicPartition, Long> snapshotState) {
          • for (KafkaTopicPartitionState<?> partition : allPartitions) {
          • Long offset = snapshotState.get(partition.getKafkaTopicPartition());
          • if (offset != null) {
          • partition.setOffset(offset);
            + public void restoreOffsets(Map<KafkaTopicPartition, Long> restoredOffsets) {
            + if (restoredOffsets.size() != allPartitions.length) {
            + throw new IllegalStateException(
            + "The fetcher was restored with partition offsets that do not " +
            + "match with the subscribed partitions: " + restoredOffsets);
              • End diff –

          I see. Thank you.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3505#discussion_r105869364 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java — @@ -194,14 +194,29 @@ protected AbstractFetcher( /** Restores the partition offsets. + * The partitions in the provided map of restored partitions to offsets must completely match + * the fetcher's subscribed partitions. * @param snapshotState The offsets for the partitions + * @param restoredOffsets The restored offsets for the partitions + * + * @throws IllegalStateException if the partitions in the provided restored offsets map + * cannot completely match the fetcher's subscribed partitions. */ public void restoreOffsets(Map<KafkaTopicPartition, Long> snapshotState) { for (KafkaTopicPartitionState<?> partition : allPartitions) { Long offset = snapshotState.get(partition.getKafkaTopicPartition()); if (offset != null) { partition.setOffset(offset); + public void restoreOffsets(Map<KafkaTopicPartition, Long> restoredOffsets) { + if (restoredOffsets.size() != allPartitions.length) { + throw new IllegalStateException( + "The fetcher was restored with partition offsets that do not " + + "match with the subscribed partitions: " + restoredOffsets); End diff – I see. Thank you.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/3505

          +1 to merge

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3505 +1 to merge
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3505#discussion_r105860440

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java —
          @@ -489,16 +486,15 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
          // Utilities
          // ------------------------------------------------------------------------

          • private void assignTopicPartitions(List<KafkaTopicPartition> kafkaTopicPartitions) {
          • subscribedPartitions = new ArrayList<>();
            -
            + private void assignTopicPartitions() {
            if (restoreToOffset != null) {
          • for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {
          • if (restoreToOffset.containsKey(kafkaTopicPartition)) { - subscribedPartitions.add(kafkaTopicPartition); - }

            + subscribedPartitions = new ArrayList<>(restoreToOffset.size());
            + for (Map.Entry<KafkaTopicPartition, Long> restoredPartitionState : restoreToOffset.entrySet()) {
            + subscribedPartitions.add(restoredPartitionState.getKey());

              • End diff –

          (note about your comment --->) subscribed partitions will always be completely identical to the restored state, if there is any.

          I should actually just change this to `subscribedPartitions = new ArrayList<>(restoreToOffset.keySet);`, to be more clearer.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3505#discussion_r105860440 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java — @@ -489,16 +486,15 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { // Utilities // ------------------------------------------------------------------------ private void assignTopicPartitions(List<KafkaTopicPartition> kafkaTopicPartitions) { subscribedPartitions = new ArrayList<>(); - + private void assignTopicPartitions() { if (restoreToOffset != null) { for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) { if (restoreToOffset.containsKey(kafkaTopicPartition)) { - subscribedPartitions.add(kafkaTopicPartition); - } + subscribedPartitions = new ArrayList<>(restoreToOffset.size()); + for (Map.Entry<KafkaTopicPartition, Long> restoredPartitionState : restoreToOffset.entrySet()) { + subscribedPartitions.add(restoredPartitionState.getKey()); End diff – (note about your comment --->) subscribed partitions will always be completely identical to the restored state, if there is any. I should actually just change this to `subscribedPartitions = new ArrayList<>(restoreToOffset.keySet);`, to be more clearer.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3505#discussion_r105860279

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java —
          @@ -194,14 +194,29 @@ protected AbstractFetcher(

          /**

          • Restores the partition offsets.
            + * The partitions in the provided map of restored partitions to offsets must completely match
            + * the fetcher's subscribed partitions.
          • * @param snapshotState The offsets for the partitions
            + * @param restoredOffsets The restored offsets for the partitions
            + *
            + * @throws IllegalStateException if the partitions in the provided restored offsets map
            + * cannot completely match the fetcher's subscribed partitions.
            */
          • public void restoreOffsets(Map<KafkaTopicPartition, Long> snapshotState) {
          • for (KafkaTopicPartitionState<?> partition : allPartitions) {
          • Long offset = snapshotState.get(partition.getKafkaTopicPartition());
          • if (offset != null) {
          • partition.setOffset(offset);
            + public void restoreOffsets(Map<KafkaTopicPartition, Long> restoredOffsets) {
            + if (restoredOffsets.size() != allPartitions.length) {
            + throw new IllegalStateException(
            + "The fetcher was restored with partition offsets that do not " +
            + "match with the subscribed partitions: " + restoredOffsets);
              • End diff –

          This would not happen with the changes of this PR.

          In `open()`, I've set the `subscribedPartitions` to be exactly the same as the restored partition states. There is no filtering anymore. The `allPartitions` here is basically just the same list, but in their state holder form.

          The condition checks exists simply because "setting the fetcher's subscribed partitions" and "restoring start offsets" is 2 separate calls (the former is passed in through the fetcher's constructor, while the latter is provided through the `restoreOffsets` method). I added these checks just to make the fetcher code more self-contained. These exceptions should actually never occur.

          I agree this might be a bit confusing for the code reader. In the recent refactorings in `master`, the fetcher's subscribed partitions and start offsets (regardless of if it's a restore or fresh start) setup procedure is more atomic and less confusing in this aspect.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3505#discussion_r105860279 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java — @@ -194,14 +194,29 @@ protected AbstractFetcher( /** Restores the partition offsets. + * The partitions in the provided map of restored partitions to offsets must completely match + * the fetcher's subscribed partitions. * @param snapshotState The offsets for the partitions + * @param restoredOffsets The restored offsets for the partitions + * + * @throws IllegalStateException if the partitions in the provided restored offsets map + * cannot completely match the fetcher's subscribed partitions. */ public void restoreOffsets(Map<KafkaTopicPartition, Long> snapshotState) { for (KafkaTopicPartitionState<?> partition : allPartitions) { Long offset = snapshotState.get(partition.getKafkaTopicPartition()); if (offset != null) { partition.setOffset(offset); + public void restoreOffsets(Map<KafkaTopicPartition, Long> restoredOffsets) { + if (restoredOffsets.size() != allPartitions.length) { + throw new IllegalStateException( + "The fetcher was restored with partition offsets that do not " + + "match with the subscribed partitions: " + restoredOffsets); End diff – This would not happen with the changes of this PR. In `open()`, I've set the `subscribedPartitions` to be exactly the same as the restored partition states. There is no filtering anymore. The `allPartitions` here is basically just the same list, but in their state holder form. The condition checks exists simply because "setting the fetcher's subscribed partitions" and "restoring start offsets" is 2 separate calls (the former is passed in through the fetcher's constructor, while the latter is provided through the `restoreOffsets` method). I added these checks just to make the fetcher code more self-contained. These exceptions should actually never occur. I agree this might be a bit confusing for the code reader. In the recent refactorings in `master`, the fetcher's subscribed partitions and start offsets (regardless of if it's a restore or fresh start) setup procedure is more atomic and less confusing in this aspect.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3505#discussion_r105851319

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java —
          @@ -194,14 +194,29 @@ protected AbstractFetcher(

          /**

          • Restores the partition offsets.
            + * The partitions in the provided map of restored partitions to offsets must completely match
            + * the fetcher's subscribed partitions.
          • * @param snapshotState The offsets for the partitions
            + * @param restoredOffsets The restored offsets for the partitions
            + *
            + * @throws IllegalStateException if the partitions in the provided restored offsets map
            + * cannot completely match the fetcher's subscribed partitions.
            */
          • public void restoreOffsets(Map<KafkaTopicPartition, Long> snapshotState) {
          • for (KafkaTopicPartitionState<?> partition : allPartitions) {
          • Long offset = snapshotState.get(partition.getKafkaTopicPartition());
          • if (offset != null) {
          • partition.setOffset(offset);
            + public void restoreOffsets(Map<KafkaTopicPartition, Long> restoredOffsets) {
            + if (restoredOffsets.size() != allPartitions.length) {
            + throw new IllegalStateException(
            + "The fetcher was restored with partition offsets that do not " +
            + "match with the subscribed partitions: " + restoredOffsets);
              • End diff –

          I'm not sure if this check is too strict. I think its easily possible in Kafka to add new partitions to a topic.
          If a user does this, they can never restore their Kafka consumers.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3505#discussion_r105851319 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java — @@ -194,14 +194,29 @@ protected AbstractFetcher( /** Restores the partition offsets. + * The partitions in the provided map of restored partitions to offsets must completely match + * the fetcher's subscribed partitions. * @param snapshotState The offsets for the partitions + * @param restoredOffsets The restored offsets for the partitions + * + * @throws IllegalStateException if the partitions in the provided restored offsets map + * cannot completely match the fetcher's subscribed partitions. */ public void restoreOffsets(Map<KafkaTopicPartition, Long> snapshotState) { for (KafkaTopicPartitionState<?> partition : allPartitions) { Long offset = snapshotState.get(partition.getKafkaTopicPartition()); if (offset != null) { partition.setOffset(offset); + public void restoreOffsets(Map<KafkaTopicPartition, Long> restoredOffsets) { + if (restoredOffsets.size() != allPartitions.length) { + throw new IllegalStateException( + "The fetcher was restored with partition offsets that do not " + + "match with the subscribed partitions: " + restoredOffsets); End diff – I'm not sure if this check is too strict. I think its easily possible in Kafka to add new partitions to a topic. If a user does this, they can never restore their Kafka consumers.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/3507

          FLINK-6006 [kafka] Always use complete restored state in FlinkKafkaConsumer

          (This PR is the fix of FLINK-6006 for Flink 1.1)

          Previously, the Kafka Consumer performs partition list querying on
          restore, and then uses it to filter out restored state of partitions
          that doesn't exist in the list.

          If in any case the returned partitions list is incomplete (i.e. missing
          partitions that existed before perhaps due to temporary ZK / broker
          downtimes), then the state of the missing partitions is dropped and
          cannot be recovered anymore.

          This PR fixes this by always restoring the complete state, without
          any sort of filtering. We simply let the consumer fail if assigned
          partitions to the consuming threads / Kafka clients are unreachable when
          the consumer starts running.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-6006-1.1

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3507.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3507


          commit a57524bfb0158363be9a5bd4a6f18e053d96a030
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-03-10T06:47:57Z

          FLINK-6006 [kafka] Always use complete restored state in FlinkKafkaConsumer

          Previously, the Kafka Consumer performs partition list querying on
          restore, and then uses it to filter out restored state of partitions
          that doesn't exist in the list.

          If in any case the returned partitions list is incomplete (i.e. missing
          partitions that existed before perhaps due to temporary ZK / broker
          downtimes), then the state of the missing partitions is dropped and
          cannot be recovered anymore.

          This commit fixes this by always restoring the complete state, without
          any sort of filtering. We simply let the consumer fail if assigned
          partitions to the consuming threads / Kafka clients are unreachable when
          the consumer starts running.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/3507 FLINK-6006 [kafka] Always use complete restored state in FlinkKafkaConsumer (This PR is the fix of FLINK-6006 for Flink 1.1) Previously, the Kafka Consumer performs partition list querying on restore, and then uses it to filter out restored state of partitions that doesn't exist in the list. If in any case the returned partitions list is incomplete (i.e. missing partitions that existed before perhaps due to temporary ZK / broker downtimes), then the state of the missing partitions is dropped and cannot be recovered anymore. This PR fixes this by always restoring the complete state, without any sort of filtering. We simply let the consumer fail if assigned partitions to the consuming threads / Kafka clients are unreachable when the consumer starts running. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-6006 -1.1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3507.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3507 commit a57524bfb0158363be9a5bd4a6f18e053d96a030 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-03-10T06:47:57Z FLINK-6006 [kafka] Always use complete restored state in FlinkKafkaConsumer Previously, the Kafka Consumer performs partition list querying on restore, and then uses it to filter out restored state of partitions that doesn't exist in the list. If in any case the returned partitions list is incomplete (i.e. missing partitions that existed before perhaps due to temporary ZK / broker downtimes), then the state of the missing partitions is dropped and cannot be recovered anymore. This commit fixes this by always restoring the complete state, without any sort of filtering. We simply let the consumer fail if assigned partitions to the consuming threads / Kafka clients are unreachable when the consumer starts running.
          Hide
          gyfora Gyula Fora added a comment -

          Thank you for looking into this!

          Show
          gyfora Gyula Fora added a comment - Thank you for looking into this!

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development