Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26436

Dataframe resulting from a GroupByKey and flatMapGroups operation throws java.lang.UnsupportedException when groupByKey is applied on it.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.4.0
    • None
    • SQL

    Description

      There seems to be a bug on groupByKey api for cases when it (groupByKey) is applied on a DataSet resulting from a former groupByKey and flatMapGroups invocation.

      In such cases groupByKey throws the following exception:

      java.lang.UnsupportedException: fieldIndex on a Row without schema is undefined.

       

      Although the dataframe has a valid schema and a groupBy("key") or repartition($"key") api calls on the same Dataframe and key succeed.

       

      Following is the code that reproduces the scenario:

       

       
         import org.apache.spark.sql.catalyst.encoders.RowEncoder
      import org.apache.spark.sql.{Row, SparkSession}
      import org.apache.spark.sql.types.{ IntegerType, StructField, StructType}
      
      import scala.collection.mutable.ListBuffer
      
      
      
        object Test {
      
          def main(args: Array[String]): Unit = {
      
            val values = List(List("1", "One") ,List("1", "Two") ,List("2", "Three"),List("2","4")).map(x =>(x(0), x(1)))
            val session = SparkSession.builder.config("spark.master", "local").getOrCreate
            import session.implicits._
            val dataFrame = values.toDF
      
      
            dataFrame.show()
            dataFrame.printSchema()
      
            val newSchema = StructType(dataFrame.schema.fields
              ++ Array(
              StructField("Count", IntegerType, false)
            )
            )
      
            val expr = RowEncoder.apply(newSchema)
      
            val tranform =  dataFrame.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, inputItr) => {
              val inputSeq = inputItr.toSeq
      
              val length = inputSeq.size
              var listBuff = new ListBuffer[Row]()
              var counter : Int= 0
              for(i <- 0 until(length))
              {
                counter+=1
      
              }
      
              for(i <- 0 until length ) {
                var x = inputSeq(i)
                listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))
              }
              listBuff.iterator
            })(expr)
      
            tranform.show
      
            val newSchema1 = StructType(tranform.schema.fields
              ++ Array(
              StructField("Count1", IntegerType, false)
            )
            )
            val expr1 = RowEncoder.apply(newSchema1)
            val tranform2 =  tranform.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, inputItr) => {
              val inputSeq = inputItr.toSeq
      
              val length = inputSeq.size
              var listBuff = new ListBuffer[Row]()
              var counter : Int= 0
              for(i <- 0 until(length))
              {
                counter+=1
      
              }
      
              for(i <- 0 until length ) {
                var x = inputSeq(i)
                listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))
              }
              listBuff.iterator
            })(expr1)
      
            tranform2.show
          }
      }
      
      Test.main(null)
      
      
      

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            manish.bhatt Manish
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: