Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
2.1.1
-
None
Description
Failed to insert VectorUDT to hive table with DataFrameWriter.insertInto(tableName: String). The issue seems similar with SPARK-17765 which have been resolved in 2.1.0.
Error message:
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`features`' due to data type mismatch: cannot cast org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 to StructType(StructField(type,ByteType,true), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,true),true), StructField(values,ArrayType(DoubleType,true),true));;
'InsertIntoTable Relationid#21,features#22 parquet, OverwriteOptions(false,Map()), false
+- 'Project cast(id#13L as int) AS id#27, cast(features#14 as struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) AS features#28
+- LogicalRDD id#13L, features#14
Reproduce code:
import scala.annotation.varargs import org.apache.spark.ml.linalg.SQLDataTypes import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.LongType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StructType case class UDT(`id`: Long, `features`: org.apache.spark.ml.linalg.Vector) object UDTTest { def main(args: Array[String]): Unit = { val tb = "table_udt" val spark = SparkSession.builder().master("local[4]").appName("UDTInsertInto").enableHiveSupport().getOrCreate() spark.sql("drop table if exists " + tb) /* * VectorUDT sql type definition: * * override def sqlType: StructType = { * StructType(Seq( * StructField("type", ByteType, nullable = false), * StructField("size", IntegerType, nullable = true), * StructField("indices", ArrayType(IntegerType, containsNull = false), nullable = true), * StructField("values", ArrayType(DoubleType, containsNull = false), nullable = true))) * } */ //Create Hive table base on VectorUDT sql type spark.sql("create table if not exists "+tb+"(id int, features struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)" + " row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+ " stored as"+ " inputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+ " outputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'") var seq = new scala.collection.mutable.ArrayBuffer[UDT]() for (x <- 1 to 2) { seq += (new UDT(x, org.apache.spark.ml.linalg.Vectors.dense(0.2, 0.21, 0.44))) } val rowRDD = (spark.sparkContext.makeRDD[UDT](seq)).map { x => Row.fromSeq(Seq(x.id,x.features)) } val schema = StructType(Array(StructField("id", LongType,false),StructField("features", SQLDataTypes.VectorType,false))) val df = spark.createDataFrame(rowRDD, schema) //insert into hive table df.write.insertInto(tb) } }