Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-7011

Instable Kafka testStartFromKafkaCommitOffsets failures on Travis

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.3.1, 1.4.0
    • Fix Version/s: 1.4.0, 1.3.2
    • Component/s: Kafka Connector, Tests
    • Labels:
      None

      Description

      Example:
      https://s3.amazonaws.com/archive.travis-ci.org/jobs/246703474/log.txt?X-Amz-Expires=30&X-Amz-Date=20170627T065647Z&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAJRYRXRSVGNKPKO5A/20170627/us-east-1/s3/aws4_request&X-Amz-SignedHeaders=host&X-Amz-Signature=dbfc90cfc386fef0990325b54ff74ee4d441944687e7fdaa73ce7b0c2b2ec0ea

      In general, the test testStartFromKafkaCommitOffsets implementation is a bit of an overkill. Before continuing with the test, it writes some records just for the sake of committing offsets to Kafka and waits for some offsets to be committed (which leads to the instability), whereas we can do that simply using the test base's OffsetHandler.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/4190

          FLINK-7011 [kafka] Harden Kafka testStartFromKafkaCommitOffsets ITCases

          Hardens `testStartFromKafkaCommitOffsets` in Kafka ITCases.

          *Description of what the test does:*
          The case verifies that whatever offset was committed to Kafka, Flink reads it correctly and can use that as the correct starting point for exactly-once. It is done in an end-to-end manner, verifying that the commit logic and read offset logic is coherent.

          *Problem:*
          The previous implementation was too strict. It tries 3 times to fetch some committed offsets. If none is fetched, the test fails. This PR changes it so that we retry infinitely until some offsets are committed. In the case that no offsets are ever committed due to incorrect offset commit logic, the test timeouts can guard against that.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-7011

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4190.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4190


          commit 264172d3b788ff73aade75116e8342c132df24c0
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-06-27T07:53:06Z

          FLINK-7011 [kafka] Harden Kafka testStartFromKafkaCommitOffsets ITCases


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/4190 FLINK-7011 [kafka] Harden Kafka testStartFromKafkaCommitOffsets ITCases Hardens `testStartFromKafkaCommitOffsets` in Kafka ITCases. * Description of what the test does: * The case verifies that whatever offset was committed to Kafka, Flink reads it correctly and can use that as the correct starting point for exactly-once. It is done in an end-to-end manner, verifying that the commit logic and read offset logic is coherent. * Problem: * The previous implementation was too strict. It tries 3 times to fetch some committed offsets. If none is fetched, the test fails. This PR changes it so that we retry infinitely until some offsets are committed. In the case that no offsets are ever committed due to incorrect offset commit logic, the test timeouts can guard against that. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-7011 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4190.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4190 commit 264172d3b788ff73aade75116e8342c132df24c0 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-06-27T07:53:06Z FLINK-7011 [kafka] Harden Kafka testStartFromKafkaCommitOffsets ITCases
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4190#discussion_r124527723

          — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java —
          @@ -279,49 +279,43 @@ public void runStartFromKafkaCommitOffsets() throws Exception {

          final String topicName = writeSequence("testStartFromKafkaCommitOffsetsTopic", recordsInEachPartition, parallelism, 1);

          • KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler();
            + // read some records so that some offsets are committed to Kafka
            + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            + env.getConfig().disableSysoutLogging();
            + env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
            + env.setParallelism(parallelism);
            + env.enableCheckpointing(20); // fast checkpoints to make sure we commit some offsets
          • Long o1;
          • Long o2;
          • Long o3;
          • int attempt = 0;
          • // make sure that o1, o2, o3 are not all null before proceeding
          • do {
          • attempt++;
          • LOG.info("Attempt " + attempt + " to read records and commit some offsets to Kafka");
            -
          • final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          • env.getConfig().disableSysoutLogging();
          • env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
          • env.setParallelism(parallelism);
          • env.enableCheckpointing(20); // fast checkpoints to make sure we commit some offsets
            -
          • env
          • .addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps))
          • .map(new ThrottledMapper<String>(consumePause))
          • .map(new MapFunction<String, Object>() {
          • int count = 0;
          • @Override
          • public Object map(String value) throws Exception {
          • count++;
          • if (count == recordsToConsume) { - throw new SuccessException(); - }
          • return null;
            + env
            + .addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps))
            + .map(new ThrottledMapper<String>(consumePause))
            + .map(new MapFunction<String, Object>() {
            + int count = 0;
            + @Override
            + public Object map(String value) throws Exception {
            + count++;
            + if (count == recordsToConsume) { + throw new SuccessException(); }
          • })
          • .addSink(new DiscardingSink<>());
            + return null;
            + }
            + })
            + .addSink(new DiscardingSink<>());
          • tryExecute(env, "Read some records to commit offsets to Kafka");
            + tryExecute(env, "Read some records to commit offsets to Kafka");

          + // make sure that we indeed have some offsets committed to Kafka
          + Long o1 = null;
          + Long o2 = null;
          + Long o3 = null;
          + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler();
          + while (o1 == null && o2 == null && o3 == null)

          { + Thread.sleep(100); o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0); o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1); o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2); - }

          while (o1 == null && o2 == null && o3 == null && attempt < 3);
          — End diff –

          I would still add a hard-limit for attempts (20?); there's no benefit in a test that succeeds at some point, if it doesn't finish relatively quickly the entire build will time out anyway.

          Also, when running tests locally i suppose there isn't even a time-limit that would kill the test...

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4190#discussion_r124527723 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java — @@ -279,49 +279,43 @@ public void runStartFromKafkaCommitOffsets() throws Exception { final String topicName = writeSequence("testStartFromKafkaCommitOffsetsTopic", recordsInEachPartition, parallelism, 1); KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(); + // read some records so that some offsets are committed to Kafka + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableSysoutLogging(); + env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env.setParallelism(parallelism); + env.enableCheckpointing(20); // fast checkpoints to make sure we commit some offsets Long o1; Long o2; Long o3; int attempt = 0; // make sure that o1, o2, o3 are not all null before proceeding do { attempt++; LOG.info("Attempt " + attempt + " to read records and commit some offsets to Kafka"); - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().disableSysoutLogging(); env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); env.setParallelism(parallelism); env.enableCheckpointing(20); // fast checkpoints to make sure we commit some offsets - env .addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps)) .map(new ThrottledMapper<String>(consumePause)) .map(new MapFunction<String, Object>() { int count = 0; @Override public Object map(String value) throws Exception { count++; if (count == recordsToConsume) { - throw new SuccessException(); - } return null; + env + .addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps)) + .map(new ThrottledMapper<String>(consumePause)) + .map(new MapFunction<String, Object>() { + int count = 0; + @Override + public Object map(String value) throws Exception { + count++; + if (count == recordsToConsume) { + throw new SuccessException(); } }) .addSink(new DiscardingSink<>()); + return null; + } + }) + .addSink(new DiscardingSink<>()); tryExecute(env, "Read some records to commit offsets to Kafka"); + tryExecute(env, "Read some records to commit offsets to Kafka"); + // make sure that we indeed have some offsets committed to Kafka + Long o1 = null; + Long o2 = null; + Long o3 = null; + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(); + while (o1 == null && o2 == null && o3 == null) { + Thread.sleep(100); o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0); o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1); o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2); - } while (o1 == null && o2 == null && o3 == null && attempt < 3); — End diff – I would still add a hard-limit for attempts (20?); there's no benefit in a test that succeeds at some point , if it doesn't finish relatively quickly the entire build will time out anyway. Also, when running tests locally i suppose there isn't even a time-limit that would kill the test...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4190

          @zentol the time limit is set on each individual version-specific `KafkaConsumerXXITCase`

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4190 @zentol the time limit is set on each individual version-specific `KafkaConsumerXXITCase`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4190#discussion_r124961658

          — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java —
          @@ -279,49 +279,43 @@ public void runStartFromKafkaCommitOffsets() throws Exception {

          final String topicName = writeSequence("testStartFromKafkaCommitOffsetsTopic", recordsInEachPartition, parallelism, 1);

          • KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler();
            + // read some records so that some offsets are committed to Kafka
            + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            + env.getConfig().disableSysoutLogging();
            + env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
            + env.setParallelism(parallelism);
            + env.enableCheckpointing(20); // fast checkpoints to make sure we commit some offsets
          • Long o1;
          • Long o2;
          • Long o3;
          • int attempt = 0;
          • // make sure that o1, o2, o3 are not all null before proceeding
          • do {
          • attempt++;
          • LOG.info("Attempt " + attempt + " to read records and commit some offsets to Kafka");
            -
          • final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          • env.getConfig().disableSysoutLogging();
          • env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
          • env.setParallelism(parallelism);
          • env.enableCheckpointing(20); // fast checkpoints to make sure we commit some offsets
            -
          • env
          • .addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps))
          • .map(new ThrottledMapper<String>(consumePause))
          • .map(new MapFunction<String, Object>() {
          • int count = 0;
          • @Override
          • public Object map(String value) throws Exception {
          • count++;
          • if (count == recordsToConsume) { - throw new SuccessException(); - }
          • return null;
            + env
            + .addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps))
            + .map(new ThrottledMapper<String>(consumePause))
            + .map(new MapFunction<String, Object>() {
            + int count = 0;
            + @Override
            + public Object map(String value) throws Exception {
            + count++;
            + if (count == recordsToConsume) { + throw new SuccessException(); }
          • })
          • .addSink(new DiscardingSink<>());
            + return null;
            + }
            + })
            + .addSink(new DiscardingSink<>());
          • tryExecute(env, "Read some records to commit offsets to Kafka");
            + tryExecute(env, "Read some records to commit offsets to Kafka");

          + // make sure that we indeed have some offsets committed to Kafka
          + Long o1 = null;
          + Long o2 = null;
          + Long o3 = null;
          + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler();
          + while (o1 == null && o2 == null && o3 == null)

          { + Thread.sleep(100); o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0); o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1); o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2); - }

          while (o1 == null && o2 == null && o3 == null && attempt < 3);
          — End diff –

          I understand the argument. Perhaps it is also a fact that this test is covering too much into one single test, hence the awkwardness in making it stable.
          I think it is sufficient to have 2 separate tests that replace this:
          (a) test that committed Kafka offsets are correct (there is already a ITCase for this)
          (b) test that committed offsets are correctly picked up and used correctly (there is actually also a test for this already).

          Hence, I would conclude that perhaps this test can be removed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4190#discussion_r124961658 — Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java — @@ -279,49 +279,43 @@ public void runStartFromKafkaCommitOffsets() throws Exception { final String topicName = writeSequence("testStartFromKafkaCommitOffsetsTopic", recordsInEachPartition, parallelism, 1); KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(); + // read some records so that some offsets are committed to Kafka + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().disableSysoutLogging(); + env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env.setParallelism(parallelism); + env.enableCheckpointing(20); // fast checkpoints to make sure we commit some offsets Long o1; Long o2; Long o3; int attempt = 0; // make sure that o1, o2, o3 are not all null before proceeding do { attempt++; LOG.info("Attempt " + attempt + " to read records and commit some offsets to Kafka"); - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().disableSysoutLogging(); env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); env.setParallelism(parallelism); env.enableCheckpointing(20); // fast checkpoints to make sure we commit some offsets - env .addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps)) .map(new ThrottledMapper<String>(consumePause)) .map(new MapFunction<String, Object>() { int count = 0; @Override public Object map(String value) throws Exception { count++; if (count == recordsToConsume) { - throw new SuccessException(); - } return null; + env + .addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps)) + .map(new ThrottledMapper<String>(consumePause)) + .map(new MapFunction<String, Object>() { + int count = 0; + @Override + public Object map(String value) throws Exception { + count++; + if (count == recordsToConsume) { + throw new SuccessException(); } }) .addSink(new DiscardingSink<>()); + return null; + } + }) + .addSink(new DiscardingSink<>()); tryExecute(env, "Read some records to commit offsets to Kafka"); + tryExecute(env, "Read some records to commit offsets to Kafka"); + // make sure that we indeed have some offsets committed to Kafka + Long o1 = null; + Long o2 = null; + Long o3 = null; + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(); + while (o1 == null && o2 == null && o3 == null) { + Thread.sleep(100); o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0); o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1); o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2); - } while (o1 == null && o2 == null && o3 == null && attempt < 3); — End diff – I understand the argument. Perhaps it is also a fact that this test is covering too much into one single test, hence the awkwardness in making it stable. I think it is sufficient to have 2 separate tests that replace this: (a) test that committed Kafka offsets are correct (there is already a ITCase for this) (b) test that committed offsets are correctly picked up and used correctly (there is actually also a test for this already). Hence, I would conclude that perhaps this test can be removed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/4190

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4190
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Merged for master via ba75bdef78dd3ea6d23666d63c94e96b668a8a94.
          Merged for 1.3 via 87ff2890ccec895f950bd9d20e4394cae75e9d5c.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Merged for master via ba75bdef78dd3ea6d23666d63c94e96b668a8a94. Merged for 1.3 via 87ff2890ccec895f950bd9d20e4394cae75e9d5c.

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development