Details
-
Improvement
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
1.10.0
-
None
Description
If any exception happens when ConsumerJMS tries to read messages, the process tries again immediately.
try { consumer.consume(destinationName, errorQueueName, durable, shared, subscriptionName, charset, new ConsumerCallback() { @Override public void accept(final JMSResponse response) { if (response == null) { return; } FlowFile flowFile = processSession.create(); flowFile = processSession.write(flowFile, out -> out.write(response.getMessageBody())); final Map<String, String> jmsHeaders = response.getMessageHeaders(); final Map<String, String> jmsProperties = response.getMessageProperties(); flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession); flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession); flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName); processSession.getProvenanceReporter().receive(flowFile, destinationName); processSession.putAttribute(flowFile, JMS_MESSAGETYPE, response.getMessageType()); processSession.transfer(flowFile, REL_SUCCESS); processSession.commit(); } }); } catch(Exception e) { consumer.setValid(false); throw e; // for backward compatibility with exception handling in flows } }
It should call context.yield in exception block. Notice PublishJMS is yielded in the same scenario. It is requires to do in the ConsumeJMS processor only.
Attachments
Issue Links
- links to