Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-29845

ThroughputCalculator throws java.lang.IllegalArgumentException: Time should be non negative under very low throughput cluster

    XMLWordPrintableJSON

Details

    Description

      Our team are using Flink@1.14.6 to process data from Kafka.

      It works all fine unless the same job jar with same arguments deployed in an environment with very low kafka source throughput. The job crashed sometimes with the following Exception and could not be able to recover unless we restarted TaskManagers, which is unacceptable for a production environment.

      [2022-10-31T15:33:57.153+08:00] [o.a.f.runtime.taskmanager.Task#cess (2/16)#244] - [WARN ] KeyedProcess (2/16)#244 (b9b54f6445419fc43c4d58fcd95cee82) switched from RUNNING to FAILED with failure cause: java.lang.IllegalArgumentException: Time should be non negative
      	at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
      	at org.apache.flink.runtime.throughput.ThroughputEMA.calculateThroughput(ThroughputEMA.java:44)
      	at org.apache.flink.runtime.throughput.ThroughputCalculator.calculateThroughput(ThroughputCalculator.java:80)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.debloat(StreamTask.java:789)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$4(StreamTask.java:781)
      	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
      	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
      	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
      	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
      	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:806)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:758)
      	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
      	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
      	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
      	at java.lang.Thread.run(Thread.java:748)
      

      After checking the source code roughly, we found if buffer debloating is disabled (https://github.com/apache/flink/blob/release-1.14.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L427 ), the buffer debloater will still be scheduled (https://github.com/apache/flink/blob/release-1.14.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L755 ) so that the ThrouputCalculator  keeps calculating the throughput (https://github.com/apache/flink/blob/release-1.14.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L789 ) which causes the division of zero and seems useless as i suppose.

      Currently, we tried to workaround by setting taskmanager.network.memory.buffer-debloat.period: 365d to avoid the buffer debloater being scheduled frequently causing the random crash.

      P.S. We found a bug with similar stacktrace https://issues.apache.org/jira/browse/FLINK-25454 which was fixed in 1.14.6.

      Attachments

        Issue Links

          Activity

            People

              Weijie Guo Weijie Guo
              dawnwords Jingxiao GU
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated: