Description
DataFrame.except doesn't work for UDT columns. It is because ExtractEquiJoinKeys will run Literal.default against UDT. However, we don't handle UDT in Literal.default and an exception will throw like:
java.lang.RuntimeException: no default for type
org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7
at org.apache.spark.sql.catalyst.expressions.Literal$.default(literals.scala:179)
at org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys$$anonfun$4.apply(patterns.scala:117)
at org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys$$anonfun$4.apply(patterns.scala:110)
More simple fix is just let Literal.default handle UDT by its sql type. So we can use more efficient join type on UDT.
Besides except, this also fixes other similar scenarios, so in summary this fixes:
- except on two Datasets with UDT
- intersect on two Datasets with UDT
- Join with the join conditions using <=> on UDT columns