Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24633

arrays_zip function's code generator splits input processing incorrectly

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 2.4.0
    • 2.4.0
    • SQL
    • None
    • Mac OS High Sierra

    Description

      This works:

      scala> val df = spark.read.parquet("many_arrays_per_row")
      df: org.apache.spark.sql.DataFrame = [k0: array<bigint>, k1: array<bigint> ... 98 more fields]
      scala> df.selectExpr("arrays_zip(k0, k1, k2)").show(truncate=false)
      +----------------------------------------+
      |arrays_zip(k0, k1, k2)                  |
      +----------------------------------------+
      |[[6583, 1312, 7460], [668, 1626, 4129]] |
      |[[5415, 5251, 1514], [1631, 2224, 2553]]|
      +----------------------------------------+
      

      If I add one more array to the parameter list, I get this:

      scala> df.selectExpr("arrays_zip(k0, k1, k2, k3)").show(truncate=false)
      18/06/22 18:06:41 ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 92, Column 35: Unknown variable or type "scan_row_0"
      org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 92, Column 35: Unknown variable or type "scan_row_0"
              at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11821)
              at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6521)
              at org.codehaus.janino.UnitCompiler.access$13100(UnitCompiler.java:212)
              at org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:6133)
      .. much exception trace...
      18/06/22 18:06:41 WARN WholeStageCodegenExec: Whole-stage codegen disabled for plan (id=1):
       *(1) LocalLimit 21
      +- *(1) Project [cast(arrays_zip(k0#375, k1#376, k2#387, k3#398) as string) AS arrays_zip(k0, k1, k2, k3)#619]
         +- *(1) FileScan parquet [k0#375,k1#376,k2#387,k3#398] Batched: false, Format: Parquet, Location: InMemoryFileIndex[file:/Users/brobbins/github/spark_upstream/many_arrays_per_row], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<k0:array<bigint>,k1:array<bigint>,k2:array<bigint>,k3:array<bigint>>
      
      +----------------------------------------------------+
      |arrays_zip(k0, k1, k2, k3)                          |
      +----------------------------------------------------+
      |[[6583, 1312, 7460, 3712], [668, 1626, 4129, 2815]] |
      |[[5415, 5251, 1514, 1580], [1631, 2224, 2553, 7555]]|
      +----------------------------------------------------+
      

      I still got the answer!

      I add a 5th parameter:

      scala> df.selectExpr("arrays_zip(k0, k1, k2, k3, k4)").show(truncate=false)
      18/06/22 18:07:53 ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 97, Column 35: Unknown variable or type "scan_row_0"
      org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 97, Column 35: Unknown variable or type "scan_row_0"
              at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11821)
              at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6521)
              at org.codehaus.janino.UnitCompiler.access$13100(UnitCompiler.java:212)
              at org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:6133)
              at org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:6130)
              at org.codehaus.janino.Java$Package.accept(Java.java:4077)
      .. much exception trace...
      Caused by: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 73, Column 21: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 73, Column 21: Unknown variable or type "i"
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scal\
      a:1361)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1423)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1420)
        at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
        at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
        ... 31 more
      scala> 
      

      This time, no result.

      It looks like the generated code is expecting the input row to be in a parameterĀ (either i or scan_row_x), but that parameter is not passedĀ to the input handler function (see lines 069 and 073)

      /* 069 */   private int getValuesAndCardinalities_0_1(ArrayData[] arrVals_0, int biggestCardinality_0) {
      /* 070 */
      /* 071 */
      /* 072 */     if (biggestCardinality_0 != -1) {
      /* 073 */       boolean isNull_6 = i.isNullAt(4);
      /* 074 */       ArrayData value_6 = isNull_6 ?
      /* 075 */       null : (i.getArray(4));
      /* 076 */       if (!isNull_6) {
      /* 077 */         arrVals_0[4] = value_6;
      /* 078 */         biggestCardinality_0 = Math.max(biggestCardinality_0, value_6.numElements());
      /* 079 */       } else {
      /* 080 */         biggestCardinality_0 = -1;
      /* 081 */       }
      /* 082 */     }
      /* 083 */
      /* 084 */     return biggestCardinality_0;
      /* 085 */
      /* 086 */   }
      

      Here's the scan_row_0 case:

      /* 095 */   private int project_getValuesAndCardinalities_0_1(ArrayData[] project_arrVals_0, int project_biggestCardinality_0) {
      /* 096 */     if (project_biggestCardinality_0 != -1) {
      /* 097 */       boolean scan_isNull_3 = scan_row_0.isNullAt(3);
      /* 098 */       ArrayData scan_value_3 = scan_isNull_3 ?
      /* 099 */       null : (scan_row_0.getArray(3));
      /* 100 */       if (!scan_isNull_3) {
      /* 101 */         project_arrVals_0[3] = scan_value_3;
      /* 102 */         project_biggestCardinality_0 = Math.max(project_biggestCardinality_0, scan_value_3.numElements());
      /* 103 */       } else {
      /* 104 */         project_biggestCardinality_0 = -1;
      /* 105 */       }
      /* 106 */     }
      

      I am marking this as minor since this function is new and not in a released version of Spark.

      cc DylanGuedes

      Attachments

        Activity

          People

            mgaido Marco Gaido
            bersprockets Bruce Robbins
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: