Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22085

KafkaSourceLegacyITCase hangs/fails on azure

    XMLWordPrintableJSON

Details

    Description

      1) Observations

      a) The Azure pipeline would occasionally hang without printing any test error information.

      https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15939&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5&l=8219

      b) By running the test KafkaSourceLegacyITCase::testBrokerFailure() with INFO level logging, the the test would hang with the following error message printed repeatedly:

      20451 [New I/O boss #50] ERROR org.apache.flink.networking.NetworkFailureHandler [] - Closing communication channel because of an exception
      java.net.ConnectException: Connection refused: localhost/127.0.0.1:50073
              at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:1.8.0_151]
              at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[?:1.8.0_151]
              at org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152) ~[flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
              at org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105) [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
              at org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79) [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
              at org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
              at org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42) [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
              at org.apache.flink.shaded.testutils.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
              at org.apache.flink.shaded.testutils.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_151]
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_151]
              at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
      

      2) Root cause explanations

      The test would hang because it enters the following loop:

      • closeOnFlush() is called for a given channel
      • closeOnFlush() calls channel.write(..)
      • channel.write() triggers the exceptionCaught(...) callback
      • closeOnFlush() is called for the same channel again.

      3) Solution

      Update closeOnFlush() so that, if a channel is being closed by this method, then closeOnFlush() would not try to write to this channel if it is called on this channel again.

      Attachments

        Issue Links

          Activity

            People

              gaoyunhaii Yun Gao
              dwysakowicz Dawid Wysakowicz
              Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: