Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
3.14.10, 3.22.2
-
None
-
Unknown
Description
A transacted route with ZipSplitter and Aggregation Strategy does not aggregate the last zip file entry. The issue only occurs for transacted routes.
Example:
Zip Archive
- A.xml
- B.xml
Both splits are processed but only for the first exchange (A.xml) the aggregate method is called.
For a zip archive with two entries the doRun() method of MulticastTransactedTask is called three times. The third time iterator.next() returns null although hasNext() was true. As a result the doDone() method is called but there is still a task in the queue (with the second exchange). This task is processed after doDone() was executed but it’s not aggregated because of a done check in aggregate() of MulticastTransactedTask.
We found the problem in Camel 3.14, but it is still present in Camel 3.22.
It can be reproduced with the following test (it works if you remove the transacted tag from the route)
import org.apache.camel.AggregationStrategy; import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.dataformat.zipfile.ZipSplitter; import org.apache.camel.spring.spi.SpringTransactionPolicy; import org.apache.camel.test.junit4.CamelTestSupport; import org.h2.jdbcx.JdbcDataSource; import org.junit.Test; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.transaction.support.TransactionTemplate; public class ZipSplitterTest extends CamelTestSupport { String zipArchiveWithTwoFiles = "UEsDBBQAAAAIAFlrtFDFAfecUAAAAB4BAAALAAAAT3JkZXJzMS54bWyzyS9KSS0qtuPl4oQwQSxOm8wUOxMb/cwUCK+gKD+lNLkEzOG0yUvMTbWDCik42uiD+WB1+kgKbfThxqEZbEqUwU6kG2xGlMHOhA2GsortAFBLAwQUAAAACABBW9hQgBf0tVgAAAAqAQAACwAAAE9yZGVyczIueG1ss8kvSkktKrbj5eKEMEEsTpvMFDtDQ0Mb/cwUCL+gKD+lNLkEzOG0yUvMTbWDCimA1YFFwCr1kZTa6MONRDPcyMiIKMPB6kg13NjYmCjDweoIGQ5lFdsBAFBLAQIfABQAAAAIAFlrtFDFAfecUAAAAB4BAAALACQAAAAAAAAAIAAAAAAAAABPcmRlcnMxLnhtbAoAIAAAAAAAAQAYAAD57I2ZLtYBg97kuHn02gEA+eyNmS7WAVBLAQIfABQAAAAIAEFb2FCAF/S1WAAAACoBAAALACQAAAAAAAAAIAAAAHkAAABPcmRlcnMyLnhtbAoAIAAAAAAAAQAYAAAxPXoJStYBjn3iuHn02gEAMT16CUrWAVBLBQYAAAAAAgACALoAAAD6AAAAAAA="; @Test public void testIfAllSplitsAggregated() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); template.sendBody("direct:start", ""); // Check if second file was processed in aggregate() method of AggregationStrategy assertEquals("Orders2.xml", mock.getExchanges().get(0).getMessage().getHeader("CamelFileName", String.class)); } @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { JdbcDataSource dataSource = new JdbcDataSource(); dataSource.setURL("jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1"); dataSource.setUser("sa"); dataSource.setPassword(""); DataSourceTransactionManager txManager = new DataSourceTransactionManager(dataSource); TransactionTemplate transactionTemplate = new TransactionTemplate(txManager); transactionTemplate.setPropagationBehaviorName("PROPAGATION_REQUIRED"); transactionTemplate.setIsolationLevelName("ISOLATION_READ_COMMITTED"); transactionTemplate.setTimeout(1800); SpringTransactionPolicy springTransactionPolicy = new SpringTransactionPolicy(); springTransactionPolicy.setTransactionManager(txManager); springTransactionPolicy.setTransactionTemplate(transactionTemplate); getContext().getRegistry().bind("transacted", springTransactionPolicy); getContext().getRegistry().bind("zipSplitter", new ZipSplitter()); from("direct:start") .transacted("transacted") .setBody().simple(zipArchiveWithTwoFiles) .unmarshal().base64() .split().ref("zipSplitter").streaming().aggregationStrategy(new StringAggregationStrategy()) .log("Splitted") .end() .to("mock:result"); } }; } private static class StringAggregationStrategy implements AggregationStrategy { @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { System.out.println(newExchange.getMessage().getHeader("CamelFileName")); return newExchange; } } }
Attachments
Issue Links
- links to