Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22316

Cannot Select ReducedAggregator Column

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Incomplete
    • 2.2.0
    • None
    • Spark Core

    Description

      Given a dataset which has been run through reduceGroups like this

      case class Person(name: String, age: Int)
      case class Customer(id: Int, person: Person)
      val ds = spark.createDataset(Seq(Customer(1,Person("russ", 85))))
      val grouped = ds.groupByKey(c => c.id).reduceGroups( (x,y) => x )
      

      We end up with a Dataset with the schema

       org.apache.spark.sql.types.StructType = 
      StructType(
        StructField(value,IntegerType,false), 
        StructField(ReduceAggregator(Customer),
          StructType(StructField(id,IntegerType,false),
          StructField(person,
            StructType(StructField(name,StringType,true),
            StructField(age,IntegerType,false))
         ,true))
      ,true))
      

      The column names are

      Array(value, ReduceAggregator(Customer))
      

      But you cannot select the "ReduceAggregatorColumn"

      grouped.select(grouped.columns(1))
      org.apache.spark.sql.AnalysisException: cannot resolve '`ReduceAggregator(Customer)`' given input columns: [value, ReduceAggregator(Customer)];;
      'Project ['ReduceAggregator(Customer)]
      +- Aggregate [value#338], [value#338, reduceaggregator(org.apache.spark.sql.expressions.ReduceAggregator@5ada573, Some(newInstance(class Customer)), Some(class Customer), Some(StructType(StructField(id,IntegerType,false), StructField(person,StructType(StructField(name,StringType,true), StructField(age,IntegerType,false)),true))), input[0, scala.Tuple2, true]._1 AS value#340, if ((isnull(input[0, scala.Tuple2, true]._2) || None.equals)) null else named_struct(id, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).id AS id#195, person, if (isnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).person)) null else named_struct(name, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).person).name, true), age, assertnotnull(assertnotnull(assertnotnull(input[0, scala.Tuple2, true]._2)).person).age) AS person#196) AS _2#341, newInstance(class scala.Tuple2), assertnotnull(assertnotnull(input[0, Customer, true])).id AS id#195, if (isnull(assertnotnull(assertnotnull(input[0, Customer, true])).person)) null else named_struct(name, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(assertnotnull(input[0, Customer, true])).person).name, true), age, assertnotnull(assertnotnull(assertnotnull(input[0, Customer, true])).person).age) AS person#196, StructField(id,IntegerType,false), StructField(person,StructType(StructField(name,StringType,true), StructField(age,IntegerType,false)),true), true, 0, 0) AS ReduceAggregator(Customer)#346]
         +- AppendColumns <function1>, class Customer, [StructField(id,IntegerType,false), StructField(person,StructType(StructField(name,StringType,true), StructField(age,IntegerType,false)),true)], newInstance(class Customer), [input[0, int, false] AS value#338]
            +- LocalRelation [id#197, person#198]
      

      You can work around this by using "toDF" to rename the column

      scala> grouped.toDF("key", "reduced").select("reduced")
      res55: org.apache.spark.sql.DataFrame = [reduced: struct<id: int, person: struct<name: string, age: int>>]
      

      I think that all invocations of

      ds.select(ds.columns(i))
      

      For all valid i < columns size should work.

      Attachments

        Activity

          People

            Unassigned Unassigned
            rspitzer Russell Spitzer
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: