Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Duplicate
-
1.2.1
-
None
-
None
-
Ubuntu, MacOS.
Tried builds with scala 2.11 and 2.10 (for kafka receiver).
Also tried the pre-built spark-1.2.1-bin-hadoop2.4.tgz
The bug reproduces in all cases on 3 different computers we've tried on.
Description
Below is a complete source code of a very simple test application.
Run it in one terminal window, and "nc -lk 9999" in another.
Once per second, enter a number into the "nc" terminal (so that the window would slide over several non-empty RDDs). 2-3 such iterations is going to be enough for the program to stall completely (no new events are processed) with the following output:
------------------------------------------- Time: 1425922369000 ms ------------------------------------------- ------------------------------------------- Time: 1425922370000 ms ------------------------------------------- (1.0,4.0) ------------------------------------------- Time: 1425922371000 ms ------------------------------------------- (1.0,4.0) [Stage 17:=============================> (1 + 0) / 2]
The "stage..." message is output to stderr.
We've tried both standalone (local master) and clustered setups - reproduces in all cases. We tried raw sockets and Kafka as a receiver - reproduces in both cases.
NOTE that the bug does not reproduce under the following conditions:
- the receiver is from a queue (StreamingContext.queueStream)
- if the commented-out "print" is un-commented.
- if (window + reduceByKey) is substituted to reduceByKeyAndWindow
here is the simple test application:
import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming._ object SparkStreamingTest extends App { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingTest") val ssc = new StreamingContext(sparkConf, Seconds(1)) val lines0 = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER) val words = lines0.map(x => (1.0, x.toDouble)) // words.print() // TODO: enable this print to avoid the program freeze val windowed = words.window(Seconds(4), Seconds(1)) val grouped = windowed.reduceByKey(_ + _) grouped.print() ssc.start() ssc.awaitTermination() }
Attachments
Issue Links
- duplicates
-
SPARK-4939 Python updateStateByKey example hang in local mode
- Resolved