Affects Version/s: 1.3.0, 1.4.0, 1.5.0, 1.6.0
Fix Version/s: None
The AMQP Consumer performs a "basicGet()". The was this basicGet is called results in the message being dequeued from the AMQP queue.
If a processor instances fails to submit a flow file to the output as a result in the case of an error in "session.write()" or the processor is unexpectedly halted before the flow file is created and persisted, the message consumer from an AMQP queue is lost and can't be recovered.
A potential fix here would be to:
- AMQPConsumer.java: Change the call "basicGet(this.queueName, true)" -> "basicGet(this.queueName, false)"
- AMQPConsumer.java: New method that wraps the basicAck() and basicNack() methods to taking a long (the delivery tag) and boolean (successes) if successes is true basicAck() is called is false basicNack() with requeue is called
- ConsumerAMQP.java: An additional call(s) to "consumer" to call the new method as needed in case of successes and error.