Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4280

New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker

    Details

    • Type: New Feature
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Kafka Connector
    • Labels:
      None

      Description

      Currently, to start reading from the "earliest" and "latest" position in topics for the Flink Kafka consumer, users set the Kafka config auto.offset.reset in the provided properties configuration.

      However, the way this config actually works might be a bit misleading if users were trying to find a way to "read topics from a starting position". The way the auto.offset.reset config works in the Flink Kafka consumer resembles Kafka's original intent for the setting: first, existing external offsets committed to the ZK / brokers will be checked; if none exists, then will auto.offset.reset be respected.

      I propose to add Flink-specific ways to define the starting position, without taking into account the external offsets. The original behaviour (reference external offsets first) can be changed to be a user option, so that the behaviour can be retained for frequent Kafka users that may need some collaboration with existing non-Flink Kafka consumer applications.

      How users will interact with the Flink Kafka consumer after this is added, with a newly introduced flink.starting-position config:

      Properties props = new Properties();
      props.setProperty("flink.starting-position", "earliest/latest");
      props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a warning)
      props.setProperty("group.id", "...") // this won't have effect on the starting position anymore (may still be used in external offset committing)
      ...
      

      Or, reference external offsets in ZK / broker:

      Properties props = new Properties();
      props.setProperty("flink.starting-position", "external-offsets");
      props.setProperty("auto.offset.reset", "earliest/latest"); // default will be latest
      props.setProperty("group.id", "..."); // will be used to lookup external offsets in ZK / broker on startup
      ...
      

      A thing we would need to decide on is what would the default value be for flink.starting-position.

      Two merits I see in adding this:

      1. This compensates the way users generally interpret "read from a starting position". As the Flink Kafka connector is somewhat essentially a "high-level" Kafka consumer for Flink users, I think it is reasonable to add Flink-specific functionality that users will find useful, although it wasn't supported in Kafka's original consumer designs.

      2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is used only to expose progress to the outside world, and not used to manipulate how Kafka topics are read in Flink (unless users opt to do so)" is even more definite and solid. There was some discussion in this PR (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I think adding this "decouples" more Flink's internal offset checkpointing from the external Kafka's offset store.

        Issue Links

          Activity

          Hide
          StephanEwen Stephan Ewen added a comment -

          A more general thought on the configuration of Flink-specific properties in the FlinkKafkaConsumer:

          Do we want to make all configurations go through the Properties? I personally find setter methods quite nice, and they are type- and compile-time safe.
          One possible approach could be that all Kafka settings go through the Properties object (as they do in Kafka), and that we use getters/setters for Flink properties.

          That would apply also to other settings like

          • start offsets
          • commit to kafka/zookeeper or not
          • activate metric forwarding
          Show
          StephanEwen Stephan Ewen added a comment - A more general thought on the configuration of Flink-specific properties in the FlinkKafkaConsumer: Do we want to make all configurations go through the Properties ? I personally find setter methods quite nice, and they are type- and compile-time safe. One possible approach could be that all Kafka settings go through the Properties object (as they do in Kafka), and that we use getters/setters for Flink properties. That would apply also to other settings like start offsets commit to kafka/zookeeper or not activate metric forwarding
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          I like the idea of typed configuration classes too. For the Kinesis connector, we also have the same idea in mind (JIRA: FLINK-4195).
          This also gives a better separation between Flink-specific configs and Kafka configs.
          So, I think it'd be something like this? :

          FlinkKafkaConsumerConfig config = new FlinkKafkaConsumerConfig();
          config.setStartPosition(...);
          config.setCommitOffsets(boolean);
          config.setForwardMetrics(boolean);
          config.setKafkaProperties(Properties);
          ...
          
          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - I like the idea of typed configuration classes too. For the Kinesis connector, we also have the same idea in mind (JIRA: FLINK-4195 ). This also gives a better separation between Flink-specific configs and Kafka configs. So, I think it'd be something like this? : FlinkKafkaConsumerConfig config = new FlinkKafkaConsumerConfig(); config.setStartPosition(...); config.setCommitOffsets( boolean ); config.setForwardMetrics( boolean ); config.setKafkaProperties(Properties); ...
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          Or perhaps we can seed the config with the Kafka properties by taking it in the constructor. Will need a migration plan for this.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited Or perhaps we can seed the config with the Kafka properties by taking it in the constructor. Will need a migration plan for this.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          Migration plan I have in mind:

          1) Keep the current FlinkKafkaConsumer and FlinkKafkaProducer connectors to take Properties config. Support the new flink.* keys for Flink-specific settings through the Properties.
          2) Mark the original constructors as deprecated, and have a new constructor that accepts the proposed typed configuration class. The new config class can take in Properties to build up the settings, both Flink-specific and Kafka (or simply use the setter methods).
          3) At some point, perhaps 2.0, remove the original constructors that accepts Properties.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited Migration plan I have in mind: 1) Keep the current FlinkKafkaConsumer and FlinkKafkaProducer connectors to take Properties config. Support the new flink.* keys for Flink-specific settings through the Properties . 2) Mark the original constructors as deprecated, and have a new constructor that accepts the proposed typed configuration class. The new config class can take in Properties to build up the settings, both Flink-specific and Kafka (or simply use the setter methods). 3) At some point, perhaps 2.0, remove the original constructors that accepts Properties .
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Stephan Ewen On second thought, I think I misunderstood what you meant in the first place.

          What you're proposing is this (I think this is a clearer design than what I mentioned above):

          Properties props = new Properties();
          ...
          
          FlinkKafkaConsumer kafka = new FlinkKafkaConsumer("topic", schema, props);
          kafka.setStartFromEarliest();
          kafka.setStartFromLatest();
          kafka.setEnableCommitOffsets(boolean); // if true, commits on checkpoint if checkpointing is enabled, otherwise, periodically.
          kafka.setForwardMetrics(boolean);
          ...
          
          env.addSource(kafka) ...
          

          Echoing your statement if we are going with this approach:

          now that we are trying to separate Flink-specific configs with Kafka configs, I think we should clearly state (and change implementation) that the Properties provided in the constructor will only be used to configure the internal Kafka consumers the connector is using by simply passing the Properties. So, the only valid configs given in the Properties that will take effect are the ones that the Kafka API supports, i.e. in FlinkKafkaConsumer08, only the configs that the Kafka SimpleConsumer API support take effect; in FlinkKafkaConsumer09, only the configs that the new consumer API KafkaConsumer support will take effect. Any additional function or Flink-specific behaviour on top of the internal Kafka consumers should go through setter methods.

          The problem to solve, in general, with the current configuration is that we are trying to "mimic" high-level consumer functions with original config keys. Take FlinkKafkaConsumer08 for example: the SimpleConsumer API actually doesn't use the group.id or auto.offset.reset configs. We're re-implementing the behavior of these configs ourselves, and providing them through the original config keys in the Properties. When it comes to adding functionality on top of the internally used SimpleConsumer, we tend to stretch the original definition of these keys and try to have them work with our re-implementations of configs such as group.id and auto.offset.reset. An example of confusions that users might also get when we're re-implementing configs when the internal API doesn't actually use them is present in this user ML: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-kafka-group-question-td8185.html.

          This also reasons the idea you mentioned in FLINK-3398 that we should drop Kafka's group.id and perhaps have Flink's own groupId. Since Kafka's group.id was never actually used by the internal SimpleConsumer of FlinkKafkaConsumer08 in the first place, we should have setter methods for functions like "start with offset" or "offset committing", which the user should supply with a groupId. For FlinkKafkaConsumer09, we won't need a setter method for "periodic offset committing" because the internal KafkaConsumer supports the function through group.id and enable.auto.commit; instead, we have a setter method to opt to switch to "commit offsets on checkpoint".

          Summarize in code:

          // for FlinkKafkaConsumer08
          Properties props = new Properties();
          ...
          FlinkKafkaConsumer08 kafka = new FlinkKafkaConsumer08("topic", schema, props);
          kafka.setStartFromEarliest();
          kafka.setStartFromLatest();
          kafka.setStartFromExternalOffsets("groupId")
          kafka.setEnableCommitOffsets("groupId"); // periodic if checkpointing is not enabled, otherwise on notifyCheckpointComplete()
          kafka.setForwardMetrics(boolean);
          ...
          
          // for FlinkKafkaConsumer09
          Properties props = new Properties();
          ...
          FlinkKafkaConsumer09 kafka = new FlinkKafkaConsumer09("topic", schema, props);
          kafka.setStartFromEarliest();
          kafka.setStartFromLatest();
          kafka.setStartFromExternalOffsets(); // doesn't take a "group.id", because in FlinkKafkaConsumer09, "group.id" is a reckognized config by the new KafkaConsumer API
          kafka.setCommitOffsetsOnCheckpoint(boolean); // if true (checkpointing should be enabled), overrides periodic checkpointing if "enable.auto.commit" is set in props
          kafka.setForwardMetrics(boolean);
          ...
          

          So, the general rule is:

          • Supplied configuration is used only to configure the internally used client APIs of the external system.
          • All Flink-specific configuration, or functions that the internal API do not support, go through connector-specific setter methods.

          This might be a general rule we would like all Flink supported connectors to follow, in the long run? Users will have clear understanding and full control of the behaviours of the internal API that the connectors are using, and we'd also have a clear line on how new functionality should be added upon them.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Stephan Ewen On second thought, I think I misunderstood what you meant in the first place. What you're proposing is this (I think this is a clearer design than what I mentioned above): Properties props = new Properties(); ... FlinkKafkaConsumer kafka = new FlinkKafkaConsumer( "topic" , schema, props); kafka.setStartFromEarliest(); kafka.setStartFromLatest(); kafka.setEnableCommitOffsets( boolean ); // if true , commits on checkpoint if checkpointing is enabled, otherwise, periodically. kafka.setForwardMetrics( boolean ); ... env.addSource(kafka) ... Echoing your statement if we are going with this approach: now that we are trying to separate Flink-specific configs with Kafka configs, I think we should clearly state (and change implementation) that the Properties provided in the constructor will only be used to configure the internal Kafka consumers the connector is using by simply passing the Properties . So, the only valid configs given in the Properties that will take effect are the ones that the Kafka API supports, i.e. in FlinkKafkaConsumer08 , only the configs that the Kafka SimpleConsumer API support take effect; in FlinkKafkaConsumer09 , only the configs that the new consumer API KafkaConsumer support will take effect. Any additional function or Flink-specific behaviour on top of the internal Kafka consumers should go through setter methods. The problem to solve, in general, with the current configuration is that we are trying to "mimic" high-level consumer functions with original config keys. Take FlinkKafkaConsumer08 for example: the SimpleConsumer API actually doesn't use the group.id or auto.offset.reset configs. We're re-implementing the behavior of these configs ourselves, and providing them through the original config keys in the Properties . When it comes to adding functionality on top of the internally used SimpleConsumer , we tend to stretch the original definition of these keys and try to have them work with our re-implementations of configs such as group.id and auto.offset.reset . An example of confusions that users might also get when we're re-implementing configs when the internal API doesn't actually use them is present in this user ML: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-kafka-group-question-td8185.html . This also reasons the idea you mentioned in FLINK-3398 that we should drop Kafka's group.id and perhaps have Flink's own groupId. Since Kafka's group.id was never actually used by the internal SimpleConsumer of FlinkKafkaConsumer08 in the first place, we should have setter methods for functions like "start with offset" or "offset committing", which the user should supply with a groupId. For FlinkKafkaConsumer09 , we won't need a setter method for "periodic offset committing" because the internal KafkaConsumer supports the function through group.id and enable.auto.commit ; instead, we have a setter method to opt to switch to "commit offsets on checkpoint". Summarize in code: // for FlinkKafkaConsumer08 Properties props = new Properties(); ... FlinkKafkaConsumer08 kafka = new FlinkKafkaConsumer08( "topic" , schema, props); kafka.setStartFromEarliest(); kafka.setStartFromLatest(); kafka.setStartFromExternalOffsets( "groupId" ) kafka.setEnableCommitOffsets( "groupId" ); // periodic if checkpointing is not enabled, otherwise on notifyCheckpointComplete() kafka.setForwardMetrics( boolean ); ... // for FlinkKafkaConsumer09 Properties props = new Properties(); ... FlinkKafkaConsumer09 kafka = new FlinkKafkaConsumer09( "topic" , schema, props); kafka.setStartFromEarliest(); kafka.setStartFromLatest(); kafka.setStartFromExternalOffsets(); // doesn't take a "group.id" , because in FlinkKafkaConsumer09, "group.id" is a reckognized config by the new KafkaConsumer API kafka.setCommitOffsetsOnCheckpoint( boolean ); // if true (checkpointing should be enabled), overrides periodic checkpointing if "enable.auto.commit" is set in props kafka.setForwardMetrics( boolean ); ... So, the general rule is: Supplied configuration is used only to configure the internally used client APIs of the external system. All Flink-specific configuration, or functions that the internal API do not support, go through connector-specific setter methods. This might be a general rule we would like all Flink supported connectors to follow, in the long run? Users will have clear understanding and full control of the behaviours of the internal API that the connectors are using, and we'd also have a clear line on how new functionality should be added upon them.
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          It seems like that this JIRA and FLINK-3398 together has snowballed quite a bit of thoughts and changes on the Kafka connector configuration, and possibly other supported connectors in general (I personally think we have quite a bit of diversion in terms of configuration and naming for our supported connectors; it'd be good if we can possibly unify the differences).

          To make sure the change is good and that we can stick to it onwards, would we want a formally structured FLIP (perhaps only for changes to the Kafka connector at first), with migration plan, on the issue, so that we can gather more consensus and thoughts from the community? I'm not sure if discussions like this to change public interfaces of our connectors are considered major changes for Flink and require FLIPs.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited It seems like that this JIRA and FLINK-3398 together has snowballed quite a bit of thoughts and changes on the Kafka connector configuration, and possibly other supported connectors in general (I personally think we have quite a bit of diversion in terms of configuration and naming for our supported connectors; it'd be good if we can possibly unify the differences). To make sure the change is good and that we can stick to it onwards, would we want a formally structured FLIP (perhaps only for changes to the Kafka connector at first), with migration plan, on the issue, so that we can gather more consensus and thoughts from the community? I'm not sure if discussions like this to change public interfaces of our connectors are considered major changes for Flink and require FLIPs.
          Hide
          StephanEwen Stephan Ewen added a comment -

          I like the latest suggestion for configuration, with a few possible changes:

          • Can we keep the Kafka08 and Kafka09 configuration style similar?
          • How about calling offsets in ZK/Kafka the "Group Offsets", rather than "External Offsets". Not sure everyone gets the distinction between internal offsets (Flink) and external ones (Kafka/ZK)

          One question I still have is whether it ever makes sense to commit to a different group than to start from?

          Show
          StephanEwen Stephan Ewen added a comment - I like the latest suggestion for configuration, with a few possible changes: Can we keep the Kafka08 and Kafka09 configuration style similar? How about calling offsets in ZK/Kafka the "Group Offsets", rather than "External Offsets". Not sure everyone gets the distinction between internal offsets (Flink) and external ones (Kafka/ZK) One question I still have is whether it ever makes sense to commit to a different group than to start from?
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          Stephan Ewen

          • Can we keep the Kafka08 and Kafka09 configuration style similar?
            The latest configuration suggestion was aiming to use the Properties only to configure the internal consumers (the properties are simply passed on), so any behaviour / configuration that the internal Kafka consumer doesn't recognise is to be supplied through the setter methods. So, since the internal SimpleConsumer in 0.8 and KafkaConsumer in 0.9 behaves differently to the given Properties (ex. SimpleConsumer doesn't recognise group.id while KafkaConsumer does), the setter methods also need to be designed a bit different to provide an overall same level of functionality.
            After some reconsideration, it probably isn't necessary to rework the configuration to this extent. FlinkKafkaConsumer08 has already implemented behaviours for group.id and auto.commit.enable above the SimpleConsumer s, so in the current state of the Kafka connectors, FlinkKafkaConsumer08 and FlinkKafkaConsumer09 have equal functionality. By building on what we have already, it is certainly possible to keep the reworked configuration for the two versions similar. Also, this way we won't be breaking current behaviours.
          • How about calling offsets in ZK/Kafka the "Group Offsets" ...
            I agree, "Group Offsets" seems to be a better naming.
          • Does it ever make sense to commit to a different group than to start from?
            If we're keeping the group.id setting in the properties both in 0.8 and 0.9, then this won't be a case to consider.

          Taking into account the above and the comments so far, how about this for the reworked Kafka configuration:

          Properties props = new Properties();
          // for 0.8 consumer
          props.setProperty("group.id", "...");
          props.setProperty("auto.commit.enable", "true/false"); // config for periodic committing in 0.8
          props.setProperty("auto.commit.interval.ms", "...")
          ...
          
          // or for 0.9 consumer
          props.setProperty("group.id", "...");
          props.setProperty("enable.auto.commit", "true/false"); // config for periodic committing in 0.9
          props.setProperty("auto.commit.interval.ms", "...")
          ...
          
          FlinkKafkaConsumer08 kafka = new FlinkKafkaConsumer08("topic", schema, props);
          // or FlinkKafkaConsumer09 kafka = new FlinkKafkaConsumer09("topic", schema, props);
          kafka.setStartFromEarliest();
          kafka.setStartFromLatest();
          kafka.setStartFromGroupOffsets(); // uses the "group.id" in props
          kafka.setCommitGroupOffsetsOnCheckpoint(boolean); // commits to the "group.id" (Flink checkpointing must be enabled to take effect).
                                                            // Also, this overrides periodic checkpointing if it is enabled in the
                                                            // props by "auto.commit.enable" / "enable.auto.commit".
                                                            // if this is false, and periodic committing is also not set in props, no offset committing.
          kafka.setForwardMetrics(boolean);
          ...
          

          To avoid breaking current behaviour, default starting position will be "from group offsets", and "commit on checkpoint" will be true.

          Let me know what you think!

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited Stephan Ewen Can we keep the Kafka08 and Kafka09 configuration style similar? The latest configuration suggestion was aiming to use the Properties only to configure the internal consumers (the properties are simply passed on), so any behaviour / configuration that the internal Kafka consumer doesn't recognise is to be supplied through the setter methods. So, since the internal SimpleConsumer in 0.8 and KafkaConsumer in 0.9 behaves differently to the given Properties (ex. SimpleConsumer doesn't recognise group.id while KafkaConsumer does), the setter methods also need to be designed a bit different to provide an overall same level of functionality. After some reconsideration, it probably isn't necessary to rework the configuration to this extent. FlinkKafkaConsumer08 has already implemented behaviours for group.id and auto.commit.enable above the SimpleConsumer s, so in the current state of the Kafka connectors, FlinkKafkaConsumer08 and FlinkKafkaConsumer09 have equal functionality. By building on what we have already, it is certainly possible to keep the reworked configuration for the two versions similar. Also, this way we won't be breaking current behaviours. How about calling offsets in ZK/Kafka the "Group Offsets" ... I agree, "Group Offsets" seems to be a better naming. Does it ever make sense to commit to a different group than to start from? If we're keeping the group.id setting in the properties both in 0.8 and 0.9, then this won't be a case to consider. Taking into account the above and the comments so far, how about this for the reworked Kafka configuration: Properties props = new Properties(); // for 0.8 consumer props.setProperty( "group.id" , "..." ); props.setProperty( "auto.commit.enable" , " true / false " ); // config for periodic committing in 0.8 props.setProperty( "auto.commit.interval.ms" , "..." ) ... // or for 0.9 consumer props.setProperty( "group.id" , "..." ); props.setProperty( "enable.auto.commit" , " true / false " ); // config for periodic committing in 0.9 props.setProperty( "auto.commit.interval.ms" , "..." ) ... FlinkKafkaConsumer08 kafka = new FlinkKafkaConsumer08( "topic" , schema, props); // or FlinkKafkaConsumer09 kafka = new FlinkKafkaConsumer09( "topic" , schema, props); kafka.setStartFromEarliest(); kafka.setStartFromLatest(); kafka.setStartFromGroupOffsets(); // uses the "group.id" in props kafka.setCommitGroupOffsetsOnCheckpoint( boolean ); // commits to the "group.id" (Flink checkpointing must be enabled to take effect). // Also, this overrides periodic checkpointing if it is enabled in the // props by "auto.commit.enable" / "enable.auto.commit" . // if this is false , and periodic committing is also not set in props, no offset committing. kafka.setForwardMetrics( boolean ); ... To avoid breaking current behaviour, default starting position will be "from group offsets", and "commit on checkpoint" will be true. Let me know what you think!
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          Hi Stephan Ewen,
          I'd like to start working on this and FLINK-3398 over the next week, because they're sort of blockers for other Kafka connector tasks I'm working on.
          Do you have any other suggestions for the latest configuration settings in the above comment? Thanks!

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited Hi Stephan Ewen , I'd like to start working on this and FLINK-3398 over the next week, because they're sort of blockers for other Kafka connector tasks I'm working on. Do you have any other suggestions for the latest configuration settings in the above comment? Thanks!
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/2509

          FLINK-4280[kafka-connector] Explicit start position configuration for Kafka Consumer

          This PR adds the following new explicit setter methods to configure the starting position for the Kafka Consumer connector:

          ```
          FlinkKafkaConsumer08 kafka = new FlinkKafkaConsumer08(...) // or 09
          kafka.setStartFromEarliest(); // start from earliest without respecting any committed offsets
          kafka.setStartFromLatest(); // start from latest without respecting any committed offsets
          kafka.setStartFromGroupOffsets(); // respects committed offsets in ZK / Kafka as starting points
          ```

          The default is to start from group offsets, so we won't be breaking existing user code.

          One thing to note is that this PR also includes some refactoring to consolidate all start offset assigning logic for partitions within the fetcher. For example, in 0.8 version, with this change the `SimpleConsumerThread` no longer deals with deciding where a partition needs to start from; all partitions should already be assigned starting offsets by the fetcher, and it simply needs to start consuming the partition.This is a pre-preparation for transparent partition discovery for the Kafka consumers in FLINK-4022(https://issues.apache.org/jira/browse/FLINK-4022).

          I suggest to review this PR after #2369 to reduce effort in getting the 0.10 Kafka consumer in first. Tests for the new function will be added in follow-up commits.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-4280

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2509.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2509


          commit f1d24806d902a45f66fc9b42a19a303a031b81b1
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2016-09-17T13:41:50Z

          FLINK-4280[kafka-connector] Explicit start position configuration for Kafka Consumer


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/2509 FLINK-4280 [kafka-connector] Explicit start position configuration for Kafka Consumer This PR adds the following new explicit setter methods to configure the starting position for the Kafka Consumer connector: ``` FlinkKafkaConsumer08 kafka = new FlinkKafkaConsumer08(...) // or 09 kafka.setStartFromEarliest(); // start from earliest without respecting any committed offsets kafka.setStartFromLatest(); // start from latest without respecting any committed offsets kafka.setStartFromGroupOffsets(); // respects committed offsets in ZK / Kafka as starting points ``` The default is to start from group offsets, so we won't be breaking existing user code. One thing to note is that this PR also includes some refactoring to consolidate all start offset assigning logic for partitions within the fetcher. For example, in 0.8 version, with this change the `SimpleConsumerThread` no longer deals with deciding where a partition needs to start from; all partitions should already be assigned starting offsets by the fetcher, and it simply needs to start consuming the partition.This is a pre-preparation for transparent partition discovery for the Kafka consumers in FLINK-4022 ( https://issues.apache.org/jira/browse/FLINK-4022 ). I suggest to review this PR after #2369 to reduce effort in getting the 0.10 Kafka consumer in first. Tests for the new function will be added in follow-up commits. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-4280 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2509.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2509 commit f1d24806d902a45f66fc9b42a19a303a031b81b1 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2016-09-17T13:41:50Z FLINK-4280 [kafka-connector] Explicit start position configuration for Kafka Consumer
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user gyfora commented on the issue:

          https://github.com/apache/flink/pull/2509

          Hi,

          I like the proposed changes, do you think it would make sense to add the possibility to set specific offsets on a per partition basis?

          ```
          kafka.setStartOffsets(Map<Integer, Long> partitionOffsets)
          ```

          I think this is extremely useful in production use.

          Show
          githubbot ASF GitHub Bot added a comment - Github user gyfora commented on the issue: https://github.com/apache/flink/pull/2509 Hi, I like the proposed changes, do you think it would make sense to add the possibility to set specific offsets on a per partition basis? ``` kafka.setStartOffsets(Map<Integer, Long> partitionOffsets) ``` I think this is extremely useful in production use.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2509

          Hi @gyfora,
          Yes, it is absolutely possible to add that. There's actually a JIRA for that feature too (FLINK-3123(https://issues.apache.org/jira/browse/FLINK-3123)), so I'd say we can add that feature on top of the proposed changes here, as a separate follow up PR after this one?

          One note though, the API for that feature would need to be able to specify offsets for partitions of different topics, since the Kafka consumers can subscribe multiple topics. So, `Map<Integer,Long>` wouldn't fit this case, probably would be better off having a new user-facing class as the argument to define the offsets.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2509 Hi @gyfora, Yes, it is absolutely possible to add that. There's actually a JIRA for that feature too ( FLINK-3123 ( https://issues.apache.org/jira/browse/FLINK-3123 )), so I'd say we can add that feature on top of the proposed changes here, as a separate follow up PR after this one? One note though, the API for that feature would need to be able to specify offsets for partitions of different topics, since the Kafka consumers can subscribe multiple topics. So, `Map<Integer,Long>` wouldn't fit this case, probably would be better off having a new user-facing class as the argument to define the offsets.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2509

          I'll rebase this PR soon, probably will also wait for Kafka 0.10 connector to be merged.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2509 I'll rebase this PR soon, probably will also wait for Kafka 0.10 connector to be merged.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/2509

          Thank you for working on this. I gave #2369 some love today to speed up things

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2509 Thank you for working on this. I gave #2369 some love today to speed up things
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user gyfora commented on the issue:

          https://github.com/apache/flink/pull/2509

          @tzulitai makes sense ! As for for the Map<Int, Long> you are right, the multiple topic case slipped my mind

          Show
          githubbot ASF GitHub Bot added a comment - Github user gyfora commented on the issue: https://github.com/apache/flink/pull/2509 @tzulitai makes sense ! As for for the Map<Int, Long> you are right, the multiple topic case slipped my mind
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user koeninger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r82923872

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java —
          @@ -345,16 +343,19 @@ protected static void validateZooKeeperConfig(Properties props) {
          }
          }

          • private static long getInvalidOffsetBehavior(Properties config) {
            + /**
            + * Check for invalid "auto.offset.reset" values. Should be called in constructor for eager checking before submitting
            + * the job. Note that 'none' is also considered invalid, as we don't want to deliberately throw an exception
              • End diff –

          Look out for https://issues.apache.org/jira/browse/KAFKA-3370 if you aren't already aware. Right now allowing none and catching the exception only on startup is the best workaround I've seen.

          Show
          githubbot ASF GitHub Bot added a comment - Github user koeninger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r82923872 — Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java — @@ -345,16 +343,19 @@ protected static void validateZooKeeperConfig(Properties props) { } } private static long getInvalidOffsetBehavior(Properties config) { + /** + * Check for invalid "auto.offset.reset" values. Should be called in constructor for eager checking before submitting + * the job. Note that 'none' is also considered invalid, as we don't want to deliberately throw an exception End diff – Look out for https://issues.apache.org/jira/browse/KAFKA-3370 if you aren't already aware. Right now allowing none and catching the exception only on startup is the best workaround I've seen.
          Hide
          cody@koeninger.org Cody Koeninger added a comment -

          FLINK-3123 would be pretty important to me, happy to help if needed.

          You already have a public KafkaTopicPartition class, is there a reason not to use Map<KafkaTopicPartition, Long> ?

          Show
          cody@koeninger.org Cody Koeninger added a comment - FLINK-3123 would be pretty important to me, happy to help if needed. You already have a public KafkaTopicPartition class, is there a reason not to use Map<KafkaTopicPartition, Long> ?
          Hide
          rmetzger Robert Metzger added a comment -
          Show
          rmetzger Robert Metzger added a comment - This issue is related: https://issues.apache.org/jira/browse/FLINK-3037
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r83003348

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java —
          @@ -345,16 +343,19 @@ protected static void validateZooKeeperConfig(Properties props) {
          }
          }

          • private static long getInvalidOffsetBehavior(Properties config) {
            + /**
            + * Check for invalid "auto.offset.reset" values. Should be called in constructor for eager checking before submitting
            + * the job. Note that 'none' is also considered invalid, as we don't want to deliberately throw an exception
              • End diff –

          Thank you for the pointer.
          We are discussing this issue here https://issues.apache.org/jira/browse/FLINK-4280 and here https://issues.apache.org/jira/browse/FLINK-3037

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r83003348 — Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java — @@ -345,16 +343,19 @@ protected static void validateZooKeeperConfig(Properties props) { } } private static long getInvalidOffsetBehavior(Properties config) { + /** + * Check for invalid "auto.offset.reset" values. Should be called in constructor for eager checking before submitting + * the job. Note that 'none' is also considered invalid, as we don't want to deliberately throw an exception End diff – Thank you for the pointer. We are discussing this issue here https://issues.apache.org/jira/browse/FLINK-4280 and here https://issues.apache.org/jira/browse/FLINK-3037
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          I think using the KafkaTopicPartition is a good idea.
          I actually have a branch for FLINK-3123 almost ready already, will try to open up a PR based on https://github.com/apache/flink/pull/2509 before the end of the week.
          Will be great if you can help out the review over there too

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - I think using the KafkaTopicPartition is a good idea. I actually have a branch for FLINK-3123 almost ready already, will try to open up a PR based on https://github.com/apache/flink/pull/2509 before the end of the week. Will be great if you can help out the review over there too
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r83141867

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java —
          @@ -345,16 +343,19 @@ protected static void validateZooKeeperConfig(Properties props) {
          }
          }

          • private static long getInvalidOffsetBehavior(Properties config) {
            + /**
            + * Check for invalid "auto.offset.reset" values. Should be called in constructor for eager checking before submitting
            + * the job. Note that 'none' is also considered invalid, as we don't want to deliberately throw an exception
              • End diff –

          Thanks for pointing this out. To keep things simple for now, I propose to fix https://issues.apache.org/jira/browse/FLINK-3037 as a separate PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r83141867 — Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java — @@ -345,16 +343,19 @@ protected static void validateZooKeeperConfig(Properties props) { } } private static long getInvalidOffsetBehavior(Properties config) { + /** + * Check for invalid "auto.offset.reset" values. Should be called in constructor for eager checking before submitting + * the job. Note that 'none' is also considered invalid, as we don't want to deliberately throw an exception End diff – Thanks for pointing this out. To keep things simple for now, I propose to fix https://issues.apache.org/jira/browse/FLINK-3037 as a separate PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2509

          Rebasing + adding tests for the new functions now.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2509 Rebasing + adding tests for the new functions now.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2509

          @rmetzger @gyfora @koeninger Rebased this on the Kafka 0.10 connector and some other recent changes. This is ready for review now I'd like to add tests for this after #2580, because #2580 adds a `OffsetHandler` to the Kafka test environment in the IT tests, which will come in handy when writing tests for this PR.

          I'll also open a separate PR based on this one for FLINK-3123 (set specific offsets for startup).

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2509 @rmetzger @gyfora @koeninger Rebased this on the Kafka 0.10 connector and some other recent changes. This is ready for review now I'd like to add tests for this after #2580, because #2580 adds a `OffsetHandler` to the Kafka test environment in the IT tests, which will come in handy when writing tests for this PR. I'll also open a separate PR based on this one for FLINK-3123 (set specific offsets for startup).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r83165323

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java —
          @@ -202,13 +204,53 @@ public void run() {
          }
          }

          • // seek the consumer to the initial offsets
            + List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>();
            for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
          • if (partition.isOffsetDefined())
            Unknown macro: { + if (!partition.isOffsetDefined()) { + partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); + } else { consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1); } }

          + if (partitionsWithNoOffset.size() == subscribedPartitions().length) {
          + // if all partitions have no initial offsets, that means we're starting fresh
          + switch (startupMode) {
          + case EARLIEST:
          + LOG.info("Setting starting point as earliest offset for partitions {}", partitionsWithNoOffset);
          +
          + for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions())

          { + consumer.seekToBeginning(partition.getKafkaPartitionHandle()); + }

          + break;
          + case LATEST:
          + LOG.info("Setting starting point as latest offset for partitions {}", partitionsWithNoOffset);
          +
          + for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions())

          { + consumer.seekToEnd(partition.getKafkaPartitionHandle()); + }

          + break;
          + default:
          + case GROUP_OFFSETS:
          + LOG.info("Using group offsets in Kafka of group.id {} as starting point for partitions {}",
          + kafkaProperties.getProperty("group.id"), partitionsWithNoOffset);
          + // don't need to do anything; the KafkaConsumer by default finds group offsets from Kafka brokers
          + }
          + } else if (partitionsWithNoOffset.size() > 0 && partitionsWithNoOffset.size() < subscribedPartitions().length) {
          + // we are restoring from a checkpoint/savepoint, but there are some new partitions that weren't
          + // subscribed by the consumer on the previous execution; in this case, we set the starting offset
          + // of all new partitions to the earliest offset
          + LOG.info("Setting starting point as earliest offset for newly created partitions after startup: {}", partitionsWithNoOffset);
          +
          + for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
          + if (partitionsWithNoOffset.contains(partition.getKafkaTopicPartition()))

          { + consumer.seekToBeginning(partition.getKafkaPartitionHandle()); + }

          + }
          + } else

          { + // restored from a checkpoint/savepoint, and all partitions have starting offsets; don't need to do anything + }

          +
          — End diff –

          @rmetzger after looking at https://issues.apache.org/jira/browse/FLINK-3037, do you think the proposed changes here actually fixes that issue?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r83165323 — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java — @@ -202,13 +204,53 @@ public void run() { } } // seek the consumer to the initial offsets + List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>(); for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) { if (partition.isOffsetDefined()) Unknown macro: { + if (!partition.isOffsetDefined()) { + partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); + } else { consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1); } } + if (partitionsWithNoOffset.size() == subscribedPartitions().length) { + // if all partitions have no initial offsets, that means we're starting fresh + switch (startupMode) { + case EARLIEST: + LOG.info("Setting starting point as earliest offset for partitions {}", partitionsWithNoOffset); + + for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) { + consumer.seekToBeginning(partition.getKafkaPartitionHandle()); + } + break; + case LATEST: + LOG.info("Setting starting point as latest offset for partitions {}", partitionsWithNoOffset); + + for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) { + consumer.seekToEnd(partition.getKafkaPartitionHandle()); + } + break; + default: + case GROUP_OFFSETS: + LOG.info("Using group offsets in Kafka of group.id {} as starting point for partitions {}", + kafkaProperties.getProperty("group.id"), partitionsWithNoOffset); + // don't need to do anything; the KafkaConsumer by default finds group offsets from Kafka brokers + } + } else if (partitionsWithNoOffset.size() > 0 && partitionsWithNoOffset.size() < subscribedPartitions().length) { + // we are restoring from a checkpoint/savepoint, but there are some new partitions that weren't + // subscribed by the consumer on the previous execution; in this case, we set the starting offset + // of all new partitions to the earliest offset + LOG.info("Setting starting point as earliest offset for newly created partitions after startup: {}", partitionsWithNoOffset); + + for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) { + if (partitionsWithNoOffset.contains(partition.getKafkaTopicPartition())) { + consumer.seekToBeginning(partition.getKafkaPartitionHandle()); + } + } + } else { + // restored from a checkpoint/savepoint, and all partitions have starting offsets; don't need to do anything + } + — End diff – @rmetzger after looking at https://issues.apache.org/jira/browse/FLINK-3037 , do you think the proposed changes here actually fixes that issue?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r83165620

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java —
          @@ -202,13 +204,53 @@ public void run() {
          }
          }

          • // seek the consumer to the initial offsets
            + List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>();
            for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
          • if (partition.isOffsetDefined())
            Unknown macro: { + if (!partition.isOffsetDefined()) { + partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); + } else { consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1); } }

          + if (partitionsWithNoOffset.size() == subscribedPartitions().length) {
          + // if all partitions have no initial offsets, that means we're starting fresh
          + switch (startupMode) {
          + case EARLIEST:
          + LOG.info("Setting starting point as earliest offset for partitions {}", partitionsWithNoOffset);
          +
          + for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions())

          { + consumer.seekToBeginning(partition.getKafkaPartitionHandle()); + }

          + break;
          + case LATEST:
          + LOG.info("Setting starting point as latest offset for partitions {}", partitionsWithNoOffset);
          +
          + for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions())

          { + consumer.seekToEnd(partition.getKafkaPartitionHandle()); + }

          + break;
          + default:
          + case GROUP_OFFSETS:
          + LOG.info("Using group offsets in Kafka of group.id {} as starting point for partitions {}",
          + kafkaProperties.getProperty("group.id"), partitionsWithNoOffset);
          + // don't need to do anything; the KafkaConsumer by default finds group offsets from Kafka brokers
          + }
          + } else if (partitionsWithNoOffset.size() > 0 && partitionsWithNoOffset.size() < subscribedPartitions().length) {
          — End diff –

          @rmetzger after looking at https://issues.apache.org/jira/browse/FLINK-3037, do you think the proposed changes here (in the `Kafka09Fetcher`, overall) actually fixes that issue?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r83165620 — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java — @@ -202,13 +204,53 @@ public void run() { } } // seek the consumer to the initial offsets + List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>(); for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) { if (partition.isOffsetDefined()) Unknown macro: { + if (!partition.isOffsetDefined()) { + partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); + } else { consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1); } } + if (partitionsWithNoOffset.size() == subscribedPartitions().length) { + // if all partitions have no initial offsets, that means we're starting fresh + switch (startupMode) { + case EARLIEST: + LOG.info("Setting starting point as earliest offset for partitions {}", partitionsWithNoOffset); + + for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) { + consumer.seekToBeginning(partition.getKafkaPartitionHandle()); + } + break; + case LATEST: + LOG.info("Setting starting point as latest offset for partitions {}", partitionsWithNoOffset); + + for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) { + consumer.seekToEnd(partition.getKafkaPartitionHandle()); + } + break; + default: + case GROUP_OFFSETS: + LOG.info("Using group offsets in Kafka of group.id {} as starting point for partitions {}", + kafkaProperties.getProperty("group.id"), partitionsWithNoOffset); + // don't need to do anything; the KafkaConsumer by default finds group offsets from Kafka brokers + } + } else if (partitionsWithNoOffset.size() > 0 && partitionsWithNoOffset.size() < subscribedPartitions().length) { — End diff – @rmetzger after looking at https://issues.apache.org/jira/browse/FLINK-3037 , do you think the proposed changes here (in the `Kafka09Fetcher`, overall) actually fixes that issue?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2509

          Rebased on recent Kafka consumer changes, fixed failing Kafka 0.10 exactly-once tests, and added integration tests (`testStartFromEarliestOffsets`, `testStartFromLatestOffsets`, and `testStartFromGroupOffsets`) for the new explicit startup modes.

          However, I'm bumping into Kafka consumer config errors when running the `testStartFromEarliestOffsets` in versions 0.9 and 0.10. Still investigating the issue, currently `testStartFromEarliestOffsets` is deliberately commented out in 0.9 and 0.10 IT tests for some early reviews.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2509 Rebased on recent Kafka consumer changes, fixed failing Kafka 0.10 exactly-once tests, and added integration tests (`testStartFromEarliestOffsets`, `testStartFromLatestOffsets`, and `testStartFromGroupOffsets`) for the new explicit startup modes. However, I'm bumping into Kafka consumer config errors when running the `testStartFromEarliestOffsets` in versions 0.9 and 0.10. Still investigating the issue, currently `testStartFromEarliestOffsets` is deliberately commented out in 0.9 and 0.10 IT tests for some early reviews.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r89274682

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java —
          @@ -45,10 +45,7 @@

          import java.io.File;
          import java.net.BindException;
          -import java.util.ArrayList;
          -import java.util.List;
          -import java.util.Properties;
          -import java.util.UUID;
          +import java.util.*;
          — End diff –

          Star imports are something we try to avoid in Flink.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89274682 — Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java — @@ -45,10 +45,7 @@ import java.io.File; import java.net.BindException; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.UUID; +import java.util.*; — End diff – Star imports are something we try to avoid in Flink.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r89274555

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java —
          @@ -131,6 +131,25 @@ public void testMetricsAndEndOfStream() throws Exception

          { runEndOfStreamTest(); }

          + // — startup mode —
          +
          + // TODO not passing due to Kafka Consumer config error
          — End diff –

          This is easy to fix, right? You just have to put serializer classes into the `standardProps`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89274555 — Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java — @@ -131,6 +131,25 @@ public void testMetricsAndEndOfStream() throws Exception { runEndOfStreamTest(); } + // — startup mode — + + // TODO not passing due to Kafka Consumer config error — End diff – This is easy to fix, right? You just have to put serializer classes into the `standardProps`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r89284524

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java —
          @@ -218,26 +221,57 @@ public void run() {
          }
          }

          • // seek the consumer to the initial offsets
            + List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>();
            for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
            if (partition.isOffsetDefined()) {
            LOG.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; seeking the consumer " +
            "to position {}", partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1);

          consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
          } else

          { - // for partitions that do not have offsets restored from a checkpoint/savepoint, - // we need to define our internal offset state for them using the initial offsets retrieved from Kafka - // by the KafkaConsumer, so that they are correctly checkpointed and committed on the next checkpoint + partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); + }

          + }

          • long fetchedOffset = consumer.position(partition.getKafkaPartitionHandle());
            + if (partitionsWithNoOffset.size() == subscribedPartitions().length) {
            + // if all partitions have no initial offsets, that means we're starting fresh
            + switch (startupMode) {
            + case EARLIEST:
            + LOG.info("Setting starting point as earliest offset for partitions {}", partitionsWithNoOffset);
            +
            + seekPartitionsToBeginning(consumer, convertKafkaPartitions(subscribedPartitions()));
              • End diff –

          Its a bit inefficient to convert the `subscribedPartitions()` to an ArrayList, and then in `seekPartitionsToBeginning` the List is converted back into an array. I think we can save the `ArrayList` step and create an array immediately.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89284524 — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java — @@ -218,26 +221,57 @@ public void run() { } } // seek the consumer to the initial offsets + List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>(); for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) { if (partition.isOffsetDefined()) { LOG.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; seeking the consumer " + "to position {}", partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1); consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1); } else { - // for partitions that do not have offsets restored from a checkpoint/savepoint, - // we need to define our internal offset state for them using the initial offsets retrieved from Kafka - // by the KafkaConsumer, so that they are correctly checkpointed and committed on the next checkpoint + partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); + } + } long fetchedOffset = consumer.position(partition.getKafkaPartitionHandle()); + if (partitionsWithNoOffset.size() == subscribedPartitions().length) { + // if all partitions have no initial offsets, that means we're starting fresh + switch (startupMode) { + case EARLIEST: + LOG.info("Setting starting point as earliest offset for partitions {}", partitionsWithNoOffset); + + seekPartitionsToBeginning(consumer, convertKafkaPartitions(subscribedPartitions())); End diff – Its a bit inefficient to convert the `subscribedPartitions()` to an ArrayList, and then in `seekPartitionsToBeginning` the List is converted back into an array. I think we can save the `ArrayList` step and create an array immediately.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r89284541

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java —
          @@ -218,26 +221,57 @@ public void run() {
          }
          }

          • // seek the consumer to the initial offsets
            + List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>();
            for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
            if (partition.isOffsetDefined()) {
            LOG.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; seeking the consumer " +
            "to position {}", partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1);

          consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
          } else

          { - // for partitions that do not have offsets restored from a checkpoint/savepoint, - // we need to define our internal offset state for them using the initial offsets retrieved from Kafka - // by the KafkaConsumer, so that they are correctly checkpointed and committed on the next checkpoint + partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); + }

          + }

          • long fetchedOffset = consumer.position(partition.getKafkaPartitionHandle());
            + if (partitionsWithNoOffset.size() == subscribedPartitions().length) {
            + // if all partitions have no initial offsets, that means we're starting fresh
            + switch (startupMode) {
            + case EARLIEST:
            + LOG.info("Setting starting point as earliest offset for partitions {}", partitionsWithNoOffset);
            +
            + seekPartitionsToBeginning(consumer, convertKafkaPartitions(subscribedPartitions()));
            + break;
            + case LATEST:
            + LOG.info("Setting starting point as latest offset for partitions {}", partitionsWithNoOffset);
            +
            + seekPartitionsToEnd(consumer, convertKafkaPartitions(subscribedPartitions()));
              • End diff –

          Same here

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89284541 — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java — @@ -218,26 +221,57 @@ public void run() { } } // seek the consumer to the initial offsets + List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>(); for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) { if (partition.isOffsetDefined()) { LOG.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; seeking the consumer " + "to position {}", partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1); consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1); } else { - // for partitions that do not have offsets restored from a checkpoint/savepoint, - // we need to define our internal offset state for them using the initial offsets retrieved from Kafka - // by the KafkaConsumer, so that they are correctly checkpointed and committed on the next checkpoint + partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); + } + } long fetchedOffset = consumer.position(partition.getKafkaPartitionHandle()); + if (partitionsWithNoOffset.size() == subscribedPartitions().length) { + // if all partitions have no initial offsets, that means we're starting fresh + switch (startupMode) { + case EARLIEST: + LOG.info("Setting starting point as earliest offset for partitions {}", partitionsWithNoOffset); + + seekPartitionsToBeginning(consumer, convertKafkaPartitions(subscribedPartitions())); + break; + case LATEST: + LOG.info("Setting starting point as latest offset for partitions {}", partitionsWithNoOffset); + + seekPartitionsToEnd(consumer, convertKafkaPartitions(subscribedPartitions())); End diff – Same here
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r89275211

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java —
          @@ -501,4 +514,14 @@ private static void getLastOffsetFromKafka(
          part.setOffset(offset - 1);
          }
          }
          -}
          \ No newline at end of file
          +
          + private static void checkAllPartitionsHaveDefinedStartingOffsets(
          + List<KafkaTopicPartitionState<TopicAndPartition>> partitions)
          + {
          + for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) {
          + if (!part.isOffsetDefined()) {
          + throw new RuntimeException("SimpleConsumerThread received a partition with undefined starting offset");
          — End diff –

          Since we use this in an argument check, we should throw an IllegalArgumentException here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89275211 — Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java — @@ -501,4 +514,14 @@ private static void getLastOffsetFromKafka( part.setOffset(offset - 1); } } -} \ No newline at end of file + + private static void checkAllPartitionsHaveDefinedStartingOffsets( + List<KafkaTopicPartitionState<TopicAndPartition>> partitions) + { + for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) { + if (!part.isOffsetDefined()) { + throw new RuntimeException("SimpleConsumerThread received a partition with undefined starting offset"); — End diff – Since we use this in an argument check, we should throw an IllegalArgumentException here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r89150327

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java —
          @@ -263,6 +265,7 @@ public Void answer(InvocationOnMock invocation) {
          schema,
          new Properties(),
          0L,
          + StartupMode.GROUP_OFFSETS,
          — End diff –

          Looks like the indentation of the added lines is correct, but the indentation of the file is wrong. Could you fix that with the PR?

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89150327 — Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java — @@ -263,6 +265,7 @@ public Void answer(InvocationOnMock invocation) { schema, new Properties(), 0L, + StartupMode.GROUP_OFFSETS, — End diff – Looks like the indentation of the added lines is correct, but the indentation of the file is wrong. Could you fix that with the PR?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r89276062

          — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java —
          @@ -444,6 +445,134 @@ public void run()

          { kafkaOffsetHandler.close(); deleteTestTopic(topicName); }

          +
          + /**
          + * This test ensures that when explicitly set to start from earliest record, the consumer
          + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
          + */
          + public void runStartFromEarliestOffsets() throws Exception

          { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + }

          +
          + /**
          + * This test ensures that when explicitly set to start from latest record, the consumer
          + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
          + */
          + public void runStartFromLatestOffsets() throws Exception {
          + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
          + final int parallelism = 3;
          + final int recordsInEachPartition = 50;
          +
          + final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1);
          +
          + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
          + env.getConfig().disableSysoutLogging();
          + env.setParallelism(parallelism);
          +
          + final Properties readProps = new Properties();
          + readProps.putAll(standardProps);
          + readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored
          +
          + // the committed offsets should be ignored
          + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
          + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
          + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
          + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
          +
          + Thread consumeThread = new Thread(new Runnable() {
          + @Override
          + public void run() {
          + try

          { + readSequence(env, StartupMode.LATEST, readProps, parallelism, topicName, 30, 50); + }

          catch (Exception e)

          { + throw new RuntimeException(e); + }

          + }
          + });
          + consumeThread.start();
          +
          + Thread.sleep(5000);
          — End diff –

          This is dangerous because there is no guarantee that the `readSequence` call finishes in 5 seconds (travis has sometimes pretty slow test execution).

          You can probably avoid the sleep by writing a defined number of elements into the topic, after the `readSequence()` started. Then, you check if the number of read elements is lower of equal to that defined number.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89276062 — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java — @@ -444,6 +445,134 @@ public void run() { kafkaOffsetHandler.close(); deleteTestTopic(topicName); } + + /** + * This test ensures that when explicitly set to start from earliest record, the consumer + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. + */ + public void runStartFromEarliestOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + } + + /** + * This test ensures that when explicitly set to start from latest record, the consumer + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. + */ + public void runStartFromLatestOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + final Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + Thread consumeThread = new Thread(new Runnable() { + @Override + public void run() { + try { + readSequence(env, StartupMode.LATEST, readProps, parallelism, topicName, 30, 50); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + consumeThread.start(); + + Thread.sleep(5000); — End diff – This is dangerous because there is no guarantee that the `readSequence` call finishes in 5 seconds (travis has sometimes pretty slow test execution). You can probably avoid the sleep by writing a defined number of elements into the topic, after the `readSequence()` started. Then, you check if the number of read elements is lower of equal to that defined number.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r89282863

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java —
          @@ -139,24 +143,65 @@ public void runFetchLoop() throws Exception {

          PeriodicOffsetCommitter periodicCommitter = null;
          try {

          • // read offsets from ZooKeeper for partitions that did not restore offsets
          • {
          • List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>();
          • for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
          • if (!partition.isOffsetDefined()) { - partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); - }

            + List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>();
            + for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions())

            Unknown macro: { + if (!partition.isOffsetDefined()) { + partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); } + }

            +
            + if (partitionsWithNoOffset.size() == subscribedPartitions().length) {
            + // if all partitions have no initial offsets, that means we're starting fresh without any restored state
            + switch (startupMode) {
            + case EARLIEST:
            + LOG.info("Setting starting point as earliest offset for partitions {}", partitionsWithNoOffset);
            +
            + for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions())

            { + partition.setOffset(OffsetRequest.EarliestTime()); + }

            + break;
            + case LATEST:
            + LOG.info("Setting starting point as latest offset for partitions {}", partitionsWithNoOffset);
            +
            + for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions())

            { + partition.setOffset(OffsetRequest.LatestTime()); + }

            + break;
            + default:
            + case GROUP_OFFSETS:
            + LOG.info("Using group offsets in Zookeeper of group.id {} as starting point for partitions {}",
            + kafkaConfig.getProperty("group.id"), partitionsWithNoOffset);
            +
            + Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset);
            + for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
            + Long offset = zkOffsets.get(partition.getKafkaTopicPartition());
            + if (offset != null)

            { + // the committed offset in ZK represents the next record to process, + // so we subtract it by 1 to correctly represent internal state + partition.setOffset(offset - 1); + }

            else {
            + // if we can't find an offset for a partition in ZK when using GROUP_OFFSETS,
            + // we default to "auto.offset.reset" like the Kafka high-level consumer
            + LOG.warn("No group offset can be found for partition {} in Zookeeper;" +
            + " resetting starting offset to 'auto.offset.reset'", partition);
            +
            + partition.setOffset(invalidOffsetBehavior);
            + }
            + }
            + }
            + } else if (partitionsWithNoOffset.size() > 0 && partitionsWithNoOffset.size() < subscribedPartitions().length) {

              • End diff –

          I think this case can currently never happen because on restore, we are only adding partitions part from the restore.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89282863 — Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java — @@ -139,24 +143,65 @@ public void runFetchLoop() throws Exception { PeriodicOffsetCommitter periodicCommitter = null; try { // read offsets from ZooKeeper for partitions that did not restore offsets { List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>(); for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) { if (!partition.isOffsetDefined()) { - partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); - } + List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>(); + for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) Unknown macro: { + if (!partition.isOffsetDefined()) { + partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); } + } + + if (partitionsWithNoOffset.size() == subscribedPartitions().length) { + // if all partitions have no initial offsets, that means we're starting fresh without any restored state + switch (startupMode) { + case EARLIEST: + LOG.info("Setting starting point as earliest offset for partitions {}", partitionsWithNoOffset); + + for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) { + partition.setOffset(OffsetRequest.EarliestTime()); + } + break; + case LATEST: + LOG.info("Setting starting point as latest offset for partitions {}", partitionsWithNoOffset); + + for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) { + partition.setOffset(OffsetRequest.LatestTime()); + } + break; + default: + case GROUP_OFFSETS: + LOG.info("Using group offsets in Zookeeper of group.id {} as starting point for partitions {}", + kafkaConfig.getProperty("group.id"), partitionsWithNoOffset); + + Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset); + for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) { + Long offset = zkOffsets.get(partition.getKafkaTopicPartition()); + if (offset != null) { + // the committed offset in ZK represents the next record to process, + // so we subtract it by 1 to correctly represent internal state + partition.setOffset(offset - 1); + } else { + // if we can't find an offset for a partition in ZK when using GROUP_OFFSETS, + // we default to "auto.offset.reset" like the Kafka high-level consumer + LOG.warn("No group offset can be found for partition {} in Zookeeper;" + + " resetting starting offset to 'auto.offset.reset'", partition); + + partition.setOffset(invalidOffsetBehavior); + } + } + } + } else if (partitionsWithNoOffset.size() > 0 && partitionsWithNoOffset.size() < subscribedPartitions().length) { End diff – I think this case can currently never happen because on restore, we are only adding partitions part from the restore.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r89283859

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java —
          @@ -306,13 +340,23 @@ public void run() {
          }
          }

          • // Kafka09Fetcher ignores the timestamp, Kafka010Fetcher is extracting the timestamp and passing it to the emitRecord() method.
            + // ------------------------------------------------------------------------
            + // Protected methods that allow pluggable Kafka version-specific implementations
            + // ------------------------------------------------------------------------
            +
            protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord consumerRecord) throws Exception {
            + // Kafka09Fetcher ignores the timestamp, Kafka010Fetcher is extracting the timestamp and passing it to the emitRecord() method.
            emitRecord(record, partition, offset, Long.MIN_VALUE);
              • End diff –

          This will probably break as well when rebasing

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89283859 — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java — @@ -306,13 +340,23 @@ public void run() { } } // Kafka09Fetcher ignores the timestamp, Kafka010Fetcher is extracting the timestamp and passing it to the emitRecord() method. + // ------------------------------------------------------------------------ + // Protected methods that allow pluggable Kafka version-specific implementations + // ------------------------------------------------------------------------ + protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord consumerRecord) throws Exception { + // Kafka09Fetcher ignores the timestamp, Kafka010Fetcher is extracting the timestamp and passing it to the emitRecord() method. emitRecord(record, partition, offset, Long.MIN_VALUE); End diff – This will probably break as well when rebasing
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r89283319

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java —
          @@ -408,23 +412,32 @@ else if (partitionsRemoved) {
          }
          }

          • private void getMissingOffsetsFromKafka(
            + private void replaceEarliestOrLatestOffsetsWithActualValuesFromKafka(
            List<KafkaTopicPartitionState<TopicAndPartition>> partitions) throws IOException
            {
            // collect which partitions we should fetch offsets for
          • List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = new ArrayList<>();
            + List<KafkaTopicPartitionState<TopicAndPartition>> partitionsWithEarliestOffsetSetting = new ArrayList<>();
              • End diff –

          I don't see why you need to copy the partitions into these lists.
          I think you can just go over the list and call getLastOffsetFromKafka with `part.getOffset()` as the last argument.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89283319 — Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java — @@ -408,23 +412,32 @@ else if (partitionsRemoved) { } } private void getMissingOffsetsFromKafka( + private void replaceEarliestOrLatestOffsetsWithActualValuesFromKafka( List<KafkaTopicPartitionState<TopicAndPartition>> partitions) throws IOException { // collect which partitions we should fetch offsets for List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = new ArrayList<>(); + List<KafkaTopicPartitionState<TopicAndPartition>> partitionsWithEarliestOffsetSetting = new ArrayList<>(); End diff – I don't see why you need to copy the partitions into these lists. I think you can just go over the list and call getLastOffsetFromKafka with `part.getOffset()` as the last argument.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r89275431

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java —
          @@ -110,6 +110,25 @@ public void testMetrics() throws Throwable

          { runMetricsTest(); }

          + // — startup mode —
          +
          + // TODO not passing due to Kafka Consumer config error
          +// @Test(timeout = 60000)
          +// public void testStartFromEarliestOffsets() throws Exception

          { +// runStartFromEarliestOffsets(); +// }

          — End diff –

          We should fix that as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89275431 — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java — @@ -110,6 +110,25 @@ public void testMetrics() throws Throwable { runMetricsTest(); } + // — startup mode — + + // TODO not passing due to Kafka Consumer config error +// @Test(timeout = 60000) +// public void testStartFromEarliestOffsets() throws Exception { +// runStartFromEarliestOffsets(); +// } — End diff – We should fix that as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r89275697

          — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java —
          @@ -444,6 +445,134 @@ public void run()

          { kafkaOffsetHandler.close(); deleteTestTopic(topicName); }

          +
          + /**
          + * This test ensures that when explicitly set to start from earliest record, the consumer
          + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
          + */
          + public void runStartFromEarliestOffsets() throws Exception

          { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + }

          +
          + /**
          + * This test ensures that when explicitly set to start from latest record, the consumer
          + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
          + */
          + public void runStartFromLatestOffsets() throws Exception {
          + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
          + final int parallelism = 3;
          + final int recordsInEachPartition = 50;
          +
          + final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1);
          +
          + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
          + env.getConfig().disableSysoutLogging();
          + env.setParallelism(parallelism);
          +
          + final Properties readProps = new Properties();
          + readProps.putAll(standardProps);
          + readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored
          +
          + // the committed offsets should be ignored
          + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
          + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
          + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
          + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
          +
          + Thread consumeThread = new Thread(new Runnable() {
          + @Override
          + public void run() {
          + try

          { + readSequence(env, StartupMode.LATEST, readProps, parallelism, topicName, 30, 50); + }

          catch (Exception e) {
          + throw new RuntimeException(e);
          — End diff –

          This exception will not fail the test.
          You need to define a Throwable field, set it in the thread and check it once the thread has finished.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89275697 — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java — @@ -444,6 +445,134 @@ public void run() { kafkaOffsetHandler.close(); deleteTestTopic(topicName); } + + /** + * This test ensures that when explicitly set to start from earliest record, the consumer + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. + */ + public void runStartFromEarliestOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + } + + /** + * This test ensures that when explicitly set to start from latest record, the consumer + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. + */ + public void runStartFromLatestOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + final Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + Thread consumeThread = new Thread(new Runnable() { + @Override + public void run() { + try { + readSequence(env, StartupMode.LATEST, readProps, parallelism, topicName, 30, 50); + } catch (Exception e) { + throw new RuntimeException(e); — End diff – This exception will not fail the test. You need to define a Throwable field, set it in the thread and check it once the thread has finished.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r89276937

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java —
          @@ -83,30 +90,17 @@ protected void assignPartitionsToConsumer(KafkaConsumer<byte[], byte[]> consumer
          @Override
          protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord consumerRecord) throws Exception

          { // get timestamp from provided ConsumerRecord (only possible with kafka 0.10.x) - super.emitRecord(record, partition, offset, consumerRecord.timestamp()); + emitRecord(record, partition, offset, consumerRecord.timestamp()); }
          • /**
          • * Emit record Kafka-timestamp aware.
          • */
            @Override
          • protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partitionState, long offset, long timestamp) throws Exception {
          • if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
          • // fast path logic, in case there are no watermarks
            + protected void seekPartitionsToBeginning(KafkaConsumer consumer, List<TopicPartition> partitions) { + consumer.seekToBeginning(partitions); + }
          • // emit the record, using the checkpoint lock to guarantee
          • // atomicity of record emission and offset state update
          • synchronized (checkpointLock) { - sourceContext.collectWithTimestamp(record, timestamp); - partitionState.setOffset(offset); - }
              • End diff –

          This has been refactored recently. I think you need to rebase the pull request and update the code here.
          Sorry for that.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89276937 — Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java — @@ -83,30 +90,17 @@ protected void assignPartitionsToConsumer(KafkaConsumer<byte[], byte[]> consumer @Override protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord consumerRecord) throws Exception { // get timestamp from provided ConsumerRecord (only possible with kafka 0.10.x) - super.emitRecord(record, partition, offset, consumerRecord.timestamp()); + emitRecord(record, partition, offset, consumerRecord.timestamp()); } /** * Emit record Kafka-timestamp aware. */ @Override protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partitionState, long offset, long timestamp) throws Exception { if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) { // fast path logic, in case there are no watermarks + protected void seekPartitionsToBeginning(KafkaConsumer consumer, List<TopicPartition> partitions) { + consumer.seekToBeginning(partitions); + } // emit the record, using the checkpoint lock to guarantee // atomicity of record emission and offset state update synchronized (checkpointLock) { - sourceContext.collectWithTimestamp(record, timestamp); - partitionState.setOffset(offset); - } End diff – This has been refactored recently. I think you need to rebase the pull request and update the code here. Sorry for that.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r89275454

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java —
          @@ -41,16 +41,15 @@
          import org.apache.kafka.clients.consumer.OffsetAndMetadata;
          import org.apache.kafka.common.TopicPartition;
          import org.apache.kafka.common.protocol.SecurityProtocol;
          +import org.apache.kafka.common.serialization.ByteArraySerializer;
          +import org.apache.kafka.common.serialization.StringSerializer;
          import org.slf4j.Logger;
          import org.slf4j.LoggerFactory;
          import scala.collection.Seq;

          import java.io.File;
          import java.net.BindException;
          -import java.util.ArrayList;
          -import java.util.List;
          -import java.util.Properties;
          -import java.util.UUID;
          +import java.util.*;
          — End diff –

          Star import

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89275454 — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java — @@ -41,16 +41,15 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.SecurityProtocol; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.collection.Seq; import java.io.File; import java.net.BindException; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.UUID; +import java.util.*; — End diff – Star import
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r89361344

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java —
          @@ -263,6 +265,7 @@ public Void answer(InvocationOnMock invocation) {
          schema,
          new Properties(),
          0L,
          + StartupMode.GROUP_OFFSETS,
          — End diff –

          Will do!

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89361344 — Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java — @@ -263,6 +265,7 @@ public Void answer(InvocationOnMock invocation) { schema, new Properties(), 0L, + StartupMode.GROUP_OFFSETS, — End diff – Will do!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r89361688

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java —
          @@ -131,6 +131,25 @@ public void testMetricsAndEndOfStream() throws Exception

          { runEndOfStreamTest(); }

          + // — startup mode —
          +
          + // TODO not passing due to Kafka Consumer config error
          — End diff –

          Hmm, if I recall correctly, that's what I did in the first place, but that caused some other issues. I'll definitely give this another look and make sure the test is runnable.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89361688 — Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java — @@ -131,6 +131,25 @@ public void testMetricsAndEndOfStream() throws Exception { runEndOfStreamTest(); } + // — startup mode — + + // TODO not passing due to Kafka Consumer config error — End diff – Hmm, if I recall correctly, that's what I did in the first place, but that caused some other issues. I'll definitely give this another look and make sure the test is runnable.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r89361839

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java —
          @@ -45,10 +45,7 @@

          import java.io.File;
          import java.net.BindException;
          -import java.util.ArrayList;
          -import java.util.List;
          -import java.util.Properties;
          -import java.util.UUID;
          +import java.util.*;
          — End diff –

          Ah, this was a IDE auto-complete. The style checks don't cover the test codes, right? I'll revert this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89361839 — Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java — @@ -45,10 +45,7 @@ import java.io.File; import java.net.BindException; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.UUID; +import java.util.*; — End diff – Ah, this was a IDE auto-complete. The style checks don't cover the test codes, right? I'll revert this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r89362689

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java —
          @@ -408,23 +412,32 @@ else if (partitionsRemoved) {
          }
          }

          • private void getMissingOffsetsFromKafka(
            + private void replaceEarliestOrLatestOffsetsWithActualValuesFromKafka(
            List<KafkaTopicPartitionState<TopicAndPartition>> partitions) throws IOException
            {
            // collect which partitions we should fetch offsets for
          • List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = new ArrayList<>();
            + List<KafkaTopicPartitionState<TopicAndPartition>> partitionsWithEarliestOffsetSetting = new ArrayList<>();
              • End diff –

          Good point, didn't think of that. I'll call `getLastOffsetFromKafka` if getOffset() returns `OffsetRequest.EarliestTime()` or `OffsetRequest.LatestTime()`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89362689 — Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java — @@ -408,23 +412,32 @@ else if (partitionsRemoved) { } } private void getMissingOffsetsFromKafka( + private void replaceEarliestOrLatestOffsetsWithActualValuesFromKafka( List<KafkaTopicPartitionState<TopicAndPartition>> partitions) throws IOException { // collect which partitions we should fetch offsets for List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = new ArrayList<>(); + List<KafkaTopicPartitionState<TopicAndPartition>> partitionsWithEarliestOffsetSetting = new ArrayList<>(); End diff – Good point, didn't think of that. I'll call `getLastOffsetFromKafka` if getOffset() returns `OffsetRequest.EarliestTime()` or `OffsetRequest.LatestTime()`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r89363213

          — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java —
          @@ -444,6 +445,134 @@ public void run()

          { kafkaOffsetHandler.close(); deleteTestTopic(topicName); }

          +
          + /**
          + * This test ensures that when explicitly set to start from earliest record, the consumer
          + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
          + */
          + public void runStartFromEarliestOffsets() throws Exception

          { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + }

          +
          + /**
          + * This test ensures that when explicitly set to start from latest record, the consumer
          + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
          + */
          + public void runStartFromLatestOffsets() throws Exception {
          + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
          + final int parallelism = 3;
          + final int recordsInEachPartition = 50;
          +
          + final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1);
          +
          + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
          + env.getConfig().disableSysoutLogging();
          + env.setParallelism(parallelism);
          +
          + final Properties readProps = new Properties();
          + readProps.putAll(standardProps);
          + readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored
          +
          + // the committed offsets should be ignored
          + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
          + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
          + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
          + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
          +
          + Thread consumeThread = new Thread(new Runnable() {
          + @Override
          + public void run() {
          + try

          { + readSequence(env, StartupMode.LATEST, readProps, parallelism, topicName, 30, 50); + }

          catch (Exception e) {
          + throw new RuntimeException(e);
          — End diff –

          Ah, right! Will fix.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89363213 — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java — @@ -444,6 +445,134 @@ public void run() { kafkaOffsetHandler.close(); deleteTestTopic(topicName); } + + /** + * This test ensures that when explicitly set to start from earliest record, the consumer + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. + */ + public void runStartFromEarliestOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + } + + /** + * This test ensures that when explicitly set to start from latest record, the consumer + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. + */ + public void runStartFromLatestOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + final Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + Thread consumeThread = new Thread(new Runnable() { + @Override + public void run() { + try { + readSequence(env, StartupMode.LATEST, readProps, parallelism, topicName, 30, 50); + } catch (Exception e) { + throw new RuntimeException(e); — End diff – Ah, right! Will fix.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r89364029

          — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java —
          @@ -444,6 +445,134 @@ public void run()

          { kafkaOffsetHandler.close(); deleteTestTopic(topicName); }

          +
          + /**
          + * This test ensures that when explicitly set to start from earliest record, the consumer
          + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
          + */
          + public void runStartFromEarliestOffsets() throws Exception

          { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + }

          +
          + /**
          + * This test ensures that when explicitly set to start from latest record, the consumer
          + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
          + */
          + public void runStartFromLatestOffsets() throws Exception {
          + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
          + final int parallelism = 3;
          + final int recordsInEachPartition = 50;
          +
          + final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1);
          +
          + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
          + env.getConfig().disableSysoutLogging();
          + env.setParallelism(parallelism);
          +
          + final Properties readProps = new Properties();
          + readProps.putAll(standardProps);
          + readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored
          +
          + // the committed offsets should be ignored
          + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
          + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
          + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
          + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
          +
          + Thread consumeThread = new Thread(new Runnable() {
          + @Override
          + public void run() {
          + try

          { + readSequence(env, StartupMode.LATEST, readProps, parallelism, topicName, 30, 50); + }

          catch (Exception e)

          { + throw new RuntimeException(e); + }

          + }
          + });
          + consumeThread.start();
          +
          + Thread.sleep(5000);
          — End diff –

          Will probably need to write a different / custom read method or topology for this then.
          The problem is that I wanted to reuse `readSequence()` for the test, but it expects an exact number of read elements for the test to succeed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89364029 — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java — @@ -444,6 +445,134 @@ public void run() { kafkaOffsetHandler.close(); deleteTestTopic(topicName); } + + /** + * This test ensures that when explicitly set to start from earliest record, the consumer + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. + */ + public void runStartFromEarliestOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + } + + /** + * This test ensures that when explicitly set to start from latest record, the consumer + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. + */ + public void runStartFromLatestOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + final Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + Thread consumeThread = new Thread(new Runnable() { + @Override + public void run() { + try { + readSequence(env, StartupMode.LATEST, readProps, parallelism, topicName, 30, 50); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + consumeThread.start(); + + Thread.sleep(5000); — End diff – Will probably need to write a different / custom read method or topology for this then. The problem is that I wanted to reuse `readSequence()` for the test, but it expects an exact number of read elements for the test to succeed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2509

          Thanks for the review @rmetzger
          I'll aim to address your comments and rebase by the end of this week (will tag you once it's ready).

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2509 Thanks for the review @rmetzger I'll aim to address your comments and rebase by the end of this week (will tag you once it's ready).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r89365136

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java —
          @@ -139,24 +143,65 @@ public void runFetchLoop() throws Exception {

          PeriodicOffsetCommitter periodicCommitter = null;
          try {

          • // read offsets from ZooKeeper for partitions that did not restore offsets
          • {
          • List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>();
          • for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
          • if (!partition.isOffsetDefined()) { - partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); - }

            + List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>();
            + for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions())

            Unknown macro: { + if (!partition.isOffsetDefined()) { + partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); } + }

            +
            + if (partitionsWithNoOffset.size() == subscribedPartitions().length) {
            + // if all partitions have no initial offsets, that means we're starting fresh without any restored state
            + switch (startupMode) {
            + case EARLIEST:
            + LOG.info("Setting starting point as earliest offset for partitions {}", partitionsWithNoOffset);
            +
            + for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions())

            { + partition.setOffset(OffsetRequest.EarliestTime()); + }

            + break;
            + case LATEST:
            + LOG.info("Setting starting point as latest offset for partitions {}", partitionsWithNoOffset);
            +
            + for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions())

            { + partition.setOffset(OffsetRequest.LatestTime()); + }

            + break;
            + default:
            + case GROUP_OFFSETS:
            + LOG.info("Using group offsets in Zookeeper of group.id {} as starting point for partitions {}",
            + kafkaConfig.getProperty("group.id"), partitionsWithNoOffset);
            +
            + Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset);
            + for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) {
            + Long offset = zkOffsets.get(partition.getKafkaTopicPartition());
            + if (offset != null)

            { + // the committed offset in ZK represents the next record to process, + // so we subtract it by 1 to correctly represent internal state + partition.setOffset(offset - 1); + }

            else {
            + // if we can't find an offset for a partition in ZK when using GROUP_OFFSETS,
            + // we default to "auto.offset.reset" like the Kafka high-level consumer
            + LOG.warn("No group offset can be found for partition {} in Zookeeper;" +
            + " resetting starting offset to 'auto.offset.reset'", partition);
            +
            + partition.setOffset(invalidOffsetBehavior);
            + }
            + }
            + }
            + } else if (partitionsWithNoOffset.size() > 0 && partitionsWithNoOffset.size() < subscribedPartitions().length) {

              • End diff –

          I was adding this as a preparation for the kafka partition discovery task.
          But it'd probably make sense to remove it for this PR to avoid confusion.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89365136 — Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java — @@ -139,24 +143,65 @@ public void runFetchLoop() throws Exception { PeriodicOffsetCommitter periodicCommitter = null; try { // read offsets from ZooKeeper for partitions that did not restore offsets { List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>(); for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) { if (!partition.isOffsetDefined()) { - partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); - } + List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>(); + for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) Unknown macro: { + if (!partition.isOffsetDefined()) { + partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); } + } + + if (partitionsWithNoOffset.size() == subscribedPartitions().length) { + // if all partitions have no initial offsets, that means we're starting fresh without any restored state + switch (startupMode) { + case EARLIEST: + LOG.info("Setting starting point as earliest offset for partitions {}", partitionsWithNoOffset); + + for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) { + partition.setOffset(OffsetRequest.EarliestTime()); + } + break; + case LATEST: + LOG.info("Setting starting point as latest offset for partitions {}", partitionsWithNoOffset); + + for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) { + partition.setOffset(OffsetRequest.LatestTime()); + } + break; + default: + case GROUP_OFFSETS: + LOG.info("Using group offsets in Zookeeper of group.id {} as starting point for partitions {}", + kafkaConfig.getProperty("group.id"), partitionsWithNoOffset); + + Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getCommittedOffsets(partitionsWithNoOffset); + for (KafkaTopicPartitionState<TopicAndPartition> partition : subscribedPartitions()) { + Long offset = zkOffsets.get(partition.getKafkaTopicPartition()); + if (offset != null) { + // the committed offset in ZK represents the next record to process, + // so we subtract it by 1 to correctly represent internal state + partition.setOffset(offset - 1); + } else { + // if we can't find an offset for a partition in ZK when using GROUP_OFFSETS, + // we default to "auto.offset.reset" like the Kafka high-level consumer + LOG.warn("No group offset can be found for partition {} in Zookeeper;" + + " resetting starting offset to 'auto.offset.reset'", partition); + + partition.setOffset(invalidOffsetBehavior); + } + } + } + } else if (partitionsWithNoOffset.size() > 0 && partitionsWithNoOffset.size() < subscribedPartitions().length) { End diff – I was adding this as a preparation for the kafka partition discovery task. But it'd probably make sense to remove it for this PR to avoid confusion.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r89664876

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java —
          @@ -218,26 +221,57 @@ public void run() {
          }
          }

          • // seek the consumer to the initial offsets
            + List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>();
            for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
            if (partition.isOffsetDefined()) {
            LOG.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; seeking the consumer " +
            "to position {}", partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1);

          consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
          } else

          { - // for partitions that do not have offsets restored from a checkpoint/savepoint, - // we need to define our internal offset state for them using the initial offsets retrieved from Kafka - // by the KafkaConsumer, so that they are correctly checkpointed and committed on the next checkpoint + partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); + }

          + }

          • long fetchedOffset = consumer.position(partition.getKafkaPartitionHandle());
            + if (partitionsWithNoOffset.size() == subscribedPartitions().length) {
            + // if all partitions have no initial offsets, that means we're starting fresh
            + switch (startupMode) {
            + case EARLIEST:
            + LOG.info("Setting starting point as earliest offset for partitions {}", partitionsWithNoOffset);
            +
            + seekPartitionsToBeginning(consumer, convertKafkaPartitions(subscribedPartitions()));
              • End diff –

          The problem with this one is that the `seekToBeginning` method broke compatibility from 0.8 to 0.9+.
          In 0.8, it's `seekToBeginning(TopicPartition...)` while in 0.9+ it's `seekToBeginning(Collection<TopicPartition>)`.

          I'll integrate these seek methods into the `KafkaConsumerCallBridge` introduced in a recent PR. I'll be inevitable that we must redundantly do the Array -> List conversion because our `subscribedPartitions` is an Array, while 0.9+ methods take an API. For the 0.8 methods, instead of converting the list back to an array, I'll just iterate over the list and call `seekPartitionsToBeginning` for each one.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89664876 — Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java — @@ -218,26 +221,57 @@ public void run() { } } // seek the consumer to the initial offsets + List<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<>(); for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) { if (partition.isOffsetDefined()) { LOG.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; seeking the consumer " + "to position {}", partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1); consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1); } else { - // for partitions that do not have offsets restored from a checkpoint/savepoint, - // we need to define our internal offset state for them using the initial offsets retrieved from Kafka - // by the KafkaConsumer, so that they are correctly checkpointed and committed on the next checkpoint + partitionsWithNoOffset.add(partition.getKafkaTopicPartition()); + } + } long fetchedOffset = consumer.position(partition.getKafkaPartitionHandle()); + if (partitionsWithNoOffset.size() == subscribedPartitions().length) { + // if all partitions have no initial offsets, that means we're starting fresh + switch (startupMode) { + case EARLIEST: + LOG.info("Setting starting point as earliest offset for partitions {}", partitionsWithNoOffset); + + seekPartitionsToBeginning(consumer, convertKafkaPartitions(subscribedPartitions())); End diff – The problem with this one is that the `seekToBeginning` method broke compatibility from 0.8 to 0.9+. In 0.8, it's `seekToBeginning(TopicPartition...)` while in 0.9+ it's `seekToBeginning(Collection<TopicPartition>)`. I'll integrate these seek methods into the `KafkaConsumerCallBridge` introduced in a recent PR. I'll be inevitable that we must redundantly do the Array -> List conversion because our `subscribedPartitions` is an Array, while 0.9+ methods take an API. For the 0.8 methods, instead of converting the list back to an array, I'll just iterate over the list and call `seekPartitionsToBeginning` for each one.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r89680317

          — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java —
          @@ -444,6 +445,134 @@ public void run()

          { kafkaOffsetHandler.close(); deleteTestTopic(topicName); }

          +
          + /**
          + * This test ensures that when explicitly set to start from earliest record, the consumer
          + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
          + */
          + public void runStartFromEarliestOffsets() throws Exception

          { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + }

          +
          + /**
          + * This test ensures that when explicitly set to start from latest record, the consumer
          + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
          + */
          + public void runStartFromLatestOffsets() throws Exception {
          + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50)
          + final int parallelism = 3;
          + final int recordsInEachPartition = 50;
          +
          + final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1);
          +
          + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
          + env.getConfig().disableSysoutLogging();
          + env.setParallelism(parallelism);
          +
          + final Properties readProps = new Properties();
          + readProps.putAll(standardProps);
          + readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored
          +
          + // the committed offsets should be ignored
          + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
          + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
          + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
          + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
          +
          + Thread consumeThread = new Thread(new Runnable() {
          + @Override
          + public void run() {
          + try

          { + readSequence(env, StartupMode.LATEST, readProps, parallelism, topicName, 30, 50); + }

          catch (Exception e)

          { + throw new RuntimeException(e); + }

          + }
          + });
          + consumeThread.start();
          +
          + Thread.sleep(5000);
          — End diff –

          Actually, the sleep here isn't waiting for the readSequence call to finish. I'm waiting a bit to make sure that the consume job has fully started. It won't be able to read anything until new latest data is generated afterwards, which is done below by `DataGenerators.generateRandomizedIntegerSequence`.

          So, what the test is doing is:
          1. Write 50 records to each partition.
          2. Commit some random offsets.
          3. Start a job to read from latest in a separate thread. (should not read any of the previous data, offsets also ignored). The `readSequence` is expected to read 30 more records from each partition
          4. Make sure the job has started by waiting 5 seconds.
          5. Generate 30 records to each partition.
          6. The consume job should return from `readSequence` before the test expires.

          Is there a better way to do step 4. instead of sleeping?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89680317 — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java — @@ -444,6 +445,134 @@ public void run() { kafkaOffsetHandler.close(); deleteTestTopic(topicName); } + + /** + * This test ensures that when explicitly set to start from earliest record, the consumer + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. + */ + public void runStartFromEarliestOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + } + + /** + * This test ensures that when explicitly set to start from latest record, the consumer + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. + */ + public void runStartFromLatestOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + final Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + Thread consumeThread = new Thread(new Runnable() { + @Override + public void run() { + try { + readSequence(env, StartupMode.LATEST, readProps, parallelism, topicName, 30, 50); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + consumeThread.start(); + + Thread.sleep(5000); — End diff – Actually, the sleep here isn't waiting for the readSequence call to finish. I'm waiting a bit to make sure that the consume job has fully started. It won't be able to read anything until new latest data is generated afterwards, which is done below by `DataGenerators.generateRandomizedIntegerSequence`. So, what the test is doing is: 1. Write 50 records to each partition. 2. Commit some random offsets. 3. Start a job to read from latest in a separate thread. (should not read any of the previous data, offsets also ignored). The `readSequence` is expected to read 30 more records from each partition 4. Make sure the job has started by waiting 5 seconds. 5. Generate 30 records to each partition. 6. The consume job should return from `readSequence` before the test expires. Is there a better way to do step 4. instead of sleeping?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2509

          Hi @rmetzger, I've addressed all comments. I'll leave comments inline of code on parts that addresses your more bigger comments, to help with the second-pass review.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2509 Hi @rmetzger, I've addressed all comments. I'll leave comments inline of code on parts that addresses your more bigger comments, to help with the second-pass review.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r89755327

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java —
          @@ -128,6 +129,7 @@ public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T>
          protected AbstractFetcher<T, ?> createFetcher(
          SourceContext<T> sourceContext,
          List<KafkaTopicPartition> thisSubtaskPartitions,
          + HashMap<KafkaTopicPartition, Long> restoredSnapshotState,
          — End diff –

          To make the startup mode logic cleaner, I've changed the `AbstractFetcher` life cycle a bit.
          Now, restored state is provided when constructing the `AbstractFetcher`, instead of explicitly calling `AbstractFetcher#restoreOffsets()` as a separate call.

          This allows the AbstractFetcher to have a final `isRestored` flag that version-specific implementations can use.

          The startup offset configuring logic is much simpler now with this flag:
          ```
          if (isRestored)

          { // all subscribed partition states should have defined offset // setup the KafkaConsumer client we're using to respect these restored offsets }

          else

          { // all subscribed partition states have no defined offset // (1) set offsets depending on whether startup mode is EARLIEST, LATEST, or GROUP_OFFSET // (2) use the fetched offsets from Kafka to set the initial partition states we use in Flink. }

          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89755327 — Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java — @@ -128,6 +129,7 @@ public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T> protected AbstractFetcher<T, ?> createFetcher( SourceContext<T> sourceContext, List<KafkaTopicPartition> thisSubtaskPartitions, + HashMap<KafkaTopicPartition, Long> restoredSnapshotState, — End diff – To make the startup mode logic cleaner, I've changed the `AbstractFetcher` life cycle a bit. Now, restored state is provided when constructing the `AbstractFetcher`, instead of explicitly calling `AbstractFetcher#restoreOffsets()` as a separate call. This allows the AbstractFetcher to have a final `isRestored` flag that version-specific implementations can use. The startup offset configuring logic is much simpler now with this flag: ``` if (isRestored) { // all subscribed partition states should have defined offset // setup the KafkaConsumer client we're using to respect these restored offsets } else { // all subscribed partition states have no defined offset // (1) set offsets depending on whether startup mode is EARLIEST, LATEST, or GROUP_OFFSET // (2) use the fetched offsets from Kafka to set the initial partition states we use in Flink. } ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r89755566

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java —
          @@ -133,11 +132,10 @@ public void testMetricsAndEndOfStream() throws Exception {

          // — startup mode —

          • // TODO not passing due to Kafka Consumer config error
            -// @Test(timeout = 60000)
            -// public void testStartFromEarliestOffsets() throws Exception { -// runStartFromEarliestOffsets(); -// }

            + @Test(timeout = 60000)
            + public void testStartFromEarliestOffsets() throws Exception {

              • End diff –

          these tests past now with no problem. You're right, setting the key/value deserializer keys did the trick.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89755566 — Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java — @@ -133,11 +132,10 @@ public void testMetricsAndEndOfStream() throws Exception { // — startup mode — // TODO not passing due to Kafka Consumer config error -// @Test(timeout = 60000) -// public void testStartFromEarliestOffsets() throws Exception { -// runStartFromEarliestOffsets(); -// } + @Test(timeout = 60000) + public void testStartFromEarliestOffsets() throws Exception { End diff – these tests past now with no problem. You're right, setting the key/value deserializer keys did the trick.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r89755818

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java —
          @@ -460,15 +433,13 @@ public void cancel() {
          // ------------------------------------------------------------------------

          /**

          • * Request latest offsets for a set of partitions, via a Kafka consumer.
          • *
          • * <p>This method retries three times if the response has an error.
            + * Request offsets before a specific time for a set of partitions, via a Kafka consumer.
            *
          • @param consumer The consumer connected to lead broker
          • @param partitions The list of partitions we need offsets for
          • @param whichTime The type of time we are requesting. -1 and -2 are special constants (See OffsetRequest)
            */
          • private static void getLastOffsetFromKafka(
            + private static void requestAndSetSpecificTimeOffsetsFromKafka(
              • End diff –

          Refactored the utility Kafka request methods in this class to avoid creating redundant lists.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89755818 — Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java — @@ -460,15 +433,13 @@ public void cancel() { // ------------------------------------------------------------------------ /** * Request latest offsets for a set of partitions, via a Kafka consumer. * * <p>This method retries three times if the response has an error. + * Request offsets before a specific time for a set of partitions, via a Kafka consumer. * @param consumer The consumer connected to lead broker @param partitions The list of partitions we need offsets for @param whichTime The type of time we are requesting. -1 and -2 are special constants (See OffsetRequest) */ private static void getLastOffsetFromKafka( + private static void requestAndSetSpecificTimeOffsetsFromKafka( End diff – Refactored the utility Kafka request methods in this class to avoid creating redundant lists.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r89756558

          — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java —
          @@ -482,12 +476,39 @@ public void runStartFromEarliestOffsets() throws Exception {

          • ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
            */
            public void runStartFromLatestOffsets() throws Exception {
              • End diff –

          To make this test easier without having to sleep, the test now does this:

          1. First write 50 records to each partition (these shouldn't be read)
          2. Set some offsets in Kafka (should be ignored)
          3. Start a latest-reading consuming job. This jobs throws exception if it reads any of the first 50 records
          4. Wait until the consume job has fully started (added an util method to `JobManagerCommunicationUtils` for this)
          5. Write 200 extra records to each partition.
          6. Once the writing finishes, cancel the consume job.
          7. Check if the consume job threw any test errors.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89756558 — Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java — @@ -482,12 +476,39 @@ public void runStartFromEarliestOffsets() throws Exception { ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. */ public void runStartFromLatestOffsets() throws Exception { End diff – To make this test easier without having to sleep, the test now does this: 1. First write 50 records to each partition (these shouldn't be read) 2. Set some offsets in Kafka (should be ignored) 3. Start a latest-reading consuming job. This jobs throws exception if it reads any of the first 50 records 4. Wait until the consume job has fully started (added an util method to `JobManagerCommunicationUtils` for this) 5. Write 200 extra records to each partition. 6. Once the writing finishes, cancel the consume job. 7. Check if the consume job threw any test errors.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2509

          Rebased on the "flink-connectors" change.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2509 Rebased on the "flink-connectors" change.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/2509

          Thanks a lot! I have your Kafka pull requests on my todo list. I hope I get to it soon. I'm really sorry.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2509 Thanks a lot! I have your Kafka pull requests on my todo list. I hope I get to it soon. I'm really sorry.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2509

          Note about 3rd commit: fixed failing `FlinkKafkaConsumerBaseMigrationTest`s after the rebase.

          The tests were failing due to the removal of `AbstractFetcher#restoreOffsets(...)` method as part of the refactoring of offset restoration in this PR. On the other hand, the previous implementation of tests in `FlinkKafkaConsumerBaseMigrationTest` were too tightly coupled with how the connector was implemented, i.e. it was testing how the `AbstractFetcher` methods are called, whether `MAX_VALUE` watermark was emitted (which will likely change as features are added to the connector) etc, even though the actual purpose of the tests was simply to test states were restored correctly.

          The 3rd commit therefore attempts to simplify `FlinkKafkaConsumerBaseMigrationTest` to only test legacy state restore behaviour. The deleted parts, IMHO, are already covered in other tests.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2509 Note about 3rd commit: fixed failing `FlinkKafkaConsumerBaseMigrationTest`s after the rebase. The tests were failing due to the removal of `AbstractFetcher#restoreOffsets(...)` method as part of the refactoring of offset restoration in this PR. On the other hand, the previous implementation of tests in `FlinkKafkaConsumerBaseMigrationTest` were too tightly coupled with how the connector was implemented, i.e. it was testing how the `AbstractFetcher` methods are called, whether `MAX_VALUE` watermark was emitted (which will likely change as features are added to the connector) etc, even though the actual purpose of the tests was simply to test states were restored correctly. The 3rd commit therefore attempts to simplify `FlinkKafkaConsumerBaseMigrationTest` to only test legacy state restore behaviour. The deleted parts, IMHO, are already covered in other tests.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r100988931

          — Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java —
          @@ -45,10 +45,7 @@

          import java.io.File;
          import java.net.BindException;
          -import java.util.ArrayList;
          -import java.util.List;
          -import java.util.Properties;
          -import java.util.UUID;
          +import java.util.*;
          — End diff –

          No, we are currently not checking the tests

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r100988931 — Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java — @@ -45,10 +45,7 @@ import java.io.File; import java.net.BindException; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.UUID; +import java.util.*; — End diff – No, we are currently not checking the tests
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r100989660

          — Diff: docs/dev/connectors/kafka.md —
          @@ -161,6 +161,46 @@ For convenience, Flink provides the following schemas:
          The KeyValue objectNode contains a "key" and "value" field which contain all fields, as well as
          an optional "metadata" field that exposes the offset/partition/topic for this message.

          +#### Kafka Consumers Start Position Configuration
          +
          +By default, the Flink Kafka Consumer starts reading partitions from the consumer group's (`group.id` setting in the
          +consumer properties) committed offsets in Kafka brokers (or Zookeeper for Kafka 0.8).
          +
          +This behaviour can be explicitly overriden, as demonstrated below:
          +
          +<div class="codetabs" markdown="1">
          +<div data-lang="java" markdown="1">
          +

          {% highlight java %}

          +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          +
          +FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
          +myConsumer.setStartFromEarliest(); // start from the earliest record possible
          +myConsumer.setStartFromLatest(); // start from the latest record
          +myConsumer.setStartFromGroupOffsets(); // the default behaviour
          +
          +DataStream<String> stream = env.addSource(myConsumer);
          +...
          +

          {% endhighlight %}
          +</div>
          +<div data-lang="scala" markdown="1">
          +{% highlight scala %}
          +val env = StreamExecutionEnvironment.getExecutionEnvironment()
          +
          +val myConsumer = new FlinkKafkaConsumer08[String](...)
          +myConsumer.setStartFromEarliest() // start from the earliest record possible
          +myConsumer.setStartFromLatest() // start from the latest record
          +myConsumer.setStartFromGroupOffsets() // the default behaviour
          +
          +val stream = env.addSource(myConsumer)
          +...
          +{% endhighlight %}

          +</div>
          +</div>
          +
          +All versions of the Flink Kafka Consumer have the above explicit configuration methods for start position. When
          +configured to start from the earliest or latest record by calling either `setStartFromEarliest()` or `setStartFromLatest()`,
          +the consumer will ignore any committed group offsets in Kafka when determining the start position for partitions.
          — End diff –

          Maybe we should add a note that this setting does NOT affect the start position when restoring from a savepoint or checkpoint.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r100989660 — Diff: docs/dev/connectors/kafka.md — @@ -161,6 +161,46 @@ For convenience, Flink provides the following schemas: The KeyValue objectNode contains a "key" and "value" field which contain all fields, as well as an optional "metadata" field that exposes the offset/partition/topic for this message. +#### Kafka Consumers Start Position Configuration + +By default, the Flink Kafka Consumer starts reading partitions from the consumer group's (`group.id` setting in the +consumer properties) committed offsets in Kafka brokers (or Zookeeper for Kafka 0.8). + +This behaviour can be explicitly overriden, as demonstrated below: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + {% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...); +myConsumer.setStartFromEarliest(); // start from the earliest record possible +myConsumer.setStartFromLatest(); // start from the latest record +myConsumer.setStartFromGroupOffsets(); // the default behaviour + +DataStream<String> stream = env.addSource(myConsumer); +... + {% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() + +val myConsumer = new FlinkKafkaConsumer08 [String] (...) +myConsumer.setStartFromEarliest() // start from the earliest record possible +myConsumer.setStartFromLatest() // start from the latest record +myConsumer.setStartFromGroupOffsets() // the default behaviour + +val stream = env.addSource(myConsumer) +... +{% endhighlight %} +</div> +</div> + +All versions of the Flink Kafka Consumer have the above explicit configuration methods for start position. When +configured to start from the earliest or latest record by calling either `setStartFromEarliest()` or `setStartFromLatest()`, +the consumer will ignore any committed group offsets in Kafka when determining the start position for partitions. — End diff – Maybe we should add a note that this setting does NOT affect the start position when restoring from a savepoint or checkpoint.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r101045081

          — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java —
          @@ -438,6 +439,215 @@ public void run()

          { kafkaOffsetHandler.close(); deleteTestTopic(topicName); }

          +
          + /**
          + * This test ensures that when explicitly set to start from earliest record, the consumer
          + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
          + */
          + public void runStartFromEarliestOffsets() throws Exception

          { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + }

          +
          + /**
          + * This test ensures that when explicitly set to start from latest record, the consumer
          + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
          + */
          + public void runStartFromLatestOffsets() throws Exception {
          + // 50 records written to each of 3 partitions before launching a latest-starting consuming job
          + final int parallelism = 3;
          + final int recordsInEachPartition = 50;
          +
          + // each partition will be written an extra 200 records
          + final int extraRecordsInEachPartition = 200;
          +
          + // all already existing data in the topic, before the consuming topology has started, should be ignored
          + final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1);
          +
          + // the committed offsets should be ignored
          + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
          + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
          + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
          + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
          +
          + // job names for the topologies for writing and consuming the extra records
          + final String consumeExtraRecordsJobName = "Consume Extra Records Job";
          + final String writeExtraRecordsJobName = "Write Extra Records Job";
          +
          + // seriliazation / deserialization schemas for writing and consuming the extra records
          + final TypeInformation<Tuple2<Integer, Integer>> resultType =
          + TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {});
          +
          + final KeyedSerializationSchema<Tuple2<Integer, Integer>> serSchema =
          + new KeyedSerializationSchemaWrapper<>(
          + new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
          +
          + final KeyedDeserializationSchema<Tuple2<Integer, Integer>> deserSchema =
          + new KeyedDeserializationSchemaWrapper<>(
          + new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
          +
          + // setup and run the latest-consuming job
          + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
          + env.getConfig().disableSysoutLogging();
          + env.setParallelism(parallelism);
          +
          + final Properties readProps = new Properties();
          + readProps.putAll(standardProps);
          + readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored
          +
          + FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> latestReadingConsumer =
          + kafkaServer.getConsumer(topicName, deserSchema, readProps);
          + latestReadingConsumer.setStartFromLatest();
          +
          + env
          + .addSource(latestReadingConsumer).setParallelism(parallelism)
          + .flatMap(new FlatMapFunction<Tuple2<Integer,Integer>, Object>() {
          + @Override
          + public void flatMap(Tuple2<Integer, Integer> value, Collector<Object> out) throws Exception {
          + if (value.f1 - recordsInEachPartition < 0)

          { + throw new RuntimeException("test failed; consumed a record that was previously written: " + value); + }

          + }
          + }).setParallelism(1)
          + .addSink(new DiscardingSink<>());
          +
          + final AtomicReference<Throwable> error = new AtomicReference<>();
          + Thread consumeThread = new Thread(new Runnable() {
          + @Override
          + public void run() {
          + try

          { + env.execute(consumeExtraRecordsJobName); + }

          catch (Throwable t) {
          + if (!(t.getCause() instanceof JobCancellationException))
          + error.set(t);
          — End diff –

          As per the unwritten Flink styleguide, we are always using {} after an if().

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r101045081 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java — @@ -438,6 +439,215 @@ public void run() { kafkaOffsetHandler.close(); deleteTestTopic(topicName); } + + /** + * This test ensures that when explicitly set to start from earliest record, the consumer + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. + */ + public void runStartFromEarliestOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + } + + /** + * This test ensures that when explicitly set to start from latest record, the consumer + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. + */ + public void runStartFromLatestOffsets() throws Exception { + // 50 records written to each of 3 partitions before launching a latest-starting consuming job + final int parallelism = 3; + final int recordsInEachPartition = 50; + + // each partition will be written an extra 200 records + final int extraRecordsInEachPartition = 200; + + // all already existing data in the topic, before the consuming topology has started, should be ignored + final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + // job names for the topologies for writing and consuming the extra records + final String consumeExtraRecordsJobName = "Consume Extra Records Job"; + final String writeExtraRecordsJobName = "Write Extra Records Job"; + + // seriliazation / deserialization schemas for writing and consuming the extra records + final TypeInformation<Tuple2<Integer, Integer>> resultType = + TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {}); + + final KeyedSerializationSchema<Tuple2<Integer, Integer>> serSchema = + new KeyedSerializationSchemaWrapper<>( + new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig())); + + final KeyedDeserializationSchema<Tuple2<Integer, Integer>> deserSchema = + new KeyedDeserializationSchemaWrapper<>( + new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig())); + + // setup and run the latest-consuming job + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + final Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored + + FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> latestReadingConsumer = + kafkaServer.getConsumer(topicName, deserSchema, readProps); + latestReadingConsumer.setStartFromLatest(); + + env + .addSource(latestReadingConsumer).setParallelism(parallelism) + .flatMap(new FlatMapFunction<Tuple2<Integer,Integer>, Object>() { + @Override + public void flatMap(Tuple2<Integer, Integer> value, Collector<Object> out) throws Exception { + if (value.f1 - recordsInEachPartition < 0) { + throw new RuntimeException("test failed; consumed a record that was previously written: " + value); + } + } + }).setParallelism(1) + .addSink(new DiscardingSink<>()); + + final AtomicReference<Throwable> error = new AtomicReference<>(); + Thread consumeThread = new Thread(new Runnable() { + @Override + public void run() { + try { + env.execute(consumeExtraRecordsJobName); + } catch (Throwable t) { + if (!(t.getCause() instanceof JobCancellationException)) + error.set(t); — End diff – As per the unwritten Flink styleguide, we are always using {} after an if().
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r101000141

          — Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java —
          @@ -229,6 +231,8 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties,
          standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
          standardProps.setProperty("bootstrap.servers", brokerConnectionString);
          standardProps.setProperty("group.id", "flink-tests");
          + standardProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
          + standardProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
          — End diff –

          Why is this needed now? If we have this in the default props, we can not ensure that users don't need to set it manually

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r101000141 — Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java — @@ -229,6 +231,8 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties, standardProps.setProperty("zookeeper.connect", zookeeperConnectionString); standardProps.setProperty("bootstrap.servers", brokerConnectionString); standardProps.setProperty("group.id", "flink-tests"); + standardProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + standardProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); — End diff – Why is this needed now? If we have this in the default props, we can not ensure that users don't need to set it manually
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r101047374

          — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java —
          @@ -299,91 +131,60 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
          testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);

          testHarness.setup();
          + // restore state from binary snapshot file using legacy method
          testHarness.initializeStateFromLegacyCheckpoint(
          getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot"));
          testHarness.open();

          • final Throwable[] error = new Throwable[1];
            + // assert that there are partitions and is identical to expected list
            + Assert.assertTrue(consumerFunction.getSubscribedPartitions() != null);
            + Assert.assertTrue(!consumerFunction.getSubscribedPartitions().isEmpty());
            + Assert.assertEquals(partitions, consumerFunction.getSubscribedPartitions());
          • // run the source asynchronously
          • Thread runner = new Thread() {
          • @Override
          • public void run() {
          • try {
          • consumerFunction.run(new DummySourceContext() {
          • @Override
          • public void collect(String element) { - //latch.trigger(); - }
          • });
          • }
          • catch (Throwable t) { - t.printStackTrace(); - error[0] = t; - }
          • }
          • };
          • runner.start();
            + // the expected state in "kafka-consumer-migration-test-flink1.1-snapshot"
            + final HashMap<KafkaTopicPartition, Long> expectedState = new HashMap<>();
            + expectedState.put(new KafkaTopicPartition("abc", 13), 16768L);
            + expectedState.put(new KafkaTopicPartition("def", 7), 987654321L);
          • if (!latch.isTriggered()) { - latch.await(); - }

            + // assert that state is correctly restored from legacy checkpoint
            + Assert.assertTrue(consumerFunction.getRestoredState() != null);
            + Assert.assertEquals(expectedState, consumerFunction.getRestoredState());

          consumerOperator.close();
          -

          • runner.join();
            -
          • Assert.assertNull(error[0]);
          • }
            -
          • private abstract static class DummySourceContext
          • implements SourceFunction.SourceContext<String> {
            -
          • private final Object lock = new Object();
            -
          • @Override
          • public void collectWithTimestamp(String element, long timestamp) { - }
            -
            - @Override
            - public void emitWatermark(Watermark mark) { - }

            -

          • @Override
          • public Object getCheckpointLock() { - return lock; - }

            -

          • @Override
          • public void close() { - }
              • End diff –

          Looks like you've removed a lot of code from this test here. I guess that the `DummyFlinkKafkaConsumer` covers everything the deleted code did?

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r101047374 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java — @@ -299,91 +131,60 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); testHarness.setup(); + // restore state from binary snapshot file using legacy method testHarness.initializeStateFromLegacyCheckpoint( getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot")); testHarness.open(); final Throwable[] error = new Throwable [1] ; + // assert that there are partitions and is identical to expected list + Assert.assertTrue(consumerFunction.getSubscribedPartitions() != null); + Assert.assertTrue(!consumerFunction.getSubscribedPartitions().isEmpty()); + Assert.assertEquals(partitions, consumerFunction.getSubscribedPartitions()); // run the source asynchronously Thread runner = new Thread() { @Override public void run() { try { consumerFunction.run(new DummySourceContext() { @Override public void collect(String element) { - //latch.trigger(); - } }); } catch (Throwable t) { - t.printStackTrace(); - error[0] = t; - } } }; runner.start(); + // the expected state in "kafka-consumer-migration-test-flink1.1-snapshot" + final HashMap<KafkaTopicPartition, Long> expectedState = new HashMap<>(); + expectedState.put(new KafkaTopicPartition("abc", 13), 16768L); + expectedState.put(new KafkaTopicPartition("def", 7), 987654321L); if (!latch.isTriggered()) { - latch.await(); - } + // assert that state is correctly restored from legacy checkpoint + Assert.assertTrue(consumerFunction.getRestoredState() != null); + Assert.assertEquals(expectedState, consumerFunction.getRestoredState()); consumerOperator.close(); - runner.join(); - Assert.assertNull(error [0] ); } - private abstract static class DummySourceContext implements SourceFunction.SourceContext<String> { - private final Object lock = new Object(); - @Override public void collectWithTimestamp(String element, long timestamp) { - } - - @Override - public void emitWatermark(Watermark mark) { - } - @Override public Object getCheckpointLock() { - return lock; - } - @Override public void close() { - } End diff – Looks like you've removed a lot of code from this test here. I guess that the `DummyFlinkKafkaConsumer` covers everything the deleted code did?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r101000341

          — Diff: docs/dev/connectors/kafka.md —
          @@ -161,6 +161,46 @@ For convenience, Flink provides the following schemas:
          The KeyValue objectNode contains a "key" and "value" field which contain all fields, as well as
          an optional "metadata" field that exposes the offset/partition/topic for this message.

          +#### Kafka Consumers Start Position Configuration
          +
          +By default, the Flink Kafka Consumer starts reading partitions from the consumer group's (`group.id` setting in the
          +consumer properties) committed offsets in Kafka brokers (or Zookeeper for Kafka 0.8).
          +
          +This behaviour can be explicitly overriden, as demonstrated below:
          +
          +<div class="codetabs" markdown="1">
          +<div data-lang="java" markdown="1">
          +

          {% highlight java %}

          +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          +
          +FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
          +myConsumer.setStartFromEarliest(); // start from the earliest record possible
          +myConsumer.setStartFromLatest(); // start from the latest record
          +myConsumer.setStartFromGroupOffsets(); // the default behaviour
          — End diff –

          Does the "the default behaviour" also mean that we only respect the "auto.offset.reset" configs in that case?

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r101000341 — Diff: docs/dev/connectors/kafka.md — @@ -161,6 +161,46 @@ For convenience, Flink provides the following schemas: The KeyValue objectNode contains a "key" and "value" field which contain all fields, as well as an optional "metadata" field that exposes the offset/partition/topic for this message. +#### Kafka Consumers Start Position Configuration + +By default, the Flink Kafka Consumer starts reading partitions from the consumer group's (`group.id` setting in the +consumer properties) committed offsets in Kafka brokers (or Zookeeper for Kafka 0.8). + +This behaviour can be explicitly overriden, as demonstrated below: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + {% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...); +myConsumer.setStartFromEarliest(); // start from the earliest record possible +myConsumer.setStartFromLatest(); // start from the latest record +myConsumer.setStartFromGroupOffsets(); // the default behaviour — End diff – Does the "the default behaviour" also mean that we only respect the "auto.offset.reset" configs in that case?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r101048002

          — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java —
          @@ -299,91 +131,60 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
          testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);

          testHarness.setup();
          + // restore state from binary snapshot file using legacy method
          testHarness.initializeStateFromLegacyCheckpoint(
          getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot"));
          testHarness.open();

          • final Throwable[] error = new Throwable[1];
            + // assert that there are partitions and is identical to expected list
            + Assert.assertTrue(consumerFunction.getSubscribedPartitions() != null);
            + Assert.assertTrue(!consumerFunction.getSubscribedPartitions().isEmpty());
            + Assert.assertEquals(partitions, consumerFunction.getSubscribedPartitions());
          • // run the source asynchronously
          • Thread runner = new Thread() {
          • @Override
          • public void run() {
          • try {
          • consumerFunction.run(new DummySourceContext() {
          • @Override
          • public void collect(String element) { - //latch.trigger(); - }
          • });
          • }
          • catch (Throwable t) { - t.printStackTrace(); - error[0] = t; - }
          • }
          • };
          • runner.start();
            + // the expected state in "kafka-consumer-migration-test-flink1.1-snapshot"
            + final HashMap<KafkaTopicPartition, Long> expectedState = new HashMap<>();
            + expectedState.put(new KafkaTopicPartition("abc", 13), 16768L);
            + expectedState.put(new KafkaTopicPartition("def", 7), 987654321L);
          • if (!latch.isTriggered()) { - latch.await(); - }

            + // assert that state is correctly restored from legacy checkpoint
            + Assert.assertTrue(consumerFunction.getRestoredState() != null);
            + Assert.assertEquals(expectedState, consumerFunction.getRestoredState());

          consumerOperator.close();
          -

          • runner.join();
            -
          • Assert.assertNull(error[0]);
          • }
            -
          • private abstract static class DummySourceContext
          • implements SourceFunction.SourceContext<String> {
            -
          • private final Object lock = new Object();
            -
          • @Override
          • public void collectWithTimestamp(String element, long timestamp) { - }
            -
            - @Override
            - public void emitWatermark(Watermark mark) { - }

            -

          • @Override
          • public Object getCheckpointLock() { - return lock; - }

            -

          • @Override
          • public void close() { - }
              • End diff –

          I see. That's what this comment is for: https://github.com/apache/flink/pull/2509#issuecomment-277438812

          @kl0u I think you've implemented most of the migration tests. Can you take a look at the changes @tzulitai is proposing?

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r101048002 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java — @@ -299,91 +131,60 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { testHarness.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); testHarness.setup(); + // restore state from binary snapshot file using legacy method testHarness.initializeStateFromLegacyCheckpoint( getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot")); testHarness.open(); final Throwable[] error = new Throwable [1] ; + // assert that there are partitions and is identical to expected list + Assert.assertTrue(consumerFunction.getSubscribedPartitions() != null); + Assert.assertTrue(!consumerFunction.getSubscribedPartitions().isEmpty()); + Assert.assertEquals(partitions, consumerFunction.getSubscribedPartitions()); // run the source asynchronously Thread runner = new Thread() { @Override public void run() { try { consumerFunction.run(new DummySourceContext() { @Override public void collect(String element) { - //latch.trigger(); - } }); } catch (Throwable t) { - t.printStackTrace(); - error[0] = t; - } } }; runner.start(); + // the expected state in "kafka-consumer-migration-test-flink1.1-snapshot" + final HashMap<KafkaTopicPartition, Long> expectedState = new HashMap<>(); + expectedState.put(new KafkaTopicPartition("abc", 13), 16768L); + expectedState.put(new KafkaTopicPartition("def", 7), 987654321L); if (!latch.isTriggered()) { - latch.await(); - } + // assert that state is correctly restored from legacy checkpoint + Assert.assertTrue(consumerFunction.getRestoredState() != null); + Assert.assertEquals(expectedState, consumerFunction.getRestoredState()); consumerOperator.close(); - runner.join(); - Assert.assertNull(error [0] ); } - private abstract static class DummySourceContext implements SourceFunction.SourceContext<String> { - private final Object lock = new Object(); - @Override public void collectWithTimestamp(String element, long timestamp) { - } - - @Override - public void emitWatermark(Watermark mark) { - } - @Override public Object getCheckpointLock() { - return lock; - } - @Override public void close() { - } End diff – I see. That's what this comment is for: https://github.com/apache/flink/pull/2509#issuecomment-277438812 @kl0u I think you've implemented most of the migration tests. Can you take a look at the changes @tzulitai is proposing?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r101050554

          — Diff: docs/dev/connectors/kafka.md —
          @@ -161,6 +161,46 @@ For convenience, Flink provides the following schemas:
          The KeyValue objectNode contains a "key" and "value" field which contain all fields, as well as
          an optional "metadata" field that exposes the offset/partition/topic for this message.

          +#### Kafka Consumers Start Position Configuration
          +
          +By default, the Flink Kafka Consumer starts reading partitions from the consumer group's (`group.id` setting in the
          +consumer properties) committed offsets in Kafka brokers (or Zookeeper for Kafka 0.8).
          +
          +This behaviour can be explicitly overriden, as demonstrated below:
          +
          +<div class="codetabs" markdown="1">
          +<div data-lang="java" markdown="1">
          +

          {% highlight java %}

          +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          +
          +FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
          +myConsumer.setStartFromEarliest(); // start from the earliest record possible
          +myConsumer.setStartFromLatest(); // start from the latest record
          +myConsumer.setStartFromGroupOffsets(); // the default behaviour
          — End diff –

          Yes. If the consumer group does not contain offsets for a partition, the "auto.offset.reset" property is used for that partition. I think this is the behaviour of Kafka's high level consumer.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r101050554 — Diff: docs/dev/connectors/kafka.md — @@ -161,6 +161,46 @@ For convenience, Flink provides the following schemas: The KeyValue objectNode contains a "key" and "value" field which contain all fields, as well as an optional "metadata" field that exposes the offset/partition/topic for this message. +#### Kafka Consumers Start Position Configuration + +By default, the Flink Kafka Consumer starts reading partitions from the consumer group's (`group.id` setting in the +consumer properties) committed offsets in Kafka brokers (or Zookeeper for Kafka 0.8). + +This behaviour can be explicitly overriden, as demonstrated below: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + {% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...); +myConsumer.setStartFromEarliest(); // start from the earliest record possible +myConsumer.setStartFromLatest(); // start from the latest record +myConsumer.setStartFromGroupOffsets(); // the default behaviour — End diff – Yes. If the consumer group does not contain offsets for a partition, the "auto.offset.reset" property is used for that partition. I think this is the behaviour of Kafka's high level consumer.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r101050769

          — Diff: docs/dev/connectors/kafka.md —
          @@ -161,6 +161,46 @@ For convenience, Flink provides the following schemas:
          The KeyValue objectNode contains a "key" and "value" field which contain all fields, as well as
          an optional "metadata" field that exposes the offset/partition/topic for this message.

          +#### Kafka Consumers Start Position Configuration
          +
          +By default, the Flink Kafka Consumer starts reading partitions from the consumer group's (`group.id` setting in the
          +consumer properties) committed offsets in Kafka brokers (or Zookeeper for Kafka 0.8).
          +
          +This behaviour can be explicitly overriden, as demonstrated below:
          +
          +<div class="codetabs" markdown="1">
          +<div data-lang="java" markdown="1">
          +

          {% highlight java %}

          +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          +
          +FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
          +myConsumer.setStartFromEarliest(); // start from the earliest record possible
          +myConsumer.setStartFromLatest(); // start from the latest record
          +myConsumer.setStartFromGroupOffsets(); // the default behaviour
          +
          +DataStream<String> stream = env.addSource(myConsumer);
          +...
          +

          {% endhighlight %}
          +</div>
          +<div data-lang="scala" markdown="1">
          +{% highlight scala %}
          +val env = StreamExecutionEnvironment.getExecutionEnvironment()
          +
          +val myConsumer = new FlinkKafkaConsumer08[String](...)
          +myConsumer.setStartFromEarliest() // start from the earliest record possible
          +myConsumer.setStartFromLatest() // start from the latest record
          +myConsumer.setStartFromGroupOffsets() // the default behaviour
          +
          +val stream = env.addSource(myConsumer)
          +...
          +{% endhighlight %}

          +</div>
          +</div>
          +
          +All versions of the Flink Kafka Consumer have the above explicit configuration methods for start position. When
          +configured to start from the earliest or latest record by calling either `setStartFromEarliest()` or `setStartFromLatest()`,
          +the consumer will ignore any committed group offsets in Kafka when determining the start position for partitions.
          — End diff –

          Good point, will add.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r101050769 — Diff: docs/dev/connectors/kafka.md — @@ -161,6 +161,46 @@ For convenience, Flink provides the following schemas: The KeyValue objectNode contains a "key" and "value" field which contain all fields, as well as an optional "metadata" field that exposes the offset/partition/topic for this message. +#### Kafka Consumers Start Position Configuration + +By default, the Flink Kafka Consumer starts reading partitions from the consumer group's (`group.id` setting in the +consumer properties) committed offsets in Kafka brokers (or Zookeeper for Kafka 0.8). + +This behaviour can be explicitly overriden, as demonstrated below: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + {% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...); +myConsumer.setStartFromEarliest(); // start from the earliest record possible +myConsumer.setStartFromLatest(); // start from the latest record +myConsumer.setStartFromGroupOffsets(); // the default behaviour + +DataStream<String> stream = env.addSource(myConsumer); +... + {% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() + +val myConsumer = new FlinkKafkaConsumer08 [String] (...) +myConsumer.setStartFromEarliest() // start from the earliest record possible +myConsumer.setStartFromLatest() // start from the latest record +myConsumer.setStartFromGroupOffsets() // the default behaviour + +val stream = env.addSource(myConsumer) +... +{% endhighlight %} +</div> +</div> + +All versions of the Flink Kafka Consumer have the above explicit configuration methods for start position. When +configured to start from the earliest or latest record by calling either `setStartFromEarliest()` or `setStartFromLatest()`, +the consumer will ignore any committed group offsets in Kafka when determining the start position for partitions. — End diff – Good point, will add.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r101051346

          — Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java —
          @@ -229,6 +231,8 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties,
          standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
          standardProps.setProperty("bootstrap.servers", brokerConnectionString);
          standardProps.setProperty("group.id", "flink-tests");
          + standardProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
          + standardProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
          — End diff –

          Somehow, without these settings, the new tests that test `setStartFromXXXX` methods will fail by complaining the property config does not specify settings for the deserializers.

          I guess it is because in those tests, we have Kafka clients that are used only for offset committing and fetching, in which case the client cannot infer the types to use for the serializers?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r101051346 — Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java — @@ -229,6 +231,8 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties, standardProps.setProperty("zookeeper.connect", zookeeperConnectionString); standardProps.setProperty("bootstrap.servers", brokerConnectionString); standardProps.setProperty("group.id", "flink-tests"); + standardProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + standardProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); — End diff – Somehow, without these settings, the new tests that test `setStartFromXXXX` methods will fail by complaining the property config does not specify settings for the deserializers. I guess it is because in those tests, we have Kafka clients that are used only for offset committing and fetching, in which case the client cannot infer the types to use for the serializers?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r101051496

          — Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java —
          @@ -229,6 +231,8 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties,
          standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
          standardProps.setProperty("bootstrap.servers", brokerConnectionString);
          standardProps.setProperty("group.id", "flink-tests");
          + standardProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
          + standardProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
          — End diff –

          Perhaps I should move this out of the `standardProps` and set them in those tests only.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r101051496 — Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java — @@ -229,6 +231,8 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties, standardProps.setProperty("zookeeper.connect", zookeeperConnectionString); standardProps.setProperty("bootstrap.servers", brokerConnectionString); standardProps.setProperty("group.id", "flink-tests"); + standardProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + standardProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); — End diff – Perhaps I should move this out of the `standardProps` and set them in those tests only.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r101051598

          — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java —
          @@ -438,6 +439,215 @@ public void run()

          { kafkaOffsetHandler.close(); deleteTestTopic(topicName); }

          +
          + /**
          + * This test ensures that when explicitly set to start from earliest record, the consumer
          + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
          + */
          + public void runStartFromEarliestOffsets() throws Exception

          { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + }

          +
          + /**
          + * This test ensures that when explicitly set to start from latest record, the consumer
          + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka.
          + */
          + public void runStartFromLatestOffsets() throws Exception {
          + // 50 records written to each of 3 partitions before launching a latest-starting consuming job
          + final int parallelism = 3;
          + final int recordsInEachPartition = 50;
          +
          + // each partition will be written an extra 200 records
          + final int extraRecordsInEachPartition = 200;
          +
          + // all already existing data in the topic, before the consuming topology has started, should be ignored
          + final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1);
          +
          + // the committed offsets should be ignored
          + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
          + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
          + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
          + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
          +
          + // job names for the topologies for writing and consuming the extra records
          + final String consumeExtraRecordsJobName = "Consume Extra Records Job";
          + final String writeExtraRecordsJobName = "Write Extra Records Job";
          +
          + // seriliazation / deserialization schemas for writing and consuming the extra records
          + final TypeInformation<Tuple2<Integer, Integer>> resultType =
          + TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {});
          +
          + final KeyedSerializationSchema<Tuple2<Integer, Integer>> serSchema =
          + new KeyedSerializationSchemaWrapper<>(
          + new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
          +
          + final KeyedDeserializationSchema<Tuple2<Integer, Integer>> deserSchema =
          + new KeyedDeserializationSchemaWrapper<>(
          + new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
          +
          + // setup and run the latest-consuming job
          + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
          + env.getConfig().disableSysoutLogging();
          + env.setParallelism(parallelism);
          +
          + final Properties readProps = new Properties();
          + readProps.putAll(standardProps);
          + readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored
          +
          + FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> latestReadingConsumer =
          + kafkaServer.getConsumer(topicName, deserSchema, readProps);
          + latestReadingConsumer.setStartFromLatest();
          +
          + env
          + .addSource(latestReadingConsumer).setParallelism(parallelism)
          + .flatMap(new FlatMapFunction<Tuple2<Integer,Integer>, Object>() {
          + @Override
          + public void flatMap(Tuple2<Integer, Integer> value, Collector<Object> out) throws Exception {
          + if (value.f1 - recordsInEachPartition < 0)

          { + throw new RuntimeException("test failed; consumed a record that was previously written: " + value); + }

          + }
          + }).setParallelism(1)
          + .addSink(new DiscardingSink<>());
          +
          + final AtomicReference<Throwable> error = new AtomicReference<>();
          + Thread consumeThread = new Thread(new Runnable() {
          + @Override
          + public void run() {
          + try

          { + env.execute(consumeExtraRecordsJobName); + }

          catch (Throwable t) {
          + if (!(t.getCause() instanceof JobCancellationException))
          + error.set(t);
          — End diff –

          Nice catch .. sloppy styling :/

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r101051598 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java — @@ -438,6 +439,215 @@ public void run() { kafkaOffsetHandler.close(); deleteTestTopic(topicName); } + + /** + * This test ensures that when explicitly set to start from earliest record, the consumer + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. + */ + public void runStartFromEarliestOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + } + + /** + * This test ensures that when explicitly set to start from latest record, the consumer + * ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. + */ + public void runStartFromLatestOffsets() throws Exception { + // 50 records written to each of 3 partitions before launching a latest-starting consuming job + final int parallelism = 3; + final int recordsInEachPartition = 50; + + // each partition will be written an extra 200 records + final int extraRecordsInEachPartition = 200; + + // all already existing data in the topic, before the consuming topology has started, should be ignored + final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + // job names for the topologies for writing and consuming the extra records + final String consumeExtraRecordsJobName = "Consume Extra Records Job"; + final String writeExtraRecordsJobName = "Write Extra Records Job"; + + // seriliazation / deserialization schemas for writing and consuming the extra records + final TypeInformation<Tuple2<Integer, Integer>> resultType = + TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {}); + + final KeyedSerializationSchema<Tuple2<Integer, Integer>> serSchema = + new KeyedSerializationSchemaWrapper<>( + new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig())); + + final KeyedDeserializationSchema<Tuple2<Integer, Integer>> deserSchema = + new KeyedDeserializationSchemaWrapper<>( + new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig())); + + // setup and run the latest-consuming job + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + final Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored + + FlinkKafkaConsumerBase<Tuple2<Integer, Integer>> latestReadingConsumer = + kafkaServer.getConsumer(topicName, deserSchema, readProps); + latestReadingConsumer.setStartFromLatest(); + + env + .addSource(latestReadingConsumer).setParallelism(parallelism) + .flatMap(new FlatMapFunction<Tuple2<Integer,Integer>, Object>() { + @Override + public void flatMap(Tuple2<Integer, Integer> value, Collector<Object> out) throws Exception { + if (value.f1 - recordsInEachPartition < 0) { + throw new RuntimeException("test failed; consumed a record that was previously written: " + value); + } + } + }).setParallelism(1) + .addSink(new DiscardingSink<>()); + + final AtomicReference<Throwable> error = new AtomicReference<>(); + Thread consumeThread = new Thread(new Runnable() { + @Override + public void run() { + try { + env.execute(consumeExtraRecordsJobName); + } catch (Throwable t) { + if (!(t.getCause() instanceof JobCancellationException)) + error.set(t); — End diff – Nice catch .. sloppy styling :/
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r101074257

          — Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java —
          @@ -229,6 +231,8 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties,
          standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
          standardProps.setProperty("bootstrap.servers", brokerConnectionString);
          standardProps.setProperty("group.id", "flink-tests");
          + standardProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
          + standardProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
          — End diff –

          Yes, move these settings out of the standard properties

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r101074257 — Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java — @@ -229,6 +231,8 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties, standardProps.setProperty("zookeeper.connect", zookeeperConnectionString); standardProps.setProperty("bootstrap.servers", brokerConnectionString); standardProps.setProperty("group.id", "flink-tests"); + standardProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + standardProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); — End diff – Yes, move these settings out of the standard properties
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r101075446

          — Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java —
          @@ -229,6 +231,8 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties,
          standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
          standardProps.setProperty("bootstrap.servers", brokerConnectionString);
          standardProps.setProperty("group.id", "flink-tests");
          + standardProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
          + standardProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
          — End diff –

          Probably its the `KafkaOffsetHandlerImpl`. In that case, yes, just put the additional properties when creating the instance.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r101075446 — Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java — @@ -229,6 +231,8 @@ public void prepare(int numKafkaServers, Properties additionalServerProperties, standardProps.setProperty("zookeeper.connect", zookeeperConnectionString); standardProps.setProperty("bootstrap.servers", brokerConnectionString); standardProps.setProperty("group.id", "flink-tests"); + standardProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + standardProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); — End diff – Probably its the `KafkaOffsetHandlerImpl`. In that case, yes, just put the additional properties when creating the instance.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r101075570

          — Diff: docs/dev/connectors/kafka.md —
          @@ -161,6 +161,46 @@ For convenience, Flink provides the following schemas:
          The KeyValue objectNode contains a "key" and "value" field which contain all fields, as well as
          an optional "metadata" field that exposes the offset/partition/topic for this message.

          +#### Kafka Consumers Start Position Configuration
          +
          +By default, the Flink Kafka Consumer starts reading partitions from the consumer group's (`group.id` setting in the
          +consumer properties) committed offsets in Kafka brokers (or Zookeeper for Kafka 0.8).
          +
          +This behaviour can be explicitly overriden, as demonstrated below:
          +
          +<div class="codetabs" markdown="1">
          +<div data-lang="java" markdown="1">
          +

          {% highlight java %}

          +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          +
          +FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
          +myConsumer.setStartFromEarliest(); // start from the earliest record possible
          +myConsumer.setStartFromLatest(); // start from the latest record
          +myConsumer.setStartFromGroupOffsets(); // the default behaviour
          — End diff –

          Okay, cool. Can you add that to the docs as well?

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r101075570 — Diff: docs/dev/connectors/kafka.md — @@ -161,6 +161,46 @@ For convenience, Flink provides the following schemas: The KeyValue objectNode contains a "key" and "value" field which contain all fields, as well as an optional "metadata" field that exposes the offset/partition/topic for this message. +#### Kafka Consumers Start Position Configuration + +By default, the Flink Kafka Consumer starts reading partitions from the consumer group's (`group.id` setting in the +consumer properties) committed offsets in Kafka brokers (or Zookeeper for Kafka 0.8). + +This behaviour can be explicitly overriden, as demonstrated below: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + {% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...); +myConsumer.setStartFromEarliest(); // start from the earliest record possible +myConsumer.setStartFromLatest(); // start from the latest record +myConsumer.setStartFromGroupOffsets(); // the default behaviour — End diff – Okay, cool. Can you add that to the docs as well?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/2509

          I've tested the change locally and with great success. So once my comments are addressed, the change is good to be merged

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2509 I've tested the change locally and with great success. So once my comments are addressed, the change is good to be merged
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2509

          Thanks for the review @rmetzger! The final 2 commits have addressed all your comments.

          I'll also wait for @kl0u to have a look at the changes in `FlinkKafkaConsumerBaseMigration` before merging this to `master`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2509 Thanks for the review @rmetzger! The final 2 commits have addressed all your comments. I'll also wait for @kl0u to have a look at the changes in `FlinkKafkaConsumerBaseMigration` before merging this to `master`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r101231413

          — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java —
          @@ -633,7 +654,12 @@ public void runStartFromGroupOffsets() throws Exception {
          readProps.setProperty("auto.offset.reset", "earliest");

          // the committed group offsets should be used as starting points

          • KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
            + Properties offsetHandlerProps = new Properties();
            + offsetHandlerProps.putAll(standardProps);
            + offsetHandlerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            + offsetHandlerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
              • End diff –

          Doesn't it make sense to set these properties once in the `KafkaOffsetHandlerImpl` instead of all locations where the offset handler is being created?

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r101231413 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java — @@ -633,7 +654,12 @@ public void runStartFromGroupOffsets() throws Exception { readProps.setProperty("auto.offset.reset", "earliest"); // the committed group offsets should be used as starting points KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + Properties offsetHandlerProps = new Properties(); + offsetHandlerProps.putAll(standardProps); + offsetHandlerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + offsetHandlerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); End diff – Doesn't it make sense to set these properties once in the `KafkaOffsetHandlerImpl` instead of all locations where the offset handler is being created?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2509#discussion_r101233499

          — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java —
          @@ -633,7 +654,12 @@ public void runStartFromGroupOffsets() throws Exception {
          readProps.setProperty("auto.offset.reset", "earliest");

          // the committed group offsets should be used as starting points

          • KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
            + Properties offsetHandlerProps = new Properties();
            + offsetHandlerProps.putAll(standardProps);
            + offsetHandlerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            + offsetHandlerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
              • End diff –

          I didn't do that because the constructor of `KafkaOffsetHandlerImpl` has no knowledge of whether the provided `Properties` should be manipulated or not (users of `KafkaOffsetHandlerImpl` provide the properties).

          However, I think it would make sense to do what you suggested in the `KafkaOffsetHandlerImpl` if it always just uses `standardProps` instead of a provided properties. In our case that would be completely fine. I'll change it as proposed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r101233499 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java — @@ -633,7 +654,12 @@ public void runStartFromGroupOffsets() throws Exception { readProps.setProperty("auto.offset.reset", "earliest"); // the committed group offsets should be used as starting points KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + Properties offsetHandlerProps = new Properties(); + offsetHandlerProps.putAll(standardProps); + offsetHandlerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + offsetHandlerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); End diff – I didn't do that because the constructor of `KafkaOffsetHandlerImpl` has no knowledge of whether the provided `Properties` should be manipulated or not (users of `KafkaOffsetHandlerImpl` provide the properties). However, I think it would make sense to do what you suggested in the `KafkaOffsetHandlerImpl` if it always just uses `standardProps` instead of a provided properties. In our case that would be completely fine. I'll change it as proposed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2509

          Last commit addresses consolidating the deserliazer settings for `KafkaOffsetHandlerImpl`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2509 Last commit addresses consolidating the deserliazer settings for `KafkaOffsetHandlerImpl`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/2509

          Hi @tzulitai and @rmetzger . I did not have time so far to look into it. I hope I will be able to do it till the end of the week. Is this ok?

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2509 Hi @tzulitai and @rmetzger . I did not have time so far to look into it. I hope I will be able to do it till the end of the week. Is this ok?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/2509

          @tzulitai how does this fit your timeline. Are there PRs depending this or is this PR blocking your in any way?
          If so, I would propose that we merge it right away.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2509 @tzulitai how does this fit your timeline. Are there PRs depending this or is this PR blocking your in any way? If so, I would propose that we merge it right away.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2509

          Thanks for letting us know @kl0u!

          Yes, there are other pending PRs based on this.
          I just double checked the changes in `FlinkKafkaConsumerBaseMigrationTest` myself, and I think that they are reasonable.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2509 Thanks for letting us know @kl0u! Yes, there are other pending PRs based on this. I just double checked the changes in `FlinkKafkaConsumerBaseMigrationTest` myself, and I think that they are reasonable.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/2509

          Thanks @tzulitai and @rmetzger ! Of course, feel free to proceed with this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2509 Thanks @tzulitai and @rmetzger ! Of course, feel free to proceed with this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2509

          Thanks! If do you happen to find inappropriate changes in `FlinkKafkaConsumerBaseMigrationTest`, please let me know, will be happy to discuss and fix it

          Merging this to `master` now ..

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2509 Thanks! If do you happen to find inappropriate changes in `FlinkKafkaConsumerBaseMigrationTest`, please let me know, will be happy to discuss and fix it Merging this to `master` now ..
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/2509

          Perfect!

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2509 Perfect!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2509

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2509
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -
          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Resolved in `master` via http://git-wip-us.apache.org/repos/asf/flink/commit/7477c5b .

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              1 Vote for this issue
              Watchers:
              10 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development