import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.ClassRule; import org.testcontainers.containers.KafkaContainer; import java.util.Properties; import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.*; public class Test { @ClassRule public static KafkaContainer kafka = new KafkaContainer(); @org.junit.Test public void testIt() { Properties producerProps = new Properties(); producerProps.put(BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers().replace("PLAINTEXT://","")); // producerProps.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); producerProps.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); producerProps.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); producerProps.put(ENABLE_IDEMPOTENCE_CONFIG, "true"); producerProps.put(TRANSACTIONAL_ID_CONFIG, "prod-0"); KafkaProducer producer = new KafkaProducer(producerProps); ProducerRecord record = new ProducerRecord<>("something", "A message"); producer.initTransactions(); producer.beginTransaction(); producer.send(record); producer.commitTransaction(); } }