Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24309

AsyncEventQueue should handle an interrupt from a Listener

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 2.3.0
    • Fix Version/s: 2.3.1, 2.4.0
    • Component/s: Scheduler, Spark Core
    • Labels:
      None
    • Target Version/s:

      Description

      AsyncEventQueue does not properly handle an interrupt from a Listener – the spark app won't even stop!

      I observed this on an actual workload as the EventLoggingListener can generate an interrupt from the underlying hdfs calls:

      18/05/16 17:46:36 WARN hdfs.DFSClient: Error transferring data from DatanodeInfoWithStorage[10.17.206.36:20002,DS-3adac910-5d0a-418b-b0f7-6332b35bf6a1,DISK] to DatanodeInfoWithStorage[10.17.206.42:20002,DS-2e7ed0aa-0e68-441e-b5b2-96ad4a9ce7a5,DISK]: 100000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.17.206.35:33950 remote=/10.17.206.36:20002]
      18/05/16 17:46:36 WARN hdfs.DFSClient: DataStreamer Exception
      java.net.SocketTimeoutException: 100000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.17.206.35:33950 remote=/10.17.206.36:20002]
              at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
              at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
              at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
              at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
              at java.io.FilterInputStream.read(FilterInputStream.java:83)
              at java.io.FilterInputStream.read(FilterInputStream.java:83)
              at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2305)
              at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$StreamerStreams.sendTransferBlock(DFSOutputStream.java:516)
              at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:1450)
              at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1408)
              at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1559)
              at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:1254)
              at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:739)
      18/05/16 17:46:36 ERROR scheduler.AsyncEventQueue: Listener EventLoggingListener threw an exception
      [... a few more of these ...]
      18/05/16 17:46:36 INFO scheduler.AsyncEventQueue: Stopping listener queue eventLog.
      java.lang.InterruptedException
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1220)
              at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
              at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:439)
              at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:94)
              at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
              at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:83)
              at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:79)
              at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1319)
              at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:78)
      

      When this happens, the AsyncEventQueue will continue to pile up events in its queue, though its no longer processing them. And then in the call to stop, it'll block on queue.put(POISON_PILL) forever, so the SparkContext won't stop.

        Attachments

          Activity

            People

            • Assignee:
              irashid Imran Rashid
              Reporter:
              irashid Imran Rashid
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: