Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-5035

Streaming ReceiverMessage trait should extend Serializable

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 1.0.2, 1.1.2, 1.2.1, 1.3.0
    • Fix Version/s: 1.0.3, 1.1.2, 1.2.1, 1.3.0
    • Component/s: DStreams
    • Labels:
      None

      Description

      Spark Streaming's ReceiverMessage trait should extend Serializable in order to fix a subtle bug that only occurs when running on a real cluster:

      If you attempt to send a fire-and-forget message to a remote Akka actor and that message cannot be serialized, then this seems to lead to more-or-less silent failures. As an optimization, Akka skips message serialization for messages sent within the same JVM. As a result, Spark's unit tests will never fail due to non-serializable Akka messages, but these will cause mostly-silent failures when running on a real cluster.

      Here's the current code for ReceiverMessage:

      /** Messages sent to the NetworkReceiver. */
      private[streaming] sealed trait ReceiverMessage
      private[streaming] object StopReceiver extends ReceiverMessage
      

      Since ReceiverMessage does not extend Serializable and StopReceiver is a regular object, not a case object, StopReceiver will throw serialization errors. As a result, graceful receiver shutdown is broken on real clusters but works in local and local-cluster modes. If you want to reproduce this, try running the word count example from the Streaming Programming Guide in the Spark shell:

      import org.apache.spark._
      import org.apache.spark.streaming._
      import org.apache.spark.streaming.StreamingContext._
      val ssc = new StreamingContext(sc, Seconds(10))
      // Create a DStream that will connect to hostname:port, like localhost:9999
      val lines = ssc.socketTextStream("localhost", 9999)
      // Split each line into words
      val words = lines.flatMap(_.split(" "))
      import org.apache.spark.streaming.StreamingContext._
      // Count each word in each batch
      val pairs = words.map(word => (word, 1))
      val wordCounts = pairs.reduceByKey(_ + _)
      
      // Print the first ten elements of each RDD generated in this DStream to the console
      wordCounts.print()
      ssc.start()
      Thread.sleep(10000)
      ssc.stop(true, true)
      

      This will work correctly in local mode but fail when running against a real cluster (try sbin/start-all.sh to test this locally).

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                joshrosen Josh Rosen
                Reporter:
                joshrosen Josh Rosen
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: