Description
1.Example
KafkaClient version is 3.1.0, KafkaProducer init properties:
Properties props = new Properties(); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false); props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60003);
Partial log of KafkaProducer initialization:
ssl.truststore.location = C:\Personal File\documents\KafkaSSL\client.truststore.jks ssl.truststore.password = [hidden] ssl.truststore.type = JKS transaction.timeout.ms = 60003 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer[main] INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in. [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'transaction.timeout.ms' was supplied but isn't a known config. [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 37edeed0777bacb3 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1645602332999
From the above log, you can see that KafkaProducer has applied the user's configuration, transaction.timeout.ms=60003, the default value of this configuration is 60000.
But we can see another line of log:
[main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'transaction.timeout.ms' was supplied but isn't a known config.
2.RootCause:
1) ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG is set to false.
So the configurations related to the KafkaProducer transaction will not be requested.
See the source code: KafkaProducer#configureTransactionState(...) .
2) AbstractConfig#logUnused() -> AbstractConfig#unused()
public Set<String> unused() { Set<String> keys = new HashSet<>(originals.keySet()); keys.removeAll(used); return keys; }
If a configuration has not been requested, the configuration will not be put into the used variable. SourceCode see as below:
AbstractConfig#get(String key)
protected Object get(String key) { if (!values.containsKey(key)) throw new ConfigException(String.format("Unknown configuration '%s'", key)); used.add(key); return values.get(key); }
Solution:
1. AbstractConfig#logUnused() method
Modify the log printing information of this method,and the unused configuration log print level can be changed to INFO, what do you think?
/** * Log infos for any unused configurations */ public void logUnused() { for (String key : unused()) log.info("The configuration '{}' was supplied but isn't a used config.", key); }
2. AbstractConfig provides two new methods: logUnknown() and unknown()
/** * Log warnings for any unknown configurations */ public void logUnknown() { for (String key : unknown()) log.warn("The configuration '{}' was supplied but isn't a known config.", key); }
public Set<String> unknown() { Set<String> keys = new HashSet<>(originals.keySet()); keys.removeAll(values.keySet()); return keys; }