Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9512

Flaky Test LagFetchIntegrationTest.shouldFetchLagsDuringRestoration

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 2.5.0
    • 2.5.0
    • streams, unit tests

    Description

      https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/497/testReport/junit/org.apache.kafka.streams.integration/LagFetchIntegrationTest/shouldFetchLagsDuringRestoration/

      java.lang.NullPointerException at org.apache.kafka.streams.integration.LagFetchIntegrationTest.shouldFetchLagsDuringRestoration(LagFetchIntegrationTest.java:306)

      Attachments

        Activity

          vinoth Vinoth Chandar added a comment -
          restartedStreams.setGlobalStateRestoreListener(new StateRestoreListener() {
              @Override
              public void onRestoreStart(final TopicPartition topicPartition, final String storeName, final long startingOffset, final long endingOffset) {
                  restoreStartLagInfo.putAll(restartedStreams.allLocalStorePartitionLags());
              }
          
              @Override
              public void onBatchRestored(final TopicPartition topicPartition, final String storeName, final long batchEndOffset, final long numRestored) {
              }
          
              @Override
              public void onRestoreEnd(final TopicPartition topicPartition, final String storeName, final long totalRestored) {
                  restoreEndLagInfo.putAll(restartedStreams.allLocalStorePartitionLags());
              }
          });
          
          restartedStreams.start();
          TestUtils.waitForCondition(() -> restartedStreams.allLocalStorePartitionLags().get(stateStoreName).get(0).offsetLag() == 0,
              "Standby should eventually catchup and have zero lag.");
          final LagInfo fullLagInfo = restoreStartLagInfo.get(stateStoreName).get(0);
          assertThat(fullLagInfo.currentOffsetPosition(), equalTo(0L));
          assertThat(fullLagInfo.endOffsetPosition(), equalTo(5L));
          assertThat(fullLagInfo.offsetLag(), equalTo(5L));
          
          assertThat(restoreEndLagInfo.get(stateStoreName).get(0), equalTo(zeroLagInfo)); <-- NPE line 

           

          NPE can happen only if an empty lag map is added to `onRestoreEnd` i.e when restoration finishes there is no entry for changelog topic? This is not possible esp for standby, since there should be a standby task . The test clearly waits till we reach zero lag, using the same stateStoreName.. and that seems to be working.. 

           

          So I wonder if there is some race between restoration ending and the standy task creation? In any case, the problematic line seems redundant anyway, since it just checks for the same thing as the waitForCondition() 

           

           

           

          vinoth Vinoth Chandar added a comment - restartedStreams.setGlobalStateRestoreListener( new StateRestoreListener() { @Override public void onRestoreStart( final TopicPartition topicPartition, final String storeName, final long startingOffset, final long endingOffset) { restoreStartLagInfo.putAll(restartedStreams.allLocalStorePartitionLags()); } @Override public void onBatchRestored( final TopicPartition topicPartition, final String storeName, final long batchEndOffset, final long numRestored) { } @Override public void onRestoreEnd( final TopicPartition topicPartition, final String storeName, final long totalRestored) { restoreEndLagInfo.putAll(restartedStreams.allLocalStorePartitionLags()); } }); restartedStreams.start(); TestUtils.waitForCondition(() -> restartedStreams.allLocalStorePartitionLags().get(stateStoreName).get(0).offsetLag() == 0, "Standby should eventually catchup and have zero lag." ); final LagInfo fullLagInfo = restoreStartLagInfo.get(stateStoreName).get(0); assertThat(fullLagInfo.currentOffsetPosition(), equalTo(0L)); assertThat(fullLagInfo.endOffsetPosition(), equalTo(5L)); assertThat(fullLagInfo.offsetLag(), equalTo(5L)); assertThat(restoreEndLagInfo.get(stateStoreName).get(0), equalTo(zeroLagInfo)); <-- NPE line   NPE can happen only if an empty lag map is added to `onRestoreEnd` i.e when restoration finishes there is no entry for changelog topic? This is not possible esp for standby, since there should be a standby task . The test clearly waits till we reach zero lag, using the same stateStoreName.. and that seems to be working..    So I wonder if there is some race between restoration ending and the standy task creation? In any case, the problematic line seems redundant anyway, since it just checks for the same thing as the waitForCondition()       
          vinoth Vinoth Chandar added a comment -

          I will try to reproduce this locally and investigate.. 

          If I cannot reproduce this, will send a PR to remove the redundant assert at the end. 

          vinoth Vinoth Chandar added a comment - I will try to reproduce this locally and investigate..  If I cannot reproduce this, will send a PR to remove the redundant assert at the end. 
          vinoth Vinoth Chandar added a comment -

          I have run this locally over 15 times.. without luck.. 

          guozhang Quick confirmation from you.. For a standby changelog partition, during restoration, the StandbyTask would already be created? Or is that created in parallel while the the restoration goes on.. This can be the only cause of such a race.. mjsax as well, please chime in if you know 

          vinoth Vinoth Chandar added a comment - I have run this locally over 15 times.. without luck..  guozhang  Quick confirmation from you.. For a standby changelog partition, during restoration, the StandbyTask would already be created? Or is that created in parallel while the the restoration goes on.. This can be the only cause of such a race.. mjsax  as well, please chime in if you know 
          guozhang Guozhang Wang added a comment -

          Did you see it in Scala 2.13 or other Scala versions? I also found some test failures are quite consistent with the Scala 2.13 jenkins jobs.. and locally even if I enabled parallel runs I cannot reproduce it (I'm on 2.11 locally).

          guozhang Guozhang Wang added a comment - Did you see it in Scala 2.13 or other Scala versions? I also found some test failures are quite consistent with the Scala 2.13 jenkins jobs.. and locally even if I enabled parallel runs I cannot reproduce it (I'm on 2.11 locally).
          vinoth Vinoth Chandar added a comment - - edited

          IIUC mjsax saw this couple times and since this one is on 2.12, its probably not the same as what you are facing.. 

           

          But can you confirm this understanding for me? It will help me formulate a fix here 

          > For a standby changelog partition, during restoration, the StandbyTask would already be created? Or is that created in parallel while the the restoration goes on..?

          vinoth Vinoth Chandar added a comment - - edited IIUC mjsax  saw this couple times and since this one is on 2.12, its probably not the same as what you are facing..    But can you confirm this understanding for me? It will help me formulate a fix here  > For a standby changelog partition, during restoration, the StandbyTask would already be created? Or is that created in parallel while the the restoration goes on..?
          vinoth Vinoth Chandar added a comment - - edited

          Repro-ed with some tips from guozhang.. Appears as though the fetch of the endoffsets fails to the broker, leaving us with an empty map.. (the test has no retries now) .. 

          Looking at the fix 

          vinoth Vinoth Chandar added a comment - - edited Repro-ed with some tips from guozhang.. Appears as though the fetch of the endoffsets fails to the broker, leaving us with an empty map.. (the test has no retries now) ..  Looking at the fix 
          githubbot ASF GitHub Bot added a comment -

          vinothchandar commented on pull request #8076: KAFKA-9512: Flaky Test LagFetchIntegrationTest.shouldFetchLagsDuringRestoration
          URL: https://github.com/apache/kafka/pull/8076

          *More detailed description of your change,
          if necessary. The PR title and PR message become
          the squashed commit message, so use a separate
          comment to ping reviewers.*

          • Added additional synchronization and increased timeouts to handle flakiness
          • Added some pre-cautionary retries when trying to obtain lag map

          *Summary of testing strategy (including rationale)
          for the feature or bug fix. Unit and/or integration
          tests are expected for any behaviour change and
          system tests should be considered for larger changes.*

          Ran locally the entire suite 100+ times (2x as much as it took to reproduce the original issue)
          ![Screen Shot 2020-02-09 at 5 57 31 PM](https://user-images.githubusercontent.com/1179324/74115829-c4937680-4b65-11ea-9a97-6c657e40f4a9.png)

          P.S: Also ran the two tests I added in `QueryableStateIntegrationTest`. They seem solid (standby test ran 1900 times without flaking out)

              1. Committer Checklist (excluded from commit message)
          • [ ] Verify design and implementation
          • [ ] Verify test coverage and CI build status
          • [ ] Verify documentation (including upgrade notes)

          ----------------------------------------------------------------
          This is an automated message from the Apache Git Service.
          To respond to the message, please log on to GitHub and use the
          URL above to go to the specific comment.

          For queries about this service, please contact Infrastructure at:
          users@infra.apache.org

          githubbot ASF GitHub Bot added a comment - vinothchandar commented on pull request #8076: KAFKA-9512 : Flaky Test LagFetchIntegrationTest.shouldFetchLagsDuringRestoration URL: https://github.com/apache/kafka/pull/8076 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* Added additional synchronization and increased timeouts to handle flakiness Added some pre-cautionary retries when trying to obtain lag map *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* Ran locally the entire suite 100+ times (2x as much as it took to reproduce the original issue) ! [Screen Shot 2020-02-09 at 5 57 31 PM] ( https://user-images.githubusercontent.com/1179324/74115829-c4937680-4b65-11ea-9a97-6c657e40f4a9.png ) P.S: Also ran the two tests I added in `QueryableStateIntegrationTest`. They seem solid (standby test ran 1900 times without flaking out) Committer Checklist (excluded from commit message) [ ] Verify design and implementation [ ] Verify test coverage and CI build status [ ] Verify documentation (including upgrade notes) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org
          githubbot ASF GitHub Bot added a comment -

          guozhangwang commented on pull request #8076: KAFKA-9512: Flaky Test LagFetchIntegrationTest.shouldFetchLagsDuringRestoration
          URL: https://github.com/apache/kafka/pull/8076

          ----------------------------------------------------------------
          This is an automated message from the Apache Git Service.
          To respond to the message, please log on to GitHub and use the
          URL above to go to the specific comment.

          For queries about this service, please contact Infrastructure at:
          users@infra.apache.org

          githubbot ASF GitHub Bot added a comment - guozhangwang commented on pull request #8076: KAFKA-9512 : Flaky Test LagFetchIntegrationTest.shouldFetchLagsDuringRestoration URL: https://github.com/apache/kafka/pull/8076 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org
          cadonna Bruno Cadonna added a comment -

          vinothguozhang Can we close this ticket?

          cadonna Bruno Cadonna added a comment - vinoth guozhang Can we close this ticket?
          vinoth Vinoth Chandar added a comment -

          Closing since the PR is now landed

          vinoth Vinoth Chandar added a comment - Closing since the PR is now landed

          People

            vinoth Vinoth Chandar
            mjsax Matthias J. Sax
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: