Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-17424

Dataset job fails from unsound substitution in ScalaReflect

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.6.1, 2.0.0
    • 2.0.3, 2.1.2, 2.2.0
    • Spark Core
    • None

    Description

      I have a job that uses datasets in 1.6.1 and is failing with this error:

      16/09/02 17:02:56 ERROR Driver ApplicationMaster: User class threw exception: java.lang.AssertionError: assertion failed: Unsound substitution from List(type T, type U) to List()
      java.lang.AssertionError: assertion failed: Unsound substitution from List(type T, type U) to List()
          at scala.reflect.internal.Types$SubstMap.<init>(Types.scala:4644)
          at scala.reflect.internal.Types$SubstTypeMap.<init>(Types.scala:4761)
          at scala.reflect.internal.Types$Type.subst(Types.scala:796)
          at scala.reflect.internal.Types$TypeApiImpl.substituteTypes(Types.scala:321)
          at scala.reflect.internal.Types$TypeApiImpl.substituteTypes(Types.scala:298)
          at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$getConstructorParameters$1.apply(ScalaReflection.scala:769)
          at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$getConstructorParameters$1.apply(ScalaReflection.scala:768)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
          at scala.collection.immutable.List.foreach(List.scala:318)
          at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
          at scala.collection.AbstractTraversable.map(Traversable.scala:105)
          at org.apache.spark.sql.catalyst.ScalaReflection$class.getConstructorParameters(ScalaReflection.scala:768)
          at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructorParameters(ScalaReflection.scala:30)
          at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructorParameters(ScalaReflection.scala:610)
          at org.apache.spark.sql.catalyst.trees.TreeNode.org$apache$spark$sql$catalyst$trees$TreeNode$$argNames$lzycompute(TreeNode.scala:418)
          at org.apache.spark.sql.catalyst.trees.TreeNode.org$apache$spark$sql$catalyst$trees$TreeNode$$argNames(TreeNode.scala:418)
          at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$argsMap$1.apply(TreeNode.scala:415)
          at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$argsMap$1.apply(TreeNode.scala:414)
          at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
          at scala.collection.Iterator$class.foreach(Iterator.scala:727)
          at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
          at scala.collection.TraversableOnce$class.toMap(TraversableOnce.scala:279)
          at scala.collection.AbstractIterator.toMap(Iterator.scala:1157)
          at org.apache.spark.sql.catalyst.trees.TreeNode.argsMap(TreeNode.scala:416)
          at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:46)
          at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$2.apply(SparkPlanInfo.scala:44)
          at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$2.apply(SparkPlanInfo.scala:44)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
          at scala.collection.immutable.List.foreach(List.scala:318)
          at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
          at scala.collection.AbstractTraversable.map(Traversable.scala:105)
          at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:44)
          at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$2.apply(SparkPlanInfo.scala:44)
          at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$2.apply(SparkPlanInfo.scala:44)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
          at scala.collection.immutable.List.foreach(List.scala:318)
          at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
          at scala.collection.AbstractTraversable.map(Traversable.scala:105)
          at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:44)
          at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$2.apply(SparkPlanInfo.scala:44)
          at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$2.apply(SparkPlanInfo.scala:44)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
          at scala.collection.immutable.List.foreach(List.scala:318)
          at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
          at scala.collection.AbstractTraversable.map(Traversable.scala:105)
          at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:44)
          at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:51)
          at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:56)
          at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
          at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:193)
          at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:166)
          at com.netflix.jobs.main(Processing.scala)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:557)
      

      I think this is the same bug as SPARK-13067. It looks like that issue wasn't fixed, there was just a work-around added to get the test passing.

      The problem is that the reflection code is trying to substitute concrete types for type parameters of MapPartitions[T, U], but the concrete types aren't known. So Spark ends up calling substituteTypes to substitute T and U with Nil (which gets shown as List()).

      An easy fix that works for me is this:

          // if there are type variables to fill in, do the substitution (SomeClass[T] -> SomeClass[Int])
          if (actualTypeArgs.nonEmpty) {
            params.map { p =>
              p.name.toString -> p.typeSignature.substituteTypes(formalTypeArgs, actualTypeArgs)
            }
          } else {
            params.map { p =>
              p.name.toString -> p.typeSignature
            }
          }
      

      Does this sound like a reasonable solution?

      Edit: I think this affects 2.0.0 because the call to substituteTypes is unchanged

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            rdblue Ryan Blue
            rdblue Ryan Blue
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment