diff --git a/oak-doc/src/site/markdown/nodestore/segment/overview.md b/oak-doc/src/site/markdown/nodestore/segment/overview.md index 991944dc11..14a754f0bb 100644 --- a/oak-doc/src/site/markdown/nodestore/segment/overview.md +++ b/oak-doc/src/site/markdown/nodestore/segment/overview.md @@ -678,7 +678,7 @@ Besides the local storage in TAR files (previously known as TarMK), support for ### Segment-Copy ``` -java -jar oak-run.jar segment-copy SOURCE DESTINATION [--last ] +java -jar oak-run.jar segment-copy SOURCE DESTINATION [--last ] [--flat] [--max-size-gb ] ``` The `segment-copy` command allows the "translation" of the Segment Store at `SOURCE` from one persistence type (e.g. local TarMK Segment Store) to a different persistence type (e.g. remote Azure or AWS Segment Store), saving the resulted Segment Store at `DESTINATION`. @@ -688,11 +688,15 @@ If `--last` option is present, the tool will start with the most recent revision `SOURCE` must be a valid path/uri to an existing Segment Store. `DESTINATION` must be a valid path/uri for the resulting Segment Store. -The optional `--last [Integer]` argument can be used to control the maximum number of revisions to be copied from the journal (default is 1). - Both are specified as `PATH | cloud-prefix:URI`. Please refer to the [Remote Segment Stores](#remote-segment-stores) section for details on how to correctly specify connection URIs. +The optional `--last [Integer]` argument can be used to control the maximum number of revisions to be copied from the journal (default is 1). + +The optional `--flat [Boolean]` argument can be specified for allowing the copy process to write the segments at `DESTINATION` in a flat hierarchy, that is without writing them in tar archives. + +The optional `--max-size-gb ` argument can be used for specifying to copy up to `MAX_SIZE_GB` segments from `SOURCE`. + To enable logging during segment copy a Logback configuration file has to be injected via the `logback.configurationFile` property. ##### Example @@ -801,11 +805,11 @@ This option is optional and is disabled by default. ### Compact ``` -java -jar oak-run.jar compact [--force] [--mmap] [--compactor] PATH | cloud-prefix:URI +java -jar oak-run.jar compact [--force] [--mmap] [--compactor] SOURCE [--target-path DESTINATION] [--persistent-cache-path PERSISTENT_CACHE_PATH] [--persistent-cache-size-gb ] ``` -The `compact` command performs offline compaction of the local/remote Segment Store at `PATH`/`URI`. -`PATH`/`URI` must be a valid path/uri to an existing Segment Store. Currently, Azure Segment Store and AWS Segment Store the supported remote Segment Stores. +The `compact` command performs offline compaction of the local/remote Segment Store at `SOURCE`. +`SOURCE` must be a valid path/uri to an existing Segment Store. Currently, Azure Segment Store and AWS Segment Store the supported remote Segment Stores. Please refer to the [Remote Segment Stores](#remote-segment-stores) section for details on how to correctly specify connection URIs. If the optional `--force [Boolean]` argument is set to `true` the tool ignores a non @@ -820,6 +824,14 @@ Windows, regular file access is always enforced and this option is ignored. The optional `--compactor [String]` argument can be used to pick the compactor type to be used. Valid choices are *classic* and *diff*. While the former is slower, it might be more stable, due to lack of optimisations employed by the *diff* compactor which compacts the checkpoints on top of each other. If not specified, *diff* compactor is used. +In order to speed up offline compaction for remote Segment Stores, three new options were introduced for configuring the destination segment store where compacted archives will be written and also to configure a persistent disk cache for speeding up segments reading during compaction. All three options detailed below **apply only for remote Segment Stores**. + +The required `--target-path DESTINATION` argument allows to specify a destination where compacted segments will be written. `DESTINATION` must be a valid path/uri for the new compacted Segment Store. + +The required `--persistent-cache-path PERSISTENT_CACHE_PATH` argument allows to specify the path for the persistent disk cache. `PERSISTENT_CACHE_PATH` must be a valid path. + +The optional `--persistent-cache-size-gb ` argument allows to limit the maximum size of the persistent disk cache to ``. If not specified, the default size will be limited to `50` GB. + To enable logging during offline compaction a Logback configuration file has to be injected via the `logback.configurationFile` property. In addition the `compaction-progress-log` property controls the number of compacted nodes that will be logged. The default value is 150000. diff --git a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/CompactCommand.java b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/CompactCommand.java index a4f34a30d4..77f9dcda04 100644 --- a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/CompactCommand.java +++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/CompactCommand.java @@ -61,6 +61,20 @@ class CompactCommand implements Command { "by the \"diff\" compactor which compacts the checkpoints on top of each other. If not " + "specified, \"diff\" compactor is used.") .withRequiredArg().ofType(String.class); + OptionSpec targetPath = parser.accepts("target-path", "Path/URI to TAR/remote segment store where " + + "resulting archives will be written") + .withRequiredArg() + .ofType(String.class); + OptionSpec persistentCachePath = parser.accepts("persistent-cache-path", "Path/URI to persistent cache where " + + "resulting segments will be written") + .withRequiredArg() + .ofType(String.class); + OptionSpec persistentCacheSizeGb = parser.accepts("persistent-cache-size-gb", "Size in GB (defaults to 50 GB) for " + + "the persistent disk cache") + .withRequiredArg() + .defaultsTo("50") + .ofType(Integer.class); + OptionSet options = parser.parse(args); String path = directoryArg.value(options); @@ -74,10 +88,24 @@ class CompactCommand implements Command { int code = 0; if (path.startsWith("az:")) { + if (targetPath.value(options) == null) { + System.err.println("A destination for the compacted Azure Segment Store needs to be specified"); + parser.printHelpOn(System.err); + System.exit(-1); + } + + if (persistentCachePath.value(options) == null) { + System.err.println("A path for the persistent disk cache needs to be specified"); + parser.printHelpOn(System.err); + System.exit(-1); + } + Builder azureBuilder = AzureCompact.builder() .withPath(path) + .withTargetPath(targetPath.value(options)) + .withPersistentCachePath(persistentCachePath.value(options)) + .withPersistentCacheSizeGb(persistentCacheSizeGb.value(options)) .withForce(isTrue(forceArg.value(options))) - .withSegmentCacheSize(Integer.getInteger("cache", 256)) .withGCLogInterval(Long.getLong("compaction-progress-log", 150000)); if (options.has(compactor)) { diff --git a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java index cc43a6a16f..b2b9219d41 100644 --- a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java +++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java @@ -38,6 +38,14 @@ class SegmentCopyCommand implements Command { .withOptionalArg() .ofType(Integer.class); + OptionSpec flat = parser.accepts("flat", "copy segments in flat hierarchy") + .withOptionalArg() + .ofType(Boolean.class); + + OptionSpec maxSizeGb = parser.accepts("max-size-gb", "define maximum size of archives to be copied") + .withOptionalArg() + .ofType(Integer.class); + OptionSet options = parser.parse(args); PrintWriter out = new PrintWriter(System.out, true); @@ -56,7 +64,7 @@ class SegmentCopyCommand implements Command { .withDestination(destination) .withOutWriter(out) .withErrWriter(err); - + if (options.has(last)) { builder.withRevisionsCount(last.value(options) != null ? last.value(options) : 1); } @@ -68,11 +76,16 @@ class SegmentCopyCommand implements Command { .withDestination(destination) .withOutWriter(out) .withErrWriter(err); - + if (options.has(last)) { builder.withRevisionsCount(last.value(options) != null ? last.value(options) : 1); } + if (options.has(flat) && options.has(maxSizeGb)) { + builder.withMaxSizeGb(maxSizeGb.value(options)); + builder.withFlat(flat.value(options)); + } + System.exit(builder.build().run()); } } diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java index bf703bfaa8..d37ccf8b97 100644 --- a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java @@ -103,6 +103,8 @@ public final class AzureUtilities { StorageUri storageUri = new StorageUri(new URI(uri)); CloudBlobContainer container = new CloudBlobContainer(storageUri, credentials); + container.createIfNotExists(); + return container.getDirectoryReference(dir); } diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/AzureCompact.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/AzureCompact.java index 68f6c6253d..995649cf11 100644 --- a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/AzureCompact.java +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/AzureCompact.java @@ -19,29 +19,35 @@ package org.apache.jackrabbit.oak.segment.azure.tool; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import static org.apache.jackrabbit.oak.segment.SegmentCache.DEFAULT_SEGMENT_CACHE_MB; import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.createArchiveManager; +import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.createCloudBlobDirectory; import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.newFileStore; import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.newSegmentNodeStorePersistence; import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.printableStopwatch; import com.google.common.base.Stopwatch; import com.google.common.io.Files; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.BlobListingDetails; +import com.microsoft.azure.storage.blob.CloudBlob; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; +import com.microsoft.azure.storage.blob.ListBlobItem; import org.apache.jackrabbit.oak.segment.SegmentCache; import org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.SegmentStoreType; import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.CompactorType; import org.apache.jackrabbit.oak.segment.file.FileStore; -import org.apache.jackrabbit.oak.segment.file.JournalReader; -import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile; -import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.segment.spi.persistence.split.SplitPersistence; import org.apache.jackrabbit.oak.segment.tool.Compact; import java.io.IOException; import java.io.PrintStream; +import java.net.URISyntaxException; import java.util.Collections; +import java.util.EnumSet; import java.util.List; /** @@ -65,14 +71,20 @@ public class AzureCompact { private String path; + private String targetPath; + private boolean force; private long gcLogInterval = 150000; - private int segmentCacheSize = DEFAULT_SEGMENT_CACHE_MB; + private int segmentCacheSize = 2048; private CompactorType compactorType = CompactorType.CHECKPOINT_COMPACTOR; + private String persistentCachePath; + + private Integer persistentCacheSizeGb; + private Builder() { // Prevent external instantiation. } @@ -89,6 +101,18 @@ public class AzureCompact { return this; } + /** + * The path (URI) to the target segment store. + * + * @param targetPath + * the path to the target segment store. + * @return this builder + */ + public Builder withTargetPath(String targetPath) { + this.targetPath = checkNotNull(targetPath); + return this; + } + /** * Whether to fail if run on an older version of the store of force upgrading * its format. @@ -144,6 +168,30 @@ public class AzureCompact { return this; } + /** + * The path where segments in the persistent cache will be stored. + * + * @param persistentCachePath + * the path to the persistent cache. + * @return this builder + */ + public Builder withPersistentCachePath(String persistentCachePath) { + this.persistentCachePath = checkNotNull(persistentCachePath); + return this; + } + + /** + * The maximum size in GB of the persistent disk cache. + * + * @param persistentCacheSizeGb + * the maximum size of the persistent cache. + * @return this builder + */ + public Builder withPersistentCacheSizeGb(Integer persistentCacheSizeGb) { + this.persistentCacheSizeGb = checkNotNull(persistentCacheSizeGb); + return this; + } + /** * Create an executable version of the {@link Compact} command. * @@ -157,6 +205,8 @@ public class AzureCompact { private final String path; + private final String targetPath; + private final int segmentCacheSize; private final boolean strictVersionCheck; @@ -165,24 +215,37 @@ public class AzureCompact { private final CompactorType compactorType; + private String persistentCachePath; + + private Integer persistentCacheSizeGb; + private AzureCompact(Builder builder) { this.path = builder.path; + this.targetPath = builder.targetPath; this.segmentCacheSize = builder.segmentCacheSize; this.strictVersionCheck = !builder.force; this.gcLogInterval = builder.gcLogInterval; this.compactorType = builder.compactorType; + this.persistentCachePath = builder.persistentCachePath; + this.persistentCacheSizeGb = builder.persistentCacheSizeGb; } - public int run() { + public int run() throws IOException, StorageException, URISyntaxException { Stopwatch watch = Stopwatch.createStarted(); - SegmentNodeStorePersistence persistence = newSegmentNodeStorePersistence(SegmentStoreType.AZURE, path); - SegmentArchiveManager archiveManager = createArchiveManager(persistence); + SegmentNodeStorePersistence roPersistence = newSegmentNodeStorePersistence(SegmentStoreType.AZURE, path, persistentCachePath, persistentCacheSizeGb); + SegmentNodeStorePersistence rwPersistence = newSegmentNodeStorePersistence(SegmentStoreType.AZURE, targetPath); + + SegmentNodeStorePersistence splitPersistence = new SplitPersistence(roPersistence, rwPersistence); + + SegmentArchiveManager roArchiveManager = createArchiveManager(roPersistence); + SegmentArchiveManager rwArchiveManager = createArchiveManager(rwPersistence); System.out.printf("Compacting %s\n", path); + System.out.printf(" to %s\n", targetPath); System.out.printf(" before\n"); List beforeArchives = Collections.emptyList(); try { - beforeArchives = archiveManager.listArchives(); + beforeArchives = roArchiveManager.listArchives(); } catch (IOException e) { System.err.println(e); } @@ -190,25 +253,14 @@ public class AzureCompact { printArchives(System.out, beforeArchives); System.out.printf(" -> compacting\n"); - try (FileStore store = newFileStore(persistence, Files.createTempDir(), strictVersionCheck, segmentCacheSize, + try (FileStore store = newFileStore(splitPersistence, Files.createTempDir(), strictVersionCheck, segmentCacheSize, gcLogInterval, compactorType)) { if (!store.compactFull()) { System.out.printf("Compaction cancelled after %s.\n", printableStopwatch(watch)); return 1; } - System.out.printf(" -> cleaning up\n"); - store.cleanup(); - JournalFile journal = persistence.getJournalFile(); - String head; - try (JournalReader journalReader = new JournalReader(journal)) { - head = String.format("%s root %s\n", journalReader.next().getRevision(), System.currentTimeMillis()); - } - try (JournalFileWriter journalWriter = journal.openJournalWriter()) { - System.out.printf(" -> writing new %s: %s\n", journal.getName(), head); - journalWriter.truncate(); - journalWriter.writeLine(head); - } + System.out.printf(" -> [skipping] cleaning up\n"); } catch (Exception e) { watch.stop(); e.printStackTrace(System.err); @@ -220,15 +272,32 @@ public class AzureCompact { System.out.printf(" after\n"); List afterArchives = Collections.emptyList(); try { - afterArchives = archiveManager.listArchives(); + afterArchives = rwArchiveManager.listArchives(); } catch (IOException e) { System.err.println(e); } printArchives(System.out, afterArchives); System.out.printf("Compaction succeeded in %s.\n", printableStopwatch(watch)); + + CloudBlobDirectory targetDirectory = createCloudBlobDirectory(targetPath.substring(3)); + CloudBlobContainer targetContainer = targetDirectory.getContainer(); + printTargetRepoSizeInfo(targetContainer); + return 0; } + private long printTargetRepoSizeInfo(CloudBlobContainer container) { + System.out.printf("Calculating the size of container %s\n", container.getName()); + long size = 0; + for (ListBlobItem i : container.listBlobs(null, true, EnumSet.of(BlobListingDetails.METADATA), null, null)) { + if (i instanceof CloudBlob) { + size += ((CloudBlob) i).getProperties().getLength(); + } + } + System.out.printf("The size is: %d MB \n", size / 1024 / 1024); + return size; + } + private static void printArchives(PrintStream s, List archives) { for (String a : archives) { s.printf(" %s\n", a); diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentCopy.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentCopy.java index c2c4ec2621..4d4b7e859e 100644 --- a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentCopy.java +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentCopy.java @@ -19,19 +19,43 @@ package org.apache.jackrabbit.oak.segment.azure.tool; import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.jackrabbit.oak.segment.azure.tool.SegmentStoreMigrator.runWithRetry; import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.newSegmentNodeStorePersistence; import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.printMessage; import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.printableStopwatch; import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.storeDescription; import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.storeTypeFromPathOrUri; -import com.google.common.base.Stopwatch; - +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.channels.FileChannel; +import java.nio.file.AtomicMoveNotSupportedException; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.jackrabbit.oak.commons.Buffer; +import org.apache.jackrabbit.oak.segment.azure.tool.SegmentStoreMigrator.Segment; import org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.SegmentStoreType; +import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter; +import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitorAdapter; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; import org.apache.jackrabbit.oak.segment.tool.Check; -import java.io.PrintWriter; +import com.google.common.base.Stopwatch; /** * Perform a full-copy of repository data at segment level. @@ -65,6 +89,10 @@ public class SegmentCopy { private Integer revisionsCount = Integer.MAX_VALUE; + private Boolean flat; + + private Integer maxSizeGb; + private Builder() { // Prevent external instantiation. } @@ -154,6 +182,29 @@ public class SegmentCopy { return this; } + /** + * If enabled, the segments hierarchy will be copied without any + * TAR archive being created, in a flat hierarchy. + * + * @param flat flag controlling the copying in flat hierarchy + * @return this builder. + */ + public Builder withFlat(Boolean flat) { + this.flat = flat; + return this; + } + + /** + * Parameter for configuring the maximum size of the segment store transfer + * + * @param maxSizeGb the maximum size up to which repository data will be copied + * @return this builder. + */ + public Builder withMaxSizeGb(Integer maxSizeGb) { + this.maxSizeGb = maxSizeGb; + return this; + } + /** * Create an executable version of the {@link Check} command. * @@ -169,6 +220,8 @@ public class SegmentCopy { } } + private static final int READ_THREADS = 20; + private final String source; private final String destination; @@ -179,10 +232,15 @@ public class SegmentCopy { private final Integer revisionCount; + private final Boolean flat; + + private final Integer maxSizeGb; + private SegmentNodeStorePersistence srcPersistence; private SegmentNodeStorePersistence destPersistence; + private ExecutorService executor = Executors.newFixedThreadPool(READ_THREADS + 1); public SegmentCopy(Builder builder) { this.source = builder.source; @@ -190,6 +248,8 @@ public class SegmentCopy { this.srcPersistence = builder.srcPersistence; this.destPersistence = builder.destPersistence; this.revisionCount = builder.revisionsCount; + this.flat = builder.flat; + this.maxSizeGb = builder.maxSizeGb; this.outWriter = builder.outWriter; this.errWriter = builder.errWriter; } @@ -203,28 +263,113 @@ public class SegmentCopy { String srcDescription = storeDescription(srcType, source); String destDescription = storeDescription(destType, destination); - try { - if (srcPersistence == null || destPersistence == null) { + if (flat && destType == SegmentStoreType.TAR) { + try { srcPersistence = newSegmentNodeStorePersistence(srcType, source); - destPersistence = newSegmentNodeStorePersistence(destType, destination); + + SegmentArchiveManager sourceManager = srcPersistence.createArchiveManager(false, false, + new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter()); + + int maxArchives = maxSizeGb * 4; + int count = 0; + + List archivesList = sourceManager.listArchives(); + archivesList.sort(Collections.reverseOrder()); + + for (String archiveName : archivesList) { + if (count == maxArchives - 1) { + printMessage(outWriter, "Stopping transfer after reaching {0} GB at archive {1}", maxSizeGb, + archiveName); + break; + } + + printMessage(outWriter, "{0}/{1} -> {2}", source, archiveName, destination); + + SegmentArchiveReader reader = sourceManager.forceOpen(archiveName); + + List> futures = new ArrayList<>(); + for (SegmentArchiveEntry entry : reader.listSegments()) { + futures.add(executor.submit(() -> runWithRetry(() -> { + Segment segment = new Segment(entry); + segment.read(reader); + return segment; + }, 16, 5))); + } + + File directory = new File(destination); + directory.mkdir(); + + for (Future future : futures) { + Segment segment = future.get(); + runWithRetry(() -> { + final byte[] array = segment.data.array(); + String segmentId = new UUID(segment.entry.getMsb(), segment.entry.getLsb()).toString(); + File segmentFile = new File(directory, segmentId); + File tempSegmentFile = new File(directory, segmentId + System.nanoTime() + ".part"); + Buffer buffer = Buffer.wrap(array); + + Buffer bufferCopy = buffer.duplicate(); + + try { + try (FileChannel channel = new FileOutputStream(tempSegmentFile).getChannel()) { + bufferCopy.write(channel); + } + try { + Files.move(tempSegmentFile.toPath(), segmentFile.toPath(), + StandardCopyOption.ATOMIC_MOVE); + } catch (AtomicMoveNotSupportedException e) { + Files.move(tempSegmentFile.toPath(), segmentFile.toPath()); + } + } catch (Exception e) { + printMessage(errWriter, "Error writing segment {0} to cache: {1} ", segmentId, e); + e.printStackTrace(errWriter); + try { + Files.deleteIfExists(segmentFile.toPath()); + Files.deleteIfExists(tempSegmentFile.toPath()); + } catch (IOException i) { + printMessage(errWriter, "Error while deleting corrupted segment file {0} {1}", + segmentId, i); + } + } + return null; + }, 16, 5); + } + + count++; + } + } catch (IOException | InterruptedException | ExecutionException e) { + watch.stop(); + printMessage(errWriter, "A problem occured while copying archives from {0} to {1} ", source, + destination); + e.printStackTrace(errWriter); + return 1; + } + } else { + try { + if (srcPersistence == null || destPersistence == null) { + srcPersistence = newSegmentNodeStorePersistence(srcType, source); + destPersistence = newSegmentNodeStorePersistence(destType, destination); + } + + printMessage(outWriter, "Started segment-copy transfer!"); + printMessage(outWriter, "Source: {0}", srcDescription); + printMessage(outWriter, "Destination: {0}", destDescription); + + SegmentStoreMigrator migrator = new SegmentStoreMigrator.Builder() + .withSourcePersistence(srcPersistence, srcDescription) + .withTargetPersistence(destPersistence, destDescription).withRevisionCount(revisionCount) + .build(); + + migrator.migrate(); + + } catch (Exception e) { + watch.stop(); + printMessage(errWriter, "A problem occured while copying archives from {0} to {1} ", source, + destination); + e.printStackTrace(errWriter); + return 1; } - printMessage(outWriter, "Started segment-copy transfer!"); - printMessage(outWriter, "Source: {0}", srcDescription); - printMessage(outWriter, "Destination: {0}", destDescription); - - SegmentStoreMigrator migrator = new SegmentStoreMigrator.Builder() - .withSourcePersistence(srcPersistence, srcDescription) - .withTargetPersistence(destPersistence, destDescription) - .withRevisionCount(revisionCount) - .build(); - - migrator.migrate(); - } catch (Exception e) { - watch.stop(); - printMessage(errWriter, "A problem occured while copying archives from {0} to {1} ", source, destination); - e.printStackTrace(errWriter); - return 1; } watch.stop(); diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java index 37b5a16186..6bde57b6d9 100644 --- a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java @@ -228,7 +228,7 @@ public class SegmentStoreMigrator implements Closeable { } } - private static T runWithRetry(Producer producer, int maxAttempts, int intervalSec) throws IOException { + static T runWithRetry(Producer producer, int maxAttempts, int intervalSec) throws IOException { IOException ioException = null; RepositoryNotReachableException repoNotReachableException = null; for (int i = 0; i < maxAttempts; i++) { @@ -267,25 +267,25 @@ public class SegmentStoreMigrator implements Closeable { } @FunctionalInterface - private interface Producer { + interface Producer { T produce() throws IOException; } - private static class Segment { + static class Segment { - private final SegmentArchiveEntry entry; + final SegmentArchiveEntry entry; - private volatile Buffer data; + volatile Buffer data; - private Segment(SegmentArchiveEntry entry) { + Segment(SegmentArchiveEntry entry) { this.entry = entry; } - private void read(SegmentArchiveReader reader) throws IOException { + void read(SegmentArchiveReader reader) throws IOException { data = reader.readSegment(entry.getMsb(), entry.getLsb()); } - private void write(SegmentArchiveWriter writer) throws IOException { + void write(SegmentArchiveWriter writer) throws IOException { final byte[] array = data.array(); final int offset = 0; writer.writeSegment(entry.getMsb(), entry.getLsb(), array, offset, entry.getLength(), entry.getGeneration(), diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/ToolUtils.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/ToolUtils.java index 0232f5902b..6644a4b9f8 100644 --- a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/ToolUtils.java +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/ToolUtils.java @@ -32,12 +32,6 @@ import java.text.MessageFormat; import java.util.Map; import java.util.concurrent.TimeUnit; -import com.google.common.base.Stopwatch; -import com.microsoft.azure.storage.StorageCredentials; -import com.microsoft.azure.storage.StorageCredentialsAccountAndKey; -import com.microsoft.azure.storage.StorageException; -import com.microsoft.azure.storage.blob.CloudBlobDirectory; - import org.apache.jackrabbit.oak.commons.Buffer; import org.apache.jackrabbit.oak.segment.azure.AzurePersistence; import org.apache.jackrabbit.oak.segment.azure.AzureUtilities; @@ -46,11 +40,20 @@ import org.apache.jackrabbit.oak.segment.file.FileStore; import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder; import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException; import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence; +import org.apache.jackrabbit.oak.segment.remote.persistentcache.PersistentDiskCache; import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter; import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter; import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitorAdapter; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.CachingPersistence; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache; + +import com.google.common.base.Stopwatch; +import com.microsoft.azure.storage.StorageCredentials; +import com.microsoft.azure.storage.StorageCredentialsAccountAndKey; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; /** * Utility class for common stuff pertaining to tooling. @@ -91,6 +94,26 @@ public class ToolUtils { return builder.build(); } + public static SegmentNodeStorePersistence newSegmentNodeStorePersistence(SegmentStoreType storeType, + String pathOrUri, String persistentCachePath, Integer persistentCacheSize) { + SegmentNodeStorePersistence persistence = null; + + switch (storeType) { + case AZURE: + CloudBlobDirectory cloudBlobDirectory = createCloudBlobDirectory(pathOrUri.substring(3)); + SegmentNodeStorePersistence basePersistence = new AzurePersistence(cloudBlobDirectory); + + PersistentCache persistentCache = new PersistentDiskCache(new File(persistentCachePath), + persistentCacheSize * 1024, new IOMonitorAdapter()); + persistence = new CachingPersistence(persistentCache, basePersistence); + break; + default: + persistence = new TarPersistence(new File(pathOrUri)); + } + + return persistence; + } + public static SegmentNodeStorePersistence newSegmentNodeStorePersistence(SegmentStoreType storeType, String pathOrUri) { SegmentNodeStorePersistence persistence = null;