Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-17975

EMLDAOptimizer fails with ClassCastException on YARN

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.0.1
    • Fix Version/s: 2.0.3, 2.1.1, 2.2.0
    • Component/s: MLlib
    • Labels:
      None
    • Environment:

      Centos 6, CDH 5.7, Java 1.7u80

      Description

      I'm able to reproduce the error consistently with a 2000 record text file with each record having 1-5 terms and checkpointing enabled. It looks like the problem was introduced with the resolution for SPARK-13355.

      The EdgeRDD class seems to be lying about it's type in a way that causes RDD.mapPartitionsWithIndex method to be unusable when it's referenced as an RDD of Edge elements.

      val spark = SparkSession.builder.appName("lda").getOrCreate()
      spark.sparkContext.setCheckpointDir("hdfs:///tmp/checkpoints")
      val data: RDD[(Long, Vector)] = // snip
      data.setName("data").cache()
      val lda = new LDA
      val optimizer = new EMLDAOptimizer
      lda.setOptimizer(optimizer)
        .setK(10)
        .setMaxIterations(400)
        .setAlpha(-1)
        .setBeta(-1)
        .setCheckpointInterval(7)
      val ldaModel = lda.run(data)
      
      16/10/16 23:53:54 WARN TaskSetManager: Lost task 3.0 in stage 348.0 (TID 1225, server2.domain): java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.graphx.Edge
      	at org.apache.spark.graphx.EdgeRDD$$anonfun$1$$anonfun$apply$1.apply(EdgeRDD.scala:107)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
      	at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:107)
      	at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:105)
      	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:820)
      	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:820)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
      	at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332)
      	at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330)
      	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935)
      	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
      	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
      	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
      	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
      	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
      	at org.apache.spark.graphx.EdgeRDD.compute(EdgeRDD.scala:50)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
      	at org.apache.spark.scheduler.Task.run(Task.scala:86)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      	at java.lang.Thread.run(Thread.java:722)
      

        Attachments

        1. docs.txt
          70 kB
          Jeff Stein

          Issue Links

            Activity

              People

              • Assignee:
                tdas Tathagata Das
                Reporter:
                jvstein Jeff Stein
              • Votes:
                1 Vote for this issue
                Watchers:
                9 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: