Details
-
Sub-task
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
None
-
None
-
None
Description
see https://github.com/apache/kafka/pull/16841#discussion_r1715730246
the sample code:
@ParameterizedTest @ValueSource(strings = Array("kraft")) def test(quorum: String): Unit = { client = createAdminClient client.createTopics(Collections.singletonList(new NewTopic("ikea", 1, 1.toShort))).all().get() TimeUnit.SECONDS.sleep(1) val producer = new KafkaProducer(Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, plaintextBootstrapServers(brokers).asInstanceOf[Object]), new ByteArraySerializer, new ByteArraySerializer) try { (0 until 100).foreach { i => producer.send(new ProducerRecord[Array[Byte], Array[Byte]]("ikea", new Array[Byte](10), new Array[Byte](10))) } producer.flush() } finally producer.close() client.alterPartitionReassignments(util.Collections.singletonMap(new TopicPartition("ikea", 0), Optional.of(new NewPartitionReassignment(util.Arrays.asList(0, 1))))) .all().get() client.deleteRecords(util.Collections.singletonMap(new TopicPartition("ikea", 0), RecordsToDelete.beforeOffset(3))).all().get() TimeUnit.SECONDS.sleep(5) assertEquals(2, client.describeTopics(util.Collections.singletonList("ikea")).topicNameValues().get("ikea").get().partitions() .get(0).isr().size()) }
Attachments
Issue Links
- links to