Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
3.1.2
-
None
-
None
Description
I am gettin an error when I try to persist the results on a Join operation. Note that both tables to be joined and the output table are Iceberg tables.
SQL code to repro.
String sqlJoin = String.format( "SELECT * from " + "((select %s from %s.%s where %s ) lllll " + "join (select %s from %s.%s where %s ) rrrrr " + "using (%s))", ........); spark.sql(sqlJoin).writeTo("ciptest.ttt").option("write-format", "parquet").createOrReplace();
My exception stack is:
Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRow
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$1.writeValue(UnsafeRowSerializer.scala:64)
at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:249)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:158)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at ….
Explain on the Sql statement gets the following plan:
== Physical Plan == Project [ ... ] +- SortMergeJoin […], Inner :- Sort […], false, 0 : +- Exchange hashpartitioning(…, 10), ENSURE_REQUIREMENTS, [id=#38] : +- Filter (…) : +- BatchScan[... ] left [filters=…] +- *(2) Sort […], false, 0 +- Exchange hashpartitioning(…, 10), ENSURE_REQUIREMENTS, [id=#47] +- *(1) Filter (…) +- BatchScan[…] right [filters=…]
Note that several variations of this fail. Besides the repro code listed above I have tried doing CTAS and trying to write the result into parquet files without making a table out of it.