Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-3123

Allow setting custom start-offsets for the Kafka consumer

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.0.0, 1.3.0
    • Component/s: Kafka Connector
    • Labels:
      None

      Description

      Currently, the Kafka consumer only allows to start reading from the earliest available offset or the current offset.
      Sometimes, users want to set a specific start offset themselves.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/2687

          FLINK-3123 [kafka] Allow custom specific start offsets for Kafka consumers

          This PR is based on #2509, so only the last commit is relevant.

          With this change, users can now specify specific start offsets for Kafka consumers like this:
          ```
          Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
          specificStartOffsets.put(new KafkaTopicPartition("topic", 0), 23L);
          specificStartOffsets.put(new KafkaTopicPartition("topic", 1), 31L);
          specificStartOffsets.put(new KafkaTopicPartition("topic", 2), 43L);

          FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(...);
          consumer.setStartFromSpecificOffsets(specificStartOffsets);
          ...
          ```

          If a subscribed partition is not defined a specific offset (does not have a corresponding entry in the `specificStartOffsets` map), then the startup behaviour for that particular partition fallbacks to the default group offset behaviour (look for offset in ZK / Kafka for that partition, or use "auto.offset.reset" if none can be found).

          An IT test `runStartFromSpecificOffsets()` is added for this functionality, however is currently only enabled on the Kafka 0.8 consumer, because 0.9 and 0.10 tests have the same Kafka config problem mentioned in #2509. So, for now, for versions 0.9 and 0.10, I have only manually tested this new functionality, and it works correctly as described above.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-3123

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2687.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2687


          commit eca9043ebd63ea201b14b129ce08a9f3ee78c49c
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2016-09-17T13:41:50Z

          FLINK-4280[kafka] Explicit start position configuration for Kafka Consumer

          commit 0703469e1880daa63bc5e92bae9920573659806d
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2016-10-23T08:55:58Z

          FLINK-4280 Allow Kafka 0.10 to override 0.10-specific API calls

          Methods calls to `seekToBeginning` and `seekToEnd` have breaking APIs
          across 0.9 and 0.10, causing 0.10 IT tests to fail.

          commit bdf6b76eb86c3d6a4a0bb84ab26beb13e84526b1
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2016-10-24T05:24:10Z

          FLINK-4280 Add IT tests for explicit start position configuration

          commit d8f5f976ef2e2e1d994a45468d7e9ef3b8bf0015
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2016-10-24T08:57:05Z

          FLINK-4280 Add documentation for the new explicit start position methods

          commit 098360fc797b78f15917aaf1b22d09c06a4d0a6c
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2016-10-24T08:08:18Z

          FLINK-3123 [kafka] Allow custom specific start offsets for Kafka consumers


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/2687 FLINK-3123 [kafka] Allow custom specific start offsets for Kafka consumers This PR is based on #2509, so only the last commit is relevant. With this change, users can now specify specific start offsets for Kafka consumers like this: ``` Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>(); specificStartOffsets.put(new KafkaTopicPartition("topic", 0), 23L); specificStartOffsets.put(new KafkaTopicPartition("topic", 1), 31L); specificStartOffsets.put(new KafkaTopicPartition("topic", 2), 43L); FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(...); consumer.setStartFromSpecificOffsets(specificStartOffsets); ... ``` If a subscribed partition is not defined a specific offset (does not have a corresponding entry in the `specificStartOffsets` map), then the startup behaviour for that particular partition fallbacks to the default group offset behaviour (look for offset in ZK / Kafka for that partition, or use "auto.offset.reset" if none can be found). An IT test `runStartFromSpecificOffsets()` is added for this functionality, however is currently only enabled on the Kafka 0.8 consumer, because 0.9 and 0.10 tests have the same Kafka config problem mentioned in #2509. So, for now, for versions 0.9 and 0.10, I have only manually tested this new functionality, and it works correctly as described above. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-3123 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2687.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2687 commit eca9043ebd63ea201b14b129ce08a9f3ee78c49c Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2016-09-17T13:41:50Z FLINK-4280 [kafka] Explicit start position configuration for Kafka Consumer commit 0703469e1880daa63bc5e92bae9920573659806d Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2016-10-23T08:55:58Z FLINK-4280 Allow Kafka 0.10 to override 0.10-specific API calls Methods calls to `seekToBeginning` and `seekToEnd` have breaking APIs across 0.9 and 0.10, causing 0.10 IT tests to fail. commit bdf6b76eb86c3d6a4a0bb84ab26beb13e84526b1 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2016-10-24T05:24:10Z FLINK-4280 Add IT tests for explicit start position configuration commit d8f5f976ef2e2e1d994a45468d7e9ef3b8bf0015 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2016-10-24T08:57:05Z FLINK-4280 Add documentation for the new explicit start position methods commit 098360fc797b78f15917aaf1b22d09c06a4d0a6c Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2016-10-24T08:08:18Z FLINK-3123 [kafka] Allow custom specific start offsets for Kafka consumers
          Hide
          juho.autio.r Juho Autio added a comment -

          Nice one, looking forward to this!

          If a subscribed partition is not defined a specific offset (does not have a corresponding entry in the specificStartOffsets map), then the startup behaviour for that particular partition fallbacks to the default group offset behaviour (look for offset in ZK / Kafka for that partition, or use "auto.offset.reset" if none can be found).

          My use case is skipping to the latest offsets while keeping the same consumer group id. What happens if I define all topics & partitions with offset -1? Because that's the special value for fetching the latest offset (and -2 is earliest). I wonder if there's some code that translates -1 to 0 or -2, which would be a problem.

          Even better in my case would be that I can just say: consumer.setStartFromOffset(-1) which would automatically discover all partitions for all topics and initialize the offsets by fetching from Kafka with "current offset" -1. It would be nice if this reset the offsets even when a Flink snapshot state exists with some offset positions (I suppose that's how your setStartFromSpecificOffsets works any way).

          Show
          juho.autio.r Juho Autio added a comment - Nice one, looking forward to this! If a subscribed partition is not defined a specific offset (does not have a corresponding entry in the specificStartOffsets map), then the startup behaviour for that particular partition fallbacks to the default group offset behaviour (look for offset in ZK / Kafka for that partition, or use "auto.offset.reset" if none can be found). My use case is skipping to the latest offsets while keeping the same consumer group id. What happens if I define all topics & partitions with offset -1? Because that's the special value for fetching the latest offset (and -2 is earliest). I wonder if there's some code that translates -1 to 0 or -2, which would be a problem. Even better in my case would be that I can just say: consumer.setStartFromOffset(-1) which would automatically discover all partitions for all topics and initialize the offsets by fetching from Kafka with "current offset" -1. It would be nice if this reset the offsets even when a Flink snapshot state exists with some offset positions (I suppose that's how your setStartFromSpecificOffsets works any way).
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          Hi Juho Autio,

          My use case is skipping to the latest offsets while keeping the same consumer group id.

          That's actually also addressed in FLINK-4280, which also has a PR pending (the PR for this issue is actually based on that one). With FLINK-4280, you'd call consumer.setStartFromEarliest() or consumer.setStartFromLatest() which completely ignores any committed offsets in Kafka on the initial startup, but the consumer group id is still used if you have configured to commit offsets back to Kafka. So, the proposed consumer.setStartFromSpecificOffsets(...) is basically a third startup option for max flexibility.

          It would be nice if this reset the offsets even when a Flink snapshot state exists with some offset positions.

          Flink state snapshots are only used on automatic restored checkpoints from failures, or from manual savepoint start. So, on a fresh job startup, the startup position can be freely configured using the new proposed methods.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited Hi Juho Autio , My use case is skipping to the latest offsets while keeping the same consumer group id. That's actually also addressed in FLINK-4280 , which also has a PR pending (the PR for this issue is actually based on that one). With FLINK-4280 , you'd call consumer.setStartFromEarliest() or consumer.setStartFromLatest() which completely ignores any committed offsets in Kafka on the initial startup, but the consumer group id is still used if you have configured to commit offsets back to Kafka. So, the proposed consumer.setStartFromSpecificOffsets(...) is basically a third startup option for max flexibility. It would be nice if this reset the offsets even when a Flink snapshot state exists with some offset positions. Flink state snapshots are only used on automatic restored checkpoints from failures, or from manual savepoint start. So, on a fresh job startup, the startup position can be freely configured using the new proposed methods.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/2687

          I'll review this PR once https://github.com/apache/flink/pull/2509 has been merged.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2687 I'll review this PR once https://github.com/apache/flink/pull/2509 has been merged.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2687

          Rebasing this PR now ..

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2687 Rebasing this PR now ..
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2687

          I recommend reviewing this after #3378 is merged, as it touches a lot of code related to the new start position features.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2687 I recommend reviewing this after #3378 is merged, as it touches a lot of code related to the new start position features.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2687

          Rebased on the latest Kafka consumer changes in `master`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2687 Rebased on the latest Kafka consumer changes in `master`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2687#discussion_r105621324

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java —
          @@ -558,11 +629,31 @@ protected static void initializeSubscribedPartitionsToStartOffsets(
          List<KafkaTopicPartition> kafkaTopicPartitions,
          int indexOfThisSubtask,
          int numParallelSubtasks,

          • StartupMode startupMode) {
            + StartupMode startupMode,
            + Map<KafkaTopicPartition, Long> specificStartupOffsets) {

          for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
          if (i % numParallelSubtasks == indexOfThisSubtask) {

          • subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get, startupMode.getStateSentinel());
            + if (startupMode != StartupMode.SPECIFIC_OFFSETS) { + subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel()); + }

            else {
            + if (specificStartupOffsets == null)

            { + throw new IllegalArgumentException( + "Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS + + ", but no specific offsets were specified"); + }

            +
            + KafkaTopicPartition partition = kafkaTopicPartitions.get;
            +
            + Long specificOffset = specificStartupOffsets.get(partition);

              • End diff –

          What happens when the offset is negative or any other invalid number?
          Is that handled by the individual fetchers?

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2687#discussion_r105621324 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java — @@ -558,11 +629,31 @@ protected static void initializeSubscribedPartitionsToStartOffsets( List<KafkaTopicPartition> kafkaTopicPartitions, int indexOfThisSubtask, int numParallelSubtasks, StartupMode startupMode) { + StartupMode startupMode, + Map<KafkaTopicPartition, Long> specificStartupOffsets) { for (int i = 0; i < kafkaTopicPartitions.size(); i++) { if (i % numParallelSubtasks == indexOfThisSubtask) { subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get , startupMode.getStateSentinel()); + if (startupMode != StartupMode.SPECIFIC_OFFSETS) { + subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel()); + } else { + if (specificStartupOffsets == null) { + throw new IllegalArgumentException( + "Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS + + ", but no specific offsets were specified"); + } + + KafkaTopicPartition partition = kafkaTopicPartitions.get ; + + Long specificOffset = specificStartupOffsets.get(partition); End diff – What happens when the offset is negative or any other invalid number? Is that handled by the individual fetchers?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2687#discussion_r105621449

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java —
          @@ -558,11 +629,31 @@ protected static void initializeSubscribedPartitionsToStartOffsets(
          List<KafkaTopicPartition> kafkaTopicPartitions,
          int indexOfThisSubtask,
          int numParallelSubtasks,

          • StartupMode startupMode) {
            + StartupMode startupMode,
            + Map<KafkaTopicPartition, Long> specificStartupOffsets) {

          for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
          if (i % numParallelSubtasks == indexOfThisSubtask) {

          • subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get, startupMode.getStateSentinel());
            + if (startupMode != StartupMode.SPECIFIC_OFFSETS) { + subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel()); + }

            else {
            + if (specificStartupOffsets == null)

            { + throw new IllegalArgumentException( + "Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS + + ", but no specific offsets were specified"); + }

            +
            + KafkaTopicPartition partition = kafkaTopicPartitions.get;
            +
            + Long specificOffset = specificStartupOffsets.get(partition);
            + if (specificOffset != null) {
            + // since the specified offsets represent the next record to read, we subtract
            + // it by one so that the initial state of the consumer will be correct
            + subscribedPartitionsToStartOffsets.put(partition, specificOffset - 1);

              • End diff –

          What happens if the user sets the offset to 0 because they assume that's the start? Will it fail, or fallback to auto offset reset?

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2687#discussion_r105621449 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java — @@ -558,11 +629,31 @@ protected static void initializeSubscribedPartitionsToStartOffsets( List<KafkaTopicPartition> kafkaTopicPartitions, int indexOfThisSubtask, int numParallelSubtasks, StartupMode startupMode) { + StartupMode startupMode, + Map<KafkaTopicPartition, Long> specificStartupOffsets) { for (int i = 0; i < kafkaTopicPartitions.size(); i++) { if (i % numParallelSubtasks == indexOfThisSubtask) { subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get , startupMode.getStateSentinel()); + if (startupMode != StartupMode.SPECIFIC_OFFSETS) { + subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel()); + } else { + if (specificStartupOffsets == null) { + throw new IllegalArgumentException( + "Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS + + ", but no specific offsets were specified"); + } + + KafkaTopicPartition partition = kafkaTopicPartitions.get ; + + Long specificOffset = specificStartupOffsets.get(partition); + if (specificOffset != null) { + // since the specified offsets represent the next record to read, we subtract + // it by one so that the initial state of the consumer will be correct + subscribedPartitionsToStartOffsets.put(partition, specificOffset - 1); End diff – What happens if the user sets the offset to 0 because they assume that's the start? Will it fail, or fallback to auto offset reset?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2687#discussion_r105687880

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java —
          @@ -558,11 +629,31 @@ protected static void initializeSubscribedPartitionsToStartOffsets(
          List<KafkaTopicPartition> kafkaTopicPartitions,
          int indexOfThisSubtask,
          int numParallelSubtasks,

          • StartupMode startupMode) {
            + StartupMode startupMode,
            + Map<KafkaTopicPartition, Long> specificStartupOffsets) {

          for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
          if (i % numParallelSubtasks == indexOfThisSubtask) {

          • subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get, startupMode.getStateSentinel());
            + if (startupMode != StartupMode.SPECIFIC_OFFSETS) { + subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel()); + }

            else {
            + if (specificStartupOffsets == null)

            { + throw new IllegalArgumentException( + "Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS + + ", but no specific offsets were specified"); + }

            +
            + KafkaTopicPartition partition = kafkaTopicPartitions.get;
            +
            + Long specificOffset = specificStartupOffsets.get(partition);

              • End diff –

          Yes, that is handled according to the `auto.offset.reset` setting, automatically by the Kafka clients (except of 0.8, which we need to do the dirty work ourselves, but that logic is already implemented before).

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2687#discussion_r105687880 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java — @@ -558,11 +629,31 @@ protected static void initializeSubscribedPartitionsToStartOffsets( List<KafkaTopicPartition> kafkaTopicPartitions, int indexOfThisSubtask, int numParallelSubtasks, StartupMode startupMode) { + StartupMode startupMode, + Map<KafkaTopicPartition, Long> specificStartupOffsets) { for (int i = 0; i < kafkaTopicPartitions.size(); i++) { if (i % numParallelSubtasks == indexOfThisSubtask) { subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get , startupMode.getStateSentinel()); + if (startupMode != StartupMode.SPECIFIC_OFFSETS) { + subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel()); + } else { + if (specificStartupOffsets == null) { + throw new IllegalArgumentException( + "Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS + + ", but no specific offsets were specified"); + } + + KafkaTopicPartition partition = kafkaTopicPartitions.get ; + + Long specificOffset = specificStartupOffsets.get(partition); End diff – Yes, that is handled according to the `auto.offset.reset` setting, automatically by the Kafka clients (except of 0.8, which we need to do the dirty work ourselves, but that logic is already implemented before).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2687#discussion_r105688259

          — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java —
          @@ -558,11 +629,31 @@ protected static void initializeSubscribedPartitionsToStartOffsets(
          List<KafkaTopicPartition> kafkaTopicPartitions,
          int indexOfThisSubtask,
          int numParallelSubtasks,

          • StartupMode startupMode) {
            + StartupMode startupMode,
            + Map<KafkaTopicPartition, Long> specificStartupOffsets) {

          for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
          if (i % numParallelSubtasks == indexOfThisSubtask) {

          • subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get, startupMode.getStateSentinel());
            + if (startupMode != StartupMode.SPECIFIC_OFFSETS) { + subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel()); + }

            else {
            + if (specificStartupOffsets == null)

            { + throw new IllegalArgumentException( + "Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS + + ", but no specific offsets were specified"); + }

            +
            + KafkaTopicPartition partition = kafkaTopicPartitions.get;
            +
            + Long specificOffset = specificStartupOffsets.get(partition);
            + if (specificOffset != null) {
            + // since the specified offsets represent the next record to read, we subtract
            + // it by one so that the initial state of the consumer will be correct
            + subscribedPartitionsToStartOffsets.put(partition, specificOffset - 1);

              • End diff –

          That will simply be treated as an invalid offset (if the record with offset 0 doesn't exist; if it actually exists, I don't think there's much we can do about it). So, again, the `auto.offset.reset` setting is automatically used by the clients.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2687#discussion_r105688259 — Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java — @@ -558,11 +629,31 @@ protected static void initializeSubscribedPartitionsToStartOffsets( List<KafkaTopicPartition> kafkaTopicPartitions, int indexOfThisSubtask, int numParallelSubtasks, StartupMode startupMode) { + StartupMode startupMode, + Map<KafkaTopicPartition, Long> specificStartupOffsets) { for (int i = 0; i < kafkaTopicPartitions.size(); i++) { if (i % numParallelSubtasks == indexOfThisSubtask) { subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get , startupMode.getStateSentinel()); + if (startupMode != StartupMode.SPECIFIC_OFFSETS) { + subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel()); + } else { + if (specificStartupOffsets == null) { + throw new IllegalArgumentException( + "Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS + + ", but no specific offsets were specified"); + } + + KafkaTopicPartition partition = kafkaTopicPartitions.get ; + + Long specificOffset = specificStartupOffsets.get(partition); + if (specificOffset != null) { + // since the specified offsets represent the next record to read, we subtract + // it by one so that the initial state of the consumer will be correct + subscribedPartitionsToStartOffsets.put(partition, specificOffset - 1); End diff – That will simply be treated as an invalid offset (if the record with offset 0 doesn't exist; if it actually exists, I don't think there's much we can do about it). So, again, the `auto.offset.reset` setting is automatically used by the clients.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/2687

          +1 to merge

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2687 +1 to merge
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2687

          Thanks for the fast reviews Merging ..

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2687 Thanks for the fast reviews Merging ..
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2687

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2687
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -
          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Resolved for master with http://git-wip-us.apache.org/repos/asf/flink/commit/5f08e53

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              rmetzger Robert Metzger
            • Votes:
              4 Vote for this issue
              Watchers:
              11 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development