Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-9242

LocalEnvironment - Operator threads stuck on java.lang.Thread.State: WAITING

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • 1.5.0, 1.6.0
    • None
    • Runtime / Coordination
    • None

    Description

      Hello,

      As per Fabian Hueske's advice on the mailing list, I am detailing the problem here.
      This happens on my code in both 1.5-SNAPSHOT and 1.6-SNAPSHOT but not on 1.4.2 (stable).
      I believe it might be some sort of regression which was introduced post 1.4.2.

      I'm getting different DataSet operators blocked on java.lang.Thread.State: WAITING for no apparent reason.
      I only tested this using a LocalEnvironment which is created like so:

      final Configuration conf = new Configuration();
      conf.setString("web.log.path", logPath);
      conf.setString("jobmanager.rpc.address", "127.0.0.1");
      conf.setString("web.port", "8081-9000");
      conf.setString("query.server.ports", "2000-30000");
      conf.setString("query.proxy.ports", "30001-60000");
      
      LocalEnvironment lenv = (LocalEnvironment) ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
      

      (also tried creating the LocalEnvironment without the web interface and it also happens)

      I have debugged with IntelliJ IDEA and obtained thread dumps from different executions, and realized quite a few operator threads are stuck on java.lang.Thread.State: WAITING.

      I cannot share my code at the moment, but essentially I have a series of jobs and some use common data (I made sure it was written to disk in job i and read back from disk in job i + 1)

      There are three major threads that I find to be in this waiting state.

      I'm running on local mode with a parallelism of one.
      The thread dumps I obtained show me where the wait calls originated:

       

      Number 1:
      
      "CHAIN Join (Join at selectEdges(GraphUtils.java:328)) -> Combine (Distinct at selectEdges(GraphUtils.java:330)) (1/1)@9158" prio=5 tid=0xd93 nid=NA waiting
        java.lang.Thread.State: WAITING
            at java.lang.Object.wait(Object.java:-1)
            at java.lang.Object.wait(Object.java:502)
            at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
            at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
            at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
            at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
            at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
            at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
            at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
            at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
            at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
            at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
            at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
            at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
            at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
            at java.lang.Thread.run(Thread.java:748)
      
      Number 2:
      
      "Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153" prio=5 tid=0xd8e nid=NA waiting
        java.lang.Thread.State: WAITING
            at java.lang.Object.wait(Object.java:-1)
            at java.lang.Object.wait(Object.java:502)
            at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
            at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
            at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
            at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
            at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
            at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
            at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
            at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
            at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
            at org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
            at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
            at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
            at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
            at java.lang.Thread.run(Thread.java:748)
      
      Number 3:
      
      "Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118" prio=5 tid=0xd75 nid=NA waiting
        java.lang.Thread.State: WAITING
            at java.lang.Object.wait(Object.java:-1)
            at java.lang.Object.wait(Object.java:502)
            at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
            at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
            at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
            at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
            at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
            at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
            at org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
            at org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
            at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
            at org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinIterator.java:123)
            at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
            at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
            at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
            at java.lang.Thread.run(Thread.java:748)

       
      While I realize these dumps on their own may not be helpful, they at least (as far as I know) indicate that the threads are all waiting on something.
      But if it was resource scarcity I believe the program would terminate with an exception.
      And if it was garbage collection activity, I believe the JVM process would not be at 0% CPU usage.

      I realize I didn't provide the user-code code that generates the execution plan for Flink which led to the contexts in which the threads are waiting, my apologies. I will do so as soon a I get a chance.

      To highlight the symptoms:

      • The memory assigned to the JVM is fully used, but there are no exceptions about lack of memory (and the system had plenty more memory available).
      • The CPU usage is at 0% and all threads are all in a waiting state, but I don't understand what signal they're waiting for exactly.

      I noticed something suspicious as well: I have chains of operators where the first operator will ingest the expected amount of records but will not emit any, leaving the following operator empty in a "RUNNING" state (see attached image).

      I think we may consider there is some complexity in my scenario, at least when compared to samples in the Flink documentation. When visualizing the job plan, it is necessary to zoom in and out to check on specific parts of the execution scheme.

      Among the sequence of operations, I am:

      1 - Creating a DataSet

      2 - Using it as an initial workset in a DeltaIteration

      2.1 - Joining the workset on each iteration with the edges of a graph

      3 - Using the final solution set resulting from the DeltaIteration to build a graph and execute an algorithm over it (.run method).

      • The graph is not prohibitively big and I have a very low limit on the number of iterations (at most 4 or 5).

       I will add more information as soon as it is available.
      It seems, however, that there is some sort of lack of synchronization occurring and perhaps the operators become isolated?

      Attachments

        1. flink_debugging.PNG
          34 kB
          Miguel E. Coimbra

        Issue Links

          Activity

            People

              Unassigned Unassigned
              mcoimbra Miguel E. Coimbra
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: