Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10672

Task stuck while writing output to flink

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 1.5.4
    • Fix Version/s: None
    • Component/s: Runtime / Coordination
    • Labels:
    • Environment:

      Description

      I am running a fairly complex pipleline with 200+ task.

      The pipeline works fine with small data (order of 10kb input) but gets stuck with a slightly larger data (300kb input).

       

      The task gets stuck while writing the output toFlink, more specifically it gets stuck while requesting memory segment in local buffer pool. The Task manager UI shows that it has enough memory and memory segments to work with.

      The relevant stack trace is 

      "grpc-default-executor-0" #138 daemon prio=5 os_prio=0 tid=0x00007fedb0163800 nid=0x30b7f in Object.wait() [0x00007fedb4f90000]
      java.lang.Thread.State: TIMED_WAITING (on object monitor)
      at (C/C++) 0x00007fef201c7dae (Unknown Source)
      at (C/C++) 0x00007fef1f2aea07 (Unknown Source)
      at (C/C++) 0x00007fef1f241cd3 (Unknown Source)
      at java.lang.Object.wait(Native Method)

      • waiting on <0x00000000f6d56450> (a java.util.ArrayDeque)
        at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247)
      • locked <0x00000000f6d56450> (a java.util.ArrayDeque)
        at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
        at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
        at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
        at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:42)
        at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:26)
        at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
        at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
        at org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction$MyDataReceiver.accept(FlinkExecutableStageFunction.java:230)
      • locked <0x00000000f6a60bd0> (a java.lang.Object)
        at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81)
        at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32)
        at org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139)
        at org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125)
        at org.apache.beam.vendor.grpc.v1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248)
        at org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
        at org.apache.beam.vendor.grpc.v1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
        at org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263)
        at org.apache.beam.vendor.grpc.v1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683)
        at org.apache.beam.vendor.grpc.v1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at org.apache.beam.vendor.grpc.v1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

       

      The full stack trace and logs are attached.
      Please take a look and let me know if more information is needed.

        Attachments

        1. WithBroadcastJob.png
          410 kB
          Yun Gao
        2. Po89UGDn58V.png
          138 kB
          Ankur Goenka
        3. jstack_66985.log
          298 kB
          Ankur Goenka
        4. jstack_163822.log
          280 kB
          Ankur Goenka
        5. jstack_129827.log
          275 kB
          Ankur Goenka
        6. jmx_dump.json
          1.38 MB
          Ankur Goenka
        7. jmx_dump_detailed.json
          1.97 MB
          Ankur Goenka
        8. 3aDKQ24WvKk.png
          79 kB
          Ankur Goenka
        9. 1uruvakHxBu.png
          119 kB
          Ankur Goenka
        10. 0.14_all_jobs.jpg
          301 kB
          Yun Gao

          Issue Links

            Activity

              People

              • Assignee:
                gaoyunhaii Yun Gao
                Reporter:
                angoenka Ankur Goenka
              • Votes:
                0 Vote for this issue
                Watchers:
                15 Start watching this issue

                Dates

                • Created:
                  Updated: