Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-4052

Use scala.collection.Map for pattern matching instead of using Predef.Map (it is scala.collection.immutable.Map)

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 1.1.0
    • 1.2.0
    • SQL
    • None

    Description

      Seems ScalaReflection and InsertIntoHiveTable only take scala.collection.immutable.Map as the value type of MapType. Here are test cases showing errors.

      val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
      import sqlContext.createSchemaRDD
      val rdd = sc.parallelize(("key", "value") :: Nil)
      
      // Test1: This one fails.
      case class Test1(m: scala.collection.Map[String, String])
      val rddOfTest1 = rdd.map { case (k, v) => Test1(Map(k->v)) }
      rddOfTest1.registerTempTable("t1")
      /* Stack trace
      scala.MatchError: scala.collection.Map[String,String] (of class scala.reflect.internal.Types$TypeRef$$anon$5)
      	at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:53)
      	at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:64)
      	at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:62)
      ...
      */
      
      // Test2: This one is fine.
      case class Test2(m: scala.collection.immutable.Map[String, String])
      val rddOfTest2 = rdd.map { case (k, v) => Test2(Map(k->v)) }
      rddOfTest2.registerTempTable("t2")
      sqlContext.sql("SELECT m FROM t2").collect
      sqlContext.sql("SELECT m['key'] FROM t2").collect
      
      // Test3: This one fails.
      val schema = StructType(StructField("m", MapType(StringType, StringType), true) :: Nil)
      val rowRDD = rdd.map { case (k, v) =>  Row(scala.collection.mutable.HashMap(k->v)) }
      val schemaRDD = sqlContext.applySchema(rowRDD, schema)
      schemaRDD.registerTempTable("t3")
      sqlContext.sql("SELECT m FROM t3").collect
      sqlContext.sql("SELECT m['key'] FROM t3").collect
      sqlContext.sql("CREATE TABLE testHiveTable1(m MAP <STRING, STRING>)")
      sqlContext.sql("INSERT OVERWRITE TABLE testHiveTable1 SELECT m FROM t3")
      /* Stack trace
      14/10/22 19:30:56 INFO DAGScheduler: Job 4 failed: runJob at InsertIntoHiveTable.scala:124, took 1.384579 s
      org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 4.0 failed 4 times, most recent failure: Lost task 1.3 in stage 4.0 (TID 12, yins-mbp): java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be cast to scala.collection.immutable.Map
              org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$wrapperFor$5.apply(InsertIntoHiveTable.scala:96)
              org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$wrapperFor$5.apply(InsertIntoHiveTable.scala:96)
              org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:148)
              org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:145)
      */
      
      // Test4: This one is fine.
      val rowRDD = rdd.map { case (k, v) =>  Row(Map(k->v)) }
      val schemaRDD = sqlContext.applySchema(rowRDD, schema)
      schemaRDD.registerTempTable("t4")
      sqlContext.sql("SELECT m FROM t4").collect
      sqlContext.sql("SELECT m['key'] FROM t4").collect
      sqlContext.sql("CREATE TABLE testHiveTable1(m MAP <STRING, STRING>)")
      sqlContext.sql("INSERT OVERWRITE TABLE testHiveTable1 SELECT m FROM t4")
      

      Attachments

        Activity

          People

            yhuai Yin Huai
            yhuai Yin Huai
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: