Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-3466

Job might get stuck in restoreState() from HDFS due to interrupt

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Blocker
    • Resolution: Fixed
    • 1.1.0
    • 1.1.0
    • None

    Description

      A user reported the following issue with a failing job:

      10:46:09,223 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'XXX -> YYY (3/5)' did not react to cancelling signal, but is stuck in method:
      sun.misc.Unsafe.park(Native Method)
      java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUninterruptibly(AbstractQueuedSynchronizer.java:1979)
      org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:255)
      org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434)
      org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1016)
      org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:477)
      org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:783)
      org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:717)
      org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:421)
      org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:332)
      org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576)
      org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:800)
      org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:848)
      java.io.DataInputStream.read(DataInputStream.java:149)
      org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:69)
      java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)
      java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323)
      java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794)
      java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801)
      java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
      org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.<init>(InstantiationUtil.java:55)
      org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:52)
      org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:35)
      org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:162)
      org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:440)
      org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
      org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
      java.lang.Thread.run(Thread.java:745)
      

      and

      10:46:09,223 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'XXX -> YYY (3/5)' did not react to cancelling signal, but is stuck in method:
      java.lang.Throwable.fillInStackTrace(Native Method)
      java.lang.Throwable.fillInStackTrace(Throwable.java:783)
      java.lang.Throwable.<init>(Throwable.java:250)
      java.lang.Exception.<init>(Exception.java:54)
      java.lang.InterruptedException.<init>(InterruptedException.java:57)
      java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2038)
      org.apache.hadoop.net.unix.DomainSocketWatcher.add(DomainSocketWatcher.java:325)
      org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:266)
      org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434)
      org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1016)
      org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:477)
      org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:783)
      org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:717)
      org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:421)
      org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:332)
      org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576)
      org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:800)
      org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:848)
      java.io.DataInputStream.read(DataInputStream.java:149)
      org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:69)
      java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)
      java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323)
      java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794)
      java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801)
      java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
      org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.<init>(InstantiationUtil.java:55)
      org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:52)
      org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:35)
      org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:162)
      org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:440)
      org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
      org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
      java.lang.Thread.run(Thread.java:745)
      

      The issue is most likely that the HDFS client gets stuck in the "org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read()" call when it receives an interrupt.
      By putting the call into a separate thread, the TaskInterrupt would not break the HadoopReadThread.

      The HadoopReadThread would stop eventually with an error or after the read operation has finished.

      Attachments

        Issue Links

          Activity

            People

              sewen Stephan Ewen
              rmetzger Robert Metzger
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: