Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-30531 Reduce operator chain call stack depth
  3. FLINK-30709

NetworkInput#emitNext() should push records to DataOutput in a while loop

    XMLWordPrintableJSON

Details

    Description

      It's similar to FLINK-30533, FLINK-30533 focus on source operator, this JIRA focus on Network input.

       

      Currently, each invocation of AbstractStreamTaskNetworkInput#emitNext() push at most one record to the given DataOutput. This unnecessarily increases the average Java call stack depth needed to produce a record.

      Take the following program as an example. For each element produced by this program, Flink runtime needs to include in the call stack these 3 function calls:

      • StreamTask#processInput()
      • StreamOneInputProcessor#processInput()
      • AbstractStreamTaskNetworkInput#emitNext()

      This ticket proposes to update AbstractStreamTaskNetworkInput#emitNext() to push records to DataOutput in a while loop. It improves Flink performance by removing an average of 3 function from the call stack needed to produce a record.

      Here are the benchmark results obtained by running the InputBenchmark#mapSink with env.disableOperatorChaining(). And I run it 4 times on My Mac.

       

      Before the proposed change, the avg is 12429.0605 ops/ms, here is detailed results:
      
      Benchmark                (sourceType)   Mode  Cnt      Score     Error   Units
      InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  12339.771 ± 414.649  ops/ms
      InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  12687.872 ± 320.084  ops/ms
      InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  12256.445 ± 512.219  ops/ms
      InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  12432.154 ± 405.083  ops/ms
      
      
      After the proposed change, the avg is 13836.845 ops/ms, here is detailed results:
      
      Benchmark                (sourceType)   Mode  Cnt      Score     Error   Units
      InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  13092.451 ± 490.886  ops/ms
      InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  13881.138 ± 370.249  ops/ms
      InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  13960.280 ± 389.505  ops/ms
      InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  14413.511 ± 727.844  ops/ms

       

      The proposed change increases throughput by 11.3%.

      Attachments

        Issue Links

          Activity

            People

              fanrui Rui Fan
              fanrui Rui Fan
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: