Description
Some context first
We have a spring based kafka streams application. This application is listening to two topics. Let's call them apartment and visitor. The apartments are stored in a global table, while the visitors are in the stream we are processing, and at one point we are joining the visitor stream together with the apartment table. In our test environment, both topics contain 10 partitions.
Issue
At first deployment, everything goes fine, the global table is built and all entries in the stream are processed.
After everything is finished, we shut down the application, restart it and send out a new set of visitors. The application seemingly does not respond.
After some more debugging it turned out that it simply takes 5 minutes to start up, because the global table takes 30 seconds (default value for the global request timeout) to accept that there are no messages in the apartment topics, for each and every partition. If we send out the list of apartments as new messages, the application starts up immediately.
To make matters worse, we have clients with 96 partitions, where the startup time would be 48 minutes. Not having messages in the topics between application shutdown and restart is a valid use case, so this is quite a big problem.
Possible workarounds
We could reduce the request timeout, but since this value is not specific for the global table initialization, but a global request timeout for a lot of things, we do not know what else it will affect, so we are not very keen on doing that. Even then, it would mean a 1.5 minute delay for this particular client (more if we will have other use cases in the future where we will need to use more global tables), which is far too much, considering that the application would be able to otherwise start in about 20 seconds.
Potential solutions we see
- Introduce a specific global table initialization timeout in GlobalStateManagerImpl. Then we would be able to safely modify that value without fear of making some other part of kafka unstable.
- Parallelize the initialization of the global table partitions in GlobalStateManagerImpl: knowing that the delay at startup is constant instead of linear with the number of partitions would be a huge help.
- As long as we receive a response, accept the empty map in the KafkaConsumer, and continue instead of going into a busy-waiting loop.
Attachments
Issue Links
- is fixed by
-
KAFKA-12980 Allow consumers to return from poll when position advances due to aborted transactions
- Resolved
- relates to
-
KAFKA-10315 Consider to throw exception for failed fetch requests
- Open