Description
This works:
scala> val df = spark.read.parquet("many_arrays_per_row") df: org.apache.spark.sql.DataFrame = [k0: array<bigint>, k1: array<bigint> ... 98 more fields] scala> df.selectExpr("arrays_zip(k0, k1, k2)").show(truncate=false) +----------------------------------------+ |arrays_zip(k0, k1, k2) | +----------------------------------------+ |[[6583, 1312, 7460], [668, 1626, 4129]] | |[[5415, 5251, 1514], [1631, 2224, 2553]]| +----------------------------------------+
If I add one more array to the parameter list, I get this:
scala> df.selectExpr("arrays_zip(k0, k1, k2, k3)").show(truncate=false) 18/06/22 18:06:41 ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 92, Column 35: Unknown variable or type "scan_row_0" org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 92, Column 35: Unknown variable or type "scan_row_0" at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11821) at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6521) at org.codehaus.janino.UnitCompiler.access$13100(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:6133) .. much exception trace... 18/06/22 18:06:41 WARN WholeStageCodegenExec: Whole-stage codegen disabled for plan (id=1): *(1) LocalLimit 21 +- *(1) Project [cast(arrays_zip(k0#375, k1#376, k2#387, k3#398) as string) AS arrays_zip(k0, k1, k2, k3)#619] +- *(1) FileScan parquet [k0#375,k1#376,k2#387,k3#398] Batched: false, Format: Parquet, Location: InMemoryFileIndex[file:/Users/brobbins/github/spark_upstream/many_arrays_per_row], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<k0:array<bigint>,k1:array<bigint>,k2:array<bigint>,k3:array<bigint>> +----------------------------------------------------+ |arrays_zip(k0, k1, k2, k3) | +----------------------------------------------------+ |[[6583, 1312, 7460, 3712], [668, 1626, 4129, 2815]] | |[[5415, 5251, 1514, 1580], [1631, 2224, 2553, 7555]]| +----------------------------------------------------+
I still got the answer!
I add a 5th parameter:
scala> df.selectExpr("arrays_zip(k0, k1, k2, k3, k4)").show(truncate=false) 18/06/22 18:07:53 ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 97, Column 35: Unknown variable or type "scan_row_0" org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 97, Column 35: Unknown variable or type "scan_row_0" at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11821) at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6521) at org.codehaus.janino.UnitCompiler.access$13100(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:6133) at org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:6130) at org.codehaus.janino.Java$Package.accept(Java.java:4077) .. much exception trace... Caused by: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 73, Column 21: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 73, Column 21: Unknown variable or type "i" at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scal\ a:1361) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1423) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1420) at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) ... 31 more scala>
This time, no result.
It looks like the generated code is expecting the input row to be in a parameterĀ (either i or scan_row_x), but that parameter is not passedĀ to the input handler function (see lines 069 and 073)
/* 069 */ private int getValuesAndCardinalities_0_1(ArrayData[] arrVals_0, int biggestCardinality_0) { /* 070 */ /* 071 */ /* 072 */ if (biggestCardinality_0 != -1) { /* 073 */ boolean isNull_6 = i.isNullAt(4); /* 074 */ ArrayData value_6 = isNull_6 ? /* 075 */ null : (i.getArray(4)); /* 076 */ if (!isNull_6) { /* 077 */ arrVals_0[4] = value_6; /* 078 */ biggestCardinality_0 = Math.max(biggestCardinality_0, value_6.numElements()); /* 079 */ } else { /* 080 */ biggestCardinality_0 = -1; /* 081 */ } /* 082 */ } /* 083 */ /* 084 */ return biggestCardinality_0; /* 085 */ /* 086 */ }
Here's the scan_row_0 case:
/* 095 */ private int project_getValuesAndCardinalities_0_1(ArrayData[] project_arrVals_0, int project_biggestCardinality_0) { /* 096 */ if (project_biggestCardinality_0 != -1) { /* 097 */ boolean scan_isNull_3 = scan_row_0.isNullAt(3); /* 098 */ ArrayData scan_value_3 = scan_isNull_3 ? /* 099 */ null : (scan_row_0.getArray(3)); /* 100 */ if (!scan_isNull_3) { /* 101 */ project_arrVals_0[3] = scan_value_3; /* 102 */ project_biggestCardinality_0 = Math.max(project_biggestCardinality_0, scan_value_3.numElements()); /* 103 */ } else { /* 104 */ project_biggestCardinality_0 = -1; /* 105 */ } /* 106 */ }
I am marking this as minor since this function is new and not in a released version of Spark.
cc DylanGuedes