Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24309

AsyncEventQueue should handle an interrupt from a Listener

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 2.3.0
    • 2.3.1, 2.4.0
    • Scheduler, Spark Core
    • None

    Description

      AsyncEventQueue does not properly handle an interrupt from a Listener – the spark app won't even stop!

      I observed this on an actual workload as the EventLoggingListener can generate an interrupt from the underlying hdfs calls:

      18/05/16 17:46:36 WARN hdfs.DFSClient: Error transferring data from DatanodeInfoWithStorage[10.17.206.36:20002,DS-3adac910-5d0a-418b-b0f7-6332b35bf6a1,DISK] to DatanodeInfoWithStorage[10.17.206.42:20002,DS-2e7ed0aa-0e68-441e-b5b2-96ad4a9ce7a5,DISK]: 100000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.17.206.35:33950 remote=/10.17.206.36:20002]
      18/05/16 17:46:36 WARN hdfs.DFSClient: DataStreamer Exception
      java.net.SocketTimeoutException: 100000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.17.206.35:33950 remote=/10.17.206.36:20002]
              at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
              at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
              at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
              at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
              at java.io.FilterInputStream.read(FilterInputStream.java:83)
              at java.io.FilterInputStream.read(FilterInputStream.java:83)
              at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2305)
              at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$StreamerStreams.sendTransferBlock(DFSOutputStream.java:516)
              at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:1450)
              at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1408)
              at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1559)
              at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:1254)
              at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:739)
      18/05/16 17:46:36 ERROR scheduler.AsyncEventQueue: Listener EventLoggingListener threw an exception
      [... a few more of these ...]
      18/05/16 17:46:36 INFO scheduler.AsyncEventQueue: Stopping listener queue eventLog.
      java.lang.InterruptedException
              at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1220)
              at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
              at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:439)
              at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:94)
              at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
              at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:83)
              at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:79)
              at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1319)
              at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:78)
      

      When this happens, the AsyncEventQueue will continue to pile up events in its queue, though its no longer processing them. And then in the call to stop, it'll block on queue.put(POISON_PILL) forever, so the SparkContext won't stop.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            irashid Imran Rashid
            irashid Imran Rashid
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment