Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22137

Failed to insert VectorUDT to hive table with DataFrameWriter.insertInto(tableName: String)

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.1.1
    • None
    • SQL

    Description

      Failed to insert VectorUDT to hive table with DataFrameWriter.insertInto(tableName: String). The issue seems similar with SPARK-17765 which have been resolved in 2.1.0.

      Error message:
      Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`features`' due to data type mismatch: cannot cast org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 to StructType(StructField(type,ByteType,true), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,true),true), StructField(values,ArrayType(DoubleType,true),true));;
      'InsertIntoTable Relationid#21,features#22 parquet, OverwriteOptions(false,Map()), false
      +- 'Project cast(id#13L as int) AS id#27, cast(features#14 as struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) AS features#28
      +- LogicalRDD id#13L, features#14

      Reproduce code:

      import scala.annotation.varargs
      import org.apache.spark.ml.linalg.SQLDataTypes
      import org.apache.spark.sql.Row
      import org.apache.spark.sql.SparkSession
      import org.apache.spark.sql.types.LongType
      import org.apache.spark.sql.types.StructField
      import org.apache.spark.sql.types.StructType
      
      
      case class UDT(`id`: Long, `features`: org.apache.spark.ml.linalg.Vector)
      
      object UDTTest {
      
        def main(args: Array[String]): Unit = {
          val tb = "table_udt"
          val spark = SparkSession.builder().master("local[4]").appName("UDTInsertInto").enableHiveSupport().getOrCreate()
      
          spark.sql("drop table if exists " + tb)
          
          /*
           * VectorUDT sql type definition:
           * 
           *   override def sqlType: StructType = {
           *   StructType(Seq(
           *   	StructField("type", ByteType, nullable = false),
           *   	StructField("size", IntegerType, nullable = true),
           *   	StructField("indices", ArrayType(IntegerType, containsNull = false), nullable = true),
           *   	StructField("values", ArrayType(DoubleType, containsNull = false), nullable = true)))
           *   }
          */
          
          //Create Hive table base on VectorUDT sql type
          spark.sql("create table if not exists "+tb+"(id int, features struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)" +
            " row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
            " stored as"+
              " inputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
              " outputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")
      
          var seq = new scala.collection.mutable.ArrayBuffer[UDT]()
          for (x <- 1 to 2) {
            seq += (new UDT(x, org.apache.spark.ml.linalg.Vectors.dense(0.2, 0.21, 0.44)))
          }
      
          val rowRDD = (spark.sparkContext.makeRDD[UDT](seq)).map { x => Row.fromSeq(Seq(x.id,x.features)) }
          val schema = StructType(Array(StructField("id", LongType,false),StructField("features", SQLDataTypes.VectorType,false)))
          val df = spark.createDataFrame(rowRDD, schema)
           
          //insert into hive table
          df.write.insertInto(tb)
        }
      }
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            yzheng616 yzheng616
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: