Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4727

Kafka 0.9 Consumer should also checkpoint auto retrieved offsets even when no data is read

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Resolved
    • Affects Version/s: None
    • Fix Version/s: 1.2.0, 1.1.4
    • Component/s: Kafka Connector
    • Labels:
      None

      Description

      This is basically the 0.9 version counterpart for FLINK-3440.

      When the 0.9 consumer fetches initial offsets from Kafka on startup, but does not have any data to read, it should also checkpoint & commit these initial offsets.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/2585

          FLINK-4727 [kafka-connector] Set missing initial offset states with starting KafkaConsumer position

          This was fixed for the 0.8 consumer before. This PR adds the behaviour to the 0.9 consumer.

          In #2580, there's a new IT test that assures this behaviour for 0.8 and 0.9, but is commented out for 0.9 in #2580 because that test won't pass until the changes in this PR is also included.

          I want to keep the review / discussion of the two issues apart, hence separate PRs. I suggest to merge #2580 first, and then rebase this afterwards to include the test.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-4727

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2585.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2585


          commit d030c3e4230d821d53ddf3377e0910404284300e
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2016-10-04T08:53:38Z

          FLINK-4727 [kafka-connector] Set missing initial offset states with starting KafkaConsumer position

          With this change, on a clean startup of FlinkKafkaConsumer09, the auto retrieved offsets (either earliest, latest, or an actual
          committed offset) from Kafka will also be checkpointed and committed, even if no records are read after the startup.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/2585 FLINK-4727 [kafka-connector] Set missing initial offset states with starting KafkaConsumer position This was fixed for the 0.8 consumer before. This PR adds the behaviour to the 0.9 consumer. In #2580, there's a new IT test that assures this behaviour for 0.8 and 0.9, but is commented out for 0.9 in #2580 because that test won't pass until the changes in this PR is also included. I want to keep the review / discussion of the two issues apart, hence separate PRs. I suggest to merge #2580 first, and then rebase this afterwards to include the test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-4727 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2585.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2585 commit d030c3e4230d821d53ddf3377e0910404284300e Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2016-10-04T08:53:38Z FLINK-4727 [kafka-connector] Set missing initial offset states with starting KafkaConsumer position With this change, on a clean startup of FlinkKafkaConsumer09, the auto retrieved offsets (either earliest, latest, or an actual committed offset) from Kafka will also be checkpointed and committed, even if no records are read after the startup.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2585#discussion_r83138654

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java —
          @@ -204,7 +204,22 @@ public void run() {
          // seek the consumer to the initial offsets
          for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
          if (partition.isOffsetDefined()) {
          + LOG.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; seeking the consumer " +
          + "to position {}", partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1);
          +
          consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
          + } else {
          + // for partitions that do not have offsets restored from a checkpoint/savepoint,
          + // we need to define our internal offset state for them using the initial offsets retrieved from Kafka
          + // by the KafkaConsumer, so that they are correctly checkpointed and committed on the next checkpoint
          +
          + long fetchedOffset = consumer.position(partition.getKafkaPartitionHandle());
          — End diff –

          I just confirmed this with a simple test.
          Yes, when no commit offset is available for the group id, the "auto.offset.reset" behavior is used.
          The "position" basically means which offset the KafkaConsumer will start reading from. It automatically determines the position once it is assigned partitions, either from committed offsets or the "auto.offset.reset" if none exists.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2585#discussion_r83138654 — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java — @@ -204,7 +204,22 @@ public void run() { // seek the consumer to the initial offsets for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) { if (partition.isOffsetDefined()) { + LOG.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; seeking the consumer " + + "to position {}", partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1); + consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1); + } else { + // for partitions that do not have offsets restored from a checkpoint/savepoint, + // we need to define our internal offset state for them using the initial offsets retrieved from Kafka + // by the KafkaConsumer, so that they are correctly checkpointed and committed on the next checkpoint + + long fetchedOffset = consumer.position(partition.getKafkaPartitionHandle()); — End diff – I just confirmed this with a simple test. Yes, when no commit offset is available for the group id, the "auto.offset.reset" behavior is used. The "position" basically means which offset the KafkaConsumer will start reading from. It automatically determines the position once it is assigned partitions, either from committed offsets or the "auto.offset.reset" if none exists.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/2585

          +1 to merge

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2585 +1 to merge
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2585

          Rebasing to include f46ca39 with the IT tests de-commented. Will merge when Travis turns green.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2585 Rebasing to include f46ca39 with the IT tests de-commented. Will merge when Travis turns green.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2585

          The failing tests are unrelated, and related tests are covered and have passed. Merging this ...

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2585 The failing tests are unrelated, and related tests are covered and have passed. Merging this ...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2585

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2585
          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Resolved for master via http://git-wip-us.apache.org/repos/asf/flink/commit/e4343ba

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development