Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
0.6.0
-
None
-
None
-
Ubuntu 14.04, Spark 1.4.1, Zeppelin 0.6.0, Hadoop 2.6.0, Cassandra 2.1.8, Cassandra-Spark Connector 1.4.0
Description
When running a SQL statement against a Cassandra temp table where no records have previously been realized using the SQLContext, a ClassNotFoundException is thrown.
For example, we run the following code to register the table:
import com.datastax.spark.connector._ case class Stats(queue: String, time: Long, host: String, successes: Long) val stats2 = sc.cassandraTable[Stats]("prod_analytics_events", "stats").select("queue", "time", "host", "successes").where("time >= 1442707200000 and time < 1442793600000") stats2.toDF.registerTempTable("stats2")
If we immediately try to run a %sql query, such as:
%sql select * from stats2 limit 10
we will get the following stack trace:
java.lang.ClassNotFoundException: $line551.$read at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at scala.reflect.runtime.JavaMirrors$JavaMirror.javaClass(JavaMirrors.scala:500) at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1167) at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1159) at scala.reflect.runtime.TwoWayCache.toJava(TwoWayCache.scala:49) at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1159) at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1174) at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1159) at scala.reflect.runtime.TwoWayCache.toJava(TwoWayCache.scala:49) at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1159) at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1174) at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1159) at scala.reflect.runtime.TwoWayCache.toJava(TwoWayCache.scala:49) at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1159) at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1174) at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1159) at scala.reflect.runtime.TwoWayCache.toJava(TwoWayCache.scala:49) at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1159) at scala.reflect.runtime.JavaMirrors$JavaMirror.typeToJavaClass(JavaMirrors.scala:1255) at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:202) at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:65) at com.datastax.spark.connector.rdd.reader.AnyObjectFactory.<init>(AnyObjectFactory.scala:30) at com.datastax.spark.connector.rdd.reader.GettableDataToMappedTypeConverter.<init>(GettableDataToMappedTypeConverter.scala:45) at com.datastax.spark.connector.rdd.reader.ClassBasedRowReader.<init>(ClassBasedRowReader.scala:22) at com.datastax.spark.connector.rdd.reader.ClassBasedRowReaderFactory.rowReader(ClassBasedRowReader.scala:47) at com.datastax.spark.connector.rdd.reader.ClassBasedRowReaderFactory.rowReader(ClassBasedRowReader.scala:42) at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.rowReader(CassandraTableRowReaderProvider.scala:48) at com.datastax.spark.connector.rdd.CassandraTableScanRDD.rowReader$lzycompute(CassandraTableScanRDD.scala:59) at com.datastax.spark.connector.rdd.CassandraTableScanRDD.rowReader(CassandraTableScanRDD.scala:59) at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.verify(CassandraTableRowReaderProvider.scala:151) at com.datastax.spark.connector.rdd.CassandraTableScanRDD.verify(CassandraTableScanRDD.scala:59) at com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:143) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:121) at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:125) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1269) at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1203) at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1262) at sun.reflect.GeneratedMethodAccessor106.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.zeppelin.spark.ZeppelinContext.showDF(ZeppelinContext.java:300) at org.apache.zeppelin.spark.SparkSqlInterpreter.interpret(SparkSqlInterpreter.java:142) at org.apache.zeppelin.interpreter.ClassloaderInterpreter.interpret(ClassloaderInterpreter.java:57) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:276) at org.apache.zeppelin.scheduler.Job.run(Job.java:170) at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:118) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
However, it is possible to run a query directly using the SQLContext without issue:
sqlContext.sql("select * from stats2 limit 10").collect
returns the expected results:
stats2: com.datastax.spark.connector.rdd.CassandraTableScanRDD[Stats] = CassandraTableScanRDD[637] at RDD at CassandraRDD.scala:15 res155: Array[org.apache.spark.sql.Row] = Array([events_ANDROID_LocationUpdate,1442707206499,sink4x056,1821024], [events_ANDROID_LocationUpdate,1442707207062,sink4x019,1480357], [events_ANDROID_LocationUpdate,1442707266854,sink4x056,1821394], [events_ANDROID_LocationUpdate,1442707268281,sink4x019,1480675], [events_ANDROID_LocationUpdate,1442707329595,sink4x056,1821771], [events_ANDROID_LocationUpdate,1442707332608,sink4x019,1480979], [events_ANDROID_LocationUpdate,1442707389853,sink4x056,1822088], [events_ANDROID_LocationUpdate,1442707393107,sink4x019,1481257], [events_ANDROID_LocationUpdate,1442707451639,sink4x056,1822413], [events_ANDROID_LocationUpdate,1442707457504,sink4x019,1481591])
Additionally, if we first materialize some rows using the SQLContext (such as in the above example), further queries using %sql work fine.
Relevant config from zeppelin-env.sh:
export ZEPPELIN_JAVA_OPTS="-Dspark.jars=/opt/spark/lib/spark-cassandra-connector-assembly.jar:/opt/hadoop/share/hadoop/tools/lib/*:/opt/jars/*:/opt/spark/lib/pyspark-cassandra.jar -Dspark.cassandra.connection.host=x.x.x.x -Dspark.cassandra.read.timeout_ms=300000 -Dspark.cassandra.auth.username=zeppelin -Dspark.cassandra.auth.password=[password]"
It's also worth noting that this was NOT a problem under Spark 1.3.1.