Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
None
Description
ReceiverSupervisorImpl on executor side reports block's meta back to ReceiverTracker on driver side. In current code, askWithRetry is used. However, for AddBlock, ReceiverTracker is not idempotent, which may result in messages are processed multiple times.
To reproduce:
1. Check if it is the first time receiving AddBlock in ReceiverTracker, if so sleep long enough(say 200 seconds), thus the first RPC call will be timeout in askWithRetry, then AddBlock will be resent.
2. Rebuild Spark and run following job:
def streamProcessing(): Unit = { val conf = new SparkConf() .setAppName("StreamingTest") .setMaster(masterUrl) val ssc = new StreamingContext(conf, Seconds(200)) val stream = ssc.socketTextStream("localhost", 1234) stream.print() ssc.start() ssc.awaitTermination() }
To fix:
It makes sense to provide a blocking version ask in RpcEndpointRef, as vanzin mentioned in SPARK-18113 (https://github.com/apache/spark/pull/16503#event-927953218), askWithRetry is a leftover from akka days. It imposes restrictions on the caller(e.g. idempotency) and other things that people generally don't pay that much attention to when using it.