Description
Looks like exchanges expired by CXF continuation timeout are being accumulated in InflightRepository. Tested with Camel 2.17.1 and cxf-rt-transports-http-jetty:
Dependencies:
<dependencies> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> <version>2.17.1</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-cxf</artifactId> <version>2.17.1</version> </dependency> <dependency> <groupId>org.apache.cxf</groupId> <artifactId>cxf-rt-transports-http-jetty</artifactId> <version>3.1.5</version> </dependency> </dependencies>
Reproducer:
import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.cxf.CxfEndpoint; import org.apache.camel.component.cxf.DataFormat; import org.apache.camel.impl.DefaultCamelContext; import org.springframework.util.StreamUtils; import org.w3c.dom.Document; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.soap.MessageFactory; import javax.xml.soap.SOAPMessage; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.net.HttpURLConnection; import java.net.URL; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.Executor; import java.util.concurrent.Executors; public class Sample { private final static String URI = "http://127.0.0.1:8080/"; private final static long CONTINUATION_TIMEOUT = 100L; private final static long DELAYER_VALUE = 200L; private final static int SENDER_THREADS = Runtime.getRuntime().availableProcessors(); private final static int MESSAGES_PER_SENDER = 10000; private static void setupCamel() throws Exception { final CamelContext camelContext = new DefaultCamelContext(); final CxfEndpoint endpoint = (CxfEndpoint)camelContext.getEndpoint( "cxf://" + URI ); endpoint.setContinuationTimeout( CONTINUATION_TIMEOUT ); endpoint.setDataFormat( DataFormat.PAYLOAD ); camelContext.addRoutes( new RouteBuilder() { public void configure() throws Exception { from( endpoint ) .threads() .setBody( constant( "<ok />" ) ) .delay( DELAYER_VALUE ) .end(); } }); final TimerTask repoSizeReporter = new TimerTask() { public void run() { System.out.println( "Inflight repository size: " + camelContext.getInflightRepository().size() ); System.gc(); System.out.println( "Memory usage: " + (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory())/(1024*1024) + "MB" ); } }; final Timer repoSizeReporterTimer = new Timer(); repoSizeReporterTimer.schedule( repoSizeReporter, 1000, 1000 ); camelContext.start(); } private static byte[] createSoapMessage() throws Exception { final StringBuilder payloadBuilder = new StringBuilder( "<payload>" ); for( int i = 0; i < 5000; i++ ) { payloadBuilder.append( "<payloadElement />" ); } final String payload = payloadBuilder.append( "</payload>" ).toString(); final DocumentBuilder documentBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder(); final Document payloadDocument = documentBuilder.parse( new ByteArrayInputStream( payload.getBytes() ) ); final ByteArrayOutputStream soapOutStream = new ByteArrayOutputStream(); final SOAPMessage message = MessageFactory.newInstance().createMessage(); message.getSOAPBody().addDocument( payloadDocument ); message.writeTo( soapOutStream ); return soapOutStream.toByteArray(); } private static Runnable soapSender() { return () -> { try { final byte[] soapMessage = createSoapMessage(); for( int i = 0; i < MESSAGES_PER_SENDER; i++ ) { final HttpURLConnection connection = (HttpURLConnection)new URL( URI ).openConnection(); connection.setDoOutput( true ); connection.setRequestProperty( "Content-Type", "text/xml" ); connection.setRequestProperty( "SOAPAction", "\"\"" ); connection.setRequestMethod( "POST" ); connection.setRequestProperty( "Accept", "*/*" ); connection.connect(); StreamUtils.copy( soapMessage, connection.getOutputStream() ); connection.getResponseCode(); connection.disconnect(); } } catch ( final Exception ex ) { ex.printStackTrace(); } }; } public static void main(String[] args) throws Exception { setupCamel(); final Executor executor = Executors.newFixedThreadPool( SENDER_THREADS ); for( int i = 0; i < SENDER_THREADS; i++ ) { executor.execute( soapSender() ); } } }