Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-12783

Dataset map serialization error

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 1.6.0
    • 1.6.1
    • SQL
    • None

    Description

      When Dataset API is used to map to another case class, an error is thrown.

      case class MyMap(map: Map[String, String])
      case class TestCaseClass(a: String, b: String){
        def toMyMap: MyMap = {
          MyMap(Map(a->b))
        }
      
        def toStr: String = {
          a
        }
      }
      //Main method section below
      import sqlContext.implicits._
      val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), TestCaseClass("2015-05-01", "data2"))).toDF()
      df1.as[TestCaseClass].map(_.toStr).show() //works fine
      df1.as[TestCaseClass].map(_.toMyMap).show() //fails
      

      Error message:

      Caused by: java.io.NotSerializableException: scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1
      Serialization stack:

      • object not serializable (class: scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1, value: package lang)
      • field (class: scala.reflect.internal.Types$ThisType, name: sym, type: class scala.reflect.internal.Symbols$Symbol)
      • object (class scala.reflect.internal.Types$UniqueThisType, java.lang.type)
      • field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: class scala.reflect.internal.Types$Type)
      • object (class scala.reflect.internal.Types$ClassNoArgsTypeRef, String)
      • field (class: scala.reflect.internal.Types$TypeRef, name: normalized, type: class scala.reflect.internal.Types$Type)
      • object (class scala.reflect.internal.Types$AliasNoArgsTypeRef, String)
      • field (class: org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1, type: class scala.reflect.api.Types$TypeApi)
      • object (class org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, <function1>)
      • field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, name: function, type: interface scala.Function1)
      • object (class org.apache.spark.sql.catalyst.expressions.MapObjects, mapobjects(<function1>,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType))
      • field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: targetObject, type: class org.apache.spark.sql.catalyst.expressions.Expression)
      • object (class org.apache.spark.sql.catalyst.expressions.Invoke, invoke(mapobjects(<function1>,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object))
      • writeObject data (class: scala.collection.immutable.List$SerializationProxy)
      • object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@4c7e3aab)
      • writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
      • object (class scala.collection.immutable.$colon$colon, List(invoke(mapobjects(<function1>,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object), invoke(mapobjects(<function1>,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object)))
      • field (class: org.apache.spark.sql.catalyst.expressions.StaticInvoke, name: arguments, type: interface scala.collection.Seq)
      • object (class org.apache.spark.sql.catalyst.expressions.StaticInvoke, staticinvoke(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface scala.collection.Map),toScalaMap,invoke(mapobjects(<function1>,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object),invoke(mapobjects(<function1>,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object),true))
      • writeObject data (class: scala.collection.immutable.List$SerializationProxy)
      • object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@78d9820)
      • writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
      • object (class scala.collection.immutable.$colon$colon, List(staticinvoke(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface scala.collection.Map),toScalaMap,invoke(mapobjects(<function1>,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object),invoke(mapobjects(<function1>,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object),true)))
      • field (class: org.apache.spark.sql.catalyst.expressions.NewInstance, name: arguments, type: interface scala.collection.Seq)
      • object (class org.apache.spark.sql.catalyst.expressions.NewInstance, newinstance(class collector.MyMap,staticinvoke(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface scala.collection.Map),toScalaMap,invoke(mapobjects(<function1>,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object),invoke(mapobjects(<function1>,invoke(upcast('map,MapType(StringType,StringType,true),- field (class: "scala.collection.immutable.Map", name: "map"),- root class: "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class [Ljava.lang.Object),true),false,ObjectType(class collector.MyMap),None))
      • field (class: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, name: fromRowExpression, type: class org.apache.spark.sql.catalyst.expressions.Expression)
      • object (class org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, class[map#ExprId(9,5d198984-4022-43b2-a2a3-ddbb214ba0ef): map<string,string>])
      • field (class: org.apache.spark.sql.execution.MapPartitions, name: uEncoder, type: class org.apache.spark.sql.catalyst.encoders.ExpressionEncoder)
      • object (class org.apache.spark.sql.execution.MapPartitions, !MapPartitions <function1>, class[a[0]: string, b[0]: string], class[map#ExprId(9,5d198984-4022-43b2-a2a3-ddbb214ba0ef): map<string,string>], map#13
        +- LocalTableScan a#2,b#3, [[0,180000000a,2800000005,2d35302d35313032,3130,3161746164],[0,180000000a,2800000005,2d35302d35313032,3130,3261746164]]
        )
      • field (class: org.apache.spark.sql.execution.MapPartitions$$anonfun$8, name: $outer, type: class org.apache.spark.sql.execution.MapPartitions)
      • object (class org.apache.spark.sql.execution.MapPartitions$$anonfun$8, <function1>)
      • field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, name: f$22, type: interface scala.Function1)
      • object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, <function0>)
      • field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21, name: $outer, type: class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1)
      • object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21, <function3>)
      • field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type: interface scala.Function3)
      • object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[1] at show at CollectorSparkTest.scala:50)
      • field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD)
      • object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@4d60c27d)
      • writeObject data (class: scala.collection.immutable.List$SerializationProxy)
      • object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@6c436651)
      • writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
      • object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@4d60c27d))
      • field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq)
      • object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[2] at show at CollectorSparkTest.scala:50)
      • field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
      • object (class scala.Tuple2, (MapPartitionsRDD[2] at show at CollectorSparkTest.scala:50,<function2>))
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
        at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1003)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921)
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1607)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

      Attachments

        1. MyMap.scala
          0.3 kB
          Muthu Jayakumar

        Activity

          People

            cloud_fan Wenchen Fan
            babloo80 Muthu Jayakumar
            Votes:
            1 Vote for this issue
            Watchers:
            9 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: