Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
2.8.1
-
None
-
Unknown
Description
Using a NettyProducer without the disconnect=true configuration is causing the route to block after 10 messages on the to("netty://tcp....") call.
It appears that a new socket connection is created for every message, and then after 10 connections no new connection is allowed (must be a default thread pool limit?).
Using the disconnect=true option fixes the problem as a socket is connected, message sent, then disconnected. But this does not seem viable for implementations where that overhead is undesirable or where more than one response is expected on a channel.
–
This is a small Unit Test that shows the problem (http://camel.465427.n5.nabble.com/Camel-Netty-Producer-creating-new-connection-on-every-message-td4844805.html#none)
package netty;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.TestCase;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(Parameterized.class)
public class NettyTest extends TestCase
{
private final static Logger logger = LoggerFactory.getLogger(NettyTest.class);
private final static CamelContext serverContext = new DefaultCamelContext();
private final CamelContext clientContext = new DefaultCamelContext();
private final AtomicInteger responseCounter = new AtomicInteger(0);
private final AtomicBoolean passedTen = new AtomicBoolean(false);
private Boolean disconnectClient;
public NettyTest(Boolean disconnectClient)
{ this.disconnectClient = disconnectClient; } @Parameters
public static Collection<Object[]> configs()
{
return Arrays.asList(new Object[][] {
,
{ false } });
}
@BeforeClass
public static void createServer() throws Exception
{
serverContext.addRoutes(new RouteBuilder()
{
@Override
public void configure() throws Exception
{
from("netty:tcp://localhost:9000?sync=true&disconnectOnNoReply=false&allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false")
.setExchangePattern(ExchangePattern.InOut)
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception
{
Object body = exchange.getIn().getBody();
logger.info("Request received : Value = {}", body);
}
})
.transform(constant(3)).stop();
}
});
serverContext.start();
}
@Before
public void createClient() throws Exception
{
clientContext.addRoutes(new RouteBuilder()
{
@Override
public void configure() throws Exception
{
// Generate an Echo message and ensure a Response is sent
from("timer://echoTimer?delay=1s&fixedRate=true&period=1s")
.setExchangePattern(ExchangePattern.InOut)
.transform()
.constant(2)
.to(ExchangePattern.InOut, "netty:tcp://localhost:9000?allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false&sync=true&disconnect=" + disconnectClient.toString())
.process(new Processor()
{
@Override
public void process(Exchange exchange) throws Exception
{
Object body = exchange.getIn().getBody();
logger.info("Response number {} : Value = {}",
responseCounter.incrementAndGet(), body);
if (responseCounter.get() > 10)
{ passedTen.set(true); }
}
}).stop();
}
});
}
@Test
public void test() throws Exception
{
clientContext.getShutdownStrategy().setTimeout(1);
clientContext.start();
logger.info("Disconnect = {}", this.disconnectClient);
Thread.sleep(TimeUnit.SECONDS.toMillis(15));
clientContext.stop();
assertTrue("More than 10 responses have been received", passedTen.get());
}
}