-
Type:
Improvement
-
Status: Resolved
-
Priority:
Minor
-
Resolution: Invalid
-
Affects Version/s: None
-
Fix Version/s: None
-
Component/s: None
-
Labels:None
-
Estimated Complexity:Unknown
For the points 1 & 2, do we have camel docs available ?
1) When java calls a seda component directly, if the seda queue is full, Java must handle it.
2) If java calls direct component first, which in turn calls seda, if the seda queue is full, the DQL camel error handler handles it.
The underlying problem we experienced was java calling seda, getting a 'queue full' exception, but since the java code didn't look at the exception on the response object, it was being missed.
Since this queue was full we tend to have inflight messages which are kinda lost since batch shutdowns without waiting for inflight message to process.
Below is sample code
import org.apache.camel.CamelContext; import org.apache.camel.EndpointInject; import org.apache.camel.FluentProducerTemplate; import org.apache.camel.LoggingLevel; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.test.spring.ShutdownTimeout; import org.junit.jupiter.api.Test; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.TestPropertySource; import javax.inject.Inject; import java.util.stream.IntStream; //@TestPropertySource(properties="camel.springboot.shutdownTimeout=5") @ShutdownTimeout(value = 1) @ContextConfiguration(classes = CamelShutdownTest.TestConfiguration.class) public class CamelShutdownTest extends AbstractCamelTest { @Configuration public static class TestConfiguration { @Bean public RouteBuilder sedaRoute() { return new RouteBuilder() { @Override public void configure() { errorHandler(deadLetterChannel("direct:dead")); from("direct:test") .to("seda:test?size=2"); from("seda:test?size=2") .delayer(2000L) .log(LoggingLevel.INFO, LoggerFactory.getLogger(CamelShutdownTest.class), "done ${body}"); from("direct:dead") .log(LoggingLevel.INFO, LoggerFactory.getLogger(CamelShutdownTest.class), "dead ${body}"); } }; } } @EndpointInject(uri = "direct:test") private FluentProducerTemplate fluentProducerTemplate; @Inject private CamelContext context; @Test public void test() throws Exception { context.start(); fluentProducerTemplate.withBody("warm up").send(); Thread.sleep(1000L); IntStream.range(0, 100).boxed().forEach((i) -> { logger.info("Queued {}", i); try { Thread.sleep(5L); } catch (InterruptedException e) { e.printStackTrace(); } fluentProducerTemplate.withBody("test message " + i).send(); }); } }