Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-32023

execution.buffer-timeout cannot be set to -1 ms

    XMLWordPrintableJSON

Details

    Description

      The desc for execution.buffer-timeout is as following:

      public static final ConfigOption<Duration> BUFFER_TIMEOUT =
              ConfigOptions.key("execution.buffer-timeout")
                      .durationType()
                      .defaultValue(Duration.ofMillis(100))
                      .withDescription(
                              Description.builder()
                                      .text(
                                              "The maximum time frequency (milliseconds) for the flushing of the output buffers. By default "
                                                      + "the output buffers flush frequently to provide low latency and to aid smooth developer "
                                                      + "experience. Setting the parameter can result in three logical modes:")
                                      .list(
                                              text(
                                                      "A positive value triggers flushing periodically by that interval"),
                                              text(
                                                      FLUSH_AFTER_EVERY_RECORD
                                                              + " triggers flushing after every record thus minimizing latency"),
                                              text(
                                                      DISABLED_NETWORK_BUFFER_TIMEOUT
                                                              + " ms triggers flushing only when the output buffer is full thus maximizing "
                                                              + "throughput"))
                                      .build()); 

      When we set execution.buffer-timeout to -1 ms, the following error is reported:

      Caused by: java.lang.IllegalArgumentException: Could not parse value '-1 ms' for key 'execution.buffer-timeout'.
          at org.apache.flink.configuration.Configuration.getOptional(Configuration.java:856)
          at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.configure(StreamExecutionEnvironment.java:822)
          at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.<init>(StreamExecutionEnvironment.java:224)
          at org.apache.flink.streaming.api.environment.StreamContextEnvironment.<init>(StreamContextEnvironment.java:51)
          at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createStreamExecutionEnvironment(StreamExecutionEnvironment.java:1996)
          at java.util.Optional.orElseGet(Optional.java:267)
          at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment(StreamExecutionEnvironment.java:1986)
          at com.kuaishou.flink.examples.api.WordCount.main(WordCount.java:27)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:498)
          at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:327)
          ... 11 more
      Caused by: java.lang.NumberFormatException: text does not start with a number
          at org.apache.flink.util.TimeUtils.parseDuration(TimeUtils.java:78)
          at org.apache.flink.configuration.Configuration.convertToDuration(Configuration.java:1058)
          at org.apache.flink.configuration.Configuration.convertValue(Configuration.java:996)
          at org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:853)
          at java.util.Optional.map(Optional.java:215)
          at org.apache.flink.configuration.Configuration.getOptional(Configuration.java:853)
          ... 23 more 

      The reason is that the value for Duration can not be negative. We should change the behavior or support to trigger flushing only when the output buffer is full.

      Attachments

        Issue Links

          Activity

            People

              Jiangang Liu
              Jiangang Liu
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: