Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.5.0
-
None
Description
I've created a simple class to test the resequencer as listed below:
public final class MyCamelResequencer { private static final Logger LOG = Logger.getLogger(MyCamelResequencer.class); private MyCamelResequencer() {} public static void main(String args[]) throws Exception { CamelContext context = new DefaultCamelContext(); Tracer tracer = new Tracer(); tracer.getFormatter().setShowBreadCrumb(false); tracer.getFormatter().setShowNode(true); context.addInterceptStrategy(tracer); String brokerUrl = "vm://localhost?broker.persistent=false"; // String brokerUrl = "tcp://localhost:61616"; context.addComponent("activemq", ActiveMQComponent.activeMQComponent(brokerUrl)); context.addRoutes(new RouteBuilder() { public void configure() { // from("seda:FOO").to("log:PLANETS"); // from("seda:FOO").resequencer(header("seqnum")).to("log:PLANETS"); // from("activemq:TEST.IN").to("log:PLANETS"); from("activemq:TEST.IN").resequencer(header("seqnum")).to("log:PLANETS"); // from("activemq:TEST.IN").resequencer(header("seq")).to("activemq:TEST.OUT"); } }); ProducerTemplate template = context.createProducerTemplate(); context.start(); List<String> planets = new ArrayList(8); planets.add("Mercury"); planets.add("Venus"); planets.add("Earth"); planets.add("Mars"); planets.add("Jupiter"); planets.add("Saturn"); planets.add("Uranus"); planets.add("Neptune"); List<Integer> numbers = new ArrayList<Integer>(8); numbers.add(1); numbers.add(2); numbers.add(3); numbers.add(4); numbers.add(5); numbers.add(6); numbers.add(7); numbers.add(8); for (String planet: planets) { Collections.shuffle(numbers); int seqnum = numbers.remove(0); String message = "I am the planet " + planet + " with seqnum: " + seqnum; LOG.info("Sending message: " + message); template.sendBodyAndHeader("activemq:TEST.IN", message, "seqnum", seqnum); // template.sendBodyAndHeader("seda:FOO", message, "seqnum", seqnum); } Thread.sleep(10000); LOG.info("Shutting down the Camel context"); context.stop(); System.exit(1); }
The following are my observations:
- SEDA -> log receives all messages
- SEDA -> resequencer -> log receives all messages
- ActiveMQ -> log receives all messages
- ActiveMQ -> resequencer -> log does not receive all messages
Upon enabling the Tracer interceptor, I'm able to see that it doesn't even receive all messages from the ActiveMQ component:
[ main] DefaultCamelContext INFO JMX enabled. Using InstrumentationLifecycleStrategy.
[ main] BrokerService INFO Using Persistence Adapter: MemoryPersistenceAdapter
[ main] BrokerService INFO ActiveMQ null JMS Message Broker (localhost) is starting
[ main] BrokerService INFO For help or more information please see: http://activemq.apache.org/
[ JMX connector] ManagementContext WARN Failed to start jmx connector: Cannot bind to URL [rmi://localhost:1099/jmxrmi]: javax.naming.NameAlreadyBoundException: jmxrmi [Root exception is java.rmi.AlreadyBoundException: jmxrmi]
[ main] BrokerService INFO ActiveMQ JMS Message Broker (localhost, ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-0:0) started
[ main] TransportConnector INFO Connector vm://localhost Started
[ main] MyCamelResequencer INFO Sending message: I am the planet Mercury with seqnum: 1
[ main] MyCamelResequencer INFO Sending message: I am the planet Venus with seqnum: 3
[ main] MyCamelResequencer INFO Sending message: I am the planet Earth with seqnum: 6
[ main] MyCamelResequencer INFO Sending message: I am the planet Mars with seqnum: 2
[ main] MyCamelResequencer INFO Sending message: I am the planet Jupiter with seqnum: 4
[ main] MyCamelResequencer INFO Sending message: I am the planet Saturn with seqnum: 8
[ main] MyCamelResequencer INFO Sending message: I am the planet Uranus with seqnum: 7
[ main] MyCamelResequencer INFO Sending message: I am the planet Neptune with seqnum: 5
[aultMessageListenerContainer-1] TraceInterceptor INFO -> interceptor1 Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[ [] -> [To[log:PLANETS]]]]), RecipientList[log:org.apache.camel.DeadLetterChannel?level=error], RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{} Headers:
BodyType:String Body:I am the planet Mercury with seqnum: 1
[aultMessageListenerContainer-1] TraceInterceptor INFO -> resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly Properties:
Headers:
BodyType:String Body:I am the planet Mercury with seqnum: 1
[aultMessageListenerContainer-2] TraceInterceptor INFO -> interceptor1 Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[ [] -> [To[log:PLANETS]]]]), RecipientList[log:org.apache.camel.DeadLetterChannel?level=error], RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{} Headers:
BodyType:String Body:I am the planet Earth with seqnum: 6
[aultMessageListenerContainer-2] TraceInterceptor INFO -> resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly Properties:
Headers:
BodyType:String Body:I am the planet Earth with seqnum: 6
[aultMessageListenerContainer-3] TraceInterceptor INFO -> interceptor1 Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[ [] -> [To[log:PLANETS]]]]), RecipientList[log:org.apache.camel.DeadLetterChannel?level=error], RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{} Headers:
BodyType:String Body:I am the planet Mars with seqnum: 2
[aultMessageListenerContainer-3] TraceInterceptor INFO -> resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly Properties:
Headers:
BodyType:String Body:I am the planet Mars with seqnum: 2
[aultMessageListenerContainer-4] TraceInterceptor INFO -> interceptor1 Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[ [] -> [To[log:PLANETS]]]]), RecipientList[log:org.apache.camel.DeadLetterChannel?level=error], RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{} Headers:
BodyType:String Body:I am the planet Jupiter with seqnum: 4
[aultMessageListenerContainer-4] TraceInterceptor INFO -> resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly Properties:
Headers:
BodyType:String Body:I am the planet Jupiter with seqnum: 4
[aultMessageListenerContainer-5] TraceInterceptor INFO -> interceptor1 Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[ [] -> [To[log:PLANETS]]]]), RecipientList[log:org.apache.camel.DeadLetterChannel?level=error], RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{} Headers:
BodyType:String Body:I am the planet Uranus with seqnum: 7
[aultMessageListenerContainer-5] TraceInterceptor INFO -> resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly Properties:
Headers:
BodyType:String Body:I am the planet Uranus with seqnum: 7
[aultMessageListenerContainer-6] TraceInterceptor INFO -> interceptor1 Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[ [] -> [To[log:PLANETS]]]]), RecipientList[log:org.apache.camel.DeadLetterChannel?level=error], RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{} Headers:
BodyType:String Body:I am the planet Neptune with seqnum: 5
[aultMessageListenerContainer-6] TraceInterceptor INFO -> resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly Properties:
Headers:
BodyType:String Body:I am the planet Neptune with seqnum: 5
[[log:PLANETS]]] Polling Thread] TraceInterceptor INFO -> to1 To[log:PLANETS] InOnly Properties:{} Headers:
BodyType:String Body:I am the planet Venus with seqnum: 3
[[log:PLANETS]]] Polling Thread] PLANETS INFO Exchange[BodyType:String, Body:I am the planet Venus with seqnum: 3]
[[log:PLANETS]]] Polling Thread] TraceInterceptor INFO -> to1 To[log:PLANETS] InOnly Properties:{} Headers:
BodyType:String Body:I am the planet Saturn with seqnum: 8
[[log:PLANETS]]] Polling Thread] PLANETS INFO Exchange[BodyType:String, Body:I am the planet Saturn with seqnum: 8]
[ main] MyCamelResequencer INFO Shutting down the Camel context
[ ActiveMQ ShutdownHook] BrokerService INFO ActiveMQ Message Broker (localhost, ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-0:0) is shutting down
[ ActiveMQ ShutdownHook] TransportConnector INFO Connector vm://localhost Stopped
[ ActiveMQ ShutdownHook] BrokerService INFO ActiveMQ JMS Message Broker (localhost, ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-0:0) stopped