Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-17424

Dataset job fails from unsound substitution in ScalaReflect

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.6.1, 2.0.0
    • Fix Version/s: 2.0.3, 2.1.2, 2.2.0
    • Component/s: Spark Core
    • Labels:
      None

      Description

      I have a job that uses datasets in 1.6.1 and is failing with this error:

      16/09/02 17:02:56 ERROR Driver ApplicationMaster: User class threw exception: java.lang.AssertionError: assertion failed: Unsound substitution from List(type T, type U) to List()
      java.lang.AssertionError: assertion failed: Unsound substitution from List(type T, type U) to List()
          at scala.reflect.internal.Types$SubstMap.<init>(Types.scala:4644)
          at scala.reflect.internal.Types$SubstTypeMap.<init>(Types.scala:4761)
          at scala.reflect.internal.Types$Type.subst(Types.scala:796)
          at scala.reflect.internal.Types$TypeApiImpl.substituteTypes(Types.scala:321)
          at scala.reflect.internal.Types$TypeApiImpl.substituteTypes(Types.scala:298)
          at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$getConstructorParameters$1.apply(ScalaReflection.scala:769)
          at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$getConstructorParameters$1.apply(ScalaReflection.scala:768)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
          at scala.collection.immutable.List.foreach(List.scala:318)
          at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
          at scala.collection.AbstractTraversable.map(Traversable.scala:105)
          at org.apache.spark.sql.catalyst.ScalaReflection$class.getConstructorParameters(ScalaReflection.scala:768)
          at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructorParameters(ScalaReflection.scala:30)
          at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructorParameters(ScalaReflection.scala:610)
          at org.apache.spark.sql.catalyst.trees.TreeNode.org$apache$spark$sql$catalyst$trees$TreeNode$$argNames$lzycompute(TreeNode.scala:418)
          at org.apache.spark.sql.catalyst.trees.TreeNode.org$apache$spark$sql$catalyst$trees$TreeNode$$argNames(TreeNode.scala:418)
          at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$argsMap$1.apply(TreeNode.scala:415)
          at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$argsMap$1.apply(TreeNode.scala:414)
          at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
          at scala.collection.Iterator$class.foreach(Iterator.scala:727)
          at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
          at scala.collection.TraversableOnce$class.toMap(TraversableOnce.scala:279)
          at scala.collection.AbstractIterator.toMap(Iterator.scala:1157)
          at org.apache.spark.sql.catalyst.trees.TreeNode.argsMap(TreeNode.scala:416)
          at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:46)
          at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$2.apply(SparkPlanInfo.scala:44)
          at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$2.apply(SparkPlanInfo.scala:44)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
          at scala.collection.immutable.List.foreach(List.scala:318)
          at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
          at scala.collection.AbstractTraversable.map(Traversable.scala:105)
          at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:44)
          at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$2.apply(SparkPlanInfo.scala:44)
          at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$2.apply(SparkPlanInfo.scala:44)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
          at scala.collection.immutable.List.foreach(List.scala:318)
          at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
          at scala.collection.AbstractTraversable.map(Traversable.scala:105)
          at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:44)
          at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$2.apply(SparkPlanInfo.scala:44)
          at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$2.apply(SparkPlanInfo.scala:44)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
          at scala.collection.immutable.List.foreach(List.scala:318)
          at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
          at scala.collection.AbstractTraversable.map(Traversable.scala:105)
          at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:44)
          at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:51)
          at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:56)
          at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
          at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:193)
          at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:166)
          at com.netflix.jobs.main(Processing.scala)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:557)
      

      I think this is the same bug as SPARK-13067. It looks like that issue wasn't fixed, there was just a work-around added to get the test passing.

      The problem is that the reflection code is trying to substitute concrete types for type parameters of MapPartitions[T, U], but the concrete types aren't known. So Spark ends up calling substituteTypes to substitute T and U with Nil (which gets shown as List()).

      An easy fix that works for me is this:

          // if there are type variables to fill in, do the substitution (SomeClass[T] -> SomeClass[Int])
          if (actualTypeArgs.nonEmpty) {
            params.map { p =>
              p.name.toString -> p.typeSignature.substituteTypes(formalTypeArgs, actualTypeArgs)
            }
          } else {
            params.map { p =>
              p.name.toString -> p.typeSignature
            }
          }
      

      Does this sound like a reasonable solution?

      Edit: I think this affects 2.0.0 because the call to substituteTypes is unchanged

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                rdblue Ryan Blue
                Reporter:
                rdblue Ryan Blue
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: