Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-28818

FrequentItems applies an incorrect schema to the resulting dataframe when nulls are present

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 2.4.3
    • 2.4.7, 3.0.0
    • SQL
    • None

    Description

      A trivially reproducible bug in the code for `FrequentItems`. The schema for the resulting arrays of frequent items is hard coded] to have non-nullable array elements:

      val outputCols = colInfo.map { v =>
      StructField(v._1 + "_freqItems", ArrayType(v._2, false))
       }
       val schema = StructType(outputCols).toAttributes
       Dataset.ofRows(df.sparkSession, LocalRelation.fromExternalRows(schema, Seq(resultRow)))
      

       

      However if the column contains frequent nulls then these nulls are included in the frequent items array. This results in various errors such as any attempt to `collect()` resulting in a null pointer exception:

      from pyspark.sql import SparkSession
      
      spark = SparkSession.Builder().getOrCreate()
      df = spark.createDataFrame([
          (1, 'a'),
          (2, None),
          (3, 'b'),
      ], schema="id INTEGER, val STRING")
      
      rows = df.freqItems(df.columns).collect()
      

       Results in:

      Traceback (most recent call last):                                              
        File "<stdin>", line 1, in <module>
        File "/usr/local/bin/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/dataframe.py", line 533, in collect
          sock_info = self._jdf.collectToPython()
        File "/usr/local/bin/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
        File "/usr/local/bin/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/utils.py", line 63, in deco
          return f(*a, **kw)
        File "/usr/local/bin/spark-2.4.3-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
      py4j.protocol.Py4JJavaError: An error occurred while calling o40.collectToPython.
      : java.lang.NullPointerException
      	at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:109)
      	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
      	at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:44)
      	at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:44)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.immutable.List.foreach(List.scala:392)
      	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      	at scala.collection.immutable.List.map(List.scala:296)
      	at org.apache.spark.sql.execution.LocalTableScanExec.unsafeRows$lzycompute(LocalTableScanExec.scala:44)
      	at org.apache.spark.sql.execution.LocalTableScanExec.unsafeRows(LocalTableScanExec.scala:39)
      	at org.apache.spark.sql.execution.LocalTableScanExec.executeCollect(LocalTableScanExec.scala:70)
      	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3257)
      	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3254)
      	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
      	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
      	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
      	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
      	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
      	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3254)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
      	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
      	at py4j.Gateway.invoke(Gateway.java:282)
      	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
      	at py4j.commands.CallCommand.execute(CallCommand.java:79)
      	at py4j.GatewayConnection.run(GatewayConnection.java:238)
      	at java.lang.Thread.run(Thread.java:748)
      

      Unclear if the hardcoding is at fault or if the algorithm is actually designed to not return nulls even if they are frequent. In which case the hard coding would be appropriate. I'll put a PR in that assumes that the hardcoding is the bug unless people know otherwise?

      Attachments

        Activity

          People

            mhawes Matt Hawes
            mhawes Matt Hawes
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: