Index: clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java (revision 6c9e27c2b51b3653f8a4009ca9712b3bf85ebacd) +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java (date 1622547166291) @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.producer; +import org.apache.kafka.clients.producer.internals.ProducerMuteManager; import org.apache.kafka.common.Configurable; /** @@ -92,4 +93,12 @@ * This is called when interceptor is closed */ public void close(); + + /** + * This method is called when the Producer is built to initialize the partition mute manager + * + * @param producerMuteManager + */ + default void initialize(ProducerMuteManager producerMuteManager) { + } } Index: clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java (revision 6c9e27c2b51b3653f8a4009ca9712b3bf85ebacd) +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java (date 1622547211993) @@ -263,6 +263,10 @@ "By default the TransactionId is not configured, which means transactions cannot be used. " + "Note that, by default, transactions require a cluster of at least three brokers which is the recommended setting for production; for development you can change this, by adjusting broker setting transaction.state.log.replication.factor."; + /** enable.partition.mute */ + public static final String ENABLE_MUTE_PARTITION_CONFIG = "enable.mute.partition"; + private static final String ENABLE_MUTE_PARTITION_CONFIG_DOC = "When set to 'true', producer enable mute partition. "; + /** * security.providers */ @@ -409,7 +413,12 @@ null, new ConfigDef.NonEmptyString(), Importance.LOW, - TRANSACTIONAL_ID_DOC); + TRANSACTIONAL_ID_DOC) + .define(ENABLE_MUTE_PARTITION_CONFIG, + Type.BOOLEAN, + false, + Importance.LOW, + ENABLE_MUTE_PARTITION_CONFIG_DOC); } @Override Index: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java (revision 6c9e27c2b51b3653f8a4009ca9712b3bf85ebacd) +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java (date 1622547452484) @@ -29,6 +29,7 @@ import org.apache.kafka.clients.producer.internals.ProducerInterceptors; import org.apache.kafka.clients.producer.internals.ProducerMetadata; import org.apache.kafka.clients.producer.internals.ProducerMetrics; +import org.apache.kafka.clients.producer.internals.ProducerMuteManager; import org.apache.kafka.clients.producer.internals.RecordAccumulator; import org.apache.kafka.clients.producer.internals.Sender; import org.apache.kafka.clients.producer.internals.TransactionManager; @@ -258,6 +259,7 @@ private final ProducerInterceptors interceptors; private final ApiVersions apiVersions; private final TransactionManager transactionManager; + private final ProducerMuteManager producerMuteManager; /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings @@ -425,7 +427,16 @@ this.metadata.bootstrap(addresses); } this.errors = this.metrics.sensor("errors"); + + this.producerMuteManager = configurePartitionMuteManager(config, logContext); + this.sender = newSender(logContext, kafkaClient, this.metadata); + + if (this.producerMuteManager != null) { + this.partitioner.initialize(this.producerMuteManager); + this.interceptors.initialize(this.producerMuteManager); + } + String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId; this.ioThread = new KafkaThread(ioThreadName, this.sender, true); this.ioThread.start(); @@ -479,7 +490,8 @@ requestTimeoutMs, producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), this.transactionManager, - apiVersions); + apiVersions, + this.producerMuteManager); } private static int lingerMs(ProducerConfig config) { @@ -562,6 +574,16 @@ return acks; } + private ProducerMuteManager configurePartitionMuteManager(ProducerConfig config, LogContext logContext) { + boolean enableMutePartition = config.getBoolean(ProducerConfig.ENABLE_MUTE_PARTITION_CONFIG); + ProducerMuteManager producerMuteManager = null; + if (enableMutePartition) { + producerMuteManager = new ProducerMuteManager(this.accumulator, logContext); + log.info("Instantiated a partition mute manager."); + } + return producerMuteManager; + } + /** * Needs to be called before any other methods when the transactional.id is set in the configuration. * @@ -1249,6 +1271,8 @@ Utils.closeQuietly(keySerializer, "producer keySerializer", firstException); Utils.closeQuietly(valueSerializer, "producer valueSerializer", firstException); Utils.closeQuietly(partitioner, "producer partitioner", firstException); + Utils.closeQuietly(producerMuteManager, "producer partition mute manager", firstException); + AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics); Throwable exception = firstException.get(); if (exception != null && !swallowException) { Index: clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java (revision 6c9e27c2b51b3653f8a4009ca9712b3bf85ebacd) +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java (date 1622547166291) @@ -123,6 +123,21 @@ } } + /** + * + * @param producerMuteManager + */ + public void initialize(ProducerMuteManager producerMuteManager) { + for (ProducerInterceptor interceptor : this.interceptors) { + try { + interceptor.initialize(producerMuteManager); + } catch (Exception e) { + // do not propagate interceptor exceptions, just log + log.warn("Error executing interceptor initialize", e); + } + } + } + /** * Closes every interceptor in a container. */ Index: clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java (revision 6c9e27c2b51b3653f8a4009ca9712b3bf85ebacd) +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java (date 1622547166294) @@ -336,7 +336,7 @@ metrics = new Metrics(new MetricConfig().tags(clientTags)); SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry(metrics); Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, - 1, metricsRegistry, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null, apiVersions); + 1, metricsRegistry, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null, apiVersions, null); // Append a message so that topic metrics are created appendToAccumulator(tp0, 0L, "key", "value"); @@ -364,7 +364,7 @@ SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); try { Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, - maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null, apiVersions); + maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null, apiVersions, null); // do a successful retry Future future = appendToAccumulator(tp0, 0L, "key", "value"); sender.runOnce(); // connect @@ -422,7 +422,7 @@ try { Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, - senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null, apiVersions); + senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null, apiVersions, null); // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1 MetadataResponse metadataUpdate1 = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("test", 2)); client.prepareMetadataUpdate(metadataUpdate1); @@ -1547,7 +1547,7 @@ SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, - senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions); + senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions, null); appendToAccumulator(tp0); // failed response Future successfulResponse = appendToAccumulator(tp1); @@ -1588,7 +1588,7 @@ SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, 10, - senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions); + senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions, null); appendToAccumulator(tp0); // failed response appendToAccumulator(tp1); // success response @@ -1621,7 +1621,7 @@ SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, 10, - senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions); + senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions, null); Future failedResponse = appendToAccumulator(tp0); Future successfulResponse = appendToAccumulator(tp1); @@ -1654,7 +1654,7 @@ SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, - senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions); + senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions, null); Future outOfOrderResponse = appendToAccumulator(tp0); Future successfulResponse = appendToAccumulator(tp1); @@ -2191,7 +2191,7 @@ SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, - senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions); + senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions, null); Future responseFuture = appendToAccumulator(tp0); client.prepareResponse(body -> { @@ -2231,7 +2231,7 @@ Metrics m = new Metrics(); SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, - senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions); + senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions, null); Future responseFuture = appendToAccumulator(tp0); sender.runOnce(); // connect. @@ -2267,7 +2267,7 @@ SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, - senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions); + senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions, null); Future responseFuture = appendToAccumulator(tp0); sender.runOnce(); // connect. @@ -2331,7 +2331,7 @@ new BufferPool(totalSize, batchSize, metrics, time, "producer-internal-metrics")); SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, - senderMetrics, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions()); + senderMetrics, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions(), null); // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1 MetadataResponse metadataUpdate1 = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap(topic, 2)); client.prepareMetadataUpdate(metadataUpdate1); @@ -2644,7 +2644,7 @@ try { TransactionManager txnManager = new TransactionManager(logContext, "testTransactionalRequestsSentOnShutdown", 6000, 100, apiVersions); Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, - maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions); + maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions, null); ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); TopicPartition tp = new TopicPartition("testTransactionalRequestsSentOnShutdown", 1); @@ -2679,7 +2679,7 @@ try { TransactionManager txnManager = new TransactionManager(logContext, "testIncompleteTransactionAbortOnShutdown", 6000, 100, apiVersions); Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, - maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions); + maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions, null); ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); TopicPartition tp = new TopicPartition("testIncompleteTransactionAbortOnShutdown", 1); @@ -2714,7 +2714,7 @@ try { TransactionManager txnManager = new TransactionManager(logContext, "testForceShutdownWithIncompleteTransaction", 6000, 100, apiVersions); Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, - maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions); + maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions, null); ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); TopicPartition tp = new TopicPartition("testForceShutdownWithIncompleteTransaction", 1); @@ -3017,7 +3017,7 @@ DELIVERY_TIMEOUT_MS, metrics, metricGrpName, time, apiVersions, transactionManager, pool); this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics); this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, guaranteeOrder, MAX_REQUEST_SIZE, ACKS_ALL, - retries, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions); + retries, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions, null); metadata.add("test", time.milliseconds()); if (updateMetadata) Index: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java (revision 6c9e27c2b51b3653f8a4009ca9712b3bf85ebacd) +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java (date 1622547166295) @@ -176,7 +176,7 @@ this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(metrics), this.time, REQUEST_TIMEOUT, - 50, transactionManager, apiVersions); + 50, transactionManager, apiVersions, null); } @Test @@ -3043,7 +3043,7 @@ initializeTransactionManager(Optional.empty()); Sender sender = new Sender(logContext, this.client, this.metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(time)), this.time, - REQUEST_TIMEOUT, 50, transactionManager, apiVersions); + REQUEST_TIMEOUT, 50, transactionManager, apiVersions, null); initializeIdempotentProducerId(producerId, epoch); ProducerBatch tp0b1 = writeIdempotentBatchWithValue(transactionManager, tp0, "1"); @@ -3167,7 +3167,7 @@ initializeTransactionManager(Optional.empty()); Sender sender = new Sender(logContext, this.client, this.metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(time)), this.time, - REQUEST_TIMEOUT, 50, transactionManager, apiVersions); + REQUEST_TIMEOUT, 50, transactionManager, apiVersions, null); initializeIdempotentProducerId(producerId, epoch); ProducerBatch tp0b1 = writeIdempotentBatchWithValue(transactionManager, tp0, "1"); Index: clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java --- a/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java (revision 6c9e27c2b51b3653f8a4009ca9712b3bf85ebacd) +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java (date 1622547166291) @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.producer; +import org.apache.kafka.clients.producer.internals.ProducerMuteManager; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.Cluster; @@ -52,4 +53,12 @@ */ default void onNewBatch(String topic, Cluster cluster, int prevPartition) { } + + /** + * This method is called when the Producer is built to initialize the partition mute manager + * + * @param producerMuteManager + */ + default void initialize(ProducerMuteManager producerMuteManager) { + } } Index: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java (revision 6c9e27c2b51b3653f8a4009ca9712b3bf85ebacd) +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java (date 1622547452476) @@ -118,6 +118,9 @@ /* all the state related to transactions, in particular the producer id, producer epoch, and sequence numbers */ private final TransactionManager transactionManager; + /* the Producer mute manager */ + private final ProducerMuteManager producerMuteManager; + // A per-partition queue of batches ordered by creation time for tracking the in-flight batches private final Map> inFlightBatches; @@ -134,7 +137,8 @@ int requestTimeoutMs, long retryBackoffMs, TransactionManager transactionManager, - ApiVersions apiVersions) { + ApiVersions apiVersions, + ProducerMuteManager producerMuteManager) { this.log = logContext.logger(Sender.class); this.client = client; this.accumulator = accumulator; @@ -151,6 +155,10 @@ this.apiVersions = apiVersions; this.transactionManager = transactionManager; this.inFlightBatches = new HashMap<>(); + this.producerMuteManager = producerMuteManager; + if (this.producerMuteManager != null) { + this.producerMuteManager.setInFlightBatches(this.inFlightBatches); + } } public List inFlightBatches(TopicPartition tp) { Index: clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMuteManager.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMuteManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMuteManager.java new file mode 100644 --- /dev/null (date 1622547166292) +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMuteManager.java (date 1622547166292) @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import java.io.Closeable; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +/** + * A class which maintains mute of TopicPartitions. Also keeps the number of TopicPartitions accumulated-batches and in-flight requests. + */ +public class ProducerMuteManager implements Closeable { + + private final Logger log; + private final RecordAccumulator accumulator; + private Map> inFlightBatches; + + private final Set mutedPartitions; + + public ProducerMuteManager(final RecordAccumulator accumulator, final LogContext logContext) { + this.log = logContext.logger(ProducerMuteManager.class); + + this.accumulator = accumulator; + this.mutedPartitions = new HashSet<>(); + } + + /** + * Add mute of TopicPartition + * + * @param topicPartition + */ + public synchronized void mute(TopicPartition topicPartition) { + this.mutedPartitions.add(topicPartition); + } + + /** + * Remove muted of TopicPartition + * + * @param topicPartition + */ + public synchronized void unmute(TopicPartition topicPartition) { + this.mutedPartitions.remove(topicPartition); + } + + public boolean isMute(TopicPartition topicPartition) { + return this.mutedPartitions.contains(topicPartition); + } + + /** + * Return muted of TopicPartitions + * + * @return + */ + public Set getMutedPartitions() { + return Collections.unmodifiableSet(mutedPartitions); + } + + public synchronized void close() { + if (this.mutedPartitions != null) { + this.mutedPartitions.clear(); + } + } + + /** + * Return the number of TopicPartition accumulated-batches requests + * + * @return + */ + public Map getAccumulatedBatches() { + Map accumulatedBatches = new HashMap<>(); + for (Entry> topicPartitionDeque : accumulator.batches().entrySet()) { + accumulatedBatches.put(topicPartitionDeque.getKey(), topicPartitionDeque.getValue().size()); + } + return accumulatedBatches; + } + + void setInFlightBatches(Map> inFlightBatches) { + this.inFlightBatches = inFlightBatches; + } + + /** + * Return the number of TopicPartition in-flight requests + * + * @return The request count. + */ + public Map getInFlightRequestCount() { + Map inFlightRequestCount = new HashMap<>(); + for (Entry> topicPartitionListEntry : this.inFlightBatches.entrySet()) { + Integer count = 0; + TopicPartition topicPartition = topicPartitionListEntry.getKey(); + List producerBatchList = topicPartitionListEntry.getValue(); + for (ProducerBatch producerBatch : producerBatchList) { + count += producerBatch.recordCount; + } + inFlightRequestCount.put(topicPartition, count); + } + return inFlightRequestCount; + } +} Index: clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java --- a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java (revision 6c9e27c2b51b3653f8a4009ca9712b3bf85ebacd) +++ b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java (date 1622547166291) @@ -135,6 +135,13 @@ return this.topicPartition.partition(); } + /** + * The topic partition the record was sent to + */ + public TopicPartition topicPartition() { + return this.topicPartition; + } + @Override public String toString() { return topicPartition.toString() + "@" + offset;