Description
The KafkaProducer is not properly joining the thread it creates. The code is like this:
try { this.ioThread.join(timeUnit.toMillis(timeout)); } catch (InterruptedException t) { firstException.compareAndSet(null, t); log.error("Interrupted while joining ioThread", t); }
If the code is interrupted while performing the join, it will end up leaving the io thread running. The correct way of handling this is a follows:
try { this.ioThread.join(timeUnit.toMillis(timeout)); } catch (InterruptedException t) { // propagate the interrupt this.ioThread.interrupt(); try { this.ioThread.join(); } catch (InterruptedException t) { firstException.compareAndSet(null, t); log.error("Interrupted while joining ioThread", t); } finally { // make sure we maintain the interrupted status Thread.currentThread.interrupt(); } }
Attachments
Issue Links
- is related to
-
KAFKA-5936 KafkaProducer should not wrap InterruptedException in close() with KafkaException
- Resolved
- links to