Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-15870

DataFrame can't execute after uncacheTable.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 2.0.0
    • SQL
    • None

    Description

      If a cached DataFrame executed more than once and then do uncacheTable like the following:

          val selectStar = sql("SELECT * FROM testData WHERE key = 1")
          selectStar.createOrReplaceTempView("selectStar")
      
          spark.catalog.cacheTable("selectStar")
          checkAnswer(
            selectStar,
            Seq(Row(1, "1")))
      
          spark.catalog.uncacheTable("selectStar")
          checkAnswer(
            selectStar,
            Seq(Row(1, "1")))
      

      , the uncached DataFrame can't execute because of Task not serializable exception like:

      org.apache.spark.SparkException: Task not serializable
      	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
      	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
      	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
      	at org.apache.spark.SparkContext.clean(SparkContext.scala:2038)
      	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
      	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1912)
      	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:884)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
      	at org.apache.spark.rdd.RDD.withScope(RDD.scala:357)
      	at org.apache.spark.rdd.RDD.collect(RDD.scala:883)
      	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290)
      	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2163)
      	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
      	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2489)
      	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2162)
      	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2167)
      	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2167)
      	at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2502)
      	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2167)
      	at org.apache.spark.sql.Dataset.collect(Dataset.scala:2143)
      ...
      Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
      	at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:153)
      	at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1118)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
      	at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
      	at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
      	at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
      	at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
      	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
      	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
      	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
      	... 66 more
      

      Notice that DataFrame uncached with DataFrame.unpersist() works, but with spark.catalog.uncacheTable doesn't work.

      Attachments

        Activity

          People

            ueshin Takuya Ueshin
            ueshin Takuya Ueshin
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: