Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-11777

Transactional hazelcast:seda component uses not transaction aware queue

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.19.2
    • 2.20.0
    • camel-hazelcast
    • None
    • Unknown

    Description

      As mentioned in docs
      http://docs.hazelcast.org/docs/3.8.4/manual/html-single/index.html#creating-a-transaction-interface

      Data structures should be transaction-aware and initialized inside of transaction context. Now hazelcast seda component uses queue initialized externally for the transaction. So when we rollback transaction - things stay the same like we already committed changes (this simply noop).

      Patch should look like (but exactly this version fails tests, so I should investigate more)

      index 7e3b24cc8f..cebd69ce16 100644
      --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
      +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
      @@ -16,10 +16,10 @@
        */
       package org.apache.camel.component.hazelcast.seda;
       
      -import java.util.concurrent.BlockingQueue;
       import java.util.concurrent.ExecutorService;
       import java.util.concurrent.TimeUnit;
       
      +import com.hazelcast.core.BaseQueue;
       import com.hazelcast.transaction.TransactionContext;
       
       import org.apache.camel.AsyncCallback;
      @@ -71,7 +71,7 @@ public class HazelcastSedaConsumer extends DefaultConsumer implements Runnable {
           }
       
           public void run() {
      -        final BlockingQueue<?> queue = endpoint.getQueue();
      +        BaseQueue<?> queue = endpoint.getHazelcastInstance().getQueue(endpoint.getConfiguration().getQueueName());
       
               while (queue != null && isRunAllowed()) {
                   final Exchange exchange = this.getEndpoint().createExchange();
      @@ -85,6 +85,7 @@ public class HazelcastSedaConsumer extends DefaultConsumer implements Runnable {
                           if (transactionCtx != null) {
                               log.trace("Begin transaction: {}", transactionCtx.getTxnId());
                               transactionCtx.beginTransaction();
      +                        queue = transactionCtx.getQueue(endpoint.getConfiguration().getQueueName());
                           }
                       }
       
      

      Attachments

        Issue Links

          Activity

            People

              davsclaus Claus Ibsen
              lanwen Kirill Merkushev
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: