Details
-
Sub-task
-
Status: Closed
-
Critical
-
Resolution: Duplicate
-
1.13.5, 1.14.2, 1.15.0
Description
The test FlinkKafkaInternalProducerITCase.testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator fails on AZP with
2021-12-27T02:56:56.3618475Z java.util.concurrent.TimeoutException: The topic metadata failed to propagate to Kafka broker. 2021-12-27T02:56:56.3619339Z at org.apache.flink.core.testutils.CommonTestUtils.waitUtil(CommonTestUtils.java:214) 2021-12-27T02:56:56.3619958Z at org.apache.flink.core.testutils.CommonTestUtils.waitUtil(CommonTestUtils.java:230) 2021-12-27T02:56:56.3620643Z at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:216) 2021-12-27T02:56:56.3621504Z at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:98) 2021-12-27T02:56:56.3625442Z at org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:216) 2021-12-27T02:56:56.3626958Z at org.apache.flink.streaming.connectors.kafka.FlinkKafkaInternalProducerITCase.testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator(FlinkKafkaInternalProducerITCase.java:206) 2021-12-27T02:56:56.3627771Z at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 2021-12-27T02:56:56.3628571Z at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 2021-12-27T02:56:56.3629167Z at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 2021-12-27T02:56:56.3629689Z at java.lang.reflect.Method.invoke(Method.java:498) 2021-12-27T02:56:56.3630183Z at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) 2021-12-27T02:56:56.3630764Z at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 2021-12-27T02:56:56.3631484Z at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) 2021-12-27T02:56:56.3632056Z at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 2021-12-27T02:56:56.3632655Z at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) 2021-12-27T02:56:56.3633576Z at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) 2021-12-27T02:56:56.3634123Z at java.util.concurrent.FutureTask.run(FutureTask.java:266) 2021-12-27T02:56:56.3634579Z at java.lang.Thread.run(Thread.java:748) 2021-12-27T02:56:56.3785761Z java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic 'flink-kafka-producer-txn-coordinator-changed' already exists. 2021-12-27T02:56:56.3787263Z at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) 2021-12-27T02:56:56.3787944Z at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) 2021-12-27T02:56:56.3788739Z at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) 2021-12-27T02:56:56.3789355Z at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) 2021-12-27T02:56:56.3790029Z at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:214) 2021-12-27T02:56:56.3791029Z at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:98) 2021-12-27T02:56:56.3791831Z at org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:216) 2021-12-27T02:56:56.3792748Z at org.apache.flink.streaming.connectors.kafka.FlinkKafkaInternalProducerITCase.testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator(FlinkKafkaInternalProducerITCase.java:206) 2021-12-27T02:56:56.3793447Z at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 2021-12-27T02:56:56.3793931Z at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 2021-12-27T02:56:56.3794501Z at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 2021-12-27T02:56:56.3795020Z at java.lang.reflect.Method.invoke(Method.java:498) 2021-12-27T02:56:56.3795538Z at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) 2021-12-27T02:56:56.3796121Z at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 2021-12-27T02:56:56.3796692Z at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) 2021-12-27T02:56:56.3797256Z at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 2021-12-27T02:56:56.3797835Z at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) 2021-12-27T02:56:56.3798546Z at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) 2021-12-27T02:56:56.3799087Z at java.util.concurrent.FutureTask.run(FutureTask.java:266) 2021-12-27T02:56:56.3799519Z at java.lang.Thread.run(Thread.java:748) 2021-12-27T02:56:56.3800385Z Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic 'flink-kafka-producer-txn-coordinator-changed' already exists. 2021-12-27T02:56:56.3888956Z java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic 'flink-kafka-producer-txn-coordinator-changed' already exists. 2021-12-27T02:56:56.3890048Z at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) 2021-12-27T02:56:56.3890665Z at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) 2021-12-27T02:56:56.3891363Z at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) 2021-12-27T02:56:56.3891964Z at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) 2021-12-27T02:56:56.3892608Z at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:214) 2021-12-27T02:56:56.3893404Z at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:98) 2021-12-27T02:56:56.3894039Z at org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:216) 2021-12-27T02:56:56.3895053Z at org.apache.flink.streaming.connectors.kafka.FlinkKafkaInternalProducerITCase.testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator(FlinkKafkaInternalProducerITCase.java:206) 2021-12-27T02:56:56.3895717Z at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 2021-12-27T02:56:56.3896201Z at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 2021-12-27T02:56:56.3896756Z at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 2021-12-27T02:56:56.3897243Z at java.lang.reflect.Method.invoke(Method.java:498) 2021-12-27T02:56:56.3897734Z at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) 2021-12-27T02:56:56.3898374Z at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 2021-12-27T02:56:56.3898924Z at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) 2021-12-27T02:56:56.3899587Z at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 2021-12-27T02:56:56.3900165Z at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) 2021-12-27T02:56:56.3900800Z at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) 2021-12-27T02:56:56.3901502Z at java.util.concurrent.FutureTask.run(FutureTask.java:266) 2021-12-27T02:56:56.3901932Z at java.lang.Thread.run(Thread.java:748) 2021-12-27T02:56:56.3902788Z Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic 'flink-kafka-producer-txn-coordinator-changed' already exists. 2021-12-27T02:56:56.3991984Z java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic 'flink-kafka-producer-txn-coordinator-changed' already exists. 2021-12-27T02:56:56.3992877Z at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) 2021-12-27T02:56:56.3993759Z at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) 2021-12-27T02:56:56.3994385Z at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) 2021-12-27T02:56:56.3994975Z at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) 2021-12-27T02:56:56.3995830Z at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:214) 2021-12-27T02:56:56.3996907Z at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:98) 2021-12-27T02:56:56.3997773Z at org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:216) 2021-12-27T02:56:56.3998719Z at org.apache.flink.streaming.connectors.kafka.FlinkKafkaInternalProducerITCase.testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator(FlinkKafkaInternalProducerITCase.java:206) 2021-12-27T02:56:56.3999400Z at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 2021-12-27T02:56:56.3999895Z at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 2021-12-27T02:56:56.4000621Z at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 2021-12-27T02:56:56.4001473Z at java.lang.reflect.Method.invoke(Method.java:498) 2021-12-27T02:56:56.4002287Z at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) 2021-12-27T02:56:56.4003140Z at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 2021-12-27T02:56:56.4003988Z at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) 2021-12-27T02:56:56.4004839Z at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 2021-12-27T02:56:56.4005767Z at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) 2021-12-27T02:56:56.4007030Z at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) 2021-12-27T02:56:56.4007864Z at java.util.concurrent.FutureTask.run(FutureTask.java:266) 2021-12-27T02:56:56.4008687Z at java.lang.Thread.run(Thread.java:748) 2021-12-27T02:56:56.4010025Z Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic 'flink-kafka-producer-txn-coordinator-changed' already exists. 2021-12-27T02:57:34.2838223Z Dec 27 02:57:34 [ERROR] Tests run: 10, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 71.271 s <<< FAILURE! - in org.apache.flink.streaming.connectors.kafka.FlinkKafkaInternalProducerITCase 2021-12-27T02:57:34.2839174Z Dec 27 02:57:34 [ERROR] testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator Time elapsed: 17.087 s <<< FAILURE! 2021-12-27T02:57:34.2842604Z Dec 27 02:57:34 java.lang.AssertionError: Create test topic : flink-kafka-producer-txn-coordinator-changed failed, org.apache.kafka.common.errors.TopicExistsException: Topic 'flink-kafka-producer-txn-coordinator-changed' already exists. 2021-12-27T02:57:34.2844263Z Dec 27 02:57:34 at org.junit.Assert.fail(Assert.java:89) 2021-12-27T02:57:34.2844951Z Dec 27 02:57:34 at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:223) 2021-12-27T02:57:34.2845760Z Dec 27 02:57:34 at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:98) 2021-12-27T02:57:34.2846517Z Dec 27 02:57:34 at org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:216) 2021-12-27T02:57:34.2847411Z Dec 27 02:57:34 at org.apache.flink.streaming.connectors.kafka.FlinkKafkaInternalProducerITCase.testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator(FlinkKafkaInternalProducerITCase.java:206) 2021-12-27T02:57:34.2848348Z Dec 27 02:57:34 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 2021-12-27T02:57:34.2849022Z Dec 27 02:57:34 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 2021-12-27T02:57:34.2849894Z Dec 27 02:57:34 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 2021-12-27T02:57:34.2850502Z Dec 27 02:57:34 at java.lang.reflect.Method.invoke(Method.java:498) 2021-12-27T02:57:34.2851187Z Dec 27 02:57:34 at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) 2021-12-27T02:57:34.2851863Z Dec 27 02:57:34 at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 2021-12-27T02:57:34.2852519Z Dec 27 02:57:34 at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) 2021-12-27T02:57:34.2853347Z Dec 27 02:57:34 at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 2021-12-27T02:57:34.2854045Z Dec 27 02:57:34 at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) 2021-12-27T02:57:34.2854754Z Dec 27 02:57:34 at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) 2021-12-27T02:57:34.2855401Z Dec 27 02:57:34 at java.util.concurrent.FutureTask.run(FutureTask.java:266) 2021-12-27T02:57:34.2855902Z Dec 27 02:57:34 at java.lang.Thread.run(Thread.java:748) 2021-12-27T02:57:34.2856297Z Dec 27 02:57:34 2021-12-27T02:57:35.3197736Z Dec 27 02:57:35 [INFO] Running org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleExactlyOnceITCase 2021-12-27T02:58:56.9222554Z Dec 27 02:58:56 [INFO] Tests run: 6, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 81.601 s - in org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleExactlyOnceITCase 2021-12-27T02:58:58.1145773Z Dec 27 02:58:58 [INFO] Running org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleITCase 2021-12-27T02:59:34.6576286Z Dec 27 02:59:34 [INFO] Tests run: 11, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 36.541 s - in org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleITCase 2021-12-27T02:59:35.8126189Z Dec 27 02:59:35 [INFO] Running org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase 2021-12-27T03:03:00.0705828Z Dec 27 03:03:00 [INFO] Tests run: 15, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 204.256 s - in org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase 2021-12-27T03:03:00.5933283Z Dec 27 03:03:00 [INFO] 2021-12-27T03:03:00.5934894Z Dec 27 03:03:00 [INFO] Results: 2021-12-27T03:03:00.5935369Z Dec 27 03:03:00 [INFO] 2021-12-27T03:03:00.5935750Z Dec 27 03:03:00 [ERROR] Failures: 2021-12-27T03:03:00.5939331Z Dec 27 03:03:00 [ERROR] FlinkKafkaInternalProducerITCase.testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator:206->KafkaTestBase.createTestTopic:216 Create test topic : flink-kafka-producer-txn-coordinator-changed failed, org.apache.kafka.common.errors.TopicExistsException: Topic 'flink-kafka-producer-txn-coordinator-changed' already exists.
I suspect that this has something to do with the retries that we introduced as part of FLINK-15493 (cc fpaul).
Attachments
Issue Links
- duplicates
-
FLINK-26115 Multiple Kafka connector tests failed due to The topic metadata failed to propagate to Kafka broker
- Closed
- is related to
-
FLINK-15493 FlinkKafkaInternalProducerITCase.testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator failed on travis
- Resolved
-
FLINK-24119 KafkaITCase.testTimestamps fails due to "Topic xxx already exist"
- Closed