Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-3466

Job might get stuck in restoreState() from HDFS due to interrupt

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.1.0
    • Fix Version/s: 1.1.0
    • Labels:
      None

      Description

      A user reported the following issue with a failing job:

      10:46:09,223 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'XXX -> YYY (3/5)' did not react to cancelling signal, but is stuck in method:
      sun.misc.Unsafe.park(Native Method)
      java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUninterruptibly(AbstractQueuedSynchronizer.java:1979)
      org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:255)
      org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434)
      org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1016)
      org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:477)
      org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:783)
      org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:717)
      org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:421)
      org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:332)
      org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576)
      org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:800)
      org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:848)
      java.io.DataInputStream.read(DataInputStream.java:149)
      org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:69)
      java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)
      java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323)
      java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794)
      java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801)
      java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
      org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.<init>(InstantiationUtil.java:55)
      org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:52)
      org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:35)
      org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:162)
      org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:440)
      org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
      org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
      java.lang.Thread.run(Thread.java:745)
      

      and

      10:46:09,223 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'XXX -> YYY (3/5)' did not react to cancelling signal, but is stuck in method:
      java.lang.Throwable.fillInStackTrace(Native Method)
      java.lang.Throwable.fillInStackTrace(Throwable.java:783)
      java.lang.Throwable.<init>(Throwable.java:250)
      java.lang.Exception.<init>(Exception.java:54)
      java.lang.InterruptedException.<init>(InterruptedException.java:57)
      java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2038)
      org.apache.hadoop.net.unix.DomainSocketWatcher.add(DomainSocketWatcher.java:325)
      org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:266)
      org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434)
      org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1016)
      org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:477)
      org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:783)
      org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:717)
      org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:421)
      org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:332)
      org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576)
      org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:800)
      org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:848)
      java.io.DataInputStream.read(DataInputStream.java:149)
      org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:69)
      java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)
      java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323)
      java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794)
      java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801)
      java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
      org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.<init>(InstantiationUtil.java:55)
      org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:52)
      org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:35)
      org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:162)
      org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:440)
      org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
      org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
      java.lang.Thread.run(Thread.java:745)
      

      The issue is most likely that the HDFS client gets stuck in the "org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read()" call when it receives an interrupt.
      By putting the call into a separate thread, the TaskInterrupt would not break the HadoopReadThread.

      The HadoopReadThread would stop eventually with an error or after the read operation has finished.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                sewen Stephan Ewen
                Reporter:
                rmetzger Robert Metzger
              • Votes:
                0 Vote for this issue
                Watchers:
                6 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: