Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-11777

Transactional hazelcast:seda component uses not transaction aware queue

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.19.2
    • Fix Version/s: 2.20.0
    • Component/s: camel-hazelcast
    • Labels:
      None
    • Estimated Complexity:
      Unknown

      Description

      As mentioned in docs
      http://docs.hazelcast.org/docs/3.8.4/manual/html-single/index.html#creating-a-transaction-interface

      Data structures should be transaction-aware and initialized inside of transaction context. Now hazelcast seda component uses queue initialized externally for the transaction. So when we rollback transaction - things stay the same like we already committed changes (this simply noop).

      Patch should look like (but exactly this version fails tests, so I should investigate more)

      index 7e3b24cc8f..cebd69ce16 100644
      --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
      +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
      @@ -16,10 +16,10 @@
        */
       package org.apache.camel.component.hazelcast.seda;
       
      -import java.util.concurrent.BlockingQueue;
       import java.util.concurrent.ExecutorService;
       import java.util.concurrent.TimeUnit;
       
      +import com.hazelcast.core.BaseQueue;
       import com.hazelcast.transaction.TransactionContext;
       
       import org.apache.camel.AsyncCallback;
      @@ -71,7 +71,7 @@ public class HazelcastSedaConsumer extends DefaultConsumer implements Runnable {
           }
       
           public void run() {
      -        final BlockingQueue<?> queue = endpoint.getQueue();
      +        BaseQueue<?> queue = endpoint.getHazelcastInstance().getQueue(endpoint.getConfiguration().getQueueName());
       
               while (queue != null && isRunAllowed()) {
                   final Exchange exchange = this.getEndpoint().createExchange();
      @@ -85,6 +85,7 @@ public class HazelcastSedaConsumer extends DefaultConsumer implements Runnable {
                           if (transactionCtx != null) {
                               log.trace("Begin transaction: {}", transactionCtx.getTxnId());
                               transactionCtx.beginTransaction();
      +                        queue = transactionCtx.getQueue(endpoint.getConfiguration().getQueueName());
                           }
                       }
       
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                davsclaus Claus Ibsen
                Reporter:
                lanwen Kirill Merkushev
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: