Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-15870

DataFrame can't execute after uncacheTable.

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 2.0.0
    • SQL
    • None

    Description

      If a cached DataFrame executed more than once and then do uncacheTable like the following:

          val selectStar = sql("SELECT * FROM testData WHERE key = 1")
          selectStar.createOrReplaceTempView("selectStar")
      
          spark.catalog.cacheTable("selectStar")
          checkAnswer(
            selectStar,
            Seq(Row(1, "1")))
      
          spark.catalog.uncacheTable("selectStar")
          checkAnswer(
            selectStar,
            Seq(Row(1, "1")))
      

      , the uncached DataFrame can't execute because of Task not serializable exception like:

      org.apache.spark.SparkException: Task not serializable
      	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
      	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
      	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
      	at org.apache.spark.SparkContext.clean(SparkContext.scala:2038)
      	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
      	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1912)
      	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:884)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
      	at org.apache.spark.rdd.RDD.withScope(RDD.scala:357)
      	at org.apache.spark.rdd.RDD.collect(RDD.scala:883)
      	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290)
      	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2163)
      	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
      	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2489)
      	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2162)
      	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2167)
      	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2167)
      	at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2502)
      	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2167)
      	at org.apache.spark.sql.Dataset.collect(Dataset.scala:2143)
      ...
      Caused by: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor
      	at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:153)
      	at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1118)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
      	at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
      	at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
      	at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
      	at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
      	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
      	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
      	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
      	... 66 more
      

      Notice that DataFrame uncached with DataFrame.unpersist() works, but with spark.catalog.uncacheTable doesn't work.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            ueshin Takuya Ueshin
            ueshin Takuya Ueshin
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment