Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-21114

camel-zipfile - ZipSplitter with AggregationStrategy does not aggregate all splits

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 3.14.10, 3.22.2
    • 4.8.0
    • camel-zipfile
    • None
    • Unknown

    Description

      A transacted route with ZipSplitter and Aggregation Strategy does not aggregate the last zip file entry. The issue only occurs for transacted routes.

       

      Example:

       

      Zip Archive

      • A.xml
      • B.xml

       

      Both splits are processed but only for the first exchange (A.xml) the aggregate method is called.

      For a zip archive with two entries the doRun() method of MulticastTransactedTask is called three times. The third time iterator.next() returns null although hasNext() was true. As a result the doDone() method is called but there is still a task in the queue (with the second exchange). This task is processed after doDone() was executed but it’s not aggregated because of a done check in aggregate() of MulticastTransactedTask.

       

      We found the problem in Camel 3.14, but it is still present in Camel 3.22.

       

      It can be reproduced with the following test (it works if you remove the transacted tag from the route)

      import org.apache.camel.AggregationStrategy;
      import org.apache.camel.Exchange;
      import org.apache.camel.builder.RouteBuilder;
      import org.apache.camel.component.mock.MockEndpoint;
      import org.apache.camel.dataformat.zipfile.ZipSplitter;
      import org.apache.camel.spring.spi.SpringTransactionPolicy;
      import org.apache.camel.test.junit4.CamelTestSupport;
      import org.h2.jdbcx.JdbcDataSource;
      import org.junit.Test;
      import org.springframework.jdbc.datasource.DataSourceTransactionManager;
      import org.springframework.transaction.support.TransactionTemplate;
      
      public class ZipSplitterTest extends CamelTestSupport  {    
      
      String zipArchiveWithTwoFiles = "UEsDBBQAAAAIAFlrtFDFAfecUAAAAB4BAAALAAAAT3JkZXJzMS54bWyzyS9KSS0qtuPl4oQwQSxOm8wUOxMb/cwUCK+gKD+lNLkEzOG0yUvMTbWDCik42uiD+WB1+kgKbfThxqEZbEqUwU6kG2xGlMHOhA2GsortAFBLAwQUAAAACABBW9hQgBf0tVgAAAAqAQAACwAAAE9yZGVyczIueG1ss8kvSkktKrbj5eKEMEEsTpvMFDtDQ0Mb/cwUCL+gKD+lNLkEzOG0yUvMTbWDCimA1YFFwCr1kZTa6MONRDPcyMiIKMPB6kg13NjYmCjDweoIGQ5lFdsBAFBLAQIfABQAAAAIAFlrtFDFAfecUAAAAB4BAAALACQAAAAAAAAAIAAAAAAAAABPcmRlcnMxLnhtbAoAIAAAAAAAAQAYAAD57I2ZLtYBg97kuHn02gEA+eyNmS7WAVBLAQIfABQAAAAIAEFb2FCAF/S1WAAAACoBAAALACQAAAAAAAAAIAAAAHkAAABPcmRlcnMyLnhtbAoAIAAAAAAAAQAYAAAxPXoJStYBjn3iuHn02gEAMT16CUrWAVBLBQYAAAAAAgACALoAAAD6AAAAAAA=";
          
          @Test
          public void testIfAllSplitsAggregated() throws Exception {
              MockEndpoint mock = getMockEndpoint("mock:result");
              template.sendBody("direct:start", "");
            
              // Check if second file was processed in aggregate() method of AggregationStrategy
              assertEquals("Orders2.xml", mock.getExchanges().get(0).getMessage().getHeader("CamelFileName", String.class));
          }    
      
          @Override
          protected RouteBuilder createRouteBuilder() throws Exception {
              return new RouteBuilder() {
                  @Override
                  public void configure() throws Exception {
        
                      JdbcDataSource dataSource = new JdbcDataSource();
                      dataSource.setURL("jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1");
                      dataSource.setUser("sa");
                      dataSource.setPassword("");                
      
                      DataSourceTransactionManager txManager = new DataSourceTransactionManager(dataSource);
                      
                      TransactionTemplate transactionTemplate = new TransactionTemplate(txManager);
                      transactionTemplate.setPropagationBehaviorName("PROPAGATION_REQUIRED");
                      transactionTemplate.setIsolationLevelName("ISOLATION_READ_COMMITTED");
                      transactionTemplate.setTimeout(1800);
      
                      SpringTransactionPolicy springTransactionPolicy = new SpringTransactionPolicy();
                      springTransactionPolicy.setTransactionManager(txManager);
                      springTransactionPolicy.setTransactionTemplate(transactionTemplate);
                      
                      getContext().getRegistry().bind("transacted", springTransactionPolicy);
                      getContext().getRegistry().bind("zipSplitter", new ZipSplitter());
      
                      from("direct:start")
                          .transacted("transacted")
                          .setBody().simple(zipArchiveWithTwoFiles)
                          .unmarshal().base64()
                          .split().ref("zipSplitter").streaming().aggregationStrategy(new StringAggregationStrategy())
                              .log("Splitted")
                          .end()
                          .to("mock:result");
                  }
              };
          }
          
          private static class StringAggregationStrategy implements AggregationStrategy {
              @Override
              public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                 System.out.println(newExchange.getMessage().getHeader("CamelFileName"));
                 return newExchange;
              }
          }
      }

       

       

      Attachments

        Issue Links

          Activity

            People

              davsclaus Claus Ibsen
              AWeickel Andre Weickel
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: