Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-3926

result of JavaRDD collectAsMap() is not serializable

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.0.2, 1.1.0, 1.1.1, 1.2.0
    • 1.2.1, 1.3.0
    • Java API
    • None
    • CentOS / Spark 1.1 / Hadoop Hortonworks 2.4.0.2.1.2.0-402

    Description

      Using the Java API, I want to collect the result of a RDD<String, String> as a HashMap using collectAsMap function:
      Map<String, String> map = myJavaRDD.collectAsMap();
      This works fine, but when passing this map to another function, such as...
      myOtherJavaRDD.mapToPair(new CustomFunction(map))
      ...this leads to the following error:

      Exception in thread "main" org.apache.spark.SparkException: Task not serializable

      at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)

      at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)

      at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)

      at org.apache.spark.rdd.RDD.map(RDD.scala:270)

      at org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:99)

      at org.apache.spark.api.java.JavaPairRDD.mapToPair(JavaPairRDD.scala:44)

      ../.. MY CLASS ../..

      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

      at java.lang.reflect.Method.invoke(Method.java:606)

      at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)

      at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)

      at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

      Caused by: java.io.NotSerializableException: scala.collection.convert.Wrappers$MapWrapper

      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)

      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)

      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)

      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)

      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)

      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)

      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)

      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

      at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)

      at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)

      at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)

      at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)

      This seems to be due to WrapAsJava.scala being non serializable
      ../..
      implicit def mapAsJavaMap[A, B](m: Map[A, B]): ju.Map[A, B] = m match

      { //case JConcurrentMapWrapper(wrapped) => wrapped case JMapWrapper(wrapped) => wrapped.asInstanceOf[ju.Map[A, B]] case _ => new MapWrapper(m) }

      ../..

      The workaround is to manually wrapper this map into another one (serialized)
      Map<String, String> map = myJavaRDD.collectAsMap();
      Map<String, String> tmp = new HashMap<String, String>(map);
      myOtherJavaRDD.mapToPair(new CustomFunction(tmp))

      Attachments

        Activity

          People

            srowen Sean R. Owen
            aamend Antoine Amend
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: