Index: src/main/java/org/apache/harmony/unpack200/Archive.java =================================================================== --- src/main/java/org/apache/harmony/unpack200/Archive.java (revision 677594) +++ src/main/java/org/apache/harmony/unpack200/Archive.java (working copy) @@ -25,6 +25,11 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.jar.JarEntry; import java.util.jar.JarInputStream; import java.util.jar.JarOutputStream; @@ -90,6 +95,41 @@ this.outputStream = outputStream; } + + class Worker implements Runnable { + + InputStream iStream; + JarOutputStream oStream; + Segment container; + + public Worker(Segment target, InputStream in, JarOutputStream out) { + oStream = out; + iStream = in; + container = target; + } + + public void run() { + try { + synchronized(iStream) { + container.unpackRead(iStream); + } + + container.unpackProcess(); + + synchronized(oStream) { + container.unpackWrite(oStream); + oStream.flush(); + } + + } catch(IOException e) { + + } catch (Exception e) { + e.printStackTrace(); + } + } + + } + /** * Unpacks the Archive from the input file to the output file * @@ -141,8 +181,12 @@ outputStream.closeEntry(); } } else { + int procCount = Runtime.getRuntime().availableProcessors(); + BlockingQueue workerQueue = new ArrayBlockingQueue(procCount*2); + ThreadPoolExecutor tpWorkers = new ThreadPoolExecutor(procCount, procCount, 10, TimeUnit.SECONDS, workerQueue); + int i = 0; - while (available(inputStream)) { + while (inputStream.available() != 0) { i++; Segment segment = new Segment(); segment.setLogLevel(logLevel); @@ -159,14 +203,24 @@ if (overrideDeflateHint) { segment.overrideDeflateHint(deflateHint); } - segment.unpack(inputStream, outputStream); - outputStream.flush(); + // segment is ready to be dispatched + tpWorkers.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + tpWorkers.submit(new Worker(segment, inputStream, outputStream)); + if (inputStream instanceof FileInputStream) { inputFileName = ((FileInputStream) inputStream).getFD() .toString(); } } + + try { + tpWorkers.shutdown(); + tpWorkers.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } } finally { try { @@ -194,10 +248,12 @@ } private boolean available(InputStream inputStream) throws IOException { - inputStream.mark(1); - int check = inputStream.read(); - inputStream.reset(); - return check != -1; + synchronized(inputStream) { + inputStream.mark(1); + int check = inputStream.read(); + inputStream.reset(); + return check != -1; + } } /**