diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index e92e684af..0316ef748 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -2783,4 +2783,9 @@ public class KafkaConsumerTest { CLIENT_IDS.add(configs.get(ConsumerConfig.CLIENT_ID_CONFIG).toString()); } } + + @Test + public void testConsumerPollWhenTopicDeletedAndRecreated() { + + } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java index b6211edc6..5598a628b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.connect.integration; +import org.apache.kafka.clients.admin.DeleteTopicsOptions; +import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.config.ConfigDef; @@ -31,11 +33,13 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.jupiter.api.Assertions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; @@ -55,9 +59,9 @@ import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ER import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION_MESSAGE; import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_ORIG_TOPIC; import static org.apache.kafka.test.TestUtils.waitForCondition; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; +//import static org.junit.Assert.*; /** * Integration test for the different error handling policies in Connect (namely, retry policies, skipping bad records, @@ -102,6 +106,34 @@ public class ErrorHandlingIntegrationTest { connect.stop(); } + @Test + public void testCreateTopicAndDeleteTopic() throws InterruptedException { + String topicName = "test-topic"; + connect.kafka().createTopic(topicName); + Map> generatedTopic = connect.kafka().describeTopics(topicName); + assertEquals(1, generatedTopic.size()); + for(int i=0;i<5;i++) { + connect.kafka().produce(topicName, i + "", i + ""); + } + connect.kafka().deleteTopic(topicName, new DeleteTopicsOptions().timeoutMs(300)); + Thread.sleep(219); + connect.kafka().createTopic(topicName); + for(int i=5;i<10;i++) { + connect.kafka().produce(topicName, "" + i, i + ""); + } + generatedTopic = connect.kafka().describeTopics(topicName); + assertEquals(1, generatedTopic.size()); + for (ConsumerRecord rec : connect.kafka().consume(5, CONSUME_MAX_DURATION_MS, topicName)) { + try { + String k = new String(rec.key()); + String v = new String(rec.value()); + log.debug("Consumed record (key='{}', value='{}') from topic {}", k, v, rec.topic()); + }catch (Exception e) { + fail(e); + } + } + } + @Test public void testSkipRetryAndDLQWithHeaders() throws Exception { // create test topic @@ -169,7 +201,7 @@ public class ErrorHandlingIntegrationTest { for (ConsumerRecord recs : messages) { log.debug("Consumed record (key={}, value={}) from dead letter queue topic {}", new String(recs.key()), new String(recs.value()), DLQ_TOPIC); - assertTrue(recs.headers().toArray().length > 0); + Assertions.assertTrue(recs.headers().toArray().length > 0); assertValue("test-topic", recs.headers(), ERROR_HEADER_ORIG_TOPIC); assertValue(RetriableException.class.getName(), recs.headers(), ERROR_HEADER_EXCEPTION); assertValue("Error when value='value-7'", recs.headers(), ERROR_HEADER_EXCEPTION_MESSAGE); @@ -275,7 +307,7 @@ public class ErrorHandlingIntegrationTest { return; } if (expected == null || actual == null) { - fail(); + Assertions.fail(); } assertEquals(expected, new String(actual)); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java index 8b213f798..4a35da8f5 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java @@ -27,6 +27,7 @@ import kafka.zk.EmbeddedZookeeper; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.DeleteTopicsOptions; import org.apache.kafka.clients.admin.DescribeTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; @@ -466,4 +467,22 @@ public class EmbeddedKafkaCluster { props.put(propertyKey, propertyValue); } } + + public void deleteTopic(String topicName) { + deleteTopic(topicName, new DeleteTopicsOptions().timeoutMs(60000).retryOnQuotaViolation(true)); + } + + public void deleteTopic(String topicName, DeleteTopicsOptions options) { + if (topicName == null || topicName.isEmpty()) { + throw new IllegalArgumentException("topic name is illegal"); + } + log.debug("Deleting topic { name: {} }", + topicName); + try (final Admin adminClient = createAdminClient()) { + adminClient.deleteTopics(Collections.singletonList(topicName), options).all().get(); + } catch (final InterruptedException | ExecutionException e) { + log.error("Deleting topic {name : {}", topicName, e); + throw new RuntimeException(e); + } + } }