Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-16953

camel-zip-deflater - Use Commons Compress to be able to un-zip large payloads

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.11.0
    • 3.12.0
    • None
    • None
    • Ubuntu 19.04

      openjdk 11.0.11 2021-04-20
      OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.20.04)
      OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.20.04, mixed mode, sharing)

      Camel 3.11.1

    • Unknown

    Description

      ZulipChat-Discussion: https://camel.zulipchat.com/#narrow/stream/257298-camel/topic/tar.2Egz.20unmarshalling

      In a very simple route setup that just reads any tar.gz archives found in the provided directory and prints the names of the files within that archive to the log, this code fails on processing larger tar.gz archives.

      PreProcessingRoute.java
      from(file("archiveFile"))
       .routeId("pre-processing")
       .process(exchange -> {
       LOG.info("Processing archive: {}", exchange.getIn().getHeader(Exchange.FILE_NAME, String.class));
       })
       .unmarshal().gzipDeflater()
       .split(new TarSplitter()).streaming()
       .process(exchange -> {
       final String name = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class); 
       LOG.debug("name: {}", name);
       })
       .end();
      

      The JVM quickly runs out of memory as it probably copies over the bytes from the original stream to a stream that should take care of decompressing the files, as indicated by the exception being thrown in the `hugeCapacity(...)` method of the `ByteArrayOutputStream`class:

      StackTrace
      org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[]
       at org.apache.camel.CamelExecutionException.wrapCamelExecutionException(CamelExecutionException.java:45)
       at org.apache.camel.support.AbstractExchange.setException(AbstractExchange.java:589)
       at org.apache.camel.support.DefaultExchange.setException(DefaultExchange.java:27)
       at org.apache.camel.support.processor.UnmarshalProcessor.process(UnmarshalProcessor.java:81)
       at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:463)
       at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:179)
       at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64)
       at org.apache.camel.processor.Pipeline.process(Pipeline.java:184)
       at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:398)
       at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:492)
       at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:245)
       at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:206)
       at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:190)
       at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:107)
       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
       at java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308)
       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java)
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
       at java.lang.Thread.run(Thread.java:748)
      Caused by: java.lang.OutOfMemoryError: null
       at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
       at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
       at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
       at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
       at org.apache.camel.support.builder.OutputStreamBuilder.write(OutputStreamBuilder.java:58)
       at org.apache.camel.util.IOHelper.copy(IOHelper.java:193)
       at org.apache.camel.util.IOHelper.copy(IOHelper.java:148)
       at org.apache.camel.util.IOHelper.copy(IOHelper.java:143)
       at org.apache.camel.util.IOHelper.copy(IOHelper.java:139)
       at org.apache.camel.dataformat.deflater.GzipDeflaterDataFormat.unmarshal(GzipDeflaterDataFormat.java:63)
       at org.apache.camel.support.processor.UnmarshalProcessor.process(UnmarshalProcessor.java:64)
       ... 18 common frames omitted
      

      If I instead use a custom DataFormat class that looks like this:

      PreProcessingRoute.java
      import org.apache.camel.Exchange;
      import org.apache.camel.spi.DataFormat;
      import org.apache.camel.util.IOHelper;
      import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
      import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
      
      import java.io.InputStream;
      import java.io.OutputStream;
      
      public class GZipDataFormat implements DataFormat {
      
      @Override
       public void marshal(Exchange exchange, Object graph, OutputStream stream) throws Exception {
       InputStream is = exchange.getContext().getTypeConverter().mandatoryConvertTo(InputStream.class, exchange, graph);
      
      GzipCompressorOutputStream zipOutput = new GzipCompressorOutputStream(stream);
       try {
       IOHelper.copy(is, zipOutput);
       } finally {
       // must close all input streams
       IOHelper.close(is, zipOutput);
       }
       }
      
      @Override
       public Object unmarshal(Exchange exchange, InputStream stream) throws Exception {
       return new GzipCompressorInputStream(exchange.getIn().getMandatoryBody(InputStream.class));
       }
      
      @Override
       public void start() {
      
      }
      
      @Override
       public void stop() {
      
      }
      }
      

      and change the `unmarshal().gzipDeflater()` to `unmarshal(new GZipDataFormat())` implementation, Camel is able to decompress the bytes correctly and pass the stream on so that the `TarSplitter` can iterate over the entries of that archive.

      Attachments

        Activity

          People

            davsclaus Claus Ibsen
            RovoMe Roman Vottner
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: