Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
2.4.4
-
None
Description
Now, Spark use the `TaskPartitionId` to determine the StateStore path.
TaskPartitionId \ StateStoreVersion --> StoreProviderId -> StateStore StateStoreName /
In spark stages, the task partition id is determined by the number of tasks. As we said the StateStore file path depends on the task partition id. So if stream-stream join task partition id is changed against last batch, it will get wrong StateStore data or fail with non-exist StateStore data. In some corner cases, it happened. Following is a sample pseudocode:
val df3 = streamDf1.join(streamDf2) val df5 = streamDf3.join(batchDf4) val df = df3.union(df5) df.writeStream...start()
A simplified DAG like this:
DataSourceV2Scan Scan Relation DataSourceV2Scan DataSourceV2Scan (streamDf3) | (streamDf1) (streamDf2) | | | | Exchange(200) Exchange(200) Exchange(200) Exchange(200) | | | | Sort Sort | | \ / \ / \ / \ / SortMergeJoin StreamingSymmetricHashJoin \ / \ / \ / Union
Stream-Steam join task Id will start from 200 to 399 as they are in the same stage with `SortMergeJoin`. But when there is no new incoming data in `streamDf3` in some batch, it will generate a empty LocalRelation, and then the SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, Stream-Steam join task Id will start from 1 to 200. Finally, it will get wrong StateStore path through TaskPartitionId, and failed with error reading state store delta file.
LocalTableScan Scan Relation DataSourceV2Scan DataSourceV2Scan | | | | BroadcastExchange | Exchange(200) Exchange(200) | | | | | | | | \ / \ / \ / \ / BroadcastHashJoin StreamingSymmetricHashJoin \ / \ / \ / Union
In my job, I closed the auto BroadcastJoin feature (set spark.sql.autoBroadcastJoinThreshold=-1) to walk around this bug. We should make the StateStore path determinate but not depends on TaskPartitionId.