Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.11.0
-
None
-
None
-
Ubuntu 19.04
openjdk 11.0.11 2021-04-20
OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.20.04)
OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.20.04, mixed mode, sharing)Camel 3.11.1
-
Unknown
Description
ZulipChat-Discussion: https://camel.zulipchat.com/#narrow/stream/257298-camel/topic/tar.2Egz.20unmarshalling
In a very simple route setup that just reads any tar.gz archives found in the provided directory and prints the names of the files within that archive to the log, this code fails on processing larger tar.gz archives.
from(file("archiveFile")) .routeId("pre-processing") .process(exchange -> { LOG.info("Processing archive: {}", exchange.getIn().getHeader(Exchange.FILE_NAME, String.class)); }) .unmarshal().gzipDeflater() .split(new TarSplitter()).streaming() .process(exchange -> { final String name = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class); LOG.debug("name: {}", name); }) .end();
The JVM quickly runs out of memory as it probably copies over the bytes from the original stream to a stream that should take care of decompressing the files, as indicated by the exception being thrown in the `hugeCapacity(...)` method of the `ByteArrayOutputStream`class:
org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[] at org.apache.camel.CamelExecutionException.wrapCamelExecutionException(CamelExecutionException.java:45) at org.apache.camel.support.AbstractExchange.setException(AbstractExchange.java:589) at org.apache.camel.support.DefaultExchange.setException(DefaultExchange.java:27) at org.apache.camel.support.processor.UnmarshalProcessor.process(UnmarshalProcessor.java:81) at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:463) at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:179) at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64) at org.apache.camel.processor.Pipeline.process(Pipeline.java:184) at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:398) at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:492) at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:245) at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:206) at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:190) at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:107) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.OutOfMemoryError: null at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at org.apache.camel.support.builder.OutputStreamBuilder.write(OutputStreamBuilder.java:58) at org.apache.camel.util.IOHelper.copy(IOHelper.java:193) at org.apache.camel.util.IOHelper.copy(IOHelper.java:148) at org.apache.camel.util.IOHelper.copy(IOHelper.java:143) at org.apache.camel.util.IOHelper.copy(IOHelper.java:139) at org.apache.camel.dataformat.deflater.GzipDeflaterDataFormat.unmarshal(GzipDeflaterDataFormat.java:63) at org.apache.camel.support.processor.UnmarshalProcessor.process(UnmarshalProcessor.java:64) ... 18 common frames omitted
If I instead use a custom DataFormat class that looks like this:
import org.apache.camel.Exchange; import org.apache.camel.spi.DataFormat; import org.apache.camel.util.IOHelper; import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; import java.io.InputStream; import java.io.OutputStream; public class GZipDataFormat implements DataFormat { @Override public void marshal(Exchange exchange, Object graph, OutputStream stream) throws Exception { InputStream is = exchange.getContext().getTypeConverter().mandatoryConvertTo(InputStream.class, exchange, graph); GzipCompressorOutputStream zipOutput = new GzipCompressorOutputStream(stream); try { IOHelper.copy(is, zipOutput); } finally { // must close all input streams IOHelper.close(is, zipOutput); } } @Override public Object unmarshal(Exchange exchange, InputStream stream) throws Exception { return new GzipCompressorInputStream(exchange.getIn().getMandatoryBody(InputStream.class)); } @Override public void start() { } @Override public void stop() { } }
and change the `unmarshal().gzipDeflater()` to `unmarshal(new GZipDataFormat())` implementation, Camel is able to decompress the bytes correctly and pass the stream on so that the `TarSplitter` can iterate over the entries of that archive.