Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
1.2.0
-
None
Description
PutHive Streaming processor is sometimes throwing IllegalStateException with following stack trace, resulting in the rollback of the session.
2017-05-04 03:35:04,190 ERROR [Timer-Driven Process Thread-2] o.a.n.processors.hive.PutHiveStreaming PutHiveStreaming[id=d17c8654-015b-1000-0000-0000701816b4] org.apache.nifi.processors.hive.PutHiveStreaming$$Lambda$1254/2096694862@5bf15592 failed to process due to java.lang.IllegalStateException; rolling back session: {} java.lang.IllegalStateException: null at org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:210) at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:306) at org.apache.nifi.processor.util.pattern.PartialFunctions.onTrigger(PartialFunctions.java:115) at org.apache.nifi.processor.util.pattern.RollbackOnFailure.onTrigger(RollbackOnFailure.java:184) at org.apache.nifi.processors.hive.PutHiveStreaming.onTrigger(PutHiveStreaming.java:582) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1118) at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:144) at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
This is caused by the incorrect use of ProcessSession at PutHiveStreaming, that is added by NIFI-3145. The processor reads Avro records from incoming FlowFile, as well as writing succeeded and failed records to outgoing FlowFiles (success/fail).
To write outgoing FlowFiles, the processor uses different threads so that it can append those Avro record without closing the OutputStream passed from ProcessSession.append method.
However, StandardProcessSession is not thread-safe. It has many HashMaps in it and particularly, the 'recursionSet' (HashSet) was the cause of this issue, as different threads add/remove entries from the set, its 'isEmpty' method doesn't work correctly, and can cause the IllegalStateException reported in this JIRA.
The reason why NIFI-3145 changed the way to append records to outgoing FlowFile is, that previously (till NiFi 1.1.2) the processor didn't write outgoing FlowFiles correctly. When additional records are written, the processor added Avro header to the FlowFile again. Then it won't be able to read as an Avro file. So NIFI-3145 fixed it by using different threads, but it also generated this issue.
We need to implement a logic that is single-threaded and properly able to handle appending Avro records.
Attachments
Issue Links
- links to