Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-25775

Race between end-of-task and completion iterator read lock release

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 2.2.2
    • Fix Version/s: None
    • Component/s: Block Manager, Spark Core
    • Labels:
      None

      Description

      The following issue comes from a production Spark job where executors die due to uncaught exceptions during block release. When the task run with a specific configuration for executor-cores and total-executor-cores (e.g. 4 & 8 or 1 & 8),  it constantly fails at the same code segment.  Following are logs from our run:

       

      18/10/18 23:06:18 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 27 (PythonRDD[94] at RDD at PythonRDD.scala:49) (first 15 tasks are for partitions Vector(0))
      18/10/18 23:06:18 INFO TaskSchedulerImpl: Adding task set 27.0 with 1 tasks
      18/10/18 23:06:18 INFO TaskSetManager: Starting task 0.0 in stage 27.0 (TID 112, 10.248.7.2, executor 0, partition 0, PROCESS_LOCAL, 5585 bytes)
      18/10/18 23:06:18 INFO BlockManagerInfo: Added broadcast_37_piece0 in memory on 10.248.7.2:44871 (size: 9.1 KB, free: 13.0 GB)
      18/10/18 23:06:24 WARN TaskSetManager: Lost task 0.0 in stage 27.0 (TID 112, 10.248.7.2, executor 0): java.lang.AssertionError: assertion failed
      at scala.Predef$.assert(Predef.scala:156)
      at org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84)
      at org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66)
      at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:367)
      at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:366)
      at scala.Option.foreach(Option.scala:257)
      at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:366)
      at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361)
      at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:361)
      at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:736)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)
      
      18/10/18 23:06:24 INFO TaskSetManager: Starting task 0.1 in stage 27.0 (TID 113, 10.248.7.2, executor 0, partition 0, PROCESS_LOCAL, 5585 bytes)
      18/10/18 23:06:31 INFO TaskSetManager: Lost task 0.1 in stage 27.0 (TID 113) on 10.248.7.2, executor 0: java.lang.AssertionError (assertion failed) [duplicate 1]
      18/10/18 23:06:31 INFO TaskSetManager: Starting task 0.2 in stage 27.0 (TID 114, 10.248.7.2, executor 0, partition 0, PROCESS_LOCAL, 5585 bytes)
      18/10/18 23:06:31 ERROR TaskSchedulerImpl: Lost executor 0 on 10.248.7.2: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
      18/10/18 23:06:31 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20181018230546-0040/0 is now EXITED (Command exited with code 50)
      18/10/18 23:06:31 INFO StandaloneSchedulerBackend: Executor app-20181018230546-0040/0 removed: Command exited with code 50
      18/10/18 23:06:31 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20181018230546-0040/2 on worker-20181018173742-10.248.110.2-40787 (10.248.110.2:40787) with 4 cores
      18/10/18 23:06:31 INFO StandaloneSchedulerBackend: Granted executor ID app-20181018230546-0040/2 on hostPort 10.248.110.2:40787 with 4 cores, 25.0 GB RAM
      18/10/18 23:06:31 WARN TaskSetManager: Lost task 0.2 in stage 27.0 (TID 114, 10.248.7.2, executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
      18/10/18 23:06:31 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20181018230546-0040/2 is now RUNNING
      18/10/18 23:06:31 INFO DAGScheduler: Executor lost: 0 (epoch 11)
      18/10/18 23:06:31 INFO BlockManagerMaster: Removal of executor 0 requested
      18/10/18 23:06:31 INFO BlockManagerMasterEndpoint: Trying to remove executor 0 from BlockManagerMaster.
      18/10/18 23:06:31 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asked to remove non-existent executor 0
      18/10/18 23:06:31 INFO TaskSetManager: Starting task 0.3 in stage 27.0 (TID 115, 10.248.21.2, executor 1, partition 0, ANY, 5585 bytes)
      18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_27_2 !
      18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_44_7 !
      18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_27_4 !
      18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_44_3 !
      18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_27_0 !
      18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_44_5 !
      18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_78_0 !
      18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_10_2 !
      18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_10_0 !
      18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_10_4 !
      18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_78_6 !
      18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_78_4 !
      18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_78_2 !
      18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_61_4 !
      18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_61_6 !
      18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_44_1 !
      18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_61_0 !
      18/10/18 23:06:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_61_2 !
      18/10/18 23:06:31 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(0, 10.248.7.2, 44871, None)
      18/10/18 23:06:31 INFO BlockManagerMasterEndpoint: Trying to remove executor 0 from BlockManagerMaster.
      18/10/18 23:06:31 INFO BlockManagerMaster: Removed 0 successfully in removeExecutor
      18/10/18 23:06:31 INFO DAGScheduler: Shuffle files lost for executor: 0 (epoch 11)
      18/10/18 23:06:31 INFO BlockManagerInfo: Added broadcast_37_piece0 in memory on 10.248.21.2:43127 (size: 9.1 KB, free: 13.0 GB)
      18/10/18 23:06:32 INFO BlockManagerInfo: Added rdd_10_0 in memory on 10.248.21.2:43127 (size: 2.2 MB, free: 13.0 GB)
      18/10/18 23:06:33 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.248.110.2:40562) with ID 2
      18/10/18 23:06:33 INFO BlockManagerMasterEndpoint: Registering block manager 10.248.110.2:32835 with 13.2 GB RAM, BlockManagerId(2, 10.248.110.2, 32835, None)
      18/10/18 23:06:38 WARN TaskSetManager: Lost task 0.3 in stage 27.0 (TID 115, 10.248.21.2, executor 1): java.lang.AssertionError: assertion failed
      at scala.Predef$.assert(Predef.scala:156)
      at org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84)
      at org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66)
      at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:367)
      at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:366)
      at scala.Option.foreach(Option.scala:257)
      at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:366)
      at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361)
      at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:361)
      at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:736)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)
      
      18/10/18 23:06:38 ERROR TaskSetManager: Task 0 in stage 27.0 failed 4 times; aborting job
      18/10/18 23:06:38 INFO TaskSchedulerImpl: Removed TaskSet 27.0, whose tasks have all completed, from pool
      18/10/18 23:06:38 INFO TaskSchedulerImpl: Cancelling stage 27
      18/10/18 23:06:38 INFO DAGScheduler: ResultStage 27 (runJob at PythonRDD.scala:463) failed in 20.437 s due to Job aborted due to stage failure: Task 0 in stage 27.0 failed 4 times, most recent failure: Lost task 0.3 in stage 27.0 (TID 115, 10.248.21.2, executor 1): java.lang.AssertionError: assertion failed
      at scala.Predef$.assert(Predef.scala:156)
      at org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84)
      at org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66)
      at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:367)
      at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:366)
      at scala.Option.foreach(Option.scala:257)
      at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:366)
      at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361)
      at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:361)
      at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:736)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)
      
      Driver stacktrace:
      18/10/18 23:06:38 INFO DAGScheduler: Job 16 failed: runJob at PythonRDD.scala:463, took 20.450274 s
      Traceback (most recent call last):
      
      File "/usr/spark-2.2.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/session.py", line 582, in createDataFrame
      File "/usr/spark-2.2.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/session.py", line 380, in _createFromRDD
      File "/usr/spark-2.2.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/session.py", line 351, in _inferSchema
      File "/usr/spark-2.2.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 1368, in first
      File "/usr/spark-2.2.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 1350, in take
      File "/usr/spark-2.2.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/context.py", line 992, in runJob
      File "/usr/spark-2.2.2-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
      File "/usr/spark-2.2.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
      File "/usr/spark-2.2.2-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
      py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
      : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 27.0 failed 4 times, most recent failure: Lost task 0.3 in stage 27.0 (TID 115, 10.248.21.2, executor 1): java.lang.AssertionError: assertion failed
      at scala.Predef$.assert(Predef.scala:156)
      at org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84)
      at org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66)
      at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:367)
      at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:366)
      at scala.Option.foreach(Option.scala:257)
      at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:366)
      at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361)
      at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:361)
      at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:736)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)
      
      Driver stacktrace:
      at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1533)
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1521)
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1520)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
      at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1520)
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
      at scala.Option.foreach(Option.scala:257)
      at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1748)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1703)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1692)
      at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
      at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
      at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:463)
      at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
      at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
      at py4j.Gateway.invoke(Gateway.java:282)
      at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
      at py4j.commands.CallCommand.execute(CallCommand.java:79)
      at py4j.GatewayConnection.run(GatewayConnection.java:238)
      at java.lang.Thread.run(Thread.java:748)
      Caused by: java.lang.AssertionError: assertion failed
      at scala.Predef$.assert(Predef.scala:156)
      at org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84)
      at org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66)
      at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:367)
      at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:366)
      at scala.Option.foreach(Option.scala:257)
      at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:366)
      at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361)
      at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:361)
      at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:736)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      ... 1 more
      
      18/10/18 23:06:38 INFO SparkContext: Invoking stop() from shutdown hook
      18/10/18 23:06:38 INFO AbstractConnector: Stopped Spark@47d1385c{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
      18/10/18 23:06:38 INFO SparkUI: Stopped Spark web UI at http://10.248.67.5:4040
      18/10/18 23:06:38 INFO StandaloneSchedulerBackend: Shutting down all executors
      18/10/18 23:06:38 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down
      18/10/18 23:06:38 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
      18/10/18 23:06:38 INFO MemoryStore: MemoryStore cleared
      18/10/18 23:06:38 INFO BlockManager: BlockManager stopped
      18/10/18 23:06:38 INFO BlockManagerMaster: BlockManagerMaster stopped
      18/10/18 23:06:38 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
      18/10/18 23:06:38 INFO SparkContext: Successfully stopped SparkContext
      18/10/18 23:06:38 INFO ShutdownHookManager: Shutdown hook called
      18/10/18 23:06:38 INFO ShutdownHookManager: Deleting directory /tmp/spark-f22d0534-184d-4602-bec0-825e9c15ed2d/pyspark-be23722b-1f19-4608-b47a-31579b8218e3
      18/10/18 23:06:38 INFO ShutdownHookManager: Deleting directory /tmp/spark-f22d0534-184d-4602-bec0-825e9c15ed2d
      

       

       

      However, if we add a redundant 

      rdd.count() or rdd.cache()

       statement right before the failing createDataFrame() statement, it will fix the problem.

       

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              ablimit Ablimit A. Keskin
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated: