Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-9442

Flink Scaling not working

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Invalid
    • 1.4.2
    • None
    • Runtime / Coordination
    • None

    Description

      Hi,

       

      We are in the middle of testing scaling ability of Flink. But we found that scaling not working, no matter increase more slot or increase number of Task Manager. We would expect a linear, if not close-to-linear scaling performance but the result even show degradation. Appreciated any comments.

       

      Test Details,

       

      -VMWare vsphere

      -Just a simple pass through test,

          - auto gen source 3mil records, each 1kb in size, parallelism=1

          - source pass into next map operator, which just return the same record, and sent counter to statsD, parallelism is in cases = 2,4,6

      • 3 TM, total 6 slots(2/TM) each JM/TM has 32 vCPU, 100GB memory
      • Result:

            - 2 slots: 26 seconds, 3mil/26=115k TPS

            - 4 slots: 23 seconds, 3mil/23=130k TPS

            - 6 slots: 22 seconds, 3mil/22=136k TPS

       

      As shown the scaling is almost nothing, and capped at ~120k TPS. Any clue? Thanks.

       

       

       

        public class passthru extends RichMapFunction<String, String> {
      public void open(Configuration configuration) throws Exception

      { ... ... stats = new NonBlockingStatsDClient(); }

      public String map(String value) throws Exception

      { ... ... stats.increment(); return value; }

      }

      public class datagen extends RichSourceFunction<String> {
      ... ...
      public void run(SourceContext<String> ctx) throws Exception {
      int i = 0;
      while (run){
      String idx = String.format("%09d", i);
      ctx.collect("

      {\"<a 1kb json content with idx in certain json field>\"}

      ");
      i++;
      if(i == loop)
      run = false;
      }
      }
      ... ...
      }
      public class Job {
      public static void main(String[] args) throws Exception

      { ... ... DataStream<String> stream = env.addSource(new datagen(loop)).rebalance(); DataStream<String> convert = stream.map(new passthru(statsdUrl)); env.execute("Flink"); }


      }

      The reason of this sample test is because of Kafka source FlinkKafkaConsumer011 facing the same issue which is not scale-able. And FlinkKafkaConsumer011 already using RichParallelSourceFunction. And we always set kafka partition = total TM #slot. But the result is still capped and not improve linearly.

      Attachments

        Activity

          People

            Unassigned Unassigned
            yow swy
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: