Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.3.0
-
None
Description
Currently, unionByName workes with two DataFrames with slightly different schemas. It would be good it works with an array of struct columns.
unionByName fails if we try to merge dataframe with an array of struct columns with slightly different schema
Below is the example.
Step 1: dataframe arrayStructDf1 with columnbooksIntersted of type array of struct
val arrayStructData = Seq( Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))), Row("Lilly",List(Row("Java","XY",200),Row("Scala","XB",500)))) val arrayStructSchema = new StructType().add("name",StringType) .add("booksIntersted",ArrayType(new StructType() .add("name",StringType) .add("author",StringType) .add("pages",IntegerType))) val arrayStructDf1 = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema) arrayStructDf1.printSchema() scala> arrayStructDf2.printSchema() root |-- name: string (nullable = true) |-- booksIntersted: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- name: string (nullable = true) | | |-- author: string (nullable = true) | | |-- pages: integer (nullable = true)
Step 2: Another dataframe arrayStructDf2 with column booksIntersted of type array of a struct but struct contains an extra field called "new_column"
val arrayStructData2 = Seq( Row("James",List(Row("Java","XX",120,"new_column_data"),Row("Scala","XA",300,"new_column_data"))), Row("Lilly",List(Row("Java","XY",200,"new_column_data"),Row("Scala","XB",500,"new_column_data")))) val arrayStructSchemaNewClm = new StructType().add("name",StringType) .add("booksIntersted",ArrayType(new StructType() .add("name",StringType) .add("author",StringType) .add("pages",IntegerType) .add("new_column",StringType))) val arrayStructDf2 = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData2),arrayStructSchemaNewClm) arrayStructDf2.printSchema() scala> arrayStructDf2.printSchema() root |-- name: string (nullable = true) |-- booksIntersted: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- name: string (nullable = true) | | |-- author: string (nullable = true) | | |-- pages: integer (nullable = true) | | |-- new_column: string (nullable = true)
Step3: Merge arrayStructDf1 and arrayStructDf2 using unionByName
We see the error org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types.
scala> arrayStructDf1.unionByName(arrayStructDf2,allowMissingColumns=true) org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. array<struct<name:string,author:string,pages:int,new_column:string>> <> array<struct<name:string,author:string,pages:int>> at the second column of the second table; 'Union false, false :- LogicalRDD [name#183, booksIntersted#184], false +- Project [name#204, booksIntersted#205] +- LogicalRDD [name#204, booksIntersted#205], false
unionByName should fill the missing data with null like it does column with struct type