Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-5035

Streaming ReceiverMessage trait should extend Serializable



    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 1.0.2, 1.1.2, 1.2.1, 1.3.0
    • 1.0.3, 1.1.2, 1.2.1, 1.3.0
    • DStreams
    • None


      Spark Streaming's ReceiverMessage trait should extend Serializable in order to fix a subtle bug that only occurs when running on a real cluster:

      If you attempt to send a fire-and-forget message to a remote Akka actor and that message cannot be serialized, then this seems to lead to more-or-less silent failures. As an optimization, Akka skips message serialization for messages sent within the same JVM. As a result, Spark's unit tests will never fail due to non-serializable Akka messages, but these will cause mostly-silent failures when running on a real cluster.

      Here's the current code for ReceiverMessage:

      /** Messages sent to the NetworkReceiver. */
      private[streaming] sealed trait ReceiverMessage
      private[streaming] object StopReceiver extends ReceiverMessage

      Since ReceiverMessage does not extend Serializable and StopReceiver is a regular object, not a case object, StopReceiver will throw serialization errors. As a result, graceful receiver shutdown is broken on real clusters but works in local and local-cluster modes. If you want to reproduce this, try running the word count example from the Streaming Programming Guide in the Spark shell:

      import org.apache.spark._
      import org.apache.spark.streaming._
      import org.apache.spark.streaming.StreamingContext._
      val ssc = new StreamingContext(sc, Seconds(10))
      // Create a DStream that will connect to hostname:port, like localhost:9999
      val lines = ssc.socketTextStream("localhost", 9999)
      // Split each line into words
      val words = lines.flatMap(_.split(" "))
      import org.apache.spark.streaming.StreamingContext._
      // Count each word in each batch
      val pairs = words.map(word => (word, 1))
      val wordCounts = pairs.reduceByKey(_ + _)
      // Print the first ten elements of each RDD generated in this DStream to the console
      ssc.stop(true, true)

      This will work correctly in local mode but fail when running against a real cluster (try sbin/start-all.sh to test this locally).


        Issue Links



              joshrosen Josh Rosen
              joshrosen Josh Rosen
              0 Vote for this issue
              3 Start watching this issue