Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-38684

Stream-stream outer join has a possible correctness issue due to weakly read consistent on outer iterators




      We figured out stream-stream join has the same issue with SPARK-38320 on the appended iterators. Since the root cause is same as SPARK-38320, this is only reproducible with RocksDB state store provider, but even with HDFS backed state store provider, it is not guaranteed by interface contract hence may depend on the JVM vendor, version, etc.

      I can easily construct the scenario of “data loss” in state store.

      Condition follows:

      • Use stream-stream time interval outer join
        • left outer join has an issue on left side, right outer join has an issue on right side, full outer join has an issue on both sides
      • At batch N, produce row(s) on the problematic side which are non-late
      • At the same batch (batch N), some row(s) on the problematic side should be evicted by watermark condition

      When the condition is fulfilled, out of sync happens with keyToNumValues between state and the iterator in evict phase. If eviction of the row happens for the grouping key (updating keyToNumValues), the eviction phase “overwrites” keyToNumValues in the state as the value it calculates.

      Given that the eviction phase “do not know” about the new rows (keyToNumValues is out of sync), effectively discarding all rows from the state being added in the batch N.




            kabhwan Jungtaek Lim
            kabhwan Jungtaek Lim
            0 Vote for this issue
            2 Start watching this issue