Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-14087

Add jmh benchmark for producer with MockClient

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • producer
    • None

    Description

      Something like this

              Map<String, Object> configs = new HashMap<>();
              configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
              configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
              Time time = Time.SYSTEM;
              AtomicInteger offset = new AtomicInteger(0);
              MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 2));
              ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
              StringBuilder value = new StringBuilder("foo");
              for (int i = 0; i < 1000; i++)
                  value.append("x");
              AtomicInteger totalRecords = new AtomicInteger(0);
              long start = time.milliseconds();
              CompletableFuture[] futures = new CompletableFuture[3];
              for (int i = 0; i < futures.length; i++) {
                  futures[i] = CompletableFuture.runAsync(() -> {
                      ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
                      MockClient client = new MockClient(time, metadata) {
                          @Override
                          public void send(ClientRequest request, long now) {
                              super.send(request, now);
                              if (request.apiKey() == ApiKeys.PRODUCE) {
                                  // Prepare response data from request.
                                  ProduceResponseData responseData = new ProduceResponseData();                            ProduceRequest produceRequest = (ProduceRequest) request.requestBuilder().build();
                                  produceRequest.data().topicData().forEach(topicData ->
                                          topicData.partitionData().forEach(partitionData -> {
                                              String topic = topicData.name();
                                              ProduceResponseData.TopicProduceResponse tpr = responseData.responses().find(topic);
                                              if (tpr == null) {
                                                  tpr = new ProduceResponseData.TopicProduceResponse().setName(topic);
                                                  responseData.responses().add(tpr);
                                              }
                                              tpr.partitionResponses().add(new ProduceResponseData.PartitionProduceResponse()
                                                      .setIndex(partitionData.index())
                                                      .setRecordErrors(Collections.emptyList())
                                                      .setBaseOffset(offset.addAndGet(1))
                                                      .setLogAppendTimeMs(time.milliseconds())
                                                      .setLogStartOffset(0)
                                                      .setErrorMessage("")
                                                      .setErrorCode(Errors.NONE.code()));
                                          }));                            // Schedule a reply to come after some time to mock broker latency.
                                  executorService.schedule(() -> respond(new ProduceResponse(responseData)), 20, TimeUnit.MILLISECONDS);
                              }
                          }
                      };                client.updateMetadata(initialUpdateResponse);                InitProducerIdResponseData responseData = new InitProducerIdResponseData()
                              .setErrorCode(Errors.NONE.code())
                              .setProducerEpoch((short) 0)
                              .setProducerId(42)
                              .setThrottleTimeMs(0);
                      client.prepareResponse(body -> body instanceof InitProducerIdRequest,
                              new InitProducerIdResponse(responseData), false);                try (KafkaProducer<String, String> producer = kafkaProducer(
                              configs,
                              new StringSerializer(),
                              new StringSerializer(),
                              metadata,
                              client,
                              null,
                              time
                      )) {
                          final int records = 20_000_000;                    for (int k = 0; k < records; k++) {
                              producer.send(new ProducerRecord<>("topic", null, start, "key-" + k, value.toString()));
                          }                    totalRecords.addAndGet(records);
                      }
                  });
              }        for (CompletableFuture future : futures) {
                  future.get();
              } 

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            alivshits Artem Livshits
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: