Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6365

Adapt default values of the Kinesis connector

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 1.2.1, 1.3.1, 1.4.0
    • Fix Version/s: 1.4.0, 1.3.2
    • Component/s: Kinesis Connector
    • Labels:
      None
    • Release Note:
      Hide
      Some default values for configurations for AWS API call behaviors in the Flink Kinesis Consumer was adapted for better default consumption performance: 1) SHARD_GETRECORDS_MAX default changed to 10,000, and 2) SHARD_GETRECORDS_INTERVAL_MILLIS default changed to 200ms.
      Show
      Some default values for configurations for AWS API call behaviors in the Flink Kinesis Consumer was adapted for better default consumption performance: 1) SHARD_GETRECORDS_MAX default changed to 10,000, and 2) SHARD_GETRECORDS_INTERVAL_MILLIS default changed to 200ms.

      Description

      As discussed in http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-connector-SHARD-GETRECORDS-MAX-default-value-td12332.html, it seems reasonable to change the default values of the Kinesis connector to follow KCL’s default settings. I suggest to adapt at least the values for SHARD_GETRECORDS_MAX and SHARD_GETRECORDS_INTERVAL_MILLIS.

      As a Kinesis shard is currently limited to 5 get operations per second, you can observe high ReadProvisionedThroughputExceeded rates with the current default value for SHARD_GETRECORDS_INTERVAL_MILLIS of 0; it seem reasonable to increase it to 200. As it's described in the email thread, it seems furthermore desirable to increase the default value for SHARD_GETRECORDS_MAX to 10000.

      The values that are used by the KCL can be found here: https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java

      Thanks for looking into this!

      Steffen

        Activity

        Hide
        tzulitai Tzu-Li (Gordon) Tai added a comment -

        Thanks for the discussions and contribution Steffen Hausmann and Bowen Li!
        Fixed for master via 35564f25c844b827ce325453b5d518416e1bd5a8.

        Show
        tzulitai Tzu-Li (Gordon) Tai added a comment - Thanks for the discussions and contribution Steffen Hausmann and Bowen Li ! Fixed for master via 35564f25c844b827ce325453b5d518416e1bd5a8.
        Hide
        phoenixjiangnan Bowen Li added a comment -

        Cool! Glad we reached a consensus, and I can proceed with code change.

        Tzu-Li (Gordon) Tai I may not fully understand how Flink source works yet. So when (1) > (2), some Flink source instances don't read from any shards at all, which making it the same as the case where (1) == (2). Is that right?

        Show
        phoenixjiangnan Bowen Li added a comment - Cool! Glad we reached a consensus, and I can proceed with code change. Tzu-Li (Gordon) Tai I may not fully understand how Flink source works yet. So when (1) > (2), some Flink source instances don't read from any shards at all, which making it the same as the case where (1) == (2). Is that right?
        Hide
        tzulitai Tzu-Li (Gordon) Tai added a comment -

        Alright, I think Steffen Hausmann definitely makes sense here. I'll also agree to increasing SHARD_GETRECORDS_INTERVAL to 200ms.
        One correction to your description Bowen Li: each shard will only ever be picked up by a single parallel source instance across instances of a single FlinkKinesisConsumer (unless you're referring to the case where you have 2 jobs both consuming the same Kinesis stream).

        Regarding the fact that SHARD_GETRECORDS_INTERVAL is currently not a strict interval: there's actually a JIRA and PR for that - FLINK-4574. It's a more critical change that needs more time for careful reviewing, so I haven't really got back to reviewing that yet. Would be very helpful if you're interested in digging into that and reviewing it also!

        Show
        tzulitai Tzu-Li (Gordon) Tai added a comment - Alright, I think Steffen Hausmann definitely makes sense here. I'll also agree to increasing SHARD_GETRECORDS_INTERVAL to 200ms. One correction to your description Bowen Li : each shard will only ever be picked up by a single parallel source instance across instances of a single FlinkKinesisConsumer (unless you're referring to the case where you have 2 jobs both consuming the same Kinesis stream). Regarding the fact that SHARD_GETRECORDS_INTERVAL is currently not a strict interval: there's actually a JIRA and PR for that - FLINK-4574 . It's a more critical change that needs more time for careful reviewing, so I haven't really got back to reviewing that yet. Would be very helpful if you're interested in digging into that and reviewing it also!
        Hide
        phoenixjiangnan Bowen Li added a comment -

        Ok. For SHARD_GETRECORDS_MAX, 10,000 it is, since we all agree to the value. We tested it in our prod environment, and it works well by greatly reducing # requests to Kinesis.

        For SHARD_GETRECORDS_INTERVAL, I second Steffen Hausmann's proposal. Practically, I set that value of our prod Flink job to be 2,000ms (yes, 2sec), because 0ms exploded our 36-shards kinesis stream and setting SHARD_GETRECORDS_MAX as 10,000 makes up for the longer interval. I'm also evaluating it theoretically for its relationship to # parallelism of Flink datasource stream (1) and # shards in kinesis stream (2).

        • When (1) = (2), 1 parallel Flink source operation reads from exactly 1 kinesis shard. So 200ms is much better than 0ms, because 200ms makes Flink source read at max speed without exceeding read capacity.
        • When (1) > (2), some (or all) kinesis shards are read by more than 1 parallel Flink source. 200ms is still better than 0ms, because a) 200ms guarantees a shard receives at least 5requests/sec if that shard is read by 1 Flink source, and b) 200ms can greatly lower # requests if that shard is read by more than 1 Flink source, and lower Flink's read latency
        • When (1) < (2), some (or all) Flink sources read from more than 1 kinesis shard. 200ms probably cannot unleash some shards' potential, and a shorter time seems more reasonable. However, 0ms is still too intense.

        In short, 200ms at least makes Flink work, and 0ms is not. Besides, given that Steffen works for AWS, I put more weight on his opinion.

        Show
        phoenixjiangnan Bowen Li added a comment - Ok. For SHARD_GETRECORDS_MAX , 10,000 it is, since we all agree to the value. We tested it in our prod environment, and it works well by greatly reducing # requests to Kinesis. For SHARD_GETRECORDS_INTERVAL , I second Steffen Hausmann 's proposal. Practically, I set that value of our prod Flink job to be 2,000ms (yes, 2sec), because 0ms exploded our 36-shards kinesis stream and setting SHARD_GETRECORDS_MAX as 10,000 makes up for the longer interval. I'm also evaluating it theoretically for its relationship to # parallelism of Flink datasource stream (1) and # shards in kinesis stream (2). When (1) = (2), 1 parallel Flink source operation reads from exactly 1 kinesis shard. So 200ms is much better than 0ms, because 200ms makes Flink source read at max speed without exceeding read capacity. When (1) > (2), some (or all) kinesis shards are read by more than 1 parallel Flink source. 200ms is still better than 0ms, because a) 200ms guarantees a shard receives at least 5requests/sec if that shard is read by 1 Flink source, and b) 200ms can greatly lower # requests if that shard is read by more than 1 Flink source, and lower Flink's read latency When (1) < (2), some (or all) Flink sources read from more than 1 kinesis shard. 200ms probably cannot unleash some shards' potential, and a shorter time seems more reasonable. However, 0ms is still too intense. In short, 200ms at least makes Flink work, and 0ms is not. Besides, given that Steffen works for AWS, I put more weight on his opinion.
        Hide
        sthm Steffen Hausmann added a comment -

        I agree to adapting SHARD_GETRECORDS_MAX, but I would still argue that it would decrease the latency if the connector only polls every 200 ms.

        Kinesis supports 5 getrecords requests per second, and to maintain low latency it seems desirable to make this request every 200 ms. Correct me if I'm wrong, but what I believe happens right now with a default value for SHARD_GETRECORDS_MAX of 0 is as follows. The first five getrecords requests will be successful. However, as the connector will make these requests as quickly as possible, chances are that they fall in the beginning of the 1 second interval. So for subsequent request in the 1 second interval, exponential backoff will wait for some time and retry the request and chances are small that this request will be made exactly 1 seconds after the first request. So either the request gets throttled again or if it's successful but not made as quickly as it could have been.

        To introduce as little latency as possible, the connector would wait exactly 200 ms between two getrecords calls. Looking at the code, it seems like the current implementation will read the records from the stream and then wait SHARD_GETRECORDS_INTERVAL. So setting SHARD_GETRECORDS_INTERVAL to 200 will cause some additional latency beyond the 200 ms (namely, the time it takes to read the records). But even with this implementation, that can be further optimized, I would argue that it's desirable to increase SHARD_GETRECORDS_INTERVAL to 200.

        Show
        sthm Steffen Hausmann added a comment - I agree to adapting SHARD_GETRECORDS_MAX, but I would still argue that it would decrease the latency if the connector only polls every 200 ms. Kinesis supports 5 getrecords requests per second, and to maintain low latency it seems desirable to make this request every 200 ms. Correct me if I'm wrong, but what I believe happens right now with a default value for SHARD_GETRECORDS_MAX of 0 is as follows. The first five getrecords requests will be successful. However, as the connector will make these requests as quickly as possible, chances are that they fall in the beginning of the 1 second interval. So for subsequent request in the 1 second interval, exponential backoff will wait for some time and retry the request and chances are small that this request will be made exactly 1 seconds after the first request. So either the request gets throttled again or if it's successful but not made as quickly as it could have been. To introduce as little latency as possible, the connector would wait exactly 200 ms between two getrecords calls. Looking at the code, it seems like the current implementation will read the records from the stream and then wait SHARD_GETRECORDS_INTERVAL. So setting SHARD_GETRECORDS_INTERVAL to 200 will cause some additional latency beyond the 200 ms (namely, the time it takes to read the records). But even with this implementation, that can be further optimized, I would argue that it's desirable to increase SHARD_GETRECORDS_INTERVAL to 200.
        Hide
        tzulitai Tzu-Li (Gordon) Tai added a comment -

        Bowen Li Stephan Ewen

        As discussed in the original ML, I am +1 to increasing SHARD_GETRECORDS_MAX to the default that AWS SDKs are using (10000). The current 500 is just too low for the usual cases and out-of-the-box doesn't provide good throughput.

        However, I would still suggest not touching SHARD_GETRECORDS_INTERVAL. One main reason is that this directly affects user-perceived behaviour of the consumer. I also am not too fond of the idea that by default we have this sleep interval between fetches. I'm not strongly against this, though, and am open to change it.

        Show
        tzulitai Tzu-Li (Gordon) Tai added a comment - Bowen Li Stephan Ewen As discussed in the original ML, I am +1 to increasing SHARD_GETRECORDS_MAX to the default that AWS SDKs are using (10000). The current 500 is just too low for the usual cases and out-of-the-box doesn't provide good throughput. However, I would still suggest not touching SHARD_GETRECORDS_INTERVAL . One main reason is that this directly affects user-perceived behaviour of the consumer. I also am not too fond of the idea that by default we have this sleep interval between fetches. I'm not strongly against this, though, and am open to change it.
        Hide
        StephanEwen Stephan Ewen added a comment -

        Sounds reasonable to me.

        Tzu-Li (Gordon) Tai What do you think about this?

        Show
        StephanEwen Stephan Ewen added a comment - Sounds reasonable to me. Tzu-Li (Gordon) Tai What do you think about this?
        Hide
        phoenixjiangnan Bowen Li added a comment -

        We ran into this issue, too. 100 is too small for SHARD_GETRECORDS_MAX, and we got exceptions like:

        2017-07-14 19:53:02,995 INFO  org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient  - Unable to execute HTTP request: Broken pipe (Write failed)
        java.net.SocketException: Broken pipe (Write failed)
        	at java.net.SocketOutputStream.socketWrite0(Native Method)
        	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
        	at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
        	at sun.security.ssl.OutputRecord.writeBuffer(OutputRecord.java:431)
        	at sun.security.ssl.OutputRecord.write(OutputRecord.java:417)
        	at sun.security.ssl.SSLSocketImpl.writeRecordInternal(SSLSocketImpl.java:886)
        	at sun.security.ssl.SSLSocketImpl.writeRecord(SSLSocketImpl.java:857)
        	at sun.security.ssl.SSLSocketImpl.writeRecord(SSLSocketImpl.java:727)
        	at sun.security.ssl.Handshaker.sendChangeCipherSpec(Handshaker.java:1124)
        	at sun.security.ssl.ClientHandshaker.sendChangeCipherAndFinish(ClientHandshaker.java:1216)
        	at sun.security.ssl.ClientHandshaker.serverHelloDone(ClientHandshaker.java:1128)
        	at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:348)
        	at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026)
        	at sun.security.ssl.Handshaker.process_record(Handshaker.java:961)
        	at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1072)
        	at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385)
        	at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413)
        	at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397)
        	at org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:553)
        	at org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:412)
        	at org.apache.flink.kinesis.shaded.com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:134)
        	at org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:179)
        	at org.apache.http.impl.conn.ManagedClientConnectionImpl.open(ManagedClientConnectionImpl.java:328)
        	at org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:612)
        	at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:447)
        	at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:884)
        	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
        	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55)
        	at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:837)
        	at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
        	at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
        	at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
        	at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
        	at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1940)
        	at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1910)
        	at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:936)
        	at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:199)
        	at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:292)
        	at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:200)
        	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        	at java.lang.Thread.run(Thread.java:748)
        

        I'll take this ticket, and propose a default value (5000 or 10000) based on my use case with Flink and Kinesis.

        Show
        phoenixjiangnan Bowen Li added a comment - We ran into this issue, too. 100 is too small for SHARD_GETRECORDS_MAX, and we got exceptions like: 2017-07-14 19:53:02,995 INFO org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient - Unable to execute HTTP request: Broken pipe (Write failed) java.net.SocketException: Broken pipe (Write failed) at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111) at java.net.SocketOutputStream.write(SocketOutputStream.java:155) at sun.security.ssl.OutputRecord.writeBuffer(OutputRecord.java:431) at sun.security.ssl.OutputRecord.write(OutputRecord.java:417) at sun.security.ssl.SSLSocketImpl.writeRecordInternal(SSLSocketImpl.java:886) at sun.security.ssl.SSLSocketImpl.writeRecord(SSLSocketImpl.java:857) at sun.security.ssl.SSLSocketImpl.writeRecord(SSLSocketImpl.java:727) at sun.security.ssl.Handshaker.sendChangeCipherSpec(Handshaker.java:1124) at sun.security.ssl.ClientHandshaker.sendChangeCipherAndFinish(ClientHandshaker.java:1216) at sun.security.ssl.ClientHandshaker.serverHelloDone(ClientHandshaker.java:1128) at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:348) at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026) at sun.security.ssl.Handshaker.process_record(Handshaker.java:961) at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1072) at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385) at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413) at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397) at org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:553) at org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:412) at org.apache.flink.kinesis.shaded.com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:134) at org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:179) at org.apache.http.impl.conn.ManagedClientConnectionImpl.open(ManagedClientConnectionImpl.java:328) at org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:612) at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:447) at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:884) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:837) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287) at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1940) at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1910) at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:936) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:199) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:292) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:200) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang. Thread .run( Thread .java:748) I'll take this ticket, and propose a default value (5000 or 10000) based on my use case with Flink and Kinesis.

          People

          • Assignee:
            phoenixjiangnan Bowen Li
            Reporter:
            sthm Steffen Hausmann
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development