Description
I would expect the following code to raise an Exception, either in the adminClient creation or a TimeoutException when getting the future (there is no kafka running on localhost on that port).
Properties config = new Properties(); config.setProperty("bootstrap.servers", "localhost:1234"); AdminClient admin = AdminClient.create(config); admin.listTopics().names().get(1, TimeUnit.SECONDS);
The code however seems to hang forever in the last step.
A possible cause for the behavior might be a bug in the KafkaFutureImpl class:
private static class SingleWaiter<R> extends BiConsumer<R, Throwable> { [...] R await(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { long startMs = System.currentTimeMillis(); long waitTimeMs = (unit.toMillis(timeout) > 0) ? unit.toMillis(timeout) : 1; long delta = 0; synchronized (this) { while (true) { if (exception != null) wrapAndThrow(exception); if (done) return value; if (delta > waitTimeMs) { throw new TimeoutException(); } this.wait(waitTimeMs - delta); delta = System.currentTimeMillis() - startMs; } } }
While debugging I observed waitTimeMs and delta to become equal after one iteration, giving a this.wait(0) in the next iteration, which according to the documentation http://docs.oracle.com/javase/8/docs/api/java/lang/Object.html#wait-long- results in an indefinite wait.
Attachments
Issue Links
- links to