Details
Description
The same code is running fine in Linux. I am trying to send a transactional record with exactly once schematics. These are my producer, consumer and broker setups.
public void sendWithTTemp(String topic, EHEvent event) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092,localhost:9093,localhost:9094");
// props.put("bootstrap.servers", "34.240.248.190:9092,52.50.95.30:9092,52.50.95.30:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put("transactional.id", "TID" + transactionId.incrementAndGet());
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "5000");
Producer<String, String> producer =
new KafkaProducer<>(props,
new StringSerializer(),
new StringSerializer());
Logger.log(this, "Initializing transaction...");
producer.initTransactions();
Logger.log(this, "Initializing done.");
try
{ Logger.log(this, "Begin transaction..."); producer.beginTransaction(); Logger.log(this, "Begin transaction done."); Logger.log(this, "Sending events..."); producer.send(new ProducerRecord<>(topic, event.getKey().toString(), event.getValue().toString())); Logger.log(this, "Sending events done."); Logger.log(this, "Committing..."); producer.commitTransaction(); Logger.log(this, "Committing done."); }catch (ProducerFencedException | OutOfOrderSequenceException
AuthorizationException e)
{
producer.close();
e.printStackTrace();
}
catch (KafkaException e) { producer.abortTransaction(); e.printStackTrace(); } |
producer.close();
}
In Consumer
I have set isolation.level=read_committed
In 3 Brokers
I'm running with the following properties
Properties props = new Properties();
props.setProperty("broker.id", "" + i);
props.setProperty("listeners", "PLAINTEXT://:909" + (2 + i));
props.setProperty("log.dirs", Configuration.KAFKA_DATA_PATH + "
B" + i);
props.setProperty("num.partitions", "1");
props.setProperty("zookeeper.connect", "localhost:2181");
props.setProperty("zookeeper.connection.timeout.ms", "6000");
props.setProperty("min.insync.replicas", "2");
props.setProperty("offsets.topic.replication.factor", "2");
props.setProperty("offsets.topic.num.partitions", "1");
props.setProperty("transaction.state.log.num.partitions", "2");
props.setProperty("transaction.state.log.replication.factor", "2");
props.setProperty("transaction.state.log.min.isr", "2");
I am not getting any records in the consumer. When I set isolation.level=read_uncommitted, I get the records. I assume that the records are not getting commited. What could be the problem? log attached
Attachments
Attachments
Issue Links
- is duplicated by
-
KAFKA-6196 Kafka Transactional producer with broker on windows
- Resolved
-
KAFKA-6153 Kafka Transactional Messaging does not work on windows but on linux
- Resolved
-
KAFKA-6076 Using new producer api of transaction twice failed when server run on Windows OS
- Resolved
- links to