Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13714

Flaky test IQv2StoreIntegrationTest

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 3.2.0
    • 3.2.0
    • streams
    • None

    Description

      I have observed multiple consistency violations in the IQv2StoreIntegrationTest. Since this is the first release of IQv2, and it's apparently a major flaw in the feature, we should not release with this bug outstanding. Depending on the time-table, we may want to block the release or pull the feature until the next release.

       

      The first observation I have is from 23 Feb 2022. So far all observations point to the range query in particular, and all observations have been for RocksDB stores, including RocksDBStore, TimestampedRocksDBStore, and the windowed store built on RocksDB segments.

      For reference, range queries were implemented on 16 Feb 2022: https://github.com/apache/kafka/commit/b38f6ba5cc989702180f5d5f8e55ba20444ea884

      The window-specific range query test has also failed once that I have seen. That feature was implemented on 2 Jan 2022: https://github.com/apache/kafka/commit/b8f1cf14c396ab04b8968a8fa04d8cf67dd3254c

       

      Here are some stack traces I have seen:

      verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI]
      
      java.lang.AssertionError: 
      Expected: is <[1, 2, 3]>
           but: was <[1, 2]>
      	at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
      	at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
      	at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1125)
      	at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:803)
      	at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776)
       
      verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI]
      
      java.lang.AssertionError: 
      Expected: is <[1, 2, 3]>
           but: was <[1, 3]>
      	at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
      	at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
      	at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1131)
      	at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:809)
      	at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:778)
      	 
      verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI]
      
      java.lang.AssertionError: Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@35025a0a, executionInfo=[], position=Position{position={input-topic={0=1}}}}, 1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@38732364, executionInfo=[], position=Position{position={input-topic={1=1}}}}}, globalResult=null}
      Expected: is <[1, 2, 3]>
           but: was <[1, 2]>
      	at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
      	at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1129)
      	at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:807)
      	at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:780)
       
      verifyStore[cache=true, log=false, supplier=ROCKS_WINDOW, kind=DSL] 
      
          java.lang.AssertionError: Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredWindowedKeyValueIterator@2a32fb6, executionInfo=[], position=Position{position={input-topic={0=1}}}}, 1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredWindowedKeyValueIterator@6107165, executionInfo=[], position=Position{position={input-topic={1=1}}}}}, globalResult=null}
          Expected: is <[0, 1, 2, 3]> 
               but: was <[0, 2, 3]>
              at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
              at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleWindowRangeQuery(IQv2StoreIntegrationTest.java:1234)
              at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleWindowRangeQueries(IQv2StoreIntegrationTest.java:880)
              at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:793)
       

       

      Some observations:

      • After I added the whole query result to the failure message, we can see that the results are always past the desired position, even though they don't include all the data that should have been present in that position.
      • All the observed failures have happened with caching=true, but that it probably a red herring, since range queries skip the cache (cf fe72187cb15bf7dcc16e8630ed379e979c101151)
      • For a while, I thought that it might be a thread visibility problem with the iterators, since the missing record was always at the end of the range for some partition, but the window range failure is missing record 1, which is at the beginning of the range in partition 1.

      I have been able to reproduce the failure locally, but only occasionally. I made some hacks to narrow down the space of possibilities: https://github.com/vvcephei/kafka/commit/2a0776e52e378f1c59e98f352e3fa4f79c55842d

      I didn't have success running that one test until failure in IDEA. It has never failed for me in IDEA, even after thousands of attempts. In my testing branch, I added a loop to repeat one test configuration a thousand times in Gradle, but it still didn't fail reliably.

      I also added a test to specifically check that RocksDB is giving the desired serialization both in one thread and across threads, and that test passes for me. My next thought is to expand that Tmp test to do the same with the RocksDBIterator class, or maybe just with a standalone RocksDBStore to see if we can reproduce it.

      Attachments

        Issue Links

          Activity

            People

              vvcephei John Roesler
              vvcephei John Roesler
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: