diff --git oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java index 431ab1242b..09d9ee1ba7 100644 --- oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java +++ oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java @@ -25,14 +25,12 @@ import java.io.PrintWriter; import joptsimple.OptionParser; import joptsimple.OptionSet; -import joptsimple.OptionSpec; class SegmentCopyCommand implements Command { @Override public void execute(String... args) throws Exception { OptionParser parser = new OptionParser(); - OptionSpec verbose = parser.accepts("verbose", "print detailed output about individual segments transfered"); OptionSet options = parser.parse(args); PrintWriter out = new PrintWriter(System.out, true); @@ -48,7 +46,6 @@ class SegmentCopyCommand implements Command { int statusCode = SegmentCopy.builder() .withSource(source) .withDestination(destination) - .withVerbose(options.has(verbose)) .withOutWriter(out) .withErrWriter(err) .build() @@ -66,4 +63,4 @@ class SegmentCopyCommand implements Command { parser.printHelpOn(err); System.exit(1); } -} +} \ No newline at end of file diff --git oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentCopy.java oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentCopy.java index 754ed744fd..a8305285ec 100644 --- oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentCopy.java +++ oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentCopy.java @@ -19,7 +19,6 @@ package org.apache.jackrabbit.oak.segment.azure.tool; import static com.google.common.base.Preconditions.checkNotNull; -import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.fetchByteArray; import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.newSegmentNodeStorePersistence; import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.printMessage; import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.printableStopwatch; @@ -29,33 +28,13 @@ import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.storeTypeFr import com.google.common.base.Stopwatch; import org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.SegmentStoreType; -import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor; -import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter; -import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor; -import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter; -import org.apache.jackrabbit.oak.segment.spi.persistence.GCJournalFile; -import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile; -import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader; -import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter; -import org.apache.jackrabbit.oak.segment.spi.persistence.ManifestFile; import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock; -import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry; -import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager; -import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader; -import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; import org.apache.jackrabbit.oak.segment.tool.Check; -import org.apache.jackrabbit.oak.segment.spi.persistence.Buffer; import java.io.IOException; import java.io.PrintWriter; import java.text.MessageFormat; -import java.util.ArrayDeque; -import java.util.Collections; -import java.util.Deque; -import java.util.List; -import java.util.Properties; -import java.util.UUID; /** * Perform a full-copy of repository data at segment level. @@ -83,8 +62,6 @@ public class SegmentCopy { private SegmentNodeStorePersistence destPersistence; - private boolean verbose; - private PrintWriter outWriter; private PrintWriter errWriter; @@ -166,19 +143,6 @@ public class SegmentCopy { return this; } - /** - * Whether to show detailed output about current copy operation or not. - * - * @param verbose, - * true to print detailed output, false - * otherwise. - * @return this builder. - */ - public Builder withVerbose(boolean verbose) { - this.verbose = verbose; - return this; - } - /** * Create an executable version of the {@link Check} command. * @@ -197,8 +161,6 @@ public class SegmentCopy { private final String destination; - private final boolean verbose; - private final PrintWriter outWriter; private final PrintWriter errWriter; @@ -207,12 +169,12 @@ public class SegmentCopy { private SegmentNodeStorePersistence destPersistence; + public SegmentCopy(Builder builder) { this.source = builder.source; this.destination = builder.destination; this.srcPersistence = builder.srcPersistence; this.destPersistence = builder.destPersistence; - this.verbose = builder.verbose; this.outWriter = builder.outWriter; this.errWriter = builder.errWriter; } @@ -224,6 +186,9 @@ public class SegmentCopy { SegmentStoreType srcType = storeTypeFromPathOrUri(source); SegmentStoreType destType = storeTypeFromPathOrUri(destination); + String srcDescription = storeDescription(srcType, source); + String destDescription = storeDescription(destType, destination); + try { if (srcPersistence == null || destPersistence == null) { srcPersistence = newSegmentNodeStorePersistence(srcType, source); @@ -231,48 +196,23 @@ public class SegmentCopy { } printMessage(outWriter, "Started segment-copy transfer!"); - printMessage(outWriter, "Source: {0}", storeDescription(srcType, source)); - printMessage(outWriter, "Destination: {0}", storeDescription(destType, destination)); + printMessage(outWriter, "Source: {0}", srcDescription); + printMessage(outWriter, "Destination: {0}", destDescription); try { - srcPersistence.lockRepository(); + srcRepositoryLock = srcPersistence.lockRepository(); } catch (Exception e) { throw new Exception(MessageFormat.format( "Cannot lock source segment store {0} for starting copying process. Giving up!", - storeDescription(srcType, source))); + srcDescription)); } - printMessage(outWriter, "Copying archives..."); - // TODO: copy only segments not transfered - IOMonitor ioMonitor = new IOMonitorAdapter(); - FileStoreMonitor fileStoreMonitor = new FileStoreMonitorAdapter(); - - SegmentArchiveManager srcArchiveManager = srcPersistence.createArchiveManager(false, false, ioMonitor, - fileStoreMonitor); - SegmentArchiveManager destArchiveManager = destPersistence.createArchiveManager(false, false, ioMonitor, - fileStoreMonitor); - copyArchives(srcArchiveManager, destArchiveManager); - - printMessage(outWriter, "Copying journal..."); - // TODO: delete destination journal file if present - JournalFile srcJournal = srcPersistence.getJournalFile(); - JournalFile destJournal = destPersistence.getJournalFile(); - copyJournal(srcJournal, destJournal); - - printMessage(outWriter, "Copying gc journal..."); - // TODO: delete destination gc journal file if present - GCJournalFile srcGcJournal = srcPersistence.getGCJournalFile(); - GCJournalFile destGcJournal = destPersistence.getGCJournalFile(); - for (String line : srcGcJournal.readLines()) { - destGcJournal.writeLine(line); - } + SegmentStoreMigrator migrator = new SegmentStoreMigrator.Builder() + .withSourcePersistence(srcPersistence, srcDescription) + .withTargetPersistence(destPersistence, destDescription) + .build(); - printMessage(outWriter, "Copying manifest..."); - // TODO: delete destination manifest file if present - ManifestFile srcManifest = srcPersistence.getManifestFile(); - ManifestFile destManifest = destPersistence.getManifestFile(); - Properties properties = srcManifest.load(); - destManifest.save(properties); + migrator.migrate(); } catch (Exception e) { watch.stop(); printMessage(errWriter, "A problem occured while copying archives from {0} to {1} ", source, destination); @@ -283,8 +223,7 @@ public class SegmentCopy { try { srcRepositoryLock.unlock(); } catch (IOException e) { - printMessage(errWriter, "A problem occured while unlocking source repository {0} ", - storeDescription(srcType, source)); + printMessage(errWriter, "A problem occured while unlocking source repository {0} ", srcDescription); e.printStackTrace(errWriter); } } @@ -295,76 +234,4 @@ public class SegmentCopy { return 0; } - - private void copyArchives(SegmentArchiveManager srcArchiveManager, SegmentArchiveManager destArchiveManager) - throws IOException { - List srcArchiveNames = srcArchiveManager.listArchives(); - Collections.sort(srcArchiveNames); - int archiveCount = srcArchiveNames.size(); - int crtCount = 0; - - for (String archiveName : srcArchiveNames) { - crtCount++; - printMessage(outWriter, "{0} - {1}/{2}", archiveName, crtCount, archiveCount); - if (verbose) { - printMessage(outWriter, " |"); - } - - SegmentArchiveWriter archiveWriter = destArchiveManager.create(archiveName); - SegmentArchiveReader archiveReader = srcArchiveManager.open(archiveName); - List segmentEntries = archiveReader.listSegments(); - for (SegmentArchiveEntry segmentEntry : segmentEntries) { - writeSegment(segmentEntry, archiveReader, archiveWriter); - } - - Buffer binRefBuffer = archiveReader.getBinaryReferences(); - byte[] binRefData = fetchByteArray(binRefBuffer); - - archiveWriter.writeBinaryReferences(binRefData); - - Buffer graphBuffer = archiveReader.getGraph(); - byte[] graphData = fetchByteArray(graphBuffer); - - archiveWriter.writeGraph(graphData); - archiveWriter.close(); - } - } - - private void writeSegment(SegmentArchiveEntry segmentEntry, SegmentArchiveReader archiveReader, - SegmentArchiveWriter archiveWriter) throws IOException { - long msb = segmentEntry.getMsb(); - long lsb = segmentEntry.getLsb(); - if (verbose) { - printMessage(outWriter, " - {0}", new UUID(msb, lsb)); - } - - int size = segmentEntry.getLength(); - int offset = 0; - int generation = segmentEntry.getGeneration(); - int fullGeneration = segmentEntry.getFullGeneration(); - boolean isCompacted = segmentEntry.isCompacted(); - - Buffer byteBuffer = archiveReader.readSegment(msb, lsb); - byte[] data = fetchByteArray(byteBuffer); - - archiveWriter.writeSegment(msb, lsb, data, offset, size, generation, fullGeneration, isCompacted); - archiveWriter.flush(); - } - - private void copyJournal(JournalFile srcJournal, JournalFile destJournal) throws IOException { - try (JournalFileReader srcJournalReader = srcJournal.openJournalReader(); - JournalFileWriter destJournalWriter = destJournal.openJournalWriter()) { - - Deque linesStack = new ArrayDeque<>(); - String line = null; - while ((line = srcJournalReader.readLine()) != null) { - linesStack.push(line); - } - - while (!linesStack.isEmpty()) { - line = linesStack.pop(); - destJournalWriter.writeLine(line); - } - } - } } \ No newline at end of file diff --git oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java new file mode 100644 index 0000000000..a9a3f01dd8 --- /dev/null +++ oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure.tool; + +import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.fetchByteArray; +import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.storeDescription; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; + +import org.apache.jackrabbit.oak.segment.azure.AzurePersistence; +import org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.SegmentStoreType; +import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence; +import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter; +import org.apache.jackrabbit.oak.segment.spi.persistence.Buffer; +import org.apache.jackrabbit.oak.segment.spi.persistence.GCJournalFile; +import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader; +import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class SegmentStoreMigrator { + + private static final Logger log = LoggerFactory.getLogger(SegmentStoreMigrator.class); + + private static final int READ_THREADS = 20; + + private final SegmentNodeStorePersistence source; + + private final SegmentNodeStorePersistence target; + + private final String sourceName; + + private final String targetName; + + private ExecutorService executor = Executors.newFixedThreadPool(READ_THREADS + 1); + + private SegmentStoreMigrator(Builder builder) { + this.source = builder.source; + this.target = builder.target; + this.sourceName = builder.sourceName; + this.targetName = builder.targetName; + } + + public void migrate() throws IOException, ExecutionException, InterruptedException { + migrateJournal(); + migrateGCJournal(); + migrateManifest(); + migrateArchives(); + } + + private void migrateJournal() throws IOException { + log.info("{}/journal.log -> {}", sourceName, targetName); + if (!source.getJournalFile().exists()) { + log.info("No journal at {}; skipping.", sourceName); + return; + } + List journal = new ArrayList<>(); + try (JournalFileReader reader = source.getJournalFile().openJournalReader()) { + String line; + while ((line = reader.readLine()) != null) { + journal.add(line); + } + } + Collections.reverse(journal); + try (JournalFileWriter writer = target.getJournalFile().openJournalWriter()) { + for (String line : journal) { + writer.writeLine(line); + } + } + } + + private void migrateGCJournal() throws IOException { + log.info("{}/gc.log -> {}", sourceName, targetName); + GCJournalFile targetGCJournal = target.getGCJournalFile(); + for (String line : source.getGCJournalFile().readLines()) { + targetGCJournal.writeLine(line); + } + } + + private void migrateManifest() throws IOException { + log.info("{}/manifest -> {}", sourceName, targetName); + if (!source.getManifestFile().exists()) { + log.info("No manifest at {}; skipping.", sourceName); + return; + } + Properties manifest = source.getManifestFile().load(); + target.getManifestFile().save(manifest); + } + + private void migrateArchives() throws IOException, ExecutionException, InterruptedException { + if (!source.segmentFilesExist()) { + log.info("No segment archives at {}; skipping.", sourceName); + return; + } + SegmentArchiveManager sourceManager = source.createArchiveManager(false, false, new IOMonitorAdapter(), + new FileStoreMonitorAdapter()); + SegmentArchiveManager targetManager = target.createArchiveManager(false, false, new IOMonitorAdapter(), + new FileStoreMonitorAdapter()); + for (String archiveName : sourceManager.listArchives()) { + log.info("{}/{} -> {}", sourceName, archiveName, targetName); + try (SegmentArchiveReader reader = sourceManager.forceOpen(archiveName)) { + SegmentArchiveWriter writer = targetManager.create(archiveName); + try { + migrateSegments(reader, writer); + migrateBinaryRef(reader, writer); + migrateGraph(reader, writer); + } finally { + writer.close(); + } + } + } + } + + private void migrateSegments(SegmentArchiveReader reader, SegmentArchiveWriter writer) + throws InterruptedException, ExecutionException { + BlockingDeque readDeque = new LinkedBlockingDeque<>(READ_THREADS); + BlockingDeque writeDeque = new LinkedBlockingDeque<>(READ_THREADS); + AtomicBoolean processingFinished = new AtomicBoolean(false); + AtomicBoolean exception = new AtomicBoolean(false); + List> futures = new ArrayList<>(); + for (int i = 0; i < READ_THREADS; i++) { + futures.add(executor.submit(() -> { + try { + while (!exception.get() && !(readDeque.isEmpty() && processingFinished.get())) { + Segment segment = readDeque.poll(100, TimeUnit.MILLISECONDS); + if (segment != null) { + segment.read(reader); + } + } + return null; + } catch (Exception e) { + exception.set(true); + throw e; + } + })); + } + futures.add(executor.submit(() -> { + try { + while (!exception.get() && !(writeDeque.isEmpty() && processingFinished.get())) { + Segment segment = writeDeque.poll(100, TimeUnit.MILLISECONDS); + if (segment != null) { + while (segment.data == null && !exception.get()) { + Thread.sleep(10); + } + segment.write(writer); + } + } + return null; + } catch (Exception e) { + exception.set(true); + throw e; + } + })); + for (SegmentArchiveEntry entry : reader.listSegments()) { + Segment segment = new Segment(entry); + readDeque.putLast(segment); + writeDeque.putLast(segment); + } + processingFinished.set(true); + for (Future future : futures) { + future.get(); + } + } + + private void migrateBinaryRef(SegmentArchiveReader reader, SegmentArchiveWriter writer) throws IOException { + Buffer binaryReferences = reader.getBinaryReferences(); + if (binaryReferences != null) { + byte[] array = fetchByteArray(binaryReferences); + writer.writeBinaryReferences(array); + } + } + + private void migrateGraph(SegmentArchiveReader reader, SegmentArchiveWriter writer) throws IOException { + if (reader.hasGraph()) { + Buffer graph = reader.getGraph(); + byte[] array = fetchByteArray(graph); + writer.writeGraph(array); + } + } + + private static class Segment { + + private final SegmentArchiveEntry entry; + + private volatile Buffer data; + + private Segment(SegmentArchiveEntry entry) { + this.entry = entry; + } + + private void read(SegmentArchiveReader reader) throws IOException { + data = reader.readSegment(entry.getMsb(), entry.getLsb()); + } + + private void write(SegmentArchiveWriter writer) throws IOException { + final byte[] array = data.array(); + final int offset = 0; + writer.writeSegment(entry.getMsb(), entry.getLsb(), array, offset, entry.getLength(), entry.getGeneration(), + entry.getFullGeneration(), entry.isCompacted()); + } + + @Override + public String toString() { + return new UUID(entry.getMsb(), entry.getLsb()).toString(); + } + } + + public static class Builder { + + private SegmentNodeStorePersistence source; + + private SegmentNodeStorePersistence target; + + private String sourceName; + + private String targetName; + + public Builder withSource(File dir) { + this.source = new TarPersistence(dir); + this.sourceName = storeDescription(SegmentStoreType.TAR, dir.getPath()); + return this; + } + + public Builder withSource(CloudBlobDirectory dir) throws URISyntaxException, StorageException { + this.source = new AzurePersistence(dir); + this.sourceName = storeDescription(SegmentStoreType.AZURE, dir.getContainer().getName() + "/" + dir.getPrefix()); + return this; + } + + public Builder withSourcePersistence(SegmentNodeStorePersistence source, String sourceName) { + this.source = source; + this.sourceName = sourceName; + return this; + } + + public Builder withTargetPersistence(SegmentNodeStorePersistence target, String targetName) { + this.target = target; + this.targetName = targetName; + return this; + } + + public Builder withTarget(File dir) { + this.target = new TarPersistence(dir); + this.targetName = storeDescription(SegmentStoreType.TAR, dir.getPath()); + return this; + } + + public Builder withTarget(CloudBlobDirectory dir) throws URISyntaxException, StorageException { + this.target = new AzurePersistence(dir); + this.targetName = storeDescription(SegmentStoreType.AZURE, dir.getContainer().getName() + "/" + dir.getPrefix()); + return this; + } + + public SegmentStoreMigrator build() { + return new SegmentStoreMigrator(this); + } + } +} \ No newline at end of file diff --git oak-segment-azure/src/test/java/oak/apache/jackrabbit/oak/segment/azure/tool/SegmentCopyTestBase.java oak-segment-azure/src/test/java/oak/apache/jackrabbit/oak/segment/azure/tool/SegmentCopyTestBase.java index 90d2624cea..0298f9ae13 100644 --- oak-segment-azure/src/test/java/oak/apache/jackrabbit/oak/segment/azure/tool/SegmentCopyTestBase.java +++ oak-segment-azure/src/test/java/oak/apache/jackrabbit/oak/segment/azure/tool/SegmentCopyTestBase.java @@ -121,7 +121,7 @@ public abstract class SegmentCopyTestBase { SegmentCopy segmentCopy = SegmentCopy.builder().withSrcPersistencee(srcPersistence) .withDestPersistence(destPersistence).withSource(srcPathOrUri).withDestination(destPathOrUri) - .withOutWriter(outWriter).withErrWriter(errWriter).withVerbose(true).build(); + .withOutWriter(outWriter).withErrWriter(errWriter).build(); return segmentCopy.run(); } @@ -229,4 +229,4 @@ public abstract class SegmentCopyTestBase { return uri.toString(); } -} +} \ No newline at end of file