diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index e92e684af..0316ef748 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -2783,4 +2783,9 @@ public class KafkaConsumerTest { CLIENT_IDS.add(configs.get(ConsumerConfig.CLIENT_ID_CONFIG).toString()); } } + + @Test + public void testConsumerPollWhenTopicDeletedAndRecreated() { + + } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java index b6211edc6..31509411a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java @@ -16,10 +16,15 @@ */ package org.apache.kafka.connect.integration; +import org.apache.kafka.clients.admin.DeleteTopicsOptions; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; @@ -27,6 +32,7 @@ import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.MockMetricsReporter; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -34,8 +40,10 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; @@ -55,9 +63,7 @@ import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ER import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION_MESSAGE; import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_ORIG_TOPIC; import static org.apache.kafka.test.TestUtils.waitForCondition; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; /** * Integration test for the different error handling policies in Connect (namely, retry policies, skipping bad records, @@ -102,6 +108,18 @@ public class ErrorHandlingIntegrationTest { connect.stop(); } + @Test + public void testCreateTopicAndDeleteTopic() throws InterruptedException { + connect.kafka().createTopic("test-topic"); + Map> generatedTopic = connect.kafka().describeTopics("test-topic"); + assertEquals(1, generatedTopic.size()); + connect.kafka().deleteTopic("test-topic", new DeleteTopicsOptions().timeoutMs(300)); + Thread.sleep(219); + connect.kafka().createTopic("test-topic"); + generatedTopic = connect.kafka().describeTopics("test-topic"); + assertEquals(1, generatedTopic.size()); + } + @Test public void testSkipRetryAndDLQWithHeaders() throws Exception { // create test topic diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java index 8b213f798..1215c70a7 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java @@ -25,11 +25,7 @@ import kafka.utils.CoreUtils; import kafka.utils.TestUtils; import kafka.zk.EmbeddedZookeeper; import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.AdminClientConfig; -import org.apache.kafka.clients.admin.DescribeTopicsResult; -import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -466,4 +462,22 @@ public class EmbeddedKafkaCluster { props.put(propertyKey, propertyValue); } } + + public void deleteTopic(String topicName) { + deleteTopic(topicName, new DeleteTopicsOptions().timeoutMs(60000).retryOnQuotaViolation(true)); + } + + public void deleteTopic(String topicName, DeleteTopicsOptions options) { + if (topicName == null || topicName.isEmpty()) { + throw new IllegalArgumentException("topic name is illegal"); + } + log.debug("Deleting topic { name: {} }", + topicName); + try (final Admin adminClient = createAdminClient()) { + adminClient.deleteTopics(Collections.singletonList(topicName), options).all().get(); + } catch (final InterruptedException | ExecutionException e) { + log.error("Deleting topic {name : {}", topicName, e); + throw new RuntimeException(e); + } + } }