Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-30675

Spark Streaming Job stopped reading events from Queue upon Deregister Exception

    XMLWordPrintableJSON

    Details

    • Flags:
      Important

      Description

       

      Stream

      We have observed discrepancy in  kinesis stream, whereas stream has continuous incoming records but GetRecords.Records is not available.

       

      Upon analysis, we have understood that there were no GetRecords calls made by Spark Job during the time due to which the GetRecords count is not available, hence there should not be any issues with streams as the messages were being received.

      Spark/EMR

      From the driver logs, it has been found that the driver de-registered the receiver for the stream

      Driver Logs

      2020-01-03 11:11:40 ERROR ReceiverTracker:70 - Deregistered receiver for stream 0: Error while storing block into Spark - java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]

              at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)

              at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)

              at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)

              at org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:210)

              at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:158)

              at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:129)

              at org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:133)

              at org.apache.spark.streaming.kinesis.KinesisReceiver.org$apache$spark$streaming$kinesis$KinesisReceiver$$storeBlockWithRanges(KinesisReceiver.scala:293)

              at org.apache.spark.streaming.kinesis.KinesisReceiver$GeneratedBlockHandler.onPushBlock(KinesisReceiver.scala:344)

              at org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:297)

              at org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:269)

              at org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:110)

              ...

      Till this point, there is no receiver being started/registered. From the executor logs (below), it has been observed that one of the executors was running on the container.

       

      Executor Logs

      2020-01-03 11:11:30 INFO  BlockManager:54 - Removing RDD 2851002

      2020-01-03 11:11:31 INFO  ReceiverSupervisorImpl:54 - Stopping receiver with message: Error while storing block into Spark: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]

      2020-01-03 11:11:31 INFO  Worker:593 - Worker shutdown requested.

      2020-01-03 11:11:31 INFO  LeaseCoordinator:298 - Worker ip-10-61-71-29.ap-southeast-2.compute.internal:a7567f14-16be-4aca-8f64-401b0b29aea2 has successfully stopped lease-tracking threads

      2020-01-03 11:11:31 INFO  KinesisRecordProcessor:54 - Shutdown:  Shutting down workerId ip-10-61-71-29.ap-southeast-2.compute.internal:a7567f14-16be-4aca-8f64-401b0b29aea2 with reason ZOMBIE

      2020-01-03 11:11:32 INFO  MemoryStore:54 - Block input-0-1575374565339 stored as bytes in memory (estimated size /7.3 KB, free 3.4 GB)

      2020-01-03 11:11:33 INFO  Worker:634 - All record processors have been shut down successfully.

       

      After this point, the Kinesis KCL worker seemed to be terminated which was reading the Queue, due to which we could see the gap in the GetRecords.  

       

      Mitigation

      Increased the timeout

      • 'spark.streaming.receiver.blockStoreTimeout’ to 59 seconds (from default - 30 seconds) 
      • 'spark.streaming.driver.writeAheadLog.batchingTimeout’ to 30 seconds (from default - 5seconds)

       

      Note : 
      1. Writeahead logs and Checkpoint is being maitained in AWS S3 bucket

      2. Spark submit Configuration as below:

      spark-submit --deploy-mode cluster --executor-memory 4608M --driver-memory 4608M
      --conf spark.yarn.driver.memoryOverhead=710M
      --conf spark.yarn.executor.memoryOverhead=710M --driver-cores 3 --executor-cores 3
      --conf spark.dynamicAllocation.minExecutors=1
      --conf spark.dynamicAllocation.maxExecutors=2
      --conf spark.dynamicAllocation.initialExecutors=2
      --conf spark.locality.wait.node=0
      --conf spark.dynamicAllocation.enabled=true
      --conf maximizeResourceAllocation=false --class XXXXXXXXXXXX
      --conf spark.streaming.driver.writeAheadLog.closeFileAfterWrite=true
      --conf spark.scheduler.mode=FAIR
      --conf spark.metrics.conf=XXXXXXXXXXXX.properties --files=s3://XXXXXXXXXXXX/XXXXXXXXXXXX.properties
      --conf spark.streaming.receiver.writeAheadLog.closeFileAfterWrite=true
      --conf spark.streaming.receiver.writeAheadLog.enable=true
      --conf spark.streaming.receiver.blockStoreTimeout=59
      --conf spark.streaming.driver.writeAheadLog.batchingTimeout=30000
      --conf spark.streaming.receiver.maxRate=120 s3://XXXXXXXXXXXX/XXXXXXXXXXXX.jar yarn XXXXXXXXXXXX applicationContext-XXXXXXXXXXXX-streaming.xml root kinesis 60 &

      3. EMR Version - 5.26

      4. Hadoop Distribution - Amazon 2.8.5

      5. Hardware Config

      • Master (3 instances - Multi Master Cluster)
        c5.2xlarge
        8 vCore, 16 GiB memory, EBS only storage
        EBS Storage:64 GiB
      • Core (6 instances [Min - 2, Max - 6])
        c5.4xlarge
        16 vCore, 32 GiB memory, EBS only storage
        EBS Storage:1000 GiB

      6. There are 3 spark jobs running on the same cluster

      7. Streaming - Kinesis

      8. Cluster Config and Instance Config is attached

       

      Please let us know if any additional information is required.

        Attachments

        1. Instance-Config-P1.JPG
          118 kB
          Mullaivendhan Ariaputhri
        2. Cluster-Config-P1.JPG
          116 kB
          Mullaivendhan Ariaputhri
        3. Instance-Config-P2.JPG
          35 kB
          Mullaivendhan Ariaputhri
        4. Cluster-Config-P2.JPG
          32 kB
          Mullaivendhan Ariaputhri

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              jasmineemullai@gmail.com Mullaivendhan Ariaputhri
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: