Description
Someone reported similar issues before but got no response.
http://apache-spark-user-list.1001560.n3.nabble.com/Windows-of-windowed-streams-not-displaying-the-expected-results-td466.html
And I met the same issue recently and it can be reproduced in 1.3.1 by the following piece of code:
def main(args: Array[String]) {
val batchInterval = "1234"
val sparkConf = new SparkConf()
.setAppName("WindowOnWindowedDStream")
.setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Milliseconds(batchInterval.toInt))
ssc.checkpoint("checkpoint")
def createRDD(i: Int) : RDD[(String, Int)] = {
val count = 1000
val rawLogs = (1 to count).map
ssc.sparkContext.parallelize(rawLogs)
}
val rddQueue = mutable.Queue[RDD[(String, Int)]]()
val rawLogStream = ssc.queueStream(rddQueue)
(1 to 300) foreach
{ i => rddQueue.enqueue(createRDD(i)) }val l1 = rawLogStream.window(Milliseconds(batchInterval.toInt) * 5, Milliseconds(batchInterval.toInt) * 5).reduceByKey(_ + _)
val l2 = l1.window(Milliseconds(batchInterval.toInt) * 15, Milliseconds(batchInterval.toInt) * 15).reduceByKey(_ + _)
l1.print()
l2.print()
ssc.start()
ssc.awaitTermination()
}
Here we have two windowed DStream instance l1 and l2.
l1 is the result DStream by performing a window() on the source DStream with both window and sliding duration 5 times the batch internal of the source stream.
l2 is the result DStream by performing a window() on l1, with both window and sliding duration 3 times l1's batch interval, which is 15 times of the source stream.
From the output of this simple streaming app, I can only see print data output from l1 and no data printed from l2.
Diving into the source code, I found the problem may most likely reside in DStream.slice() implementation, as shown below.
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
if (!isInitialized)
if (!(fromTime - zeroTime).isMultipleOf(slideDuration))
{ logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") }if (!(toTime - zeroTime).isMultipleOf(slideDuration))
{ logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") } val alignedToTime = toTime.floor(slideDuration, zeroTime)
val alignedFromTime = fromTime.floor(slideDuration, zeroTime)
logInfo("Slicing from " + fromTime + " to " + toTime +
" (aligned to " + alignedFromTime + " and " + alignedToTime + ")")
alignedFromTime.to(alignedToTime, slideDuration).flatMap(time =>
{ if (time >= zeroTime) getOrCompute(time) else None })
}
Here after performing floor() on both fromTime and toTime, the result (alignedFromTime - zeroTime) and (alignedToTime - zeroTime) may no longer be multiple of the slidingDuration, thus making isTimeValid() check failed for all the remaining computation.
The fix would be to add a new floor() function in Time.scala to respect the zeroTime while performing the floor :
def floor(that: Duration, zeroTime: Time): Time =
{ val t = that.milliseconds new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds) }
And then change the DStream.slice to call this new floor function by passing in its zeroTime.
val alignedToTime = toTime.floor(slideDuration, zeroTime)
val alignedFromTime = fromTime.floor(slideDuration, zeroTime)
This way the alignedToTime and alignedFromTime are really aligned in respect to zeroTime whose value is not really a 0.