Uploaded image for project: 'Zeppelin'
  1. Zeppelin
  2. ZEPPELIN-332

CNFE when running SQL query against Cassandra temp table

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 0.6.0
    • None
    • Interpreters
    • None
    • Ubuntu 14.04, Spark 1.4.1, Zeppelin 0.6.0, Hadoop 2.6.0, Cassandra 2.1.8, Cassandra-Spark Connector 1.4.0

    Description

      When running a SQL statement against a Cassandra temp table where no records have previously been realized using the SQLContext, a ClassNotFoundException is thrown.

      For example, we run the following code to register the table:

      import com.datastax.spark.connector._
      case class Stats(queue: String, time: Long, host: String, successes: Long)
      val stats2 = sc.cassandraTable[Stats]("prod_analytics_events", "stats").select("queue", "time", "host", "successes").where("time >= 1442707200000 and time < 1442793600000")
      stats2.toDF.registerTempTable("stats2")
      

      If we immediately try to run a %sql query, such as:

      %sql
      select * from stats2 limit 10
      

      we will get the following stack trace:

      java.lang.ClassNotFoundException: $line551.$read
      	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
      	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
      	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
      	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
      	at java.lang.Class.forName0(Native Method)
      	at java.lang.Class.forName(Class.java:348)
      	at scala.reflect.runtime.JavaMirrors$JavaMirror.javaClass(JavaMirrors.scala:500)
      	at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1167)
      	at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1159)
      	at scala.reflect.runtime.TwoWayCache.toJava(TwoWayCache.scala:49)
      	at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1159)
      	at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1174)
      	at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1159)
      	at scala.reflect.runtime.TwoWayCache.toJava(TwoWayCache.scala:49)
      	at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1159)
      	at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1174)
      	at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1159)
      	at scala.reflect.runtime.TwoWayCache.toJava(TwoWayCache.scala:49)
      	at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1159)
      	at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1174)
      	at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1159)
      	at scala.reflect.runtime.TwoWayCache.toJava(TwoWayCache.scala:49)
      	at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1159)
      	at scala.reflect.runtime.JavaMirrors$JavaMirror.typeToJavaClass(JavaMirrors.scala:1255)
      	at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:202)
      	at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:65)
      	at com.datastax.spark.connector.rdd.reader.AnyObjectFactory.<init>(AnyObjectFactory.scala:30)
      	at com.datastax.spark.connector.rdd.reader.GettableDataToMappedTypeConverter.<init>(GettableDataToMappedTypeConverter.scala:45)
      	at com.datastax.spark.connector.rdd.reader.ClassBasedRowReader.<init>(ClassBasedRowReader.scala:22)
      	at com.datastax.spark.connector.rdd.reader.ClassBasedRowReaderFactory.rowReader(ClassBasedRowReader.scala:47)
      	at com.datastax.spark.connector.rdd.reader.ClassBasedRowReaderFactory.rowReader(ClassBasedRowReader.scala:42)
      	at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.rowReader(CassandraTableRowReaderProvider.scala:48)
      	at com.datastax.spark.connector.rdd.CassandraTableScanRDD.rowReader$lzycompute(CassandraTableScanRDD.scala:59)
      	at com.datastax.spark.connector.rdd.CassandraTableScanRDD.rowReader(CassandraTableScanRDD.scala:59)
      	at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.verify(CassandraTableRowReaderProvider.scala:151)
      	at com.datastax.spark.connector.rdd.CassandraTableScanRDD.verify(CassandraTableScanRDD.scala:59)
      	at com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:143)
      	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
      	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
      	at scala.Option.getOrElse(Option.scala:120)
      	at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
      	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
      	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
      	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
      	at scala.Option.getOrElse(Option.scala:120)
      	at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
      	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
      	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
      	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
      	at scala.Option.getOrElse(Option.scala:120)
      	at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
      	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:121)
      	at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:125)
      	at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1269)
      	at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1203)
      	at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1262)
      	at sun.reflect.GeneratedMethodAccessor106.invoke(Unknown Source)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:497)
      	at org.apache.zeppelin.spark.ZeppelinContext.showDF(ZeppelinContext.java:300)
      	at org.apache.zeppelin.spark.SparkSqlInterpreter.interpret(SparkSqlInterpreter.java:142)
      	at org.apache.zeppelin.interpreter.ClassloaderInterpreter.interpret(ClassloaderInterpreter.java:57)
      	at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93)
      	at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:276)
      	at org.apache.zeppelin.scheduler.Job.run(Job.java:170)
      	at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:118)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      

      However, it is possible to run a query directly using the SQLContext without issue:

      sqlContext.sql("select * from stats2 limit 10").collect
      

      returns the expected results:

      stats2: com.datastax.spark.connector.rdd.CassandraTableScanRDD[Stats] = CassandraTableScanRDD[637] at RDD at CassandraRDD.scala:15
      res155: Array[org.apache.spark.sql.Row] = Array([events_ANDROID_LocationUpdate,1442707206499,sink4x056,1821024], [events_ANDROID_LocationUpdate,1442707207062,sink4x019,1480357], [events_ANDROID_LocationUpdate,1442707266854,sink4x056,1821394], [events_ANDROID_LocationUpdate,1442707268281,sink4x019,1480675], [events_ANDROID_LocationUpdate,1442707329595,sink4x056,1821771], [events_ANDROID_LocationUpdate,1442707332608,sink4x019,1480979], [events_ANDROID_LocationUpdate,1442707389853,sink4x056,1822088], [events_ANDROID_LocationUpdate,1442707393107,sink4x019,1481257], [events_ANDROID_LocationUpdate,1442707451639,sink4x056,1822413], [events_ANDROID_LocationUpdate,1442707457504,sink4x019,1481591])
      

      Additionally, if we first materialize some rows using the SQLContext (such as in the above example), further queries using %sql work fine.

      Relevant config from zeppelin-env.sh:

      export ZEPPELIN_JAVA_OPTS="-Dspark.jars=/opt/spark/lib/spark-cassandra-connector-assembly.jar:/opt/hadoop/share/hadoop/tools/lib/*:/opt/jars/*:/opt/spark/lib/pyspark-cassandra.jar -Dspark.cassandra.connection.host=x.x.x.x -Dspark.cassandra.read.timeout_ms=300000 -Dspark.cassandra.auth.username=zeppelin -Dspark.cassandra.auth.password=[password]"
      

      It's also worth noting that this was NOT a problem under Spark 1.3.1.

      Attachments

        1. zepp_cass_bug_sqlctx.png
          118 kB
          Robert Strickland
        2. zepp_cass_bug_workaround.png
          168 kB
          Robert Strickland
        3. zepp_cass_bug.png
          248 kB
          Robert Strickland
        4. work-around.png
          83 kB
          DuyHai Doan

        Activity

          People

            Unassigned Unassigned
            rstrickland Robert Strickland
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: