Details
-
Improvement
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
-
None
Description
Something like this
Map<String, Object> configs = new HashMap<>(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none"); Time time = Time.SYSTEM; AtomicInteger offset = new AtomicInteger(0); MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 2)); ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE); StringBuilder value = new StringBuilder("foo"); for (int i = 0; i < 1000; i++) value.append("x"); AtomicInteger totalRecords = new AtomicInteger(0); long start = time.milliseconds(); CompletableFuture[] futures = new CompletableFuture[3]; for (int i = 0; i < futures.length; i++) { futures[i] = CompletableFuture.runAsync(() -> { ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); MockClient client = new MockClient(time, metadata) { @Override public void send(ClientRequest request, long now) { super.send(request, now); if (request.apiKey() == ApiKeys.PRODUCE) { // Prepare response data from request. ProduceResponseData responseData = new ProduceResponseData(); ProduceRequest produceRequest = (ProduceRequest) request.requestBuilder().build(); produceRequest.data().topicData().forEach(topicData -> topicData.partitionData().forEach(partitionData -> { String topic = topicData.name(); ProduceResponseData.TopicProduceResponse tpr = responseData.responses().find(topic); if (tpr == null) { tpr = new ProduceResponseData.TopicProduceResponse().setName(topic); responseData.responses().add(tpr); } tpr.partitionResponses().add(new ProduceResponseData.PartitionProduceResponse() .setIndex(partitionData.index()) .setRecordErrors(Collections.emptyList()) .setBaseOffset(offset.addAndGet(1)) .setLogAppendTimeMs(time.milliseconds()) .setLogStartOffset(0) .setErrorMessage("") .setErrorCode(Errors.NONE.code())); })); // Schedule a reply to come after some time to mock broker latency. executorService.schedule(() -> respond(new ProduceResponse(responseData)), 20, TimeUnit.MILLISECONDS); } } }; client.updateMetadata(initialUpdateResponse); InitProducerIdResponseData responseData = new InitProducerIdResponseData() .setErrorCode(Errors.NONE.code()) .setProducerEpoch((short) 0) .setProducerId(42) .setThrottleTimeMs(0); client.prepareResponse(body -> body instanceof InitProducerIdRequest, new InitProducerIdResponse(responseData), false); try (KafkaProducer<String, String> producer = kafkaProducer( configs, new StringSerializer(), new StringSerializer(), metadata, client, null, time )) { final int records = 20_000_000; for (int k = 0; k < records; k++) { producer.send(new ProducerRecord<>("topic", null, start, "key-" + k, value.toString())); } totalRecords.addAndGet(records); } }); } for (CompletableFuture future : futures) { future.get(); }