Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-30304

Possible Deadlock in Kinesis/Firehose/DynamoDB Connector

    XMLWordPrintableJSON

Details

    Description

      AWS Sinks based on Async Sink can enter a deadlock situation if the AWS async client throws error outside of the future. We observed this with a local application:

      java.lang.NullPointerException
      at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.closedChannelMessage(NettyUtils.java:135)
      at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.decorateException(NettyUtils.java:71)
      at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.handleFailure(NettyRequestExecutor.java:310)
      at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.makeRequestListener(NettyRequestExecutor.java:189)
      at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
      at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
      at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
      at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
      at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
      at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
      at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.CancellableAcquireChannelPool.lambda$acquire$1(CancellableAcquireChannelPool.java:58)
      at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
      at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
      at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
      at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
      at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
      at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
      at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.HealthCheckedChannelPool.ensureAcquiredChannelIsHealthy(HealthCheckedChannelPool.java:114)
      at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.HealthCheckedChannelPool.lambda$tryAcquire$1(HealthCheckedChannelPool.java:97)
      at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
      at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
      at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
      at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.access$200(DefaultPromise.java:35)
      at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise$1.run(DefaultPromise.java:502)
      at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
      at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
      at org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
      at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
      at org.apache.flink.kinesis.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
      at java.base/java.lang.Thread.run(Thread.java:829)

      Related AWS SDK issues that can cause this:

      If an error is thrown and not handled by the future then the AsyncSink will never decrement inFlightRequestCount. the job will hang while trying flush for checkpoint

       

      Attachments

        1. sink-deadlock.png
          224 kB
          Danny Cranmer

        Issue Links

          Activity

            People

              dannycranmer Danny Cranmer
              dannycranmer Danny Cranmer
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: