diff --git oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java index a9a3f01dd8..71d144b34f 100644 --- oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java +++ oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java @@ -47,14 +47,10 @@ import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.UUID; -import java.util.concurrent.BlockingDeque; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; public class SegmentStoreMigrator { @@ -150,53 +146,19 @@ public class SegmentStoreMigrator { } private void migrateSegments(SegmentArchiveReader reader, SegmentArchiveWriter writer) - throws InterruptedException, ExecutionException { - BlockingDeque readDeque = new LinkedBlockingDeque<>(READ_THREADS); - BlockingDeque writeDeque = new LinkedBlockingDeque<>(READ_THREADS); - AtomicBoolean processingFinished = new AtomicBoolean(false); - AtomicBoolean exception = new AtomicBoolean(false); - List> futures = new ArrayList<>(); - for (int i = 0; i < READ_THREADS; i++) { + throws ExecutionException, InterruptedException, IOException { + List> futures = new ArrayList<>(); + for (SegmentArchiveEntry entry : reader.listSegments()) { futures.add(executor.submit(() -> { - try { - while (!exception.get() && !(readDeque.isEmpty() && processingFinished.get())) { - Segment segment = readDeque.poll(100, TimeUnit.MILLISECONDS); - if (segment != null) { - segment.read(reader); - } - } - return null; - } catch (Exception e) { - exception.set(true); - throw e; - } + Segment segment = new Segment(entry); + segment.read(reader); + return segment; })); } - futures.add(executor.submit(() -> { - try { - while (!exception.get() && !(writeDeque.isEmpty() && processingFinished.get())) { - Segment segment = writeDeque.poll(100, TimeUnit.MILLISECONDS); - if (segment != null) { - while (segment.data == null && !exception.get()) { - Thread.sleep(10); - } - segment.write(writer); - } - } - return null; - } catch (Exception e) { - exception.set(true); - throw e; - } - })); - for (SegmentArchiveEntry entry : reader.listSegments()) { - Segment segment = new Segment(entry); - readDeque.putLast(segment); - writeDeque.putLast(segment); - } - processingFinished.set(true); - for (Future future : futures) { - future.get(); + + for (Future future : futures) { + Segment segment = future.get(); + segment.write(writer); } }