Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-10171

Camel CXF expired continuations cause memory leak

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.17.1
    • 2.17.4, 2.18.0
    • camel-cxf
    • None
    • Unknown

    Description

      Looks like exchanges expired by CXF continuation timeout are being accumulated in InflightRepository. Tested with Camel 2.17.1 and cxf-rt-transports-http-jetty:

      Dependencies:

      <dependencies>
              <dependency>
                  <groupId>org.apache.camel</groupId>
                  <artifactId>camel-core</artifactId>
                  <version>2.17.1</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.camel</groupId>
                  <artifactId>camel-cxf</artifactId>
                  <version>2.17.1</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.cxf</groupId>
                  <artifactId>cxf-rt-transports-http-jetty</artifactId>
                  <version>3.1.5</version>
              </dependency>
          </dependencies>

      Reproducer:

      import org.apache.camel.CamelContext;
      import org.apache.camel.builder.RouteBuilder;
      import org.apache.camel.component.cxf.CxfEndpoint;
      import org.apache.camel.component.cxf.DataFormat;
      import org.apache.camel.impl.DefaultCamelContext;
      import org.springframework.util.StreamUtils;
      import org.w3c.dom.Document;
      
      import javax.xml.parsers.DocumentBuilder;
      import javax.xml.parsers.DocumentBuilderFactory;
      import javax.xml.soap.MessageFactory;
      import javax.xml.soap.SOAPMessage;
      import java.io.ByteArrayInputStream;
      import java.io.ByteArrayOutputStream;
      import java.net.HttpURLConnection;
      import java.net.URL;
      import java.util.Timer;
      import java.util.TimerTask;
      import java.util.concurrent.Executor;
      import java.util.concurrent.Executors;
      
      public class Sample {
      
          private final static String URI = "http://127.0.0.1:8080/";
          private final static long CONTINUATION_TIMEOUT = 100L;
          private final static long DELAYER_VALUE = 200L;
          private final static int SENDER_THREADS = Runtime.getRuntime().availableProcessors();
          private final static int MESSAGES_PER_SENDER = 10000;
      
          private static void setupCamel() throws Exception {
              final CamelContext camelContext = new DefaultCamelContext();
              final CxfEndpoint endpoint = (CxfEndpoint)camelContext.getEndpoint( "cxf://" + URI );
              endpoint.setContinuationTimeout( CONTINUATION_TIMEOUT );
              endpoint.setDataFormat( DataFormat.PAYLOAD );
              camelContext.addRoutes( new RouteBuilder() {
                  public void configure() throws Exception {
                      from( endpoint )
                      .threads()
                      .setBody( constant( "<ok />" ) )
                      .delay( DELAYER_VALUE )
                      .end();
                  }
              });
              final TimerTask repoSizeReporter = new TimerTask() {
                  public void run() {
                      System.out.println( "Inflight repository size: " + camelContext.getInflightRepository().size() );
                      System.gc();
                      System.out.println( "Memory usage: " + (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory())/(1024*1024) + "MB" );
                  }
              };
              final Timer repoSizeReporterTimer = new Timer();
              repoSizeReporterTimer.schedule( repoSizeReporter, 1000, 1000 );
              camelContext.start();
          }
      
          private static byte[] createSoapMessage() throws Exception {
              final StringBuilder payloadBuilder = new StringBuilder( "<payload>" );
              for( int i = 0; i < 5000; i++ ) {
                  payloadBuilder.append( "<payloadElement />" );
              }
              final String payload = payloadBuilder.append( "</payload>" ).toString();
              final DocumentBuilder documentBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
              final Document payloadDocument = documentBuilder.parse( new ByteArrayInputStream( payload.getBytes() ) );
              final ByteArrayOutputStream soapOutStream = new ByteArrayOutputStream();
              final SOAPMessage message = MessageFactory.newInstance().createMessage();
              message.getSOAPBody().addDocument( payloadDocument );
              message.writeTo( soapOutStream );
              return soapOutStream.toByteArray();
          }
      
          private static Runnable soapSender() {
              return () -> {
                  try {
                      final byte[] soapMessage = createSoapMessage();
                      for( int i = 0; i < MESSAGES_PER_SENDER; i++ ) {
                          final HttpURLConnection connection = (HttpURLConnection)new URL( URI ).openConnection();
                          connection.setDoOutput( true );
                          connection.setRequestProperty( "Content-Type", "text/xml" );
                          connection.setRequestProperty( "SOAPAction", "\"\"" );
                          connection.setRequestMethod( "POST" );
                          connection.setRequestProperty( "Accept", "*/*" );
                          connection.connect();
                          StreamUtils.copy( soapMessage, connection.getOutputStream() );
                          connection.getResponseCode();
                          connection.disconnect();
                      }
                  } catch ( final Exception ex ) {
                      ex.printStackTrace();
                  }
              };
          }
      
          public static void main(String[] args) throws Exception {
              setupCamel();
              final Executor executor = Executors.newFixedThreadPool( SENDER_THREADS );
              for( int i = 0; i < SENDER_THREADS; i++ ) {
                  executor.execute( soapSender() );
              }
          }
      }

      Attachments

        Activity

          People

            davsclaus Claus Ibsen
            ddms Damian Malczyk
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: