Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
None
-
None
-
None
Description
If an Exception is thrown in GetKafka's consumeFromKafka method, it enters the following block:
catch (final Exception e) { this.shutdownConsumer(); getLogger().error("Failed to receive FlowFile from Kafka due to {}", new Object[]{e}); if (flowFile != null) { session.remove(flowFile); }
This call to shutdownConsumer performs the following:
if (this.executor != null) { this.executor.shutdown(); try { if (!this.executor.awaitTermination(30000, TimeUnit.MILLISECONDS)) { this.executor.shutdownNow(); getLogger().warn("Executor did not stop in 30 sec. Terminated."); } this.executor = null; } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }
Now that this.executor is set to null, the onTrigger method will continually throw NullPointerException because it attempts to call executor.submit:
synchronized (this.consumerStreamsReady) { if (!this.consumerStreamsReady.get()) { Future<Void> f = this.executor.submit(new Callable<Void>() { ...
and also
if (this.consumerStreamsReady.get()) { Future<Void> consumptionFuture = this.executor.submit(new Callable<Void>() { @Override public Void call() throws Exception { ...