diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index d0df5792bb..83f418db12 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -336,6 +336,10 @@ public class Metadata implements Closeable { topicIds.put(topicName, topicId); oldTopicId = oldTopicIds.get(topicName); } else { + // Retain the old topicId for comparison with newer TopicId created later. This is only needed till retainMs + if (metadata.error() != Errors.NONE && oldTopicIds.get(topicName) != null && retainTopic(topicName, false, nowMs)) + topicIds.put(topicName, oldTopicIds.get(topicName)); + else topicId = null; } @@ -394,7 +398,8 @@ public class Metadata implements Closeable { if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch.isPresent()) { int newEpoch = partitionMetadata.leaderEpoch.get(); Integer currentEpoch = lastSeenLeaderEpochs.get(tp); - if (topicId != null && oldTopicId != null && !topicId.equals(oldTopicId)) { + // Alternative Solution is to remove null check from oldTopicId + if (topicId != null && !topicId.equals(oldTopicId)) { // If both topic IDs were valid and the topic ID changed, update the metadata log.info("Resetting the last seen epoch of partition {} to {} since the associated topicId changed from {} to {}", tp, newEpoch, oldTopicId, topicId); diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala index 38febbcd7a..3465f60826 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala @@ -19,20 +19,33 @@ package kafka.api import java.util.Properties import java.util.concurrent.{ExecutionException, Future, TimeUnit} - import kafka.log.LogConfig -import kafka.server.Defaults +import kafka.server.{Defaults, KafkaConfig} import kafka.utils.TestUtils import org.apache.kafka.clients.producer.{BufferExhaustedException, KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.{InvalidTimestampException, RecordTooLargeException, SerializationException, TimeoutException} import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch, Records, TimestampType} import org.apache.kafka.common.serialization.ByteArraySerializer import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test +import java.nio.charset.StandardCharsets + class PlaintextProducerSendTest extends BaseProducerSendTest { + + override def generateConfigs = { + val overridingProps = new Properties() + val numServers = 2 + overridingProps.put(KafkaConfig.NumPartitionsProp, 2.toString) + overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, 2.toString) + overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString) + TestUtils.createBrokerConfigs(numServers, zkConnect, false, interBrokerSecurityProtocol = Some(securityProtocol), + trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps)) + } + @Test def testWrongSerializer(): Unit = { val producerProps = new Properties() @@ -193,4 +206,45 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { assertEquals(classOf[RecordTooLargeException], assertThrows(classOf[ExecutionException], () => producer.send(record1).get).getCause.getClass) } + /** + * Tests that Producer gets self-recovered when a topic is deleted mid-way of produce. + * + * Producer will attempt to send messages to the partition specified in each record, and should + * succeed as long as the partition is included in the metadata. + */ + @Test + def testSendWithTopicDeletionMidWay(): Unit = { + val numRecords = 10 + + // create topic with leader as 0 for the 2 partitions. + createTopic(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1))) + + val reassignment = Map( + new TopicPartition(topic, 0) -> Seq(1, 0), + new TopicPartition(topic, 1) -> Seq(1, 0) + ) + + // Change leader to 1 for both the partitions to increase leader Epoch from 0 -> 1 + zkClient.createPartitionReassignment(reassignment) + TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress, + "failed to remove reassign partitions path after completion") + + val producer = createProducer(brokerList, maxBlockMs = 5 * 1000L, deliveryTimeoutMs = 20 * 1000) + + (1 to numRecords).map { i => + val resp = producer.send(new ProducerRecord(topic, null, ("value" + i).getBytes(StandardCharsets.UTF_8))).get + assertEquals(topic, resp.topic()) + } + + // start topic deletion + adminZkClient.deleteTopic(topic) + + // Verify that the topic is deleted when no metadata request comes in + TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers) + + // Producer would timeout and not self-recover after topic deletion. + val e = assertThrows(classOf[ExecutionException], () => producer.send(new ProducerRecord(topic, null, ("value").getBytes(StandardCharsets.UTF_8))).get) + assertEquals(classOf[TimeoutException], e.getCause.getClass) + } + }