Details
Description
Spark Streaming's ReceiverMessage trait should extend Serializable in order to fix a subtle bug that only occurs when running on a real cluster:
If you attempt to send a fire-and-forget message to a remote Akka actor and that message cannot be serialized, then this seems to lead to more-or-less silent failures. As an optimization, Akka skips message serialization for messages sent within the same JVM. As a result, Spark's unit tests will never fail due to non-serializable Akka messages, but these will cause mostly-silent failures when running on a real cluster.
Here's the current code for ReceiverMessage:
/** Messages sent to the NetworkReceiver. */ private[streaming] sealed trait ReceiverMessage private[streaming] object StopReceiver extends ReceiverMessage
Since ReceiverMessage does not extend Serializable and StopReceiver is a regular object, not a case object, StopReceiver will throw serialization errors. As a result, graceful receiver shutdown is broken on real clusters but works in local and local-cluster modes. If you want to reproduce this, try running the word count example from the Streaming Programming Guide in the Spark shell:
import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ val ssc = new StreamingContext(sc, Seconds(10)) // Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream("localhost", 9999) // Split each line into words val words = lines.flatMap(_.split(" ")) import org.apache.spark.streaming.StreamingContext._ // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print() ssc.start() Thread.sleep(10000) ssc.stop(true, true)
This will work correctly in local mode but fail when running against a real cluster (try sbin/start-all.sh to test this locally).
Attachments
Issue Links
- relates to
-
SPARK-2892 Socket Receiver does not stop when streaming context is stopped
- Resolved
- links to