Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7143

Cannot use KafkaConsumer with Kotlin coroutines due to various issues

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.1.0
    • None
    • clients
    • None

    Description

      I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin coroutines, which supports a style of async programming that avoids the need for callbacks (and existing callback-based API's are usually easily be adapted to this style with a simple wrapper). With coroutines, continuations are used instead: methods with callbacks are suspended, and resumed once the call is complete. With coroutines, while access to the KafkaConsumer is done in a thread-safe way, it does NOT necessarily happen from a single thread – a different underlying thread may actually execute the code after the suspension point.

      However, the KafkaConsumer includes additional checks to verify not only the thread safety of the client, but that the same thread is being used – if the same thread (by id) is not being used the consumer throws an exception like:

      Exception in thread "ForkJoinPool.commonPool-worker-25" java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
      	at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321)
      

      I understand this check is present to protect people from themselves, but I'd like the ability to disable this check so that this code can be used effectively by libraries such as Kotlin coroutines.

      There is a workaround for the above: run the consumer in a coroutine with a single-thread context, which isn't ideal because it dedicates a thread to the consumer.

      However, further problems await – the `commitAsync` method also cannot be used with coroutines because the callback is never executed and therefore the coroutine is never resumed past the suspension point. Upon investigation, it seems the callback is only executed after future calls to poll, which in a regular polling loop with coroutines will never happen because of the suspension on `commitAsync`, so we have a deadlock. I guess the idea behind this Kafka consumer API design is that consuming new messages may continue, even though commits of previous offsets (which happened an arbitrarily long amount of time in the past) have not necessarily been processed. However, with a coroutine based API, the commitAsync can be sequential before the next poll like commitSync, but happen asynchronously without tying up a client application thread.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              rocketraman Raman Gupta
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated: