Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-7326

Performing window() on a WindowedDStream doesn't work all the time

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.3.1
    • 1.4.0
    • DStreams
    • None

    Description

      Someone reported similar issues before but got no response.
      http://apache-spark-user-list.1001560.n3.nabble.com/Windows-of-windowed-streams-not-displaying-the-expected-results-td466.html

      And I met the same issue recently and it can be reproduced in 1.3.1 by the following piece of code:

      def main(args: Array[String]) {

      val batchInterval = "1234"
      val sparkConf = new SparkConf()
      .setAppName("WindowOnWindowedDStream")
      .setMaster("local[2]")

      val ssc = new StreamingContext(sparkConf, Milliseconds(batchInterval.toInt))
      ssc.checkpoint("checkpoint")

      def createRDD(i: Int) : RDD[(String, Int)] = {

      val count = 1000
      val rawLogs = (1 to count).map

      { _ => val word = "word" + Random.nextInt.abs % 5 (word, 1) }

      ssc.sparkContext.parallelize(rawLogs)
      }

      val rddQueue = mutable.Queue[RDD[(String, Int)]]()
      val rawLogStream = ssc.queueStream(rddQueue)

      (1 to 300) foreach

      { i => rddQueue.enqueue(createRDD(i)) }

      val l1 = rawLogStream.window(Milliseconds(batchInterval.toInt) * 5, Milliseconds(batchInterval.toInt) * 5).reduceByKey(_ + _)

      val l2 = l1.window(Milliseconds(batchInterval.toInt) * 15, Milliseconds(batchInterval.toInt) * 15).reduceByKey(_ + _)

      l1.print()
      l2.print()

      ssc.start()
      ssc.awaitTermination()
      }

      Here we have two windowed DStream instance l1 and l2.

      l1 is the result DStream by performing a window() on the source DStream with both window and sliding duration 5 times the batch internal of the source stream.

      l2 is the result DStream by performing a window() on l1, with both window and sliding duration 3 times l1's batch interval, which is 15 times of the source stream.

      From the output of this simple streaming app, I can only see print data output from l1 and no data printed from l2.

      Diving into the source code, I found the problem may most likely reside in DStream.slice() implementation, as shown below.

      def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
      if (!isInitialized)

      { throw new SparkException(this + " has not been initialized") }

      if (!(fromTime - zeroTime).isMultipleOf(slideDuration))

      { logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") }

      if (!(toTime - zeroTime).isMultipleOf(slideDuration))

      { logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") }

      val alignedToTime = toTime.floor(slideDuration, zeroTime)
      val alignedFromTime = fromTime.floor(slideDuration, zeroTime)

      logInfo("Slicing from " + fromTime + " to " + toTime +
      " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")

      alignedFromTime.to(alignedToTime, slideDuration).flatMap(time =>

      { if (time >= zeroTime) getOrCompute(time) else None }

      )
      }

      Here after performing floor() on both fromTime and toTime, the result (alignedFromTime - zeroTime) and (alignedToTime - zeroTime) may no longer be multiple of the slidingDuration, thus making isTimeValid() check failed for all the remaining computation.

      The fix would be to add a new floor() function in Time.scala to respect the zeroTime while performing the floor :

      def floor(that: Duration, zeroTime: Time): Time =

      { val t = that.milliseconds new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds) }

      And then change the DStream.slice to call this new floor function by passing in its zeroTime.

      val alignedToTime = toTime.floor(slideDuration, zeroTime)
      val alignedFromTime = fromTime.floor(slideDuration, zeroTime)

      This way the alignedToTime and alignedFromTime are really aligned in respect to zeroTime whose value is not really a 0.

      Attachments

        Activity

          People

            wesleymiao Wesley Miao
            wesleymiao Wesley Miao
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: