Details
-
Bug
-
Status: Open
-
Critical
-
Resolution: Unresolved
-
1.9.0
-
None
-
None
Description
I encountered an exception, as following:
KafkaSource EXCEPTION, {} java.lang.NullPointerException: null at org.apache.flume.channel.SpillableMemoryChannel$SpillableMemoryTransaction.doRollback(SpillableMemoryChannel.java:587) at org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:168) at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:196) at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:311) at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133) at java.lang.Thread.run(Thread.java:748)
ChannelProcessor#processEventBatch() method catch exception if put evet to channel error, but the catch code block first call tx.rollback(), however tx.rollback() method may throw new exception, this moment, the actual exception is overrided, we did't know the actual exception.We should first log the excpetion, then call tx.rollback().
public void processEventBatch(List<Event> events) { for (Channel reqChannel : reqChannelQueue.keySet()) { Transaction tx = reqChannel.getTransaction(); Preconditions.checkNotNull(tx, "Transaction object must not be null"); try { tx.begin(); List<Event> batch = reqChannelQueue.get(reqChannel); for (Event event : batch) { reqChannel.put(event); } tx.commit(); } catch (Throwable t) { // this line may throw new exception, the actual exception is overrided, we did't know the actual exception tx.rollback(); if (t instanceof Error) { LOG.error("Error while writing to required channel: " + reqChannel, t); throw (Error) t; } else if (t instanceof ChannelException) { throw (ChannelException) t; } else { throw new ChannelException("Unable to put batch on required " + "channel: " + reqChannel, t); } } finally { } } }
Attachments
Issue Links
- links to