Uploaded image for project: 'TOREE'
  1. TOREE
  2. TOREE-523

Problem reading Kudu tables using Spark (Jupyer Notebook with Apache Toree - Scala Kernel )

Attach filesAttach ScreenshotAdd voteVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Blocker
    • Resolution: Unresolved
    • Affects Version/s: 0.3.0, 0.4.0
    • Fix Version/s: None
    • Component/s: Build, Kernel
    • Environment:
      Jupyter Notebook - Spark version : 2.2.0 Scala version : 2.11 Apache Toree version : 0.3
    • Flags:
      Important

      Description

      I am trying to read a Kudu table using Apache Spark within a Jupyter Notebook running with an Apache Toree - Scala Kernel.

      Spark version : 2.2.0 Scala version : 2.11 Apache Toree version : 0.3

      This is the code I am using to read the Kudu table

      val kuduMasterAddresses = KUDU_MASTER_ADDRESSES_HERE
      val kuduMasters: String = Seq(kuduMasterAddresses).mkString(",")
      
      val kuduContext = new KuduContext(kuduMasters, spark.sparkContext)
      
      val table = TABLE_NAME_HERE
      
      def readKudu(table: String) = {
          val tableKuduOptions: Map[String, String] = Map(
          "kudu.table"  -> table,
          "kudu.master" -> kuduMasters
          )
          spark.sqlContext.read.options(tableKuduOptions).kudu
      }
      
      val kuduTableDF = readKudu(table)
      

       

      Using kuduContext.tableExists(table) returns true. Using kuduTableDF.columns gives an array of String with the right column names.

      The problem occurs when I try to apply an action like count, show etc ... the current exception is thrown:

      Name: org.apache.spark.SparkException Message: Job aborted due to stage failure: Exception while getting task result: java.io.IOException: java.lang.ClassNotFoundException: org.apache.kudu.spark.kudu.KuduContext$TimestampAccumulator StackTrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1567) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1555) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1554) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1554) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:803) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:803) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:803) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1782) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1737) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1726) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
      at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:619) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2031) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2052) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2071) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2865) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2154) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2154) at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2846) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2845) at org.apache.spark.sql.Dataset.head(Dataset.scala:2154) at org.apache.spark.sql.Dataset.take(Dataset.scala:2367) at org.apache.spark.sql.Dataset.showString(Dataset.scala:241) at org.apache.spark.sql.Dataset.show(Dataset.scala:641) at org.apache.spark.sql.Dataset.show(Dataset.scala:600) at org.apache.spark.sql.Dataset.show(Dataset.scala:609)

      I have already used the AddDeps magic in the Apache Toree notebook as follows:

      %AddDeps org.apache.kudu kudu-spark2_2.11 1.6.0 --transitive --trace
      %AddDeps org.apache.kudu kudu-client 1.6.0 --transitive --trace
      

      I have no problems doing the following import :

      import org.apache.kudu.spark.kudu._

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              cnemri Chouaieb Nemri

              Dates

              • Created:
                Updated:

                Issue deployment