diff --git oak-doc/src/site/markdown/nodestore/segment/overview.md oak-doc/src/site/markdown/nodestore/segment/overview.md index c839232d7c..5026636c53 100644 --- oak-doc/src/site/markdown/nodestore/segment/overview.md +++ oak-doc/src/site/markdown/nodestore/segment/overview.md @@ -671,6 +671,24 @@ Besides the local storage in TAR files (previously known as TarMK), support for * **Microsoft Azure** The `cloud-prefix` for MS Azure is `az`, therefore a valid connection argument would be `az:https://myaccount.blob.core.windows.net/container/repository`, where the part after `:` is the Azure URL identifier for the _repository_ directory inside the specified _container_ of the _myaccount_ Azure storage account. The last missing piece is the secret key which will be supplied as an environment variable, i.e. `AZURE_SECRET_KEY`. +### Segment-Copy +``` +java -jar oak-run.jar segment-copy [--verbose] SOURCE DESTINATION +``` + +The `segment-copy` command allows the "translation" of the Segment Store at `SOURCE` from one persistence type (e.g. local TarMK Segment Store) to a different persistence type (e.g. remote Azure Segment Store), saving the resulted Segment Store at `DESTINATION`. +Unlike a sidegrade peformed with `oak-upgrade` (see [Repository Migration](#../../migration.md)) which includes only the current head state, this translation includes __all previous state persisted in the Segment Store__, therefore also stale, unreferenced content. + +`SOURCE` must be a valid path/uri to an existing Segment Store. +`DESTINATION` must be a valid path/uri for the resulting Segment Store. +Both are specified as `PATH | cloud-prefix:URI`. +Please refer to the [Remote Segment Stores](#remote-segment-stores) section for details on how to correctly specify connection URIs. + +If the `--verbose` option is specified, the command will print detailed progress information messages. +These include individual segments being transfered from `SOURCE` to `DESTINATION` at a certain point in time. +If not specified, progress information messages will be disabled. + + ### Backup ``` diff --git oak-run/src/main/java/org/apache/jackrabbit/oak/run/AvailableModes.java oak-run/src/main/java/org/apache/jackrabbit/oak/run/AvailableModes.java index 6423a2641c..33b219c1e1 100644 --- oak-run/src/main/java/org/apache/jackrabbit/oak/run/AvailableModes.java +++ oak-run/src/main/java/org/apache/jackrabbit/oak/run/AvailableModes.java @@ -59,5 +59,6 @@ public final class AvailableModes { .put(IOTraceCommand.NAME, new IOTraceCommand()) .put("server", new ServerCommand()) .put(DataStoreCommand.NAME, new DataStoreCommand()) + .put("segment-copy", new SegmentCopyCommand()) .build()); } diff --git oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java new file mode 100644 index 0000000000..431ab1242b --- /dev/null +++ oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jackrabbit.oak.run; + +import org.apache.jackrabbit.oak.run.commons.Command; +import org.apache.jackrabbit.oak.segment.azure.tool.SegmentCopy; + +import java.io.IOException; +import java.io.PrintWriter; + +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; + +class SegmentCopyCommand implements Command { + + @Override + public void execute(String... args) throws Exception { + OptionParser parser = new OptionParser(); + OptionSpec verbose = parser.accepts("verbose", "print detailed output about individual segments transfered"); + OptionSet options = parser.parse(args); + + PrintWriter out = new PrintWriter(System.out, true); + PrintWriter err = new PrintWriter(System.err, true); + + if (options.nonOptionArguments().size() != 2) { + printUsage(parser, err); + } + + String source = options.nonOptionArguments().get(0).toString(); + String destination = options.nonOptionArguments().get(1).toString(); + + int statusCode = SegmentCopy.builder() + .withSource(source) + .withDestination(destination) + .withVerbose(options.has(verbose)) + .withOutWriter(out) + .withErrWriter(err) + .build() + .run(); + System.exit(statusCode); + } + + private void printUsage(OptionParser parser, PrintWriter err, String... messages) throws IOException { + for (String message : messages) { + err.println(message); + } + + err.println("usage: segment-copy src dest [options] \n"); + err.println(" where src/dest are specified as PATH | cloud-prefix:URI"); + parser.printHelpOn(err); + System.exit(1); + } +} diff --git oak-segment-azure/pom.xml oak-segment-azure/pom.xml index 66a7415288..f0371884ae 100644 --- oak-segment-azure/pom.xml +++ oak-segment-azure/pom.xml @@ -159,6 +159,12 @@ ${project.version} test-jar test + + + org.apache.jackrabbit + oak-blob + ${project.version} + test org.mockito diff --git oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/AzureCompact.java oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/AzureCompact.java index b79bc7ed87..0533399857 100644 --- oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/AzureCompact.java +++ oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/AzureCompact.java @@ -20,42 +20,28 @@ package org.apache.jackrabbit.oak.segment.azure.tool; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.jackrabbit.oak.segment.SegmentCache.DEFAULT_SEGMENT_CACHE_MB; -import static org.apache.jackrabbit.oak.segment.azure.util.AzureConfigurationParserUtils.KEY_ACCOUNT_NAME; -import static org.apache.jackrabbit.oak.segment.azure.util.AzureConfigurationParserUtils.KEY_DIR; -import static org.apache.jackrabbit.oak.segment.azure.util.AzureConfigurationParserUtils.KEY_STORAGE_URI; -import static org.apache.jackrabbit.oak.segment.azure.util.AzureConfigurationParserUtils.parseAzureConfigurationFromUri; -import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.defaultGCOptions; +import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.createArchiveManager; +import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.newFileStore; +import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.newSegmentNodeStorePersistence; +import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.printableStopwatch; -import java.io.IOException; -import java.io.PrintStream; -import java.net.URISyntaxException; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; +import com.google.common.base.Stopwatch; +import com.google.common.io.Files; import org.apache.jackrabbit.oak.segment.SegmentCache; -import org.apache.jackrabbit.oak.segment.azure.AzureJournalFile; -import org.apache.jackrabbit.oak.segment.azure.AzurePersistence; -import org.apache.jackrabbit.oak.segment.azure.AzureUtilities; +import org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.SegmentStoreType; import org.apache.jackrabbit.oak.segment.file.FileStore; -import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder; -import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException; import org.apache.jackrabbit.oak.segment.file.JournalReader; -import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter; -import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter; import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile; import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; import org.apache.jackrabbit.oak.segment.tool.Compact; -import com.google.common.base.Stopwatch; -import com.google.common.io.Files; -import com.microsoft.azure.storage.StorageCredentials; -import com.microsoft.azure.storage.StorageCredentialsAccountAndKey; -import com.microsoft.azure.storage.StorageException; -import com.microsoft.azure.storage.blob.CloudBlobDirectory; +import java.io.IOException; +import java.io.PrintStream; +import java.util.Collections; +import java.util.List; /** * Perform an offline compaction of an existing Azure Segment Store. @@ -153,11 +139,6 @@ public class AzureCompact { checkNotNull(path); return new AzureCompact(this); } - - } - - private static String printableStopwatch(Stopwatch s) { - return String.format("%s (%ds)", s, s.elapsed(TimeUnit.SECONDS)); } private final String path; @@ -177,23 +158,8 @@ public class AzureCompact { public int run() { Stopwatch watch = Stopwatch.createStarted(); - CloudBlobDirectory cloudBlobDirectory = null; - try { - cloudBlobDirectory = createCloudBlobDirectory(); - } catch (URISyntaxException | StorageException e1) { - throw new IllegalArgumentException( - "Could not connect to the Azure Storage. Please verify the path provided!"); - } - - SegmentNodeStorePersistence persistence = new AzurePersistence(cloudBlobDirectory); - SegmentArchiveManager archiveManager = null; - try { - archiveManager = persistence.createArchiveManager(false, new IOMonitorAdapter(), - new FileStoreMonitorAdapter()); - } catch (IOException e) { - throw new IllegalArgumentException( - "Could not access the Azure Storage. Please verify the path provided!"); - } + SegmentNodeStorePersistence persistence = newSegmentNodeStorePersistence(SegmentStoreType.AZURE, path); + SegmentArchiveManager archiveManager = createArchiveManager(persistence); System.out.printf("Compacting %s\n", path); System.out.printf(" before\n"); @@ -207,14 +173,15 @@ public class AzureCompact { printArchives(System.out, beforeArchives); System.out.printf(" -> compacting\n"); - try (FileStore store = newFileStore(persistence)) { + try (FileStore store = newFileStore(persistence, Files.createTempDir(), strictVersionCheck, segmentCacheSize, + gcLogInterval)) { if (!store.compactFull()) { System.out.printf("Compaction cancelled after %s.\n", printableStopwatch(watch)); return 1; } System.out.printf(" -> cleaning up\n"); store.cleanup(); - JournalFile journal = new AzureJournalFile(cloudBlobDirectory, "journal.log"); + JournalFile journal = persistence.getJournalFile(); String head; try (JournalReader journalReader = new JournalReader(journal)) { head = String.format("%s root %s\n", journalReader.next().getRevision(), System.currentTimeMillis()); @@ -250,27 +217,4 @@ public class AzureCompact { s.printf(" %s\n", a); } } - - private FileStore newFileStore(SegmentNodeStorePersistence persistence) - throws IOException, InvalidFileStoreVersionException, URISyntaxException, StorageException { - FileStoreBuilder builder = FileStoreBuilder.fileStoreBuilder(Files.createTempDir()) - .withCustomPersistence(persistence).withMemoryMapping(false).withStrictVersionCheck(strictVersionCheck) - .withSegmentCacheSize(segmentCacheSize) - .withGCOptions(defaultGCOptions().setOffline().setGCLogInterval(gcLogInterval)); - - return builder.build(); - } - - private CloudBlobDirectory createCloudBlobDirectory() throws URISyntaxException, StorageException { - Map config = parseAzureConfigurationFromUri(path); - - String accountName = config.get(KEY_ACCOUNT_NAME); - String key = System.getenv("AZURE_SECRET_KEY"); - StorageCredentials credentials = new StorageCredentialsAccountAndKey(accountName, key); - - String uri = config.get(KEY_STORAGE_URI); - String dir = config.get(KEY_DIR); - - return AzureUtilities.cloudBlobDirectoryFrom(credentials, uri, dir); - } } diff --git oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentCopy.java oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentCopy.java new file mode 100644 index 0000000000..aee3fec271 --- /dev/null +++ oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentCopy.java @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.azure.tool; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.fetchByteArray; +import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.newSegmentNodeStorePersistence; +import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.printMessage; +import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.printableStopwatch; +import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.storeDescription; +import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.storeTypeFromPathOrUri; + +import com.google.common.base.Stopwatch; + +import org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.SegmentStoreType; +import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor; +import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter; +import org.apache.jackrabbit.oak.segment.spi.persistence.GCJournalFile; +import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile; +import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader; +import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter; +import org.apache.jackrabbit.oak.segment.spi.persistence.ManifestFile; +import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.segment.tool.Check; + +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.ByteBuffer; +import java.text.MessageFormat; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.Deque; +import java.util.List; +import java.util.Properties; +import java.util.UUID; + +/** + * Perform a full-copy of repository data at segment level. + */ +public class SegmentCopy { + /** + * Create a builder for the {@link SegmentCopy} command. + * + * @return an instance of {@link Builder}. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Collect options for the {@link SegmentCopy} command. + */ + public static class Builder { + + private String source; + + private String destination; + + private SegmentNodeStorePersistence srcPersistence; + + private SegmentNodeStorePersistence destPersistence; + + private boolean verbose; + + private PrintWriter outWriter; + + private PrintWriter errWriter; + + private Builder() { + // Prevent external instantiation. + } + + /** + * The source path/URI to an existing segment store. This parameter is required. + * + * @param source + * the source path/URI to an existing segment store. + * @return this builder. + */ + public Builder withSource(String source) { + this.source = checkNotNull(source); + return this; + } + + /** + * The destination path/URI to an existing segment store. This parameter is + * required. + * + * @param destination + * the destination path/URI to an existing segment store. + * @return this builder. + */ + public Builder withDestination(String destination) { + this.destination = checkNotNull(destination); + return this; + } + + /** + * The destination {@link SegmentNodeStorePersistence}. + * + * @param srcPersistence + * the destination {@link SegmentNodeStorePersistence}. + * @return this builder. + */ + public Builder withSrcPersistencee(SegmentNodeStorePersistence srcPersistence) { + this.srcPersistence = checkNotNull(srcPersistence); + return this; + } + + /** + * The destination {@link SegmentNodeStorePersistence}. + * + * @param destPersistence + * the destination {@link SegmentNodeStorePersistence}. + * @return this builder. + */ + public Builder withDestPersistence(SegmentNodeStorePersistence destPersistence) { + this.destPersistence = checkNotNull(destPersistence); + return this; + } + + /** + * The text output stream writer used to print normal output. + * + * @param outWriter + * the output writer. + * @return this builder. + */ + public Builder withOutWriter(PrintWriter outWriter) { + this.outWriter = outWriter; + return this; + } + + /** + * The text error stream writer used to print erroneous output. + * + * @param errWriter + * the error writer. + * @return this builder. + */ + public Builder withErrWriter(PrintWriter errWriter) { + this.errWriter = errWriter; + return this; + } + + /** + * Whether to show detailed output about current copy operation or not. + * + * @param verbose, + * true to print detailed output, false + * otherwise. + * @return this builder. + */ + public Builder withVerbose(boolean verbose) { + this.verbose = verbose; + return this; + } + + /** + * Create an executable version of the {@link Check} command. + * + * @return an instance of {@link Runnable}. + */ + public SegmentCopy build() { + if (srcPersistence == null && destPersistence == null) { + checkNotNull(source); + checkNotNull(destination); + } + return new SegmentCopy(this); + } + } + + private final String source; + + private final String destination; + + private final boolean verbose; + + private final PrintWriter outWriter; + + private final PrintWriter errWriter; + + private SegmentNodeStorePersistence srcPersistence; + + private SegmentNodeStorePersistence destPersistence; + + public SegmentCopy(Builder builder) { + this.source = builder.source; + this.destination = builder.destination; + this.srcPersistence = builder.srcPersistence; + this.destPersistence = builder.destPersistence; + this.verbose = builder.verbose; + this.outWriter = builder.outWriter; + this.errWriter = builder.errWriter; + } + + public int run() { + Stopwatch watch = Stopwatch.createStarted(); + RepositoryLock srcRepositoryLock = null; + + SegmentStoreType srcType = storeTypeFromPathOrUri(source); + SegmentStoreType destType = storeTypeFromPathOrUri(destination); + + try { + if (srcPersistence == null || destPersistence == null) { + srcPersistence = newSegmentNodeStorePersistence(srcType, source); + destPersistence = newSegmentNodeStorePersistence(destType, destination); + } + + printMessage(outWriter, "Started segment-copy transfer!"); + printMessage(outWriter, "Source: {0}", storeDescription(srcType, source)); + printMessage(outWriter, "Destination: {0}", storeDescription(destType, destination)); + + try { + srcPersistence.lockRepository(); + } catch (Exception e) { + throw new Exception(MessageFormat.format( + "Cannot lock source segment store {0} for starting copying process. Giving up!", + storeDescription(srcType, source))); + } + + printMessage(outWriter, "Copying archives..."); + // TODO: copy only segments not transfered + IOMonitor ioMonitor = new IOMonitorAdapter(); + FileStoreMonitor fileStoreMonitor = new FileStoreMonitorAdapter(); + + SegmentArchiveManager srcArchiveManager = srcPersistence.createArchiveManager(false, ioMonitor, + fileStoreMonitor); + SegmentArchiveManager destArchiveManager = destPersistence.createArchiveManager(false, ioMonitor, + fileStoreMonitor); + copyArchives(srcArchiveManager, destArchiveManager); + + printMessage(outWriter, "Copying journal..."); + // TODO: delete destination journal file if present + JournalFile srcJournal = srcPersistence.getJournalFile(); + JournalFile destJournal = destPersistence.getJournalFile(); + copyJournal(srcJournal, destJournal); + + printMessage(outWriter, "Copying gc journal..."); + // TODO: delete destination gc journal file if present + GCJournalFile srcGcJournal = srcPersistence.getGCJournalFile(); + GCJournalFile destGcJournal = destPersistence.getGCJournalFile(); + for (String line : srcGcJournal.readLines()) { + destGcJournal.writeLine(line); + } + + printMessage(outWriter, "Copying manifest..."); + // TODO: delete destination manifest file if present + ManifestFile srcManifest = srcPersistence.getManifestFile(); + ManifestFile destManifest = destPersistence.getManifestFile(); + Properties properties = srcManifest.load(); + destManifest.save(properties); + } catch (Exception e) { + watch.stop(); + printMessage(errWriter, "A problem occured while copying archives from {0} to {1} ", source, destination); + e.printStackTrace(errWriter); + return 1; + } finally { + if (srcRepositoryLock != null) { + try { + srcRepositoryLock.unlock(); + } catch (IOException e) { + printMessage(errWriter, "A problem occured while unlocking source repository {0} ", + storeDescription(srcType, source)); + e.printStackTrace(errWriter); + } + } + } + + watch.stop(); + printMessage(outWriter, "Segment-copy succeeded in {0}", printableStopwatch(watch)); + + return 0; + } + + private void copyArchives(SegmentArchiveManager srcArchiveManager, SegmentArchiveManager destArchiveManager) + throws IOException { + List srcArchiveNames = srcArchiveManager.listArchives(); + Collections.sort(srcArchiveNames); + int archiveCount = srcArchiveNames.size(); + int crtCount = 0; + + for (String archiveName : srcArchiveNames) { + crtCount++; + printMessage(outWriter, "{0} - {1}/{2}", archiveName, crtCount, archiveCount); + if (verbose) { + printMessage(outWriter, " |"); + } + + SegmentArchiveWriter archiveWriter = destArchiveManager.create(archiveName); + SegmentArchiveReader archiveReader = srcArchiveManager.open(archiveName); + List segmentEntries = archiveReader.listSegments(); + for (SegmentArchiveEntry segmentEntry : segmentEntries) { + writeSegment(segmentEntry, archiveReader, archiveWriter); + } + + ByteBuffer binRefBuffer = archiveReader.getBinaryReferences(); + byte[] binRefData = fetchByteArray(binRefBuffer); + + archiveWriter.writeBinaryReferences(binRefData); + + ByteBuffer graphBuffer = archiveReader.getGraph(); + byte[] graphData = fetchByteArray(graphBuffer); + + archiveWriter.writeGraph(graphData); + archiveWriter.close(); + } + } + + private void writeSegment(SegmentArchiveEntry segmentEntry, SegmentArchiveReader archiveReader, + SegmentArchiveWriter archiveWriter) throws IOException { + long msb = segmentEntry.getMsb(); + long lsb = segmentEntry.getLsb(); + if (verbose) { + printMessage(outWriter, " - {0}", new UUID(msb, lsb)); + } + + int size = segmentEntry.getLength(); + int offset = 0; + int generation = segmentEntry.getGeneration(); + int fullGeneration = segmentEntry.getFullGeneration(); + boolean isCompacted = segmentEntry.isCompacted(); + + ByteBuffer byteBuffer = archiveReader.readSegment(msb, lsb); + byte[] data = fetchByteArray(byteBuffer); + + archiveWriter.writeSegment(msb, lsb, data, offset, size, generation, fullGeneration, isCompacted); + archiveWriter.flush(); + } + + private void copyJournal(JournalFile srcJournal, JournalFile destJournal) throws IOException { + try (JournalFileReader srcJournalReader = srcJournal.openJournalReader(); + JournalFileWriter destJournalWriter = destJournal.openJournalWriter()) { + + Deque linesStack = new ArrayDeque<>(); + String line = null; + while ((line = srcJournalReader.readLine()) != null) { + linesStack.push(line); + } + + while (!linesStack.isEmpty()) { + line = linesStack.pop(); + destJournalWriter.writeLine(line); + } + } + } +} \ No newline at end of file diff --git oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/ToolUtils.java oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/ToolUtils.java new file mode 100644 index 0000000000..907a460886 --- /dev/null +++ oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/ToolUtils.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.azure.tool; + +import static org.apache.jackrabbit.oak.segment.azure.util.AzureConfigurationParserUtils.KEY_ACCOUNT_NAME; +import static org.apache.jackrabbit.oak.segment.azure.util.AzureConfigurationParserUtils.KEY_DIR; +import static org.apache.jackrabbit.oak.segment.azure.util.AzureConfigurationParserUtils.KEY_STORAGE_URI; +import static org.apache.jackrabbit.oak.segment.azure.util.AzureConfigurationParserUtils.parseAzureConfigurationFromUri; +import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.defaultGCOptions; + +import com.google.common.base.Stopwatch; +import com.microsoft.azure.storage.StorageCredentials; +import com.microsoft.azure.storage.StorageCredentialsAccountAndKey; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; + +import org.apache.jackrabbit.oak.segment.azure.AzurePersistence; +import org.apache.jackrabbit.oak.segment.azure.AzureUtilities; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder; +import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException; +import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence; +import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.text.MessageFormat; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Utility class for common stuff pertaining to tooling. + */ +public class ToolUtils { + + private ToolUtils() { + // prevent instantiation + } + + public enum SegmentStoreType { + TAR("TarMK Segment Store"), AZURE("Azure Segment Store"); + + private String type; + + SegmentStoreType(String type) { + this.type = type; + } + + public String description(String pathOrUri) { + String location = pathOrUri; + if (pathOrUri.startsWith("az:")) { + location = pathOrUri.substring(3); + } + + return type + "@" + location; + } + } + + public static FileStore newFileStore(SegmentNodeStorePersistence persistence, File directory, + boolean strictVersionCheck, int segmentCacheSize, long gcLogInterval) + throws IOException, InvalidFileStoreVersionException, URISyntaxException, StorageException { + FileStoreBuilder builder = FileStoreBuilder.fileStoreBuilder(directory) + .withCustomPersistence(persistence).withMemoryMapping(false).withStrictVersionCheck(strictVersionCheck) + .withSegmentCacheSize(segmentCacheSize) + .withGCOptions(defaultGCOptions().setOffline().setGCLogInterval(gcLogInterval)); + + return builder.build(); + } + + public static SegmentNodeStorePersistence newSegmentNodeStorePersistence(SegmentStoreType storeType, + String pathOrUri) { + SegmentNodeStorePersistence persistence = null; + + switch (storeType) { + case AZURE: + CloudBlobDirectory cloudBlobDirectory = createCloudBlobDirectory(pathOrUri.substring(3)); + persistence = new AzurePersistence(cloudBlobDirectory); + break; + default: + persistence = new TarPersistence(new File(pathOrUri)); + } + + return persistence; + } + + public static SegmentArchiveManager createArchiveManager(SegmentNodeStorePersistence persistence) { + SegmentArchiveManager archiveManager = null; + try { + archiveManager = persistence.createArchiveManager(false, new IOMonitorAdapter(), + new FileStoreMonitorAdapter()); + } catch (IOException e) { + throw new IllegalArgumentException( + "Could not access the Azure Storage. Please verify the path provided!"); + } + + return archiveManager; + } + + public static CloudBlobDirectory createCloudBlobDirectory(String path) { + Map config = parseAzureConfigurationFromUri(path); + + String accountName = config.get(KEY_ACCOUNT_NAME); + String key = System.getenv("AZURE_SECRET_KEY"); + StorageCredentials credentials = new StorageCredentialsAccountAndKey(accountName, key); + + String uri = config.get(KEY_STORAGE_URI); + String dir = config.get(KEY_DIR); + + try { + return AzureUtilities.cloudBlobDirectoryFrom(credentials, uri, dir); + } catch (URISyntaxException | StorageException e) { + throw new IllegalArgumentException( + "Could not connect to the Azure Storage. Please verify the path provided!"); + } + } + + public static SegmentStoreType storeTypeFromPathOrUri(String pathOrUri) { + if (pathOrUri.startsWith("az:")) { + return SegmentStoreType.AZURE; + } + + return SegmentStoreType.TAR; + } + + public static String storeDescription(SegmentStoreType storeType, String pathOrUri) { + return storeType.description(pathOrUri); + } + + public static String printableStopwatch(Stopwatch s) { + return String.format("%s (%ds)", s, s.elapsed(TimeUnit.SECONDS)); + } + + public static void printMessage(PrintWriter pw, String format, Object... arg) { + pw.println(MessageFormat.format(format, arg)); + } + + public static byte[] fetchByteArray(ByteBuffer buffer) throws IOException { + byte[] data = new byte[buffer.remaining()]; + buffer.get(data); + return data; + } +} diff --git oak-segment-azure/src/test/java/oak/apache/jackrabbit/oak/segment/azure/tool/SegmentCopyAzureToTarTest.java oak-segment-azure/src/test/java/oak/apache/jackrabbit/oak/segment/azure/tool/SegmentCopyAzureToTarTest.java new file mode 100644 index 0000000000..7d1fc442c4 --- /dev/null +++ oak-segment-azure/src/test/java/oak/apache/jackrabbit/oak/segment/azure/tool/SegmentCopyAzureToTarTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package oak.apache.jackrabbit.oak.segment.azure.tool; + +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; + +public class SegmentCopyAzureToTarTest extends SegmentCopyTestBase { + + @Override + protected SegmentNodeStorePersistence getSrcPersistence() throws Exception { + return getAzurePersistence(); + } + + @Override + protected SegmentNodeStorePersistence getDestPersistence() { + return getTarPersistence(); + } + + @Override + protected String getSrcPathOrUri() { + return getAzurePersistencePathOrUri(); + } + + @Override + protected String getDestPathOrUri() { + return getTarPersistencePathOrUri(); + } +} diff --git oak-segment-azure/src/test/java/oak/apache/jackrabbit/oak/segment/azure/tool/SegmentCopyTarToAzureTest.java oak-segment-azure/src/test/java/oak/apache/jackrabbit/oak/segment/azure/tool/SegmentCopyTarToAzureTest.java new file mode 100644 index 0000000000..a5ad6a889a --- /dev/null +++ oak-segment-azure/src/test/java/oak/apache/jackrabbit/oak/segment/azure/tool/SegmentCopyTarToAzureTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package oak.apache.jackrabbit.oak.segment.azure.tool; + +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; + +public class SegmentCopyTarToAzureTest extends SegmentCopyTestBase { + + @Override + protected SegmentNodeStorePersistence getSrcPersistence() { + return getTarPersistence(); + } + + @Override + protected SegmentNodeStorePersistence getDestPersistence() throws Exception { + return getAzurePersistence(); + } + + @Override + protected String getSrcPathOrUri() { + return getTarPersistencePathOrUri(); + } + + @Override + protected String getDestPathOrUri() { + return getAzurePersistencePathOrUri(); + } +} diff --git oak-segment-azure/src/test/java/oak/apache/jackrabbit/oak/segment/azure/tool/SegmentCopyTestBase.java oak-segment-azure/src/test/java/oak/apache/jackrabbit/oak/segment/azure/tool/SegmentCopyTestBase.java new file mode 100644 index 0000000000..c8f8aeac7a --- /dev/null +++ oak-segment-azure/src/test/java/oak/apache/jackrabbit/oak/segment/azure/tool/SegmentCopyTestBase.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package oak.apache.jackrabbit.oak.segment.azure.tool; + +import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.newFileStore; +import static org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.newSegmentNodeStorePersistence; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.jackrabbit.oak.segment.SegmentCache; +import org.apache.jackrabbit.oak.segment.SegmentNodeStore; +import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; +import org.apache.jackrabbit.oak.segment.azure.AzurePersistence; +import org.apache.jackrabbit.oak.segment.azure.AzuriteDockerRule; +import org.apache.jackrabbit.oak.segment.azure.tool.SegmentCopy; +import org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.SegmentStoreType; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor; +import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter; +import org.apache.jackrabbit.oak.segment.spi.persistence.GCJournalFile; +import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader; +import org.apache.jackrabbit.oak.segment.spi.persistence.ManifestFile; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; + +public abstract class SegmentCopyTestBase { + private static final String AZURE_DIRECTORY = "repository"; + private static final String AZURE_CONTAINER = "oak-test"; + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + @Rule + public TemporaryFolder folder = new TemporaryFolder(new File("target")); + + protected abstract SegmentNodeStorePersistence getSrcPersistence() throws Exception; + + protected abstract SegmentNodeStorePersistence getDestPersistence() throws Exception; + + protected abstract String getSrcPathOrUri(); + + protected abstract String getDestPathOrUri(); + + @Test + public void testSegmentCopy() throws Exception { + SegmentNodeStorePersistence srcPersistence = getSrcPersistence(); + SegmentNodeStorePersistence destPersistence = getDestPersistence(); + + String srcPathOrUri = getSrcPathOrUri(); + String destPathOrUri = getDestPathOrUri(); + + int code = runSegmentCopy(srcPersistence, destPersistence, srcPathOrUri, destPathOrUri); + + assertEquals(0, code); + + IOMonitor ioMonitor = new IOMonitorAdapter(); + FileStoreMonitor fileStoreMonitor = new FileStoreMonitorAdapter(); + SegmentArchiveManager srcArchiveManager = srcPersistence.createArchiveManager(false, ioMonitor, + fileStoreMonitor); + SegmentArchiveManager destArchiveManager = destPersistence.createArchiveManager(false, ioMonitor, + fileStoreMonitor); + + checkArchives(srcArchiveManager, destArchiveManager); + checkJournal(srcPersistence, destPersistence); + checkGCJournal(srcPersistence, destPersistence); + checkManifest(srcPersistence, destPersistence); + } + + private int runSegmentCopy(SegmentNodeStorePersistence srcPersistence, SegmentNodeStorePersistence destPersistence, + String srcPathOrUri, String destPathOrUri) throws Exception { + // Repeatedly add content and close FileStore to obtain a new tar file each time + for (int i = 0; i < 10; i++) { + try (FileStore fileStore = newFileStore(srcPersistence, folder.getRoot(), true, + SegmentCache.DEFAULT_SEGMENT_CACHE_MB, 150_000L)) { + SegmentNodeStore sns = SegmentNodeStoreBuilders.builder(fileStore).build(); + addContent(sns, i); + + if (i == 9) { + boolean gcSuccess = fileStore.compactFull(); + assertTrue(gcSuccess); + } + } + } + + PrintWriter outWriter = new PrintWriter(System.out, true); + PrintWriter errWriter = new PrintWriter(System.err, true); + + SegmentCopy segmentCopy = SegmentCopy.builder().withSrcPersistencee(srcPersistence) + .withDestPersistence(destPersistence).withSource(srcPathOrUri).withDestination(destPathOrUri) + .withOutWriter(outWriter).withErrWriter(errWriter).withVerbose(true).build(); + return segmentCopy.run(); + } + + private void addContent(SegmentNodeStore nodeStore, int i) throws Exception { + NodeBuilder extra = nodeStore.getRoot().builder(); + NodeBuilder content = extra.child("content"); + NodeBuilder c = content.child("c" + i); + for (int j = 0; j < 10; j++) { + c.setProperty("p" + i, "v" + i); + } + nodeStore.merge(extra, EmptyHook.INSTANCE, CommitInfo.EMPTY); + } + + private void checkArchives(SegmentArchiveManager srcArchiveManager, SegmentArchiveManager destArchiveManager) + throws IOException { + // check archives + List srcArchives = srcArchiveManager.listArchives(); + List destArchives = destArchiveManager.listArchives(); + Collections.sort(srcArchives); + Collections.sort(destArchives); + assertTrue(srcArchives.equals(destArchives)); + + // check archives contents + for (String archive : srcArchives) { + assertEquals(srcArchiveManager.exists(archive), destArchiveManager.exists(archive)); + + SegmentArchiveReader srcArchiveReader = srcArchiveManager.open(archive); + SegmentArchiveReader destArchiveReader = destArchiveManager.open(archive); + + List srcSegments = srcArchiveReader.listSegments(); + List destSegments = destArchiveReader.listSegments(); + + for (int i = 0; i < srcSegments.size(); i++) { + SegmentArchiveEntry srcSegment = srcSegments.get(i); + SegmentArchiveEntry destSegment = destSegments.get(i); + + assertEquals(srcSegment.getMsb(), destSegment.getMsb()); + assertEquals(srcSegment.getLsb(), destSegment.getLsb()); + assertEquals(srcSegment.getLength(), destSegment.getLength()); + assertEquals(srcSegment.getFullGeneration(), destSegment.getFullGeneration()); + assertEquals(srcSegment.getGeneration(), destSegment.getFullGeneration()); + + ByteBuffer srcDataBuffer = srcArchiveReader.readSegment(srcSegment.getMsb(), srcSegment.getLsb()); + ByteBuffer destDataBuffer = destArchiveReader.readSegment(destSegment.getMsb(), destSegment.getLsb()); + + assertEquals(srcDataBuffer, destDataBuffer); + } + + ByteBuffer srcBinRefBuffer = srcArchiveReader.getBinaryReferences(); + ByteBuffer destBinRefBuffer = destArchiveReader.getBinaryReferences(); + assertEquals(srcBinRefBuffer, destBinRefBuffer); + + assertEquals(srcArchiveReader.hasGraph(), destArchiveReader.hasGraph()); + + ByteBuffer srcGraphBuffer = srcArchiveReader.getGraph(); + ByteBuffer destGraphBuffer = destArchiveReader.getGraph(); + assertEquals(srcGraphBuffer, destGraphBuffer); + } + } + + private void checkJournal(SegmentNodeStorePersistence srcPersistence, SegmentNodeStorePersistence destPersistence) + throws IOException { + JournalFileReader srcJournalFileReader = srcPersistence.getJournalFile().openJournalReader(); + JournalFileReader destJournalFileReader = destPersistence.getJournalFile().openJournalReader(); + + String srcJournalLine = null; + while ((srcJournalLine = srcJournalFileReader.readLine()) != null) { + String destJournalLine = destJournalFileReader.readLine(); + assertEquals(srcJournalLine, destJournalLine); + } + } + + private void checkGCJournal(SegmentNodeStorePersistence srcPersistence, SegmentNodeStorePersistence destPersistence) + throws IOException { + GCJournalFile srcGCJournalFile = srcPersistence.getGCJournalFile(); + GCJournalFile destGCJournalFile = destPersistence.getGCJournalFile(); + assertEquals(srcGCJournalFile.readLines(), destGCJournalFile.readLines()); + } + + private void checkManifest(SegmentNodeStorePersistence srcPersistence, SegmentNodeStorePersistence destPersistence) + throws IOException { + ManifestFile srcManifestFile = srcPersistence.getManifestFile(); + ManifestFile destManifestFile = destPersistence.getManifestFile(); + assertEquals(srcManifestFile.load(), destManifestFile.load()); + } + + protected SegmentNodeStorePersistence getTarPersistence() { + return newSegmentNodeStorePersistence(SegmentStoreType.TAR, folder.getRoot().getAbsolutePath()); + } + + protected SegmentNodeStorePersistence getAzurePersistence() throws Exception { + return new AzurePersistence(azurite.getContainer(AZURE_CONTAINER).getDirectoryReference(AZURE_DIRECTORY)); + } + + protected String getTarPersistencePathOrUri() { + return folder.getRoot().getAbsolutePath(); + } + + protected String getAzurePersistencePathOrUri() { + StringBuilder uri = new StringBuilder("az:"); + uri.append("http://127.0.0.1:"); + uri.append(azurite.getMappedPort()).append("/"); + uri.append(AZURE_CONTAINER).append("/"); + uri.append(AZURE_DIRECTORY); + + return uri.toString(); + } +}