Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-7508

switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode rather than Per_Request mode

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.4.0
    • Component/s: Kinesis Connector
    • Labels:
      None
    • Flags:
      Important

      Description

      KinesisProducerLibrary (KPL) 0.10.x had been using a One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which is very expensive.

      0.12.4 introduced a new ThreadingMode - Pooled, which will use a thread pool. This hugely improves KPL's performance and reduces consumed resources. By default, KPL still uses per-request mode. We should explicitly switch FlinkKinesisProducer's KPL threading mode to 'Pooled'.

      This work depends on FLINK-7366 and FLINK-7508

      Benchmarking I did:

      • Environment: Running a Flink hourly-sliding windowing job on 18-node EMR cluster with R4.2xlarge instances. Each hourly-sliding window in the Flink job generates about 21million UserRecords, which means that we generated a test load of 21million UserRecords at the first minute of each hour.
      • Criteria: Test KPL throughput per minute. Since the default RecordTTL for KPL is 30 sec, we can be sure that either all UserRecords are sent by KPL within a minute, or we will see UserRecord expiration errors.
      • One-New-Thread-Per-Request model: max throughput is about 2million UserRecords per min; it doesn't go beyond that because CPU utilization goes to 100%, everything stopped working and that Flink job crashed.
      • Thread-Pool model with pool size of 10: it sends out 21million UserRecords within 30 sec without any UserRecord expiration errors. The average peak CPU utilization is about 20% - 30%. So 21million UserRecords/min is not the max throughput of thread-pool model. We didn't go any further because 1) this throughput is already a couple times more than what we really need, and 2) we don't have a quick way of increasing the test load

      Thus, I propose switching FlinkKinesisProducer to Thread-Pool mode. Tzu-Li (Gordon) Tai What do you think

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user bowenli86 opened a pull request:

          https://github.com/apache/flink/pull/4656

          FLINK-7508[kinesis] switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode rather than Per_Request mode

            1. What is the purpose of the change

          KinesisProducerLibrary (KPL) 0.10.x had been using a One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which is very expensive.

          0.12.4 introduced a new ThreadingMode - Pooled, which will use a thread pool. This hugely improves KPL's performance and reduces consumed resources. By default, KPL still uses per-request mode. We should explicitly switch FlinkKinesisProducer's KPL threading mode to 'Pooled'.
          This work depends on FLINK-7366 and FLINK-7508

          Benchmarking I did:

          • Environment: Running a Flink hourly-sliding windowing job on 18-node EMR cluster with R4.2xlarge instances. Each hourly-sliding window in the Flink job generates about 21million UserRecords, which means that we generated a test load of 21million UserRecords at the first minute of each hour.
          • Criteria: Test KPL throughput per minute. Since the default RecordTTL for KPL is 30 sec, we can be sure that either all UserRecords are sent by KPL within a minute, or we will see UserRecord expiration errors.
          • One-New-Thread-Per-Request model: max throughput is about 2million UserRecords per min; it doesn't go beyond that because CPU utilization goes to 100%, everything stopped working and that Flink job crashed.
          • Thread-Pool model with pool size of 10: it sends out 21million UserRecords within 30 sec without any UserRecord expiration errors. The average peak CPU utilization is about 20% - 30%. So 21million UserRecords/min is not the max throughput of thread-pool model. We didn't go any further because 1) this throughput is already a couple times more than what we really need, and 2) we don't have a quick way of increasing the test load

          Thus, I propose switching FlinkKinesisProducer to Thread-Pool mode.

            1. Brief change log
          • switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode rather than Per_Request mode
          • update docs
            1. Verifying this change

          This change added tests and can be verified as follows:

          • added unit tests in flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
            1. Does this pull request potentially affect one of the following parts:
            1. Documentation
          • Does this pull request introduce a new feature? (yes)
          • If yes, how is the feature documented? (docs)

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/bowenli86/flink FLINK-7508

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4656.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4656


          commit 6386983239bd3024b395c865ec4fd33e232ca5a3
          Author: Bowen Li <bowenli86@gmail.com>
          Date: 2017-08-30T16:35:03Z

          FLINK-7422 Upgrade Kinesis Client Library (KCL) and AWS SDK in flink-connector-kinesis

          commit 381cd4156b84673a1d32d2db3f7b2d748d90d980
          Author: Bowen Li <bowenli86@gmail.com>
          Date: 2017-09-07T06:33:37Z

          Merge remote-tracking branch 'upstream/master'

          commit 893ec61bebfa20a038819bf1929791e57b98f33b
          Author: Bowen Li <bowenli86@gmail.com>
          Date: 2017-09-07T20:34:09Z

          FLINK-7508 switch FlinkKinesisProducer to use KPL's ThreadingMode to threaded-pool mode rather than one_thread_per_request mode


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/4656 FLINK-7508 [kinesis] switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode rather than Per_Request mode What is the purpose of the change KinesisProducerLibrary (KPL) 0.10.x had been using a One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which is very expensive. 0.12.4 introduced a new ThreadingMode - Pooled, which will use a thread pool. This hugely improves KPL's performance and reduces consumed resources. By default, KPL still uses per-request mode. We should explicitly switch FlinkKinesisProducer's KPL threading mode to 'Pooled'. This work depends on FLINK-7366 and FLINK-7508 Benchmarking I did: Environment: Running a Flink hourly-sliding windowing job on 18-node EMR cluster with R4.2xlarge instances. Each hourly-sliding window in the Flink job generates about 21million UserRecords, which means that we generated a test load of 21million UserRecords at the first minute of each hour. Criteria: Test KPL throughput per minute. Since the default RecordTTL for KPL is 30 sec, we can be sure that either all UserRecords are sent by KPL within a minute, or we will see UserRecord expiration errors. One-New-Thread-Per-Request model: max throughput is about 2million UserRecords per min; it doesn't go beyond that because CPU utilization goes to 100%, everything stopped working and that Flink job crashed. Thread-Pool model with pool size of 10: it sends out 21million UserRecords within 30 sec without any UserRecord expiration errors. The average peak CPU utilization is about 20% - 30%. So 21million UserRecords/min is not the max throughput of thread-pool model. We didn't go any further because 1) this throughput is already a couple times more than what we really need, and 2) we don't have a quick way of increasing the test load Thus, I propose switching FlinkKinesisProducer to Thread-Pool mode. Brief change log switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode rather than Per_Request mode update docs Verifying this change This change added tests and can be verified as follows: added unit tests in flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java Does this pull request potentially affect one of the following parts: Documentation Does this pull request introduce a new feature? (yes) If yes, how is the feature documented? (docs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-7508 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4656.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4656 commit 6386983239bd3024b395c865ec4fd33e232ca5a3 Author: Bowen Li <bowenli86@gmail.com> Date: 2017-08-30T16:35:03Z FLINK-7422 Upgrade Kinesis Client Library (KCL) and AWS SDK in flink-connector-kinesis commit 381cd4156b84673a1d32d2db3f7b2d748d90d980 Author: Bowen Li <bowenli86@gmail.com> Date: 2017-09-07T06:33:37Z Merge remote-tracking branch 'upstream/master' commit 893ec61bebfa20a038819bf1929791e57b98f33b Author: Bowen Li <bowenli86@gmail.com> Date: 2017-09-07T20:34:09Z FLINK-7508 switch FlinkKinesisProducer to use KPL's ThreadingMode to threaded-pool mode rather than one_thread_per_request mode
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bowenli86 commented on the issue:

          https://github.com/apache/flink/pull/4656

          @tzulitai Hi Gordon, can you please take a look at this PR?

          Show
          githubbot ASF GitHub Bot added a comment - Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4656 @tzulitai Hi Gordon, can you please take a look at this PR?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4656#discussion_r138836822

          — Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java —
          @@ -36,7 +37,7 @@

          • Tests for KinesisConfigUtil.
            */
            @RunWith(PowerMockRunner.class)
            -@PrepareForTest( {FlinkKinesisConsumer.class, KinesisConfigUtil.class}

            )
            +@PrepareForTest(

            {KinesisConfigUtil.class}

            )

              • End diff –

          Can remove the now unnecessary `{}`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4656#discussion_r138836822 — Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java — @@ -36,7 +37,7 @@ Tests for KinesisConfigUtil. */ @RunWith(PowerMockRunner.class) -@PrepareForTest( {FlinkKinesisConsumer.class, KinesisConfigUtil.class} ) +@PrepareForTest( {KinesisConfigUtil.class} ) End diff – Can remove the now unnecessary `{}`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4656#discussion_r138837100

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java —
          @@ -171,8 +171,9 @@ public void open(Configuration parameters) throws Exception {
          super.open(parameters);

          // check and pass the configuration properties

          • KinesisProducerConfiguration producerConfig = KinesisConfigUtil.validateProducerConfiguration(configProps);
            + KinesisProducerConfiguration producerConfig = KinesisConfigUtil.getValidatedProducerConfiguration(configProps);
            producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
            + producerConfig.setThreadingModel(KinesisProducerConfiguration.ThreadingModel.POOLED);
              • End diff –

          Do you think it will make sense to allow the user to configure different threading models?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4656#discussion_r138837100 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java — @@ -171,8 +171,9 @@ public void open(Configuration parameters) throws Exception { super.open(parameters); // check and pass the configuration properties KinesisProducerConfiguration producerConfig = KinesisConfigUtil.validateProducerConfiguration(configProps); + KinesisProducerConfiguration producerConfig = KinesisConfigUtil.getValidatedProducerConfiguration(configProps); producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps)); + producerConfig.setThreadingModel(KinesisProducerConfiguration.ThreadingModel.POOLED); End diff – Do you think it will make sense to allow the user to configure different threading models?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bowenli86 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4656#discussion_r138965647

          — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java —
          @@ -171,8 +171,9 @@ public void open(Configuration parameters) throws Exception {
          super.open(parameters);

          // check and pass the configuration properties

          • KinesisProducerConfiguration producerConfig = KinesisConfigUtil.validateProducerConfiguration(configProps);
            + KinesisProducerConfiguration producerConfig = KinesisConfigUtil.getValidatedProducerConfiguration(configProps);
            producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
            + producerConfig.setThreadingModel(KinesisProducerConfiguration.ThreadingModel.POOLED);
              • End diff –

          On a second thought for this. Though I believe POOLED model is the best option for most of the use cases I can think of, we should give users the flexibility to make decisions.

          Adding PER_REQUEST model

          Show
          githubbot ASF GitHub Bot added a comment - Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4656#discussion_r138965647 — Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java — @@ -171,8 +171,9 @@ public void open(Configuration parameters) throws Exception { super.open(parameters); // check and pass the configuration properties KinesisProducerConfiguration producerConfig = KinesisConfigUtil.validateProducerConfiguration(configProps); + KinesisProducerConfiguration producerConfig = KinesisConfigUtil.getValidatedProducerConfiguration(configProps); producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps)); + producerConfig.setThreadingModel(KinesisProducerConfiguration.ThreadingModel.POOLED); End diff – On a second thought for this. Though I believe POOLED model is the best option for most of the use cases I can think of, we should give users the flexibility to make decisions. Adding PER_REQUEST model
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bowenli86 commented on the issue:

          https://github.com/apache/flink/pull/4656

          @tzulitai Thanks, Gordon! I watched your presentation on "managing state of Flink", and you did a great job explaining all details.

          I added an option for PER_REQUEST model since it doesn't hurt anything.

          This PR is for 1.4. There's [another PR here](https://github.com/apache/flink/pull/4657) I think need to be merged to both 1.4 and 1.3.x

          Show
          githubbot ASF GitHub Bot added a comment - Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4656 @tzulitai Thanks, Gordon! I watched your presentation on "managing state of Flink", and you did a great job explaining all details. I added an option for PER_REQUEST model since it doesn't hurt anything. This PR is for 1.4. There's [another PR here] ( https://github.com/apache/flink/pull/4657 ) I think need to be merged to both 1.4 and 1.3.x
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4656#discussion_r139931718

          — Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java —
          @@ -50,7 +51,66 @@ public void testUnparsableLongForProducerConfiguration()

          { testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); testConfig.setProperty("RateLimit", "unparsableLong"); - KinesisConfigUtil.validateProducerConfiguration(testConfig); + KinesisConfigUtil.getValidatedProducerConfiguration(testConfig); + }

          +
          + @Test
          + public void testDefaultRateLimitInProducerConfiguration()

          { + Properties testConfig = new Properties(); + testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + + KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig); + + assertEquals(100, kpc.getRateLimit()); + }

          +
          + @Test
          + public void testCustomizedRateLimitInProducerConfiguration()

          { + Properties testConfig = new Properties(); + testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(KinesisConfigUtil.RATE_LIMIT, "150"); + + KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig); + + assertEquals(150, kpc.getRateLimit()); + }

          +
          + @Test
          + public void testDefaultThreadingModelInProducerConfiguration()

          { + Properties testConfig = new Properties(); + testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig); + + assertEquals(KinesisProducerConfiguration.ThreadingModel.POOLED, kpc.getThreadingModel()); + }

          +
          + @Test
          + public void testCustomizedThreadingModelSizeInProducerConfiguration() {
          — End diff –

          nit: I think the "Size" in the name test is redundant here. I'll remove it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4656#discussion_r139931718 — Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java — @@ -50,7 +51,66 @@ public void testUnparsableLongForProducerConfiguration() { testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); testConfig.setProperty("RateLimit", "unparsableLong"); - KinesisConfigUtil.validateProducerConfiguration(testConfig); + KinesisConfigUtil.getValidatedProducerConfiguration(testConfig); + } + + @Test + public void testDefaultRateLimitInProducerConfiguration() { + Properties testConfig = new Properties(); + testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + + KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig); + + assertEquals(100, kpc.getRateLimit()); + } + + @Test + public void testCustomizedRateLimitInProducerConfiguration() { + Properties testConfig = new Properties(); + testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(KinesisConfigUtil.RATE_LIMIT, "150"); + + KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig); + + assertEquals(150, kpc.getRateLimit()); + } + + @Test + public void testDefaultThreadingModelInProducerConfiguration() { + Properties testConfig = new Properties(); + testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig); + + assertEquals(KinesisProducerConfiguration.ThreadingModel.POOLED, kpc.getThreadingModel()); + } + + @Test + public void testCustomizedThreadingModelSizeInProducerConfiguration() { — End diff – nit: I think the "Size" in the name test is redundant here. I'll remove it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4656

          Thanks

          LGTM! Merging this ..

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4656 Thanks LGTM! Merging this ..
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user bowenli86 commented on the issue:

          https://github.com/apache/flink/pull/4656

          @tzulitai Thank you!

          Show
          githubbot ASF GitHub Bot added a comment - Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4656 @tzulitai Thank you!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/4656

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4656
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Thanks for the contribution Bowen Li.

          Resolved for master via 637dde889fe2d21ff6990749a750356d20fcd965.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Thanks for the contribution Bowen Li . Resolved for master via 637dde889fe2d21ff6990749a750356d20fcd965.

            People

            • Assignee:
              phoenixjiangnan Bowen Li
              Reporter:
              phoenixjiangnan Bowen Li
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development