Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-4830

Spark Streaming Java Application : java.lang.ClassNotFoundException

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 1.1.0
    • None
    • DStreams
    • None

    Description

      Application Overview:

      We have Spark Streaming application that consumes messages from RabbitMQ and processes them. When generating hundreds of events on RabbitMQ and running our application on spark standalone cluster we see some java.lang.ClassNotFoundException exceptions in the log.

      Our domain model is simple POJO that represents RabbitMQ events we want to consume and contains some custom properties we are interested in:

      com.xxx.Event.java
              public class Event implements java.io.Externalizable {
          
                  // custom properties
      
                  // custom implementation of writeExternal(), readExternal() methods
              }    
      

      We have implemented custom Spark Streaming receiver that just receives messages from RabbitMQ queue by means of custom consumer (See "Receiving messages by subscription" at https://www.rabbitmq.com/api-guide.html), converts them to our custom domain event objects (com.xxx.Event) and stores them on spark memory:

      RabbitMQReceiver.java
                  byte[] body = // data received from Rabbit using custom consumer
                  Event event = new Event(body);
                  store(event)  // store into Spark              
      

      The main program is simple, it just set up spark streaming context:

      Application.java
                  SparkConf sparkConf = new SparkConf().setAppName(APPLICATION_NAME);
                  sparkConf.setJars(SparkContext.jarOfClass(Application.class).toList());  
      
                  JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(BATCH_DURATION_MS));
      

      Initialize input streams:

      Application.java
                  
                  ReceiverInputDStream<Event> stream = // create input stream from RabbitMQ
                  JavaReceiverInputDStream<Event> events = new JavaReceiverInputDStream<Event>(stream, classTag(Event.class));
      

      Process events:

      Application.java
                  
                  events.foreachRDD(
                          rdd -> {
      
                              rdd.foreachPartition(
      
                                      partition -> {
       
                                              // process partition
                                      }
                              }
                          })
                          
                  ssc.start();
                  ssc.awaitTermination();
      

      Application submission:

      Application is packaged as a single fat jar file using maven shade plugin (http://maven.apache.org/plugins/maven-shade-plugin/). It is compiled with spark version 1.1.0
      We run our application on spark version 1.1.0 standalone cluster that consists of driver host, master host and two worker hosts. We submit application from driver host.

      On one of the workers we see java.lang.ClassNotFoundException exceptions:

      app.log

      14/11/27 10:27:10 ERROR BlockManagerWorker: Exception handling buffer message
      java.lang.ClassNotFoundException: com.xxx.Event
      at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
      at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
      at java.security.AccessController.doPrivileged(Native Method)
      at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
      at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
      at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
      at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
      at java.lang.Class.forName0(Native Method)
      at java.lang.Class.forName(Class.java:344)
      at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
      at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
      at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
      at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
      at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
      at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
      at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
      at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
      at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
      at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:235)
      at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:126)
      at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:104)
      at org.apache.spark.storage.MemoryStore.putBytes(MemoryStore.scala:76)
      at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:748)
      at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:639)
      at org.apache.spark.storage.BlockManagerWorker.putBlock(BlockManagerWorker.scala:92)
      at org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:73)
      at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
      at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
      at scala.collection.Iterator$class.foreach(Iterator.scala:727)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
      at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      at org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)
      at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
      at org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)
      at org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:48)
      at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
      at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
      at org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:682)
      at org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:520)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)

      We see that worker has downloaded application.jar and added it to class loader:

      app.log

      14/11/27 10:26:59 INFO Executor: Fetching http://xx.xx.xx.xx:38287/jars/application.jar with timestamp 1417084011213
      14/11/27 10:26:59 INFO Utils: Fetching http://xx.xx.xx.xx:38287/jars/application.jar to /tmp/fetchFileTemp8223721356974787443.tmp
      14/11/27 10:27:00 INFO BlockManager: Removing RDD 4
      14/11/27 10:27:01 INFO Executor: Adding file:/path/to/spark/work/app-20141127102651-0001/1/./application.jar to class loader

      Also we manually checked jar file and it contains com.xxx.Event class

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              mtelizhyn Mykhaylo Telizhyn
              Votes:
              1 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: