Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-34071

Deadlock in AWS Kinesis Data Streams AsyncSink connector

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • aws-connector-3.0.0, 1.15.4, aws-connector-4.2.0
    • None
    • None

    Description

      Sink operator hangs while flushing records, similarly to FLINK-32230. Error observed even when using AWS SDK version that contains fix for async client error handling https://github.com/aws/aws-sdk-java-v2/pull/4402

      Thread dump of stuck thread:

      "sdk-async-response-1-6236" Id=11213 RUNNABLE
          at app//org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.lambda$flush$5(AsyncSinkWriter.java:385)
          at app//org.apache.flink.connector.base.sink.writer.AsyncSinkWriter$$Lambda$1778/0x0000000801141040.accept(Unknown Source)
          at org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkWriter.handleFullyFailedRequest(KinesisStreamsSinkWriter.java:210)
          at org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkWriter.lambda$submitRequestEntries$1(KinesisStreamsSinkWriter.java:184)
          at org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkWriter$$Lambda$1965/0x00000008011a0c40.accept(Unknown Source)
          at java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
          at java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
          at java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
          at java.base@11.0.18/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79)
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.CompletableFutureUtils$$Lambda$1925/0x0000000801181840.accept(Unknown Source)
          at java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
          at java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
          at java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
          at java.base@11.0.18/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:56)
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage$$Lambda$1961/0x0000000801191c40.accept(Unknown Source)
          at java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
          at java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
          at java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
          at java.base@11.0.18/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute$2(AsyncApiCallTimeoutTrackingStage.java:67)
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage$$Lambda$1960/0x0000000801191840.accept(Unknown Source)
          at java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
          at java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
          at java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
          at java.base@11.0.18/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79)
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.CompletableFutureUtils$$Lambda$1925/0x0000000801181840.accept(Unknown Source)
          at java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
          at java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
          at java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
          at java.base@11.0.18/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeAttemptExecute(AsyncRetryableStage.java:103)
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:184)
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:170)
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor$$Lambda$1956/0x0000000801192840.accept(Unknown Source)
          at java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
          at java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
          at java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
          at java.base@11.0.18/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null$0(MakeAsyncHttpRequestStage.java:105)
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$$Lambda$1954/0x0000000801193040.accept(Unknown Source)
          at java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
          at java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
          at java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
          at java.base@11.0.18/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.completeResponseFuture(MakeAsyncHttpRequestStage.java:238)
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:163)
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$$Lambda$1952/0x0000000801193840.apply(Unknown Source)
          ... 

      Alongside this issue following exception can be observed

      java.io.IOException: An error occurred on the connection: java.nio.channels.ClosedChannelException, [channel: 159aa20c]. All streams will be closed
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.decorateConnectionException(MultiplexedChannelRecord.java:213)
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.lambda$closeChildChannels$10(MultiplexedChannelRecord.java:205)
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.lambda$closeAndExecuteOnChildChannels$11(MultiplexedChannelRecord.java:229)
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.doInEventLoop(NettyUtils.java:248)
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.closeAndExecuteOnChildChannels(MultiplexedChannelRecord.java:220)
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.closeChildChannels(MultiplexedChannelRecord.java:205)
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool.closeAndReleaseParent(Http2MultiplexedChannelPool.java:353)
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool.closeAndReleaseParent(Http2MultiplexedChannelPool.java:333)
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool.access$200(Http2MultiplexedChannelPool.java:76)
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool$ReleaseOnExceptionHandler.closeAndReleaseParent(Http2MultiplexedChannelPool.java:509)
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool$ReleaseOnExceptionHandler.channelInactive(Http2MultiplexedChannelPool.java:486)
          at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
          at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
          at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
          at org.apache.flink.kinesis.shaded.io.netty.handler.logging.LoggingHandler.channelInactive(LoggingHandler.java:206)
          at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
          at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
          at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
          at org.apache.flink.kinesis.shaded.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
          at org.apache.flink.kinesis.shaded.io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
          at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
          at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
          at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
          at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2PingHandler.channelInactive(Http2PingHandler.java:77)
          at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
          at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
          at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
          at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:411)
          at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:376)
          at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.channelInactive(Http2ConnectionHandler.java:430)
          at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
          at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
          at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
          at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:411)
          at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:376)
          at org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.channelInactive(SslHandler.java:1085)
          at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
          at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
          at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
          at org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
          at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301)
          at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
          at org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
          at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
          at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
          at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
          at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
          at org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)
          at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
          at org.apache.flink.kinesis.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
          at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: java.nio.channels.ClosedChannelException
          ... 41 more
      

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              a.pilipenko Aleksandr Pilipenko
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated: