Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
1.16.0, 1.15.3, aws-connector-3.0.0, aws-connector-4.0.0
-
None
Description
AWS Sinks based on Async Sink can enter a deadlock situation if the AWS async client throws error outside of the future. We observed this with a local application:
java.lang.NullPointerException at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.closedChannelMessage(NettyUtils.java:135) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.decorateException(NettyUtils.java:71) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.handleFailure(NettyRequestExecutor.java:310) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.makeRequestListener(NettyRequestExecutor.java:189) at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552) at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491) at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616) at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609) at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.CancellableAcquireChannelPool.lambda$acquire$1(CancellableAcquireChannelPool.java:58) at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552) at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491) at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616) at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609) at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.HealthCheckedChannelPool.ensureAcquiredChannelIsHealthy(HealthCheckedChannelPool.java:114) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.HealthCheckedChannelPool.lambda$tryAcquire$1(HealthCheckedChannelPool.java:97) at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571) at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550) at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.access$200(DefaultPromise.java:35) at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise$1.run(DefaultPromise.java:502) at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) at org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) at org.apache.flink.kinesis.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.base/java.lang.Thread.run(Thread.java:829)
Related AWS SDK issues that can cause this:
- https://github.com/aws/aws-sdk-java-v2/issues/3435
- https://github.com/aws/aws-sdk-java-v2/issues/1812
If an error is thrown and not handled by the future then the AsyncSink will never decrement inFlightRequestCount. the job will hang while trying flush for checkpoint
Attachments
Attachments
Issue Links
- is fixed by
-
FLINK-30633 Update AWS SDKv2 to v2.19.14
- Resolved