Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10474

Kafka Java client introduces CPU overhead when there are many consumers

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 2.4.1
    • Fix Version/s: None
    • Component/s: clients
    • Labels:
      None

      Description

      We are using the Kafka Java client (version 2.4.1) and we started noticing a gradual increase in the CPU usage of our production instances. The usage was not correlated with load, because it increases in bumps whenever we acquire a new customer. We realized it's probably the number of consumers that is growing steadily, as each registration adds one consumer per account by default.

       

      We did some profiling and by running a local sample with 3000 consumers, we noticed that the JmxReporter generates some CPU overhead. Unfortunately, there's no way to disable it, so we created a class with the same name and package on the classpath and let the classloader pick it up first. That alone lead to a drop from 60% sustained CPU usage to 40% sustained.

       

      We then optimized the creation of consumers only for active customers (so no more idle consumers). When the idle consumers were removed, the CPU dropped to 8% sustained usage.

      Apart from the JMX metrics reporter, I don't know what contributes to idle consumers generating such CPU overhead. If it helps, our consumer does this:

       

       

      try (KafkaConsumer<Integer, PendingAuditLogEntry> consumer = new KafkaConsumer<>(props)) {
      	consumer.subscribe(Collections.singletonList(applicationId.toString()));	
              while (true) {
      		try {
      			synchronized (applicationId.toString().intern()) {
      				
      				ConsumerRecords<Integer, Message> messages = consumer.poll(Duration.ofMillis(2000));				List<Message> entries = StreamSupport.stream(messages.spliterator(), false)
      						.map(ConsumerRecord::value).collect(Collectors.toList());				// idle consumers don't get here, because they have no entries to process
      				if (!entries.isEmpty()) {
      					try {
      						// logic
      						consumer.commitSync();
      					} catch (ExecutionException ex) {
      						// revert to the last successful batch start offset
      						// poll() advances its internal position, which is different from the committed offset
      						consumer.committed(consumer.assignment()).forEach(consumer::seek);
      					}
      				}
      			}
      			// we want to process more records at once (due to business logic specifics) so we sleep for a preconfigured period of time, in our case - 10 seconds
      			Thread.sleep(kafkaSleepMillis);
      		} catch (Exception ex) {
      			logger.error("Exception while trying to get data from kafka consumer for appId={}, stopping.", applicationId, ex);
      			break;
      		}
      	}
      } catch (Exception ex) {
      	logger.error("Failed to run kafka consumer", ex);
      }

       Each consumer runs in a separate thread in a cached execturor service

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              bozho Bozhidar Bozhanov
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated: