Details
Description
Preface: I'm creating Kotlin API for spark to take best parts from three worlds — spark scala, spark java and kotlin.
What is nice — it works in most scenarios.
But i've hit following cornercase:
withSpark(props = mapOf("spark.sql.codegen.wholeStage" to true)) { dsOf(1, null, 2) .map { c(it) } .debugCodegen() .show() }
c(it) is creation of unnamed tuple
It fails with exception
java.lang.NullPointerException: Null value appeared in non-nullable field: top level Product or row object If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int). at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.serializefromobject_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.mapelements_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.deserializetoobject_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) …
I know, in Scala it won't work, so I could stop here. But it works in Kotlin if I turn wholestage codegen off!
Moreover, if we will dig into generated code (when wholestage codegen is on), we'll see that basically flow is following:
If one of elements in source dataset was null we wil throw NPE no matter what.
Flow is as follows:
private void serializefromobject_doConsume_0(org.jetbrains.spark.api.Arity1 serializefromobject_expr_0_0, boolean serializefromobject_exprIsNull_0_0) throws java.io.IOException { serializefromobject_doConsume_0(mapelements_value_1, mapelements_isNull_1); mapelements_isNull_1 = mapelements_resultIsNull_0; mapelements_resultIsNull_0 = mapelements_exprIsNull_0_0; private void mapelements_doConsume_0(java.lang.Integer mapelements_expr_0_0, boolean mapelements_exprIsNull_0_0) throws java.io.IOException { mapelements_doConsume_0(deserializetoobject_value_0, deserializetoobject_isNull_0); deserializetoobject_resultIsNull_0 = deserializetoobject_exprIsNull_0_0; private void deserializetoobject_doConsume_0(InternalRow localtablescan_row_0, int deserializetoobject_expr_0_0, boolean deserializetoobject_exprIsNull_0_0) throws java.io.IOException { deserializetoobject_doConsume_0(localtablescan_row_0, localtablescan_value_0, localtablescan_isNull_0); boolean localtablescan_isNull_0 = localtablescan_row_0.isNullAt(0); mapelements_isNull_1 = true;
You can find generated code in it's original view and slightly simplified and refacored version here
I believe that Spark should not behave differently when wholestage codegen is on and off and differences in behavior look like a bug.
My Spark version is 3.0.0-preview2