Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-19347

ReceiverSupervisorImpl can add block to ReceiverTracker multiple times because of askWithRetry

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 2.2.0
    • DStreams, Spark Core
    • None

    Description

      ReceiverSupervisorImpl on executor side reports block's meta back to ReceiverTracker on driver side. In current code, askWithRetry is used. However, for AddBlock, ReceiverTracker is not idempotent, which may result in messages are processed multiple times.

      To reproduce:
      1. Check if it is the first time receiving AddBlock in ReceiverTracker, if so sleep long enough(say 200 seconds), thus the first RPC call will be timeout in askWithRetry, then AddBlock will be resent.
      2. Rebuild Spark and run following job:

        def streamProcessing(): Unit = {
          val conf = new SparkConf()
            .setAppName("StreamingTest")
            .setMaster(masterUrl)
          val ssc = new StreamingContext(conf, Seconds(200))
          val stream = ssc.socketTextStream("localhost", 1234)
          stream.print()
          ssc.start()
          ssc.awaitTermination()
        }
      

      To fix:
      It makes sense to provide a blocking version ask in RpcEndpointRef, as vanzin mentioned in SPARK-18113 (https://github.com/apache/spark/pull/16503#event-927953218), askWithRetry is a leftover from akka days. It imposes restrictions on the caller(e.g. idempotency) and other things that people generally don't pay that much attention to when using it.

      Attachments

        Activity

          People

            jinxing6042@126.com Jin Xing
            jinxing6042@126.com Jin Xing
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: