Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-3398

Flink Kafka consumer should support auto-commit opt-outs

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Kafka Connector
    • Labels:
      None

      Description

      Currently the Kafka source will commit consumer offsets to Zookeeper, either upon a checkpoint if checkpointing is enabled, otherwise periodically based on auto.commit.interval.ms

      It should be possible to opt-out of committing consumer offsets to Zookeeper. Kafka has this config as auto.commit.enable (0.8) and enable.auto.commit (0.9).

        Issue Links

          Activity

          Hide
          shikhar Shikhar Bhushan added a comment -

          Can this be addressed for 1.0 – currently there is no way to prevent the consumer from updating offsets in ZK. I plan to submit a patch in a day or two.

          Show
          shikhar Shikhar Bhushan added a comment - Can this be addressed for 1.0 – currently there is no way to prevent the consumer from updating offsets in ZK. I plan to submit a patch in a day or two.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user shikhar opened a pull request:

          https://github.com/apache/flink/pull/1690

          FLINK-3398: Allow for opting-out from Kafka offset auto-commit

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/shikhar/flink kafka-autocommit-optout

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/1690.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #1690



          Show
          githubbot ASF GitHub Bot added a comment - GitHub user shikhar opened a pull request: https://github.com/apache/flink/pull/1690 FLINK-3398 : Allow for opting-out from Kafka offset auto-commit You can merge this pull request into a Git repository by running: $ git pull https://github.com/shikhar/flink kafka-autocommit-optout Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1690.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1690
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the pull request:

          https://github.com/apache/flink/pull/1690#issuecomment-187710646

          The change is breaking some tests.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1690#issuecomment-187710646 The change is breaking some tests.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shikhar commented on the pull request:

          https://github.com/apache/flink/pull/1690#issuecomment-188002588

          @rmetzger the test failures were because we had `auto.commit.enable=false` in the standard `KafkaTestEnvironmentImpl` standard props and it wasn't respecting that previously, updated those defaults

          Show
          githubbot ASF GitHub Bot added a comment - Github user shikhar commented on the pull request: https://github.com/apache/flink/pull/1690#issuecomment-188002588 @rmetzger the test failures were because we had `auto.commit.enable=false` in the standard `KafkaTestEnvironmentImpl` standard props and it wasn't respecting that previously, updated those defaults
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shikhar commented on the pull request:

          https://github.com/apache/flink/pull/1690#issuecomment-188818665

          Just fixed the `Kafka09ITCase.testCheckpointing()` fail as well, hopefully CI is green with that.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shikhar commented on the pull request: https://github.com/apache/flink/pull/1690#issuecomment-188818665 Just fixed the `Kafka09ITCase.testCheckpointing()` fail as well, hopefully CI is green with that.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the pull request:

          https://github.com/apache/flink/pull/1690#issuecomment-195374583

          Thank you for working on this. I've been very busy with the 1.0 release, vacation and some talks, but now I'm back for my regular work.
          I try to review the PR today or on Monday.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1690#issuecomment-195374583 Thank you for working on this. I've been very busy with the 1.0 release, vacation and some talks, but now I'm back for my regular work. I try to review the PR today or on Monday.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/1690#discussion_r56009318

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java —
          @@ -295,7 +296,7 @@ public void open(Configuration parameters) throws Exception {

          @Override
          public void run(SourceContext<T> sourceContext) throws Exception {

          • if (fetcher != null) {
            + if (fetcher != null && isAutoCommitEnabled()) {
              • End diff –

          This is not correct!
          If the fetcher Unable to render embedded object: File (= null and auto commit disabled, `fetcher.run()` (line 316) is never called) not found. This means that the fetcher is doing anything at all!

          I'm not sure how the integration tests were passing with that change! (maybe there is no test testing auto commit enable = false.)

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1690#discussion_r56009318 — Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java — @@ -295,7 +296,7 @@ public void open(Configuration parameters) throws Exception { @Override public void run(SourceContext<T> sourceContext) throws Exception { if (fetcher != null) { + if (fetcher != null && isAutoCommitEnabled()) { End diff – This is not correct! If the fetcher Unable to render embedded object: File (= null and auto commit disabled, `fetcher.run()` (line 316) is never called) not found. This means that the fetcher is doing anything at all! I'm not sure how the integration tests were passing with that change! (maybe there is no test testing auto commit enable = false.)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the pull request:

          https://github.com/apache/flink/pull/1690#issuecomment-196338135

          Can you add a test case ensuring the functionality? If I'm not mistaken, the change would have introduced a pretty bad bug.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1690#issuecomment-196338135 Can you add a test case ensuring the functionality? If I'm not mistaken, the change would have introduced a pretty bad bug.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shikhar commented on a diff in the pull request:

          https://github.com/apache/flink/pull/1690#discussion_r56450314

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java —
          @@ -295,7 +296,7 @@ public void open(Configuration parameters) throws Exception {

          @Override
          public void run(SourceContext<T> sourceContext) throws Exception {

          • if (fetcher != null) {
            + if (fetcher != null && isAutoCommitEnabled()) {
              • End diff –

          Ikes! Added that on the wrong conditional, it was supposed to be on the `!streamingRuntimeContext.isCheckpointingEnabled()` clause where `PeriodicOffsetCommitter` is initialized.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shikhar commented on a diff in the pull request: https://github.com/apache/flink/pull/1690#discussion_r56450314 — Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java — @@ -295,7 +296,7 @@ public void open(Configuration parameters) throws Exception { @Override public void run(SourceContext<T> sourceContext) throws Exception { if (fetcher != null) { + if (fetcher != null && isAutoCommitEnabled()) { End diff – Ikes! Added that on the wrong conditional, it was supposed to be on the `!streamingRuntimeContext.isCheckpointingEnabled()` clause where `PeriodicOffsetCommitter` is initialized.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the pull request:

          https://github.com/apache/flink/pull/1690#issuecomment-199316872

          Let me summarize the change:

          ```
          auto.commit.enable=true checkpointing=true offsets go to ZK/Broker on each completed checkpoint
          auto.commit.enable=false checkpointing=true offsets are only available for recovery with Flink, not in ZK / broker
          auto.commit.enable=true checkpointing=false offsets are written to ZK periodically
          auto.commit.enable=false checkpointing=false offsets are not in ZK or in Flink
          ```

          I don't like this variant:
          ```
          auto.commit.enable=false checkpointing=true offsets are only available for recovery with Flink, not in ZK / broker
          ```
          because it stretches the original definition of "auto.commit.enable" a bit. In my understanding, "auto.commit.enable" in Kafka starts a periodic committer, for sending the offsets to ZK / broker.

          In Flink, the semantic of the setting is more or less "enable offset committing".
          How about we limit the "auto.commit.enable" setting only to the periodic offset committing thread?

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1690#issuecomment-199316872 Let me summarize the change: ``` auto.commit.enable=true checkpointing=true offsets go to ZK/Broker on each completed checkpoint auto.commit.enable=false checkpointing=true offsets are only available for recovery with Flink, not in ZK / broker auto.commit.enable=true checkpointing=false offsets are written to ZK periodically auto.commit.enable=false checkpointing=false offsets are not in ZK or in Flink ``` I don't like this variant: ``` auto.commit.enable=false checkpointing=true offsets are only available for recovery with Flink, not in ZK / broker ``` because it stretches the original definition of "auto.commit.enable" a bit. In my understanding, "auto.commit.enable" in Kafka starts a periodic committer, for sending the offsets to ZK / broker. In Flink, the semantic of the setting is more or less "enable offset committing". How about we limit the "auto.commit.enable" setting only to the periodic offset committing thread?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the pull request:

          https://github.com/apache/flink/pull/1690#issuecomment-205819299

          I think the functionality is very desirable. In many cases, you really don't want Flink to write something to Zookeeper for some random GroupId. This floods ZooKeeper with garbage that helps no one.

          I think that this is actually a kind of nice solution. The `auto.commit.enable` in Flink means not necessary periodically, but "on checkpoint or periodically".

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1690#issuecomment-205819299 I think the functionality is very desirable. In many cases, you really don't want Flink to write something to Zookeeper for some random GroupId. This floods ZooKeeper with garbage that helps no one. I think that this is actually a kind of nice solution. The `auto.commit.enable` in Flink means not necessary periodically, but "on checkpoint or periodically".
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wanderingbort commented on the pull request:

          https://github.com/apache/flink/pull/1690#issuecomment-207065222

          I just wanted to echo @StephanEwen 's sentiment.

          Flink's management of offsets seems to supersede the Kafka concept of committed offsets (whether to ZK or an 0.9 broker) leaving me to think there are only 2 scenarios where offset commitment is useful in the context of flink :

          • coordinating with delivery with consumers outside the flink ecosystem (which seems conceptually dubious)
          • reporting to monitoring software that understands 0.8- zookeeper stored offsets or 0.9+ broker stored offsets

          I would consider both of these to be optional features and not required features.

          For my particular use case, we are on Kafka 0.8 and requiring write access to the zookeeper cluster that is coordinating kafka in order to access read-only message data is problematic.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wanderingbort commented on the pull request: https://github.com/apache/flink/pull/1690#issuecomment-207065222 I just wanted to echo @StephanEwen 's sentiment. Flink's management of offsets seems to supersede the Kafka concept of committed offsets (whether to ZK or an 0.9 broker) leaving me to think there are only 2 scenarios where offset commitment is useful in the context of flink : coordinating with delivery with consumers outside the flink ecosystem (which seems conceptually dubious) reporting to monitoring software that understands 0.8- zookeeper stored offsets or 0.9+ broker stored offsets I would consider both of these to be optional features and not required features. For my particular use case, we are on Kafka 0.8 and requiring write access to the zookeeper cluster that is coordinating kafka in order to access read-only message data is problematic.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shikhar commented on the pull request:

          https://github.com/apache/flink/pull/1690#issuecomment-207563659

          Another thing is that presence of offsets in ZK (0.8) / Kafka (0.9) also affects what happens when a job is starting without a checkpoint / savepoint. If they are present, the reset mode won't be respected, and there is potential for dropping messages (e.g. if the recovery strategy in the absence of a valid checkpoint or savepoint is to process from the beginning of available data with offset reset mode 'earliest').

          Show
          githubbot ASF GitHub Bot added a comment - Github user shikhar commented on the pull request: https://github.com/apache/flink/pull/1690#issuecomment-207563659 Another thing is that presence of offsets in ZK (0.8) / Kafka (0.9) also affects what happens when a job is starting without a checkpoint / savepoint. If they are present, the reset mode won't be respected, and there is potential for dropping messages (e.g. if the recovery strategy in the absence of a valid checkpoint or savepoint is to process from the beginning of available data with offset reset mode 'earliest').
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/1690

          What's the status of this PR? I think it is a good functionality to add. @shikhar if you'd like to continue working on this, I'm happy to help shepherd this PR for merging.

          Some of my input from the previous discussions on the interpretation of "auto.commit.enable":
          I agree with @rmetzger that stretching the definition of this config to "on checkpoint or periodically" isn't a good idea. Committing the offsets back to Kafka on 'notifyCheckpointComplete()' is a Flink-specific behaviour. I think mixing Flink-specific behaviour within Kafka original configs can be confusing for frequent Kafka users and difficult to maintain in the long run.

          Perhaps we can have something like "flink.commit-on-checkpoint". What I have in mind:
          > // commit to ZK / broker only on Flink's checkpoint.
          > flink.commit-on-checkpoint=true auto.commit.enable=*
          >
          > // periodically commits to ZK / broker
          > flink.commit-on-checkpoint=false auto.commit.enable=true
          >
          > // no commit to ZK / broker at all
          > flink.commit-on-checkpoint=false auto.commit.enable=false

          The first option will be valid only when checkpointing is enabled also. Otherwise, whether or not checkpointing is enabled does not effect the above settings.

          I think this better matches the idea that "the Kafka offset store is only used for exposure of progress to the outside world, and not for recovery / manipulation of how Flink reads from Kafka".

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/1690 What's the status of this PR? I think it is a good functionality to add. @shikhar if you'd like to continue working on this, I'm happy to help shepherd this PR for merging. Some of my input from the previous discussions on the interpretation of "auto.commit.enable": I agree with @rmetzger that stretching the definition of this config to "on checkpoint or periodically" isn't a good idea. Committing the offsets back to Kafka on 'notifyCheckpointComplete()' is a Flink-specific behaviour. I think mixing Flink-specific behaviour within Kafka original configs can be confusing for frequent Kafka users and difficult to maintain in the long run. Perhaps we can have something like "flink.commit-on-checkpoint". What I have in mind: > // commit to ZK / broker only on Flink's checkpoint. > flink.commit-on-checkpoint=true auto.commit.enable=* > > // periodically commits to ZK / broker > flink.commit-on-checkpoint=false auto.commit.enable=true > > // no commit to ZK / broker at all > flink.commit-on-checkpoint=false auto.commit.enable=false The first option will be valid only when checkpointing is enabled also. Otherwise, whether or not checkpointing is enabled does not effect the above settings. I think this better matches the idea that "the Kafka offset store is only used for exposure of progress to the outside world, and not for recovery / manipulation of how Flink reads from Kafka".
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shikhar commented on the issue:

          https://github.com/apache/flink/pull/1690

          Thanks @tzulitai! I like your proposal, I'm afraid I won't be able to get around to shaping up this PR though so might be best to start a new one (please feel free to close).

          Show
          githubbot ASF GitHub Bot added a comment - Github user shikhar commented on the issue: https://github.com/apache/flink/pull/1690 Thanks @tzulitai! I like your proposal, I'm afraid I won't be able to get around to shaping up this PR though so might be best to start a new one (please feel free to close).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/1690

          @shikhar Sure, please feel free to tag me for a review once you are done. I think you'll need to manually close this PR yourself, you can close it when the new PR is opened. Another option is to rebase this PR on the current master, and after you finish shaping up again, force push to this same PR branch.

          Also, since this will be affecting how users interact with the Kafka consumer, let's wait until we get consensus from @StephanEwen and @rmetzger on the proposed change before you get on to working on it

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/1690 @shikhar Sure, please feel free to tag me for a review once you are done. I think you'll need to manually close this PR yourself, you can close it when the new PR is opened. Another option is to rebase this PR on the current master, and after you finish shaping up again, force push to this same PR branch. Also, since this will be affecting how users interact with the Kafka consumer, let's wait until we get consensus from @StephanEwen and @rmetzger on the proposed change before you get on to working on it
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shikhar closed the pull request at:

          https://github.com/apache/flink/pull/1690

          Show
          githubbot ASF GitHub Bot added a comment - Github user shikhar closed the pull request at: https://github.com/apache/flink/pull/1690
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shikhar commented on the issue:

          https://github.com/apache/flink/pull/1690

          @tzulitai sorry I meant I won't be able to get to this change at all, so anyone please feel free to take this up!

          Show
          githubbot ASF GitHub Bot added a comment - Github user shikhar commented on the issue: https://github.com/apache/flink/pull/1690 @tzulitai sorry I meant I won't be able to get to this change at all, so anyone please feel free to take this up!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/1690

          @shikhar Ok, sure no problem

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/1690 @shikhar Ok, sure no problem
          Hide
          StephanEwen Stephan Ewen added a comment -

          Let's reboot the discussion and work on this issue. Some observations:

          1. Many cases do not not need committing of offsets externally. Users often specify some random "group.id" if no meaningful group can be specified.
          2. In some cases, committing is even harmful, as the picked up start offsets override the reset strategies like (earliest offset). To avoid that, people use a random "group.id".
          3. If one wants to use external offsets, one typically also wants to update them, and vice versa. Both is tied to a "group.id"

          With these observations, it seems we can tie this to the existence of a group:

          • If one sets a "group.id", we use it for start offset and commit offsets.
          • If no "group.id" exists, no start offsets are obtained and no offsets are committed.

          Does that make sense?

          Show
          StephanEwen Stephan Ewen added a comment - Let's reboot the discussion and work on this issue. Some observations: 1. Many cases do not not need committing of offsets externally. Users often specify some random "group.id" if no meaningful group can be specified. 2. In some cases, committing is even harmful, as the picked up start offsets override the reset strategies like (earliest offset). To avoid that, people use a random "group.id". 3. If one wants to use external offsets, one typically also wants to update them, and vice versa. Both is tied to a "group.id" With these observations, it seems we can tie this to the existence of a group: If one sets a "group.id", we use it for start offset and commit offsets. If no "group.id" exists, no start offsets are obtained and no offsets are committed. Does that make sense?
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Stephan Ewen That's actually a very good summary! From the usage pattern, I think it makes sense. Our Kafka consumers are using "group.id" for start offset & offset committing only, so the change will also fit into the current implementation.

          However, this again is stretching Kafka's original intent of the "group.id" config. I think we need to decide whether or not these Flink-specific behaviours should be separated apart from Kafka's original configs.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Stephan Ewen That's actually a very good summary! From the usage pattern, I think it makes sense. Our Kafka consumers are using "group.id" for start offset & offset committing only, so the change will also fit into the current implementation. However, this again is stretching Kafka's original intent of the "group.id" config. I think we need to decide whether or not these Flink-specific behaviours should be separated apart from Kafka's original configs.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/1690

          I left a few thoughts in the corresponding JIRA issue: https://issues.apache.org/jira/browse/FLINK-3398

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/1690 I left a few thoughts in the corresponding JIRA issue: https://issues.apache.org/jira/browse/FLINK-3398
          Hide
          StephanEwen Stephan Ewen added a comment -

          Tzu-Li (Gordon) Tai If we use a separate flink group-id setting for this, should we simply ignore the kafka group.id property?

          Show
          StephanEwen Stephan Ewen added a comment - Tzu-Li (Gordon) Tai If we use a separate flink group-id setting for this, should we simply ignore the kafka group.id property?
          Hide
          StephanEwen Stephan Ewen added a comment -

          We could do that, need to think then about whether we simply want to break behavior, of have a migration plan.

          Show
          StephanEwen Stephan Ewen added a comment - We could do that, need to think then about whether we simply want to break behavior, of have a migration plan.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          Would this be better?:
          Have settings probably like flink.start-from-offsets (or flink.starting-position=external-offsets as proposed in FLINK-4280) and flink.commmit-offsets. If these settings are present, the Kafka settings supplied must contain the group.id.
          A specific "group" setting for Flink might not make sense and confusing, because the Kafka consumer doesn't really use any consumer group management functionality. The setting is implying that the user wants to trigger a Flink-specific behaviour that depends on a Kafka setting.

          Does this make sense?

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited Would this be better?: Have settings probably like flink.start-from-offsets (or flink.starting-position=external-offsets as proposed in FLINK-4280 ) and flink.commmit-offsets . If these settings are present, the Kafka settings supplied must contain the group.id . A specific "group" setting for Flink might not make sense and confusing, because the Kafka consumer doesn't really use any consumer group management functionality. The setting is implying that the user wants to trigger a Flink-specific behaviour that depends on a Kafka setting. Does this make sense?
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          I've left some thoughts on the migration plan in the comments of FLINK-4280.
          Sorry for having to jump around the JIRAs, seems like the issues are somewhat related.

          For migration, the default values for flink.start-from-offsets and flink.commit-offsets, if not set, will need to be true (or flink.starting-position default to external-offsets), so that we don't break behaviors of existing user configs which won't have these new settings.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited I've left some thoughts on the migration plan in the comments of FLINK-4280 . Sorry for having to jump around the JIRAs, seems like the issues are somewhat related. For migration, the default values for flink.start-from-offsets and flink.commit-offsets , if not set, will need to be true (or flink.starting-position default to external-offsets ), so that we don't break behaviors of existing user configs which won't have these new settings.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          I've gave your idea of "a separate flink group-id setting, simply ignore the kafka group.id " a second thought and commented about it in FLINK-4280.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited I've gave your idea of "a separate flink group-id setting, simply ignore the kafka group.id " a second thought and commented about it in FLINK-4280 .
          Hide
          rmetzger Robert Metzger added a comment -

          Maybe we should close this JIRA and have the discussion only in FLINK-4280.

          Show
          rmetzger Robert Metzger added a comment - Maybe we should close this JIRA and have the discussion only in FLINK-4280 .
          Hide
          rmetzger Robert Metzger added a comment -

          Aljoscha Krettek You've changed the priority of this issue to blocker a while back.
          Is there a particular reason for that, or do you "just" consider this important?

          I'm asking because I'd like to get the 1.2 release out, and I wonder whether we should block the release on this one.

          Show
          rmetzger Robert Metzger added a comment - Aljoscha Krettek You've changed the priority of this issue to blocker a while back. Is there a particular reason for that, or do you "just" consider this important? I'm asking because I'd like to get the 1.2 release out, and I wonder whether we should block the release on this one.
          Hide
          aljoscha Aljoscha Krettek added a comment -

          I just saw a lot of discussion about this on the ML so I bumped it. We can also reduce the importance again.

          Show
          aljoscha Aljoscha Krettek added a comment - I just saw a lot of discussion about this on the ML so I bumped it. We can also reduce the importance again.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          I think we should keep this JIRA instead of merging it into FLINK-4280.
          They try to address different new features: FLINK-4280 is for more flexible start position configuration independent from Kafka offsets.
          This JIRA is for opt-out from offset committing on Flink's checkpoints.

          I'll move the respective proposed API for commit opt-out here:

          Properties props = new Properties();
          ...
          FlinkKafkaConsumer09 kafka = new FlinkKafkaConsumer09("topic", schema, props);
          kafka.setEnableOffsetCommittingOnCheckpoints(boolean); // if true (checkpointing should be enabled), overrides periodic checkpointing if "enable.auto.commit" is set in props.
          

          So the scenarios breaks down into:

          1. props.put("auto.commit.enable", "true") & setEnableOffsetCommittingOnCheckpoints(false):
          Perform auto periodic committing with the internal client (or in 0.8 case our own periodic committer), regardless of whether or not Flink checkpointing is on. Usage documentation should state clearly that if using Kafka's auto periodic committing, the offsets in Kafka will not be in-sync with the checkpoint offsets.

          2. props.put("auto.commit.enable", "true") & setEnableOffsetCommittingOnCheckpoints(true):
          Overrides whatever value in props for "auto.commit.enable", only commit on checkpoints. If checkpointing isn't enabled, setEnableOffsetCommittingOnCheckpoints will have no effect, and the value set for auto.commit.enable will be used instead.

          3. props.put("auto.commit.enable", "false") & setEnableOffsetCommittingOnCheckpoints(false): No offset committing at all.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited I think we should keep this JIRA instead of merging it into FLINK-4280 . They try to address different new features: FLINK-4280 is for more flexible start position configuration independent from Kafka offsets. This JIRA is for opt-out from offset committing on Flink's checkpoints. I'll move the respective proposed API for commit opt-out here: Properties props = new Properties(); ... FlinkKafkaConsumer09 kafka = new FlinkKafkaConsumer09( "topic" , schema, props); kafka.setEnableOffsetCommittingOnCheckpoints( boolean ); // if true (checkpointing should be enabled), overrides periodic checkpointing if "enable.auto.commit" is set in props. So the scenarios breaks down into: 1. props.put("auto.commit.enable", "true") & setEnableOffsetCommittingOnCheckpoints(false) : Perform auto periodic committing with the internal client (or in 0.8 case our own periodic committer), regardless of whether or not Flink checkpointing is on. Usage documentation should state clearly that if using Kafka's auto periodic committing, the offsets in Kafka will not be in-sync with the checkpoint offsets. 2. props.put("auto.commit.enable", "true") & setEnableOffsetCommittingOnCheckpoints(true) : Overrides whatever value in props for "auto.commit.enable", only commit on checkpoints. If checkpointing isn't enabled, setEnableOffsetCommittingOnCheckpoints will have no effect, and the value set for auto.commit.enable will be used instead. 3. props.put("auto.commit.enable", "false") & setEnableOffsetCommittingOnCheckpoints(false) : No offset committing at all.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/3527

          FLINK-3398 [kafka] Allow disabling offset committing for FlinkKafkaConsumer

          This PR allows users to completely disable offset committing back to Kafka.

          The way configuration works depends on whether checkpointing was enabled for the job:

          • *Checkpointing disabled:* when checkpointing is disabled, the consumer relies on the auto commit functionality of internal Kafka clients. Therefore, to disable / enable offset committing, users simply need to set appropriate values for `auto.commit.enable` / `auto.commit.interval.ms`.
          • *Checkpointing enabled:* in this scenario, the `auto.commit.enable` / `auto.commit.interval.ms` is completely ignored. To disable / enable offset committing, users use a new `setCommiOffsetsOnCheckpoints(boolean)` method. By default, this is `true`, so that without any additional configuration, the committing behaviour sticks with the original consumer prior to this change. No user code / behaviour is broken.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-3398

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3527.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3527


          commit 1163d82c908423c941f6b8d71c9019c7c33ba6ab
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-03-13T05:49:08Z

          FLINK-3398 [kafka] Allow disabling offset committing for FlinkKafkaConsumer


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/3527 FLINK-3398 [kafka] Allow disabling offset committing for FlinkKafkaConsumer This PR allows users to completely disable offset committing back to Kafka. The way configuration works depends on whether checkpointing was enabled for the job: * Checkpointing disabled: * when checkpointing is disabled, the consumer relies on the auto commit functionality of internal Kafka clients. Therefore, to disable / enable offset committing, users simply need to set appropriate values for `auto.commit.enable` / `auto.commit.interval.ms`. * Checkpointing enabled: * in this scenario, the `auto.commit.enable` / `auto.commit.interval.ms` is completely ignored. To disable / enable offset committing, users use a new `setCommiOffsetsOnCheckpoints(boolean)` method. By default, this is `true`, so that without any additional configuration, the committing behaviour sticks with the original consumer prior to this change. No user code / behaviour is broken. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-3398 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3527.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3527 commit 1163d82c908423c941f6b8d71c9019c7c33ba6ab Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-03-13T05:49:08Z FLINK-3398 [kafka] Allow disabling offset committing for FlinkKafkaConsumer
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/3527

          Thank you for opening a PR for this issue!

          I know I've +1ed a lot of PRs of you recently, and I think I'm going to +1 this one without any further comments .. because I did not find anything

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3527 Thank you for opening a PR for this issue! I know I've +1ed a lot of PRs of you recently, and I think I'm going to +1 this one without any further comments .. because I did not find anything
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3527

          Did a final test run on a Kafka installation, and things worked as expected.
          One minor improvement would be to add logs for what exactly the commit mode is used when it is determined in `open()`.

          I think it's a safe call to add the log and then merge this Will proceed to merge for `master`.
          Thanks for all the recent reviews @rmetzger :-D

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3527 Did a final test run on a Kafka installation, and things worked as expected. One minor improvement would be to add logs for what exactly the commit mode is used when it is determined in `open()`. I think it's a safe call to add the log and then merge this Will proceed to merge for `master`. Thanks for all the recent reviews @rmetzger :-D
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3527

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3527
          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Fixed for 1.3.0 with http://git-wip-us.apache.org/repos/asf/flink/commit/90c7415 .

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              shikhar Shikhar Bhushan
            • Votes:
              3 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development