Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.1.2, 3.2.1, 3.3.0, 3.4.0
-
None
Description
When we use dropDuplicate() in the streaming query, specifying the columns explicitly would perform deduplication against the columns rather than all columns.
For the structure of state in streaming deduplication, we construct the key from "specified" columns and value as empty row (since it's not used at all). That said, once the query specifies the columns in dropDuplicate(), all other columns should not affect the operation.
Unfortunately, even we use the empty row as value of the state store, we register the "all columns" as the schema for the value on state store, which leads incorrect behavior from checking schema for state store. (This is figured out as a long-standing issue, it's from the initial implementation of StreamingDeduplicateExec.)
Specifically, columns for DataFrame which is applied to streaming deduplicate should be same across the lifetime of the query, whereas the only specified columns should be same actually.
It would be ideal to change the value schema to be empty, but the change itself may not be sufficient since schema file has been already written for older streaming queries. We may need to allow state schema compatibility checker to ignore value schema if required (either config or parameter of method if feasible).