Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
2.4.0
-
None
Description
There seems to be a bug on groupByKey api for cases when it (groupByKey) is applied on a DataSet resulting from a former groupByKey and flatMapGroups invocation.
In such cases groupByKey throws the following exception:
java.lang.UnsupportedException: fieldIndex on a Row without schema is undefined.
Although the dataframe has a valid schema and a groupBy("key") or repartition($"key") api calls on the same Dataframe and key succeed.
Following is the code that reproduces the scenario:
import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.{ IntegerType, StructField, StructType} import scala.collection.mutable.ListBuffer object Test { def main(args: Array[String]): Unit = { val values = List(List("1", "One") ,List("1", "Two") ,List("2", "Three"),List("2","4")).map(x =>(x(0), x(1))) val session = SparkSession.builder.config("spark.master", "local").getOrCreate import session.implicits._ val dataFrame = values.toDF dataFrame.show() dataFrame.printSchema() val newSchema = StructType(dataFrame.schema.fields ++ Array( StructField("Count", IntegerType, false) ) ) val expr = RowEncoder.apply(newSchema) val tranform = dataFrame.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, inputItr) => { val inputSeq = inputItr.toSeq val length = inputSeq.size var listBuff = new ListBuffer[Row]() var counter : Int= 0 for(i <- 0 until(length)) { counter+=1 } for(i <- 0 until length ) { var x = inputSeq(i) listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter)) } listBuff.iterator })(expr) tranform.show val newSchema1 = StructType(tranform.schema.fields ++ Array( StructField("Count1", IntegerType, false) ) ) val expr1 = RowEncoder.apply(newSchema1) val tranform2 = tranform.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, inputItr) => { val inputSeq = inputItr.toSeq val length = inputSeq.size var listBuff = new ListBuffer[Row]() var counter : Int= 0 for(i <- 0 until(length)) { counter+=1 } for(i <- 0 until length ) { var x = inputSeq(i) listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter)) } listBuff.iterator })(expr1) tranform2.show } } Test.main(null)