Disclaimer: I'm fairly new to Camel, so please bear with me.
I would like to discuss https://github.com/apache/camel/pull/717 and possibly propose an improvement.
In the PR mentioned above, support for publisher confirms has been implemented. The related documentation can be found on https://www.rabbitmq.com/confirms.html#publisher-confirms.
My thoughts is that the current implementation does not perform well. Basically, it has a throughput of one message at a time. I'll try to explain.
- On https://github.com/apache/camel/blob/master/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java#L178, an exchange is retrieved. This contains a payload to be sent to the RabbitMQ broker. The exchanges are retrieved one by one.
- On https://github.com/apache/camel/blob/master/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java#L288, we start with the publication to the broker.
- On https://github.com/apache/camel/blob/master/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java#L130, the RabbitMQ API `channel.basicPublish()` is used to publish the payload.
- Assuming that publisher confirmation is enabled, we come in https://github.com/apache/camel/blob/master/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java#L145. Here, we call the RabbitMQ API `channel.waitForConfirmsOrDie()`. This is a blocking call. It waits until the broker has confirmed that the publication has been correctly processed and enqueued. Since we just published one single message, we await the confirmation of that one single message.
I believe this means that our throughput is limited to one message at a time. For each message sent to RabbitMQ, we have to await the confirmation.
I believe it would be better to avoid using `channel.waitForConfirmsOrDie()` and implement a `ConfirmListener` ourselves. See https://github.com/rabbitmq/rabbitmq-java-client/blob/master/src/main/java/com/rabbitmq/client/ConfirmListener.java. This way, we can continue publications of subsequent messages, while we receive ACK's or NACK's of earlier publications.
An implementation gotcha is the fact that upon broker restart/disconnection the callback `ConfirmListener.nack()` will not be automatically called by the RabbitMQ, as the RabbitMQ API does not keep track of the in-flight messages. (In case you are wondering, `channel.waitForConfirmsOrDie()` is implemented by counting the responses.) In other words, we will have to assume a NACK for all in-flight messages when the connection was closed. This also means that we have to keep track of all in-flight messages, together with the corresponding `AsyncCallback`, at our end.
We also might want to limit the number of in-flight messages between the component and the broker.
What do you think?