Index: clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java (revision 6c9e27c2b51b3653f8a4009ca9712b3bf85ebacd)
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java (date 1622547166291)
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.producer;
+import org.apache.kafka.clients.producer.internals.ProducerMuteManager;
import org.apache.kafka.common.Configurable;
/**
@@ -92,4 +93,12 @@
* This is called when interceptor is closed
*/
public void close();
+
+ /**
+ * This method is called when the Producer is built to initialize the partition mute manager
+ *
+ * @param producerMuteManager
+ */
+ default void initialize(ProducerMuteManager producerMuteManager) {
+ }
}
Index: clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java (revision 6c9e27c2b51b3653f8a4009ca9712b3bf85ebacd)
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java (date 1622547211993)
@@ -263,6 +263,10 @@
"By default the TransactionId is not configured, which means transactions cannot be used. " +
"Note that, by default, transactions require a cluster of at least three brokers which is the recommended setting for production; for development you can change this, by adjusting broker setting transaction.state.log.replication.factor.";
+ /** enable.partition.mute */
+ public static final String ENABLE_MUTE_PARTITION_CONFIG = "enable.mute.partition";
+ private static final String ENABLE_MUTE_PARTITION_CONFIG_DOC = "When set to 'true', producer enable mute partition. ";
+
/**
* security.providers
*/
@@ -409,7 +413,12 @@
null,
new ConfigDef.NonEmptyString(),
Importance.LOW,
- TRANSACTIONAL_ID_DOC);
+ TRANSACTIONAL_ID_DOC)
+ .define(ENABLE_MUTE_PARTITION_CONFIG,
+ Type.BOOLEAN,
+ false,
+ Importance.LOW,
+ ENABLE_MUTE_PARTITION_CONFIG_DOC);
}
@Override
Index: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java (revision 6c9e27c2b51b3653f8a4009ca9712b3bf85ebacd)
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java (date 1622547452484)
@@ -29,6 +29,7 @@
import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
import org.apache.kafka.clients.producer.internals.ProducerMetrics;
+import org.apache.kafka.clients.producer.internals.ProducerMuteManager;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.clients.producer.internals.TransactionManager;
@@ -258,6 +259,7 @@
private final ProducerInterceptors interceptors;
private final ApiVersions apiVersions;
private final TransactionManager transactionManager;
+ private final ProducerMuteManager producerMuteManager;
/**
* A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -425,7 +427,16 @@
this.metadata.bootstrap(addresses);
}
this.errors = this.metrics.sensor("errors");
+
+ this.producerMuteManager = configurePartitionMuteManager(config, logContext);
+
this.sender = newSender(logContext, kafkaClient, this.metadata);
+
+ if (this.producerMuteManager != null) {
+ this.partitioner.initialize(this.producerMuteManager);
+ this.interceptors.initialize(this.producerMuteManager);
+ }
+
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
@@ -479,7 +490,8 @@
requestTimeoutMs,
producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
this.transactionManager,
- apiVersions);
+ apiVersions,
+ this.producerMuteManager);
}
private static int lingerMs(ProducerConfig config) {
@@ -562,6 +574,16 @@
return acks;
}
+ private ProducerMuteManager configurePartitionMuteManager(ProducerConfig config, LogContext logContext) {
+ boolean enableMutePartition = config.getBoolean(ProducerConfig.ENABLE_MUTE_PARTITION_CONFIG);
+ ProducerMuteManager producerMuteManager = null;
+ if (enableMutePartition) {
+ producerMuteManager = new ProducerMuteManager(this.accumulator, logContext);
+ log.info("Instantiated a partition mute manager.");
+ }
+ return producerMuteManager;
+ }
+
/**
* Needs to be called before any other methods when the transactional.id is set in the configuration.
*
@@ -1249,6 +1271,8 @@
Utils.closeQuietly(keySerializer, "producer keySerializer", firstException);
Utils.closeQuietly(valueSerializer, "producer valueSerializer", firstException);
Utils.closeQuietly(partitioner, "producer partitioner", firstException);
+ Utils.closeQuietly(producerMuteManager, "producer partition mute manager", firstException);
+
AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics);
Throwable exception = firstException.get();
if (exception != null && !swallowException) {
Index: clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java (revision 6c9e27c2b51b3653f8a4009ca9712b3bf85ebacd)
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java (date 1622547166291)
@@ -123,6 +123,21 @@
}
}
+ /**
+ *
+ * @param producerMuteManager
+ */
+ public void initialize(ProducerMuteManager producerMuteManager) {
+ for (ProducerInterceptor interceptor : this.interceptors) {
+ try {
+ interceptor.initialize(producerMuteManager);
+ } catch (Exception e) {
+ // do not propagate interceptor exceptions, just log
+ log.warn("Error executing interceptor initialize", e);
+ }
+ }
+ }
+
/**
* Closes every interceptor in a container.
*/
Index: clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java (revision 6c9e27c2b51b3653f8a4009ca9712b3bf85ebacd)
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java (date 1622547166294)
@@ -336,7 +336,7 @@
metrics = new Metrics(new MetricConfig().tags(clientTags));
SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry(metrics);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
- 1, metricsRegistry, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null, apiVersions);
+ 1, metricsRegistry, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null, apiVersions, null);
// Append a message so that topic metrics are created
appendToAccumulator(tp0, 0L, "key", "value");
@@ -364,7 +364,7 @@
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
try {
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
- maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null, apiVersions);
+ maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null, apiVersions, null);
// do a successful retry
Future future = appendToAccumulator(tp0, 0L, "key", "value");
sender.runOnce(); // connect
@@ -422,7 +422,7 @@
try {
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
- senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null, apiVersions);
+ senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null, apiVersions, null);
// Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
MetadataResponse metadataUpdate1 = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("test", 2));
client.prepareMetadataUpdate(metadataUpdate1);
@@ -1547,7 +1547,7 @@
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
- senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions);
+ senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions, null);
appendToAccumulator(tp0); // failed response
Future successfulResponse = appendToAccumulator(tp1);
@@ -1588,7 +1588,7 @@
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, 10,
- senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions);
+ senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions, null);
appendToAccumulator(tp0); // failed response
appendToAccumulator(tp1); // success response
@@ -1621,7 +1621,7 @@
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, 10,
- senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions);
+ senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions, null);
Future failedResponse = appendToAccumulator(tp0);
Future successfulResponse = appendToAccumulator(tp1);
@@ -1654,7 +1654,7 @@
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
- senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions);
+ senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions, null);
Future outOfOrderResponse = appendToAccumulator(tp0);
Future successfulResponse = appendToAccumulator(tp1);
@@ -2191,7 +2191,7 @@
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
- senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions);
+ senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions, null);
Future responseFuture = appendToAccumulator(tp0);
client.prepareResponse(body -> {
@@ -2231,7 +2231,7 @@
Metrics m = new Metrics();
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
- senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions);
+ senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions, null);
Future responseFuture = appendToAccumulator(tp0);
sender.runOnce(); // connect.
@@ -2267,7 +2267,7 @@
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
- senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions);
+ senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions, null);
Future responseFuture = appendToAccumulator(tp0);
sender.runOnce(); // connect.
@@ -2331,7 +2331,7 @@
new BufferPool(totalSize, batchSize, metrics, time, "producer-internal-metrics"));
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
- senderMetrics, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions());
+ senderMetrics, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions(), null);
// Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
MetadataResponse metadataUpdate1 = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap(topic, 2));
client.prepareMetadataUpdate(metadataUpdate1);
@@ -2644,7 +2644,7 @@
try {
TransactionManager txnManager = new TransactionManager(logContext, "testTransactionalRequestsSentOnShutdown", 6000, 100, apiVersions);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
- maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions);
+ maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions, null);
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
TopicPartition tp = new TopicPartition("testTransactionalRequestsSentOnShutdown", 1);
@@ -2679,7 +2679,7 @@
try {
TransactionManager txnManager = new TransactionManager(logContext, "testIncompleteTransactionAbortOnShutdown", 6000, 100, apiVersions);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
- maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions);
+ maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions, null);
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
TopicPartition tp = new TopicPartition("testIncompleteTransactionAbortOnShutdown", 1);
@@ -2714,7 +2714,7 @@
try {
TransactionManager txnManager = new TransactionManager(logContext, "testForceShutdownWithIncompleteTransaction", 6000, 100, apiVersions);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
- maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions);
+ maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions, null);
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
TopicPartition tp = new TopicPartition("testForceShutdownWithIncompleteTransaction", 1);
@@ -3017,7 +3017,7 @@
DELIVERY_TIMEOUT_MS, metrics, metricGrpName, time, apiVersions, transactionManager, pool);
this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, guaranteeOrder, MAX_REQUEST_SIZE, ACKS_ALL,
- retries, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions);
+ retries, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions, null);
metadata.add("test", time.milliseconds());
if (updateMetadata)
Index: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java (revision 6c9e27c2b51b3653f8a4009ca9712b3bf85ebacd)
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java (date 1622547166295)
@@ -176,7 +176,7 @@
this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true,
MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(metrics), this.time, REQUEST_TIMEOUT,
- 50, transactionManager, apiVersions);
+ 50, transactionManager, apiVersions, null);
}
@Test
@@ -3043,7 +3043,7 @@
initializeTransactionManager(Optional.empty());
Sender sender = new Sender(logContext, this.client, this.metadata, this.accumulator, false,
MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(time)), this.time,
- REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+ REQUEST_TIMEOUT, 50, transactionManager, apiVersions, null);
initializeIdempotentProducerId(producerId, epoch);
ProducerBatch tp0b1 = writeIdempotentBatchWithValue(transactionManager, tp0, "1");
@@ -3167,7 +3167,7 @@
initializeTransactionManager(Optional.empty());
Sender sender = new Sender(logContext, this.client, this.metadata, this.accumulator, false,
MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(time)), this.time,
- REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+ REQUEST_TIMEOUT, 50, transactionManager, apiVersions, null);
initializeIdempotentProducerId(producerId, epoch);
ProducerBatch tp0b1 = writeIdempotentBatchWithValue(transactionManager, tp0, "1");
Index: clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java (revision 6c9e27c2b51b3653f8a4009ca9712b3bf85ebacd)
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java (date 1622547166291)
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.producer;
+import org.apache.kafka.clients.producer.internals.ProducerMuteManager;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.Cluster;
@@ -52,4 +53,12 @@
*/
default void onNewBatch(String topic, Cluster cluster, int prevPartition) {
}
+
+ /**
+ * This method is called when the Producer is built to initialize the partition mute manager
+ *
+ * @param producerMuteManager
+ */
+ default void initialize(ProducerMuteManager producerMuteManager) {
+ }
}
Index: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java (revision 6c9e27c2b51b3653f8a4009ca9712b3bf85ebacd)
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java (date 1622547452476)
@@ -118,6 +118,9 @@
/* all the state related to transactions, in particular the producer id, producer epoch, and sequence numbers */
private final TransactionManager transactionManager;
+ /* the Producer mute manager */
+ private final ProducerMuteManager producerMuteManager;
+
// A per-partition queue of batches ordered by creation time for tracking the in-flight batches
private final Map> inFlightBatches;
@@ -134,7 +137,8 @@
int requestTimeoutMs,
long retryBackoffMs,
TransactionManager transactionManager,
- ApiVersions apiVersions) {
+ ApiVersions apiVersions,
+ ProducerMuteManager producerMuteManager) {
this.log = logContext.logger(Sender.class);
this.client = client;
this.accumulator = accumulator;
@@ -151,6 +155,10 @@
this.apiVersions = apiVersions;
this.transactionManager = transactionManager;
this.inFlightBatches = new HashMap<>();
+ this.producerMuteManager = producerMuteManager;
+ if (this.producerMuteManager != null) {
+ this.producerMuteManager.setInFlightBatches(this.inFlightBatches);
+ }
}
public List inFlightBatches(TopicPartition tp) {
Index: clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMuteManager.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMuteManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMuteManager.java
new file mode 100644
--- /dev/null (date 1622547166292)
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMuteManager.java (date 1622547166292)
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+/**
+ * A class which maintains mute of TopicPartitions. Also keeps the number of TopicPartitions accumulated-batches and in-flight requests.
+ */
+public class ProducerMuteManager implements Closeable {
+
+ private final Logger log;
+ private final RecordAccumulator accumulator;
+ private Map> inFlightBatches;
+
+ private final Set mutedPartitions;
+
+ public ProducerMuteManager(final RecordAccumulator accumulator, final LogContext logContext) {
+ this.log = logContext.logger(ProducerMuteManager.class);
+
+ this.accumulator = accumulator;
+ this.mutedPartitions = new HashSet<>();
+ }
+
+ /**
+ * Add mute of TopicPartition
+ *
+ * @param topicPartition
+ */
+ public synchronized void mute(TopicPartition topicPartition) {
+ this.mutedPartitions.add(topicPartition);
+ }
+
+ /**
+ * Remove muted of TopicPartition
+ *
+ * @param topicPartition
+ */
+ public synchronized void unmute(TopicPartition topicPartition) {
+ this.mutedPartitions.remove(topicPartition);
+ }
+
+ public boolean isMute(TopicPartition topicPartition) {
+ return this.mutedPartitions.contains(topicPartition);
+ }
+
+ /**
+ * Return muted of TopicPartitions
+ *
+ * @return
+ */
+ public Set getMutedPartitions() {
+ return Collections.unmodifiableSet(mutedPartitions);
+ }
+
+ public synchronized void close() {
+ if (this.mutedPartitions != null) {
+ this.mutedPartitions.clear();
+ }
+ }
+
+ /**
+ * Return the number of TopicPartition accumulated-batches requests
+ *
+ * @return
+ */
+ public Map getAccumulatedBatches() {
+ Map accumulatedBatches = new HashMap<>();
+ for (Entry> topicPartitionDeque : accumulator.batches().entrySet()) {
+ accumulatedBatches.put(topicPartitionDeque.getKey(), topicPartitionDeque.getValue().size());
+ }
+ return accumulatedBatches;
+ }
+
+ void setInFlightBatches(Map> inFlightBatches) {
+ this.inFlightBatches = inFlightBatches;
+ }
+
+ /**
+ * Return the number of TopicPartition in-flight requests
+ *
+ * @return The request count.
+ */
+ public Map getInFlightRequestCount() {
+ Map inFlightRequestCount = new HashMap<>();
+ for (Entry> topicPartitionListEntry : this.inFlightBatches.entrySet()) {
+ Integer count = 0;
+ TopicPartition topicPartition = topicPartitionListEntry.getKey();
+ List producerBatchList = topicPartitionListEntry.getValue();
+ for (ProducerBatch producerBatch : producerBatchList) {
+ count += producerBatch.recordCount;
+ }
+ inFlightRequestCount.put(topicPartition, count);
+ }
+ return inFlightRequestCount;
+ }
+}
Index: clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
--- a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java (revision 6c9e27c2b51b3653f8a4009ca9712b3bf85ebacd)
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java (date 1622547166291)
@@ -135,6 +135,13 @@
return this.topicPartition.partition();
}
+ /**
+ * The topic partition the record was sent to
+ */
+ public TopicPartition topicPartition() {
+ return this.topicPartition;
+ }
+
@Override
public String toString() {
return topicPartition.toString() + "@" + offset;