diff --git a/oak-segment-azure/pom.xml b/oak-segment-azure/pom.xml new file mode 100644 index 0000000000..ccd0795dd8 --- /dev/null +++ b/oak-segment-azure/pom.xml @@ -0,0 +1,160 @@ + + + + + + 4.0.0 + + + org.apache.jackrabbit + oak-parent + 1.10-SNAPSHOT + ../oak-parent/pom.xml + + + oak-segment-azure + bundle + + Oak Segment Azure + + + + + org.apache.felix + maven-bundle-plugin + + + + + azure-storage, + azure-keyvault-core + + + + + + baseline + + baseline + + pre-integration-test + + + true + + + + + + + + + + + + + + org.osgi + org.osgi.core + provided + + + org.osgi + org.osgi.compendium + provided + + + org.osgi + org.osgi.annotation + provided + + + org.osgi + org.osgi.service.component.annotations + provided + + + org.osgi + org.osgi.service.metatype.annotations + provided + + + + + org.apache.jackrabbit + oak-segment-tar + ${project.version} + provided + + + org.apache.jackrabbit + oak-store-spi + ${project.version} + provided + + + + + com.microsoft.azure + azure-storage + 5.0.0 + + + com.microsoft.azure + azure-keyvault-core + 0.9.7 + + + + + org.apache.jackrabbit + oak-segment-tar + ${project.version} + tests + test + + + org.apache.jackrabbit + oak-store-spi + ${project.version} + test-jar + test + + + org.mockito + mockito-core + 1.10.19 + test + + + junit + junit + test + + + com.arakelian + docker-junit-rule + 2.1.0 + test + + + + diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java new file mode 100644 index 0000000000..7e84e93ae6 --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.BlobListingDetails; +import com.microsoft.azure.storage.blob.CloudBlob; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import com.microsoft.azure.storage.blob.CopyStatus; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.UUID; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.getName; + +public class AzureArchiveManager implements SegmentArchiveManager { + + private static final Logger log = LoggerFactory.getLogger(AzureSegmentArchiveReader.class); + + private final CloudBlobDirectory cloudBlobDirectory; + + private final IOMonitor ioMonitor; + + private final FileStoreMonitor monitor; + + public AzureArchiveManager(CloudBlobDirectory cloudBlobDirectory, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor) { + this.cloudBlobDirectory = cloudBlobDirectory; + this.ioMonitor = ioMonitor; + this.monitor = fileStoreMonitor; + } + + @Override + public List listArchives() throws IOException { + try { + return StreamSupport.stream(cloudBlobDirectory + .listBlobs(null, false, EnumSet.noneOf(BlobListingDetails.class), null, null) + .spliterator(), false) + .filter(i -> i instanceof CloudBlobDirectory) + .map(i -> (CloudBlobDirectory) i) + .map(CloudBlobDirectory::getPrefix) + .map(Paths::get) + .map(Path::getFileName) + .map(Path::toString) + .collect(Collectors.toList()); + } catch (URISyntaxException | StorageException e) { + throw new IOException(e); + } + } + + @Override + public SegmentArchiveReader open(String archiveName) throws IOException { + try { + CloudBlobDirectory archiveDirectory = getDirectory(archiveName); + if (!archiveDirectory.getBlockBlobReference("closed").exists()) { + throw new IOException("The archive " + archiveName + " hasn't been closed correctly."); + } + return new AzureSegmentArchiveReader(archiveDirectory, ioMonitor, monitor); + } catch (StorageException | URISyntaxException e) { + throw new IOException(e); + } + } + + @Override + public SegmentArchiveWriter create(String archiveName) throws IOException { + return new AzureSegmentArchiveWriter(getDirectory(archiveName), ioMonitor, monitor); + } + + @Override + public boolean delete(String archiveName) { + try { + getBlobs(archiveName) + .forEach(cloudBlob -> { + try { + cloudBlob.delete(); + } catch (StorageException e) { + log.error("Can't delete segment {}", cloudBlob.getUri().getPath(), e); + } + }); + return true; + } catch (IOException e) { + log.error("Can't delete archive {}", archiveName, e); + return false; + } + } + + @Override + public boolean renameTo(String from, String to) { + try { + CloudBlobDirectory targetDirectory = getDirectory(to); + getBlobs(from) + .forEach(cloudBlob -> { + try { + renameBlob(cloudBlob, targetDirectory); + } catch (IOException e) { + log.error("Can't rename segment {}", cloudBlob.getUri().getPath(), e); + } + }); + return true; + } catch (IOException e) { + log.error("Can't rename archive {} to {}", from, to, e); + return false; + } + } + + @Override + public void copyFile(String from, String to) throws IOException { + CloudBlobDirectory targetDirectory = getDirectory(to); + getBlobs(from) + .forEach(cloudBlob -> { + try { + copyBlob(cloudBlob, targetDirectory); + } catch (IOException e) { + log.error("Can't copy segment {}", cloudBlob.getUri().getPath(), e); + } + }); + } + + @Override + public boolean exists(String archiveName) { + try { + return listArchives().contains(archiveName); + } catch (IOException e) { + log.error("Can't check the existence of {}", archiveName, e); + return false; + } + } + + @Override + public void recoverEntries(String archiveName, LinkedHashMap entries) throws IOException { + Pattern pattern = Pattern.compile(AzureUtilities.SEGMENT_FILE_NAME_PATTERN); + List entryList = new ArrayList<>(); + + for (CloudBlob b : getBlobList(archiveName)) { + String name = getName(b); + Matcher m = pattern.matcher(name); + if (!m.matches()) { + continue; + } + int position = Integer.parseInt(m.group(1), 16); + UUID uuid = UUID.fromString(m.group(2)); + long length = b.getProperties().getLength(); + if (length > 0) { + byte[] data = new byte[(int) length]; + try { + b.downloadToByteArray(data, 0); + } catch (StorageException e) { + throw new IOException(e); + } + entryList.add(new RecoveredEntry(position, uuid, data, name)); + } + } + Collections.sort(entryList); + + int i = 0; + for (RecoveredEntry e : entryList) { + if (e.position != i) { + log.warn("Missing entry {}.??? when recovering {}. No more segments will be read.", String.format("%04X", i), archiveName); + break; + } + log.info("Recovering segment {}/{}", archiveName, e.fileName); + entries.put(e.uuid, e.data); + i++; + } + } + + + private CloudBlobDirectory getDirectory(String archiveName) throws IOException { + try { + return cloudBlobDirectory.getDirectoryReference(archiveName); + } catch (URISyntaxException e) { + throw new IOException(e); + } + } + + private Stream getBlobs(String archiveName) throws IOException { + return AzureUtilities.getBlobs(getDirectory(archiveName)); + } + + private List getBlobList(String archiveName) throws IOException { + return getBlobs(archiveName).collect(Collectors.toList()); + } + + private void renameBlob(CloudBlob blob, CloudBlobDirectory newParent) throws IOException { + copyBlob(blob, newParent); + try { + blob.delete(); + } catch (StorageException e) { + throw new IOException(e); + } + } + + private void copyBlob(CloudBlob blob, CloudBlobDirectory newParent) throws IOException { + checkArgument(blob instanceof CloudBlockBlob, "Only page blobs are supported for the rename"); + try { + String blobName = getName(blob); + CloudBlockBlob newBlob = newParent.getBlockBlobReference(blobName); + newBlob.startCopy(blob.getUri()); + while (newBlob.getCopyState().getStatus() == CopyStatus.PENDING) { + Thread.sleep(100); + } + + CopyStatus finalStatus = newBlob.getCopyState().getStatus(); + if (newBlob.getCopyState().getStatus() != CopyStatus.SUCCESS) { + throw new IOException("Invalid copy status for " + blob.getUri().getPath() + ": " + finalStatus); + } + } catch (StorageException | InterruptedException | URISyntaxException e) { + throw new IOException(e); + } + } + + private static class RecoveredEntry implements Comparable { + + private final byte[] data; + + private final UUID uuid; + + private final int position; + + private final String fileName; + + public RecoveredEntry(int position, UUID uuid, byte[] data, String fileName) { + this.data = data; + this.uuid = uuid; + this.position = position; + this.fileName = fileName; + } + + @Override + public int compareTo(RecoveredEntry o) { + return Integer.compare(this.position, o.position); + } + } + +} diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadata.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadata.java new file mode 100644 index 0000000000..103a706f82 --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadata.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +public final class AzureBlobMetadata { + + private static final String METADATA_TYPE = "type"; + + private static final String METADATA_SEGMENT_UUID = "segment-uuid"; + + private static final String METADATA_SEGMENT_POSITION = "segment-position"; + + private static final String METADATA_SEGMENT_GENERATION = "segment-generation"; + + private static final String METADATA_SEGMENT_FULL_GENERATION = "segment-fullGeneration"; + + private static final String METADATA_SEGMENT_COMPACTED = "segment-compacted"; + + private static final String TYPE_SEGMENT = "segment"; + + public static HashMap toSegmentMetadata(AzureSegmentArchiveEntry indexEntry) { + HashMap map = new HashMap<>(); + map.put(METADATA_TYPE, TYPE_SEGMENT); + map.put(METADATA_SEGMENT_UUID, new UUID(indexEntry.getMsb(), indexEntry.getLsb()).toString()); + map.put(METADATA_SEGMENT_POSITION, String.valueOf(indexEntry.getPosition())); + map.put(METADATA_SEGMENT_GENERATION, String.valueOf(indexEntry.getGeneration())); + map.put(METADATA_SEGMENT_FULL_GENERATION, String.valueOf(indexEntry.getFullGeneration())); + map.put(METADATA_SEGMENT_COMPACTED, String.valueOf(indexEntry.isCompacted())); + return map; + } + + public static AzureSegmentArchiveEntry toIndexEntry(Map metadata, int length) { + UUID uuid = UUID.fromString(metadata.get(METADATA_SEGMENT_UUID)); + long msb = uuid.getMostSignificantBits(); + long lsb = uuid.getLeastSignificantBits(); + int position = Integer.parseInt(metadata.get(METADATA_SEGMENT_POSITION)); + int generation = Integer.parseInt(metadata.get(METADATA_SEGMENT_GENERATION)); + int fullGeneration = Integer.parseInt(metadata.get(METADATA_SEGMENT_FULL_GENERATION)); + boolean compacted = Boolean.parseBoolean(metadata.get(METADATA_SEGMENT_COMPACTED)); + return new AzureSegmentArchiveEntry(msb, lsb, position, length, generation, fullGeneration, compacted); + } + + public static boolean isSegment(Map metadata) { + return metadata != null && TYPE_SEGMENT.equals(metadata.get(METADATA_TYPE)); + } + +} diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalFile.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalFile.java new file mode 100644 index 0000000000..519da4d7ab --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalFile.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.google.common.base.Charsets; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudAppendBlob; +import org.apache.commons.io.IOUtils; +import org.apache.jackrabbit.oak.segment.spi.persistence.GCJournalFile; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.Collections; +import java.util.List; + +public class AzureGCJournalFile implements GCJournalFile { + + private final CloudAppendBlob gcJournal; + + public AzureGCJournalFile(CloudAppendBlob gcJournal) { + this.gcJournal = gcJournal; + } + + @Override + public void writeLine(String line) throws IOException { + try { + if (!gcJournal.exists()) { + gcJournal.createOrReplace(); + } + gcJournal.appendText(line + "\n", Charsets.UTF_8.name(), null, null, null); + } catch (StorageException e) { + throw new IOException(e); + } + } + + @Override + public List readLines() throws IOException { + try { + if (!gcJournal.exists()) { + return Collections.emptyList(); + } + byte[] data = new byte[(int) gcJournal.getProperties().getLength()]; + gcJournal.downloadToByteArray(data, 0); + return IOUtils.readLines(new ByteArrayInputStream(data), Charset.defaultCharset()); + } catch (StorageException e) { + throw new IOException(e); + } + } +} diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java new file mode 100644 index 0000000000..47397e40c6 --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudAppendBlob; +import com.microsoft.azure.storage.blob.CloudBlob; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; +import com.microsoft.azure.storage.blob.ListBlobItem; +import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile; +import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader; +import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class AzureJournalFile implements JournalFile { + + private static final Logger log = LoggerFactory.getLogger(AzureJournalFile.class); + + private static final int JOURNAL_LINE_LIMIT = Integer.getInteger("org.apache.jackrabbit.oak.segment.azure.journal.lines", 40_000); + + private final CloudBlobDirectory directory; + + private final String journalNamePrefix; + + private final int lineLimit; + + AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix, int lineLimit) { + this.directory = directory; + this.journalNamePrefix = journalNamePrefix; + this.lineLimit = lineLimit; + } + + public AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix) { + this(directory, journalNamePrefix, JOURNAL_LINE_LIMIT); + } + + @Override + public JournalFileReader openJournalReader() throws IOException { + return new CombinedReader(getJournalBlobs()); + } + + @Override + public JournalFileWriter openJournalWriter() throws IOException { + return new AzureJournalWriter(); + } + + @Override + public String getName() { + return journalNamePrefix; + } + + @Override + public boolean exists() { + try { + return !getJournalBlobs().isEmpty(); + } catch (IOException e) { + log.error("Can't check if the file exists", e); + return false; + } + } + + private String getJournalFileName(int index) { + return String.format("%s.%03d", journalNamePrefix, index); + } + + private List getJournalBlobs() throws IOException { + try { + List result = new ArrayList<>(); + for (ListBlobItem b : directory.listBlobs(journalNamePrefix)) { + if (b instanceof CloudAppendBlob) { + result.add((CloudAppendBlob) b); + } else { + log.warn("Invalid blob type: {} {}", b.getUri(), b.getClass()); + } + } + result.sort(Comparator.comparing(AzureUtilities::getName).reversed()); + return result; + } catch (URISyntaxException | StorageException e) { + throw new IOException(e); + } + } + + private static class AzureJournalReader implements JournalFileReader { + + private final CloudBlob blob; + + private ReverseFileReader reader; + + private AzureJournalReader(CloudBlob blob) { + this.blob = blob; + } + + @Override + public String readLine() throws IOException { + if (reader == null) { + try { + reader = new ReverseFileReader(blob); + } catch (StorageException e) { + throw new IOException(e); + } + } + return reader.readLine(); + } + + @Override + public void close() throws IOException { + } + } + + private class AzureJournalWriter implements JournalFileWriter { + + private CloudAppendBlob currentBlob; + + private int blockCount; + + public AzureJournalWriter() throws IOException { + List blobs = getJournalBlobs(); + if (blobs.isEmpty()) { + try { + currentBlob = directory.getAppendBlobReference(getJournalFileName(1)); + currentBlob.createOrReplace(); + } catch (URISyntaxException | StorageException e) { + throw new IOException(e); + } + } else { + currentBlob = blobs.get(0); + } + Integer bc = currentBlob.getProperties().getAppendBlobCommittedBlockCount(); + blockCount = bc == null ? 0 : bc; + } + + @Override + public void truncate() throws IOException { + try { + for (CloudAppendBlob cloudAppendBlob : getJournalBlobs()) { + cloudAppendBlob.delete(); + } + } catch (StorageException e) { + throw new IOException(e); + } + } + + @Override + public void writeLine(String line) throws IOException { + if (blockCount >= lineLimit) { + createNewFile(); + } + try { + currentBlob.appendText(line + "\n"); + blockCount++; + } catch (StorageException e) { + throw new IOException(e); + } + } + + private void createNewFile() throws IOException { + String name = AzureUtilities.getName(currentBlob); + Pattern pattern = Pattern.compile(Pattern.quote(journalNamePrefix) + "\\.(\\d+)" ); + Matcher matcher = pattern.matcher(name); + int parsedSuffix; + if (matcher.find()) { + String suffix = matcher.group(1); + try { + parsedSuffix = Integer.parseInt(suffix); + } catch (NumberFormatException e) { + log.warn("Can't parse suffix for journal file {}", name); + parsedSuffix = 0; + } + } else { + log.warn("Can't parse journal file name {}", name); + parsedSuffix = 0; + } + try { + currentBlob = directory.getAppendBlobReference(getJournalFileName(parsedSuffix + 1)); + currentBlob.createOrReplace(); + blockCount = 0; + } catch (URISyntaxException | StorageException e) { + throw new IOException(e); + } + } + + @Override + public void close() throws IOException { + // do nothing + } + } + + private static class CombinedReader implements JournalFileReader { + + private final Iterator readers; + + private JournalFileReader currentReader; + + private CombinedReader(List blobs) { + readers = blobs.stream().map(AzureJournalReader::new).iterator(); + } + + @Override + public String readLine() throws IOException { + String line; + do { + if (currentReader == null) { + if (!readers.hasNext()) { + return null; + } + currentReader = readers.next(); + } + do { + line = currentReader.readLine(); + } while ("".equals(line)); + if (line == null) { + currentReader.close(); + currentReader = null; + } + } while (line == null); + return line; + } + + @Override + public void close() throws IOException { + while (readers.hasNext()) { + readers.next().close(); + } + if (currentReader != null) { + currentReader.close(); + currentReader = null; + } + } + } +} \ No newline at end of file diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFile.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFile.java new file mode 100644 index 0000000000..aae72c1200 --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFile.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import org.apache.jackrabbit.oak.segment.spi.persistence.ManifestFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Properties; + +public class AzureManifestFile implements ManifestFile { + + private static final Logger log = LoggerFactory.getLogger(AzureManifestFile.class); + + private final CloudBlockBlob manifestBlob; + + public AzureManifestFile(CloudBlockBlob manifestBlob) { + this.manifestBlob = manifestBlob; + } + + @Override + public boolean exists() { + try { + return manifestBlob.exists(); + } catch (StorageException e) { + log.error("Can't check if the manifest exists", e); + return false; + } + } + + @Override + public Properties load() throws IOException { + Properties properties = new Properties(); + if (exists()) { + long length = manifestBlob.getProperties().getLength(); + byte[] data = new byte[(int) length]; + try { + manifestBlob.downloadToByteArray(data, 0); + } catch (StorageException e) { + throw new IOException(e); + } + properties.load(new ByteArrayInputStream(data)); + } + return properties; + } + + @Override + public void save(Properties properties) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + properties.store(bos, null); + + byte[] data = bos.toByteArray(); + try { + manifestBlob.uploadFromByteArray(data, 0, data.length); + } catch (StorageException e) { + throw new IOException(e); + } + } +} diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java new file mode 100644 index 0000000000..f1d3cbd892 --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.BlobListingDetails; +import com.microsoft.azure.storage.blob.CloudAppendBlob; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import com.microsoft.azure.storage.blob.ListBlobItem; +import org.apache.jackrabbit.oak.segment.spi.persistence.GCJournalFile; +import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile; +import org.apache.jackrabbit.oak.segment.spi.persistence.ManifestFile; +import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Paths; +import java.util.EnumSet; + +public class AzurePersistence implements SegmentNodeStorePersistence { + + private static final Logger log = LoggerFactory.getLogger(AzurePersistence.class); + + private final CloudBlobDirectory segmentstoreDirectory; + + public AzurePersistence(CloudBlobDirectory segmentstoreDirectory) { + this.segmentstoreDirectory = segmentstoreDirectory; + } + + @Override + public SegmentArchiveManager createArchiveManager(boolean mmap, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor) { + return new AzureArchiveManager(segmentstoreDirectory, ioMonitor, fileStoreMonitor); + } + + @Override + public boolean segmentFilesExist() { + try { + for (ListBlobItem i : segmentstoreDirectory.listBlobs(null, false, EnumSet.noneOf(BlobListingDetails.class), null, null)) { + if (i instanceof CloudBlobDirectory) { + CloudBlobDirectory dir = (CloudBlobDirectory) i; + String name = Paths.get(dir.getPrefix()).getFileName().toString(); + if (name.endsWith(".tar")) { + return true; + } + } + } + return false; + } catch (StorageException | URISyntaxException e) { + log.error("Can't check if the segment archives exists", e); + return false; + } + } + + @Override + public JournalFile getJournalFile() { + return new AzureJournalFile(segmentstoreDirectory, "journal.log"); + } + + @Override + public GCJournalFile getGCJournalFile() throws IOException { + return new AzureGCJournalFile(getAppendBlob("gc.log")); + } + + @Override + public ManifestFile getManifestFile() throws IOException { + return new AzureManifestFile(getBlockBlob("manifest")); + } + + @Override + public RepositoryLock lockRepository() throws IOException { + return new AzureRepositoryLock(getBlockBlob("repo.lock"), new Runnable() { + @Override + public void run() { + log.warn("Lost connection to the Azure. The client will be closed."); + // TODO close the connection + } + }).lock(); + } + + private CloudBlockBlob getBlockBlob(String path) throws IOException { + try { + return segmentstoreDirectory.getBlockBlobReference(path); + } catch (URISyntaxException | StorageException e) { + throw new IOException(e); + } + } + + private CloudAppendBlob getAppendBlob(String path) throws IOException { + try { + return segmentstoreDirectory.getAppendBlobReference(path); + } catch (URISyntaxException | StorageException e) { + throw new IOException(e); + } + } + +} diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java new file mode 100644 index 0000000000..42542db3e5 --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.AccessCondition; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public class AzureRepositoryLock implements RepositoryLock { + + private static final Logger log = LoggerFactory.getLogger(AzureRepositoryLock.class); + + private static int INTERVAL = 60; + + private final Runnable shutdownHook; + + private final CloudBlockBlob blob; + + private final ExecutorService executor; + + private String leaseId; + + private volatile boolean doUpdate; + + public AzureRepositoryLock(CloudBlockBlob blob, Runnable shutdownHook) { + this.shutdownHook = shutdownHook; + this.blob = blob; + this.executor = Executors.newSingleThreadExecutor(); + } + + public AzureRepositoryLock lock() throws IOException { + try { + blob.openOutputStream().close(); + leaseId = blob.acquireLease(INTERVAL, null); + log.info("Acquired lease {}", leaseId); + } catch (StorageException e) { + throw new IOException(e); + } + executor.submit(this::refreshLease); + return this; + } + + private void refreshLease() { + doUpdate = true; + long lastUpdate = 0; + while (doUpdate) { + try { + long timeSinceLastUpdate = (System.currentTimeMillis() - lastUpdate) / 1000; + if (timeSinceLastUpdate > INTERVAL / 2) { + blob.renewLease(AccessCondition.generateLeaseCondition(leaseId)); + lastUpdate = System.currentTimeMillis(); + } + } catch (StorageException e) { + log.error("Can't renew the lease", e); + shutdownHook.run(); + doUpdate = false; + return; + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + log.error("Interrupted the lease renewal loop", e); + } + } + } + + @Override + public void unlock() throws IOException { + doUpdate = false; + executor.shutdown(); + try { + executor.awaitTermination(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + throw new IOException(e); + } finally { + releaseLease(); + } + } + + private void releaseLease() throws IOException { + try { + blob.releaseLease(AccessCondition.generateLeaseCondition(leaseId)); + blob.delete(); + log.info("Released lease {}", leaseId); + } catch (StorageException e) { + throw new IOException(e); + } + } +} diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveEntry.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveEntry.java new file mode 100644 index 0000000000..08a1068ba5 --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveEntry.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import org.apache.jackrabbit.oak.segment.file.tar.index.IndexEntry; + +public class AzureSegmentArchiveEntry implements IndexEntry { + + private final long msb; + + private final long lsb; + + private final int position; + + private final int length; + + private final int generation; + + private final int fullGeneration; + + private final boolean compacted; + + public AzureSegmentArchiveEntry(long msb, long lsb, int position, int length, int generation, int fullGeneration, boolean compacted) { + this.msb = msb; + this.lsb = lsb; + this.position = position; + this.length = length; + this.generation = generation; + this.fullGeneration = fullGeneration; + this.compacted = compacted; + } + + @Override + public long getMsb() { + return msb; + } + + @Override + public long getLsb() { + return lsb; + } + + @Override + public int getPosition() { + return position; + } + + @Override + public int getLength() { + return length; + } + + @Override + public int getGeneration() { + return generation; + } + + @Override + public int getFullGeneration() { + return fullGeneration; + } + + @Override + public boolean isCompacted() { + return compacted; + } +} diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java new file mode 100644 index 0000000000..77dbd07d51 --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.google.common.base.Stopwatch; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlob; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.getSegmentFileName; +import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.readBufferFully; + +public class AzureSegmentArchiveReader implements SegmentArchiveReader { + + private final CloudBlobDirectory archiveDirectory; + + private final IOMonitor ioMonitor; + + private final FileStoreMonitor monitor; + + private final long length; + + private final Map index = new LinkedHashMap<>(); + + private Boolean hasGraph; + + AzureSegmentArchiveReader(CloudBlobDirectory archiveDirectory, IOMonitor ioMonitor, FileStoreMonitor monitor) throws IOException { + this.archiveDirectory = archiveDirectory; + this.ioMonitor = ioMonitor; + this.monitor = monitor; + long length = 0; + try { + for (CloudBlob blob : AzureUtilities.getBlobs(archiveDirectory).collect(Collectors.toList())) { + blob.downloadAttributes(); + Map metadata = blob.getMetadata(); + if (AzureBlobMetadata.isSegment(metadata)) { + AzureSegmentArchiveEntry indexEntry = AzureBlobMetadata.toIndexEntry(metadata, (int) blob.getProperties().getLength()); + index.put(new UUID(indexEntry.getMsb(), indexEntry.getLsb()), indexEntry); + } + length += blob.getProperties().getLength(); + } + } catch (StorageException e) { + throw new IOException(e); + } + this.length = length; + } + + @Override + public ByteBuffer readSegment(long msb, long lsb) throws IOException { + AzureSegmentArchiveEntry indexEntry = index.get(new UUID(msb, lsb)); + ByteBuffer buffer = ByteBuffer.allocate(indexEntry.getLength()); + ioMonitor.beforeSegmentRead(pathAsFile(), msb, lsb, indexEntry.getLength()); + Stopwatch stopwatch = Stopwatch.createStarted(); + readBufferFully(getBlob(getSegmentFileName(indexEntry)), buffer); + long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); + ioMonitor.afterSegmentRead(pathAsFile(), msb, lsb, indexEntry.getLength(), elapsed); + return buffer; + } + + @Override + public boolean containsSegment(long msb, long lsb) { + return index.containsKey(new UUID(msb, lsb)); + } + + @Override + public List listSegments() { + return new ArrayList<>(index.values()); + } + + @Override + public ByteBuffer getGraph() throws IOException { + ByteBuffer graph = readBlob(getName() + ".gph"); + hasGraph = graph != null; + return graph; + } + + @Override + public boolean hasGraph() { + if (hasGraph == null) { + try { + getGraph(); + } catch (IOException ignore) { } + } + return hasGraph; + } + + @Override + public ByteBuffer getBinaryReferences() throws IOException { + return readBlob(getName() + ".brf"); + } + + @Override + public long length() { + return length; + } + + @Override + public String getName() { + return AzureUtilities.getName(archiveDirectory); + } + + @Override + public void close() throws IOException { + // do nothing + } + + @Override + public int getEntrySize(int size) { + return size; + } + + private File pathAsFile() { + return new File(archiveDirectory.getUri().getPath()); + } + + private CloudBlockBlob getBlob(String name) throws IOException { + try { + return archiveDirectory.getBlockBlobReference(name); + } catch (URISyntaxException | StorageException e) { + throw new IOException(e); + } + } + + private ByteBuffer readBlob(String name) throws IOException { + try { + CloudBlockBlob blob = getBlob(name); + if (!blob.exists()) { + return null; + } + long length = blob.getProperties().getLength(); + ByteBuffer buffer = ByteBuffer.allocate((int) length); + AzureUtilities.readBufferFully(blob, buffer); + return buffer; + } catch (StorageException e) { + throw new IOException(e); + } + } + +} diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java new file mode 100644 index 0000000000..98bdda91e1 --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.google.common.base.Stopwatch; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor; +import org.apache.jackrabbit.oak.segment.azure.queue.SegmentWriteAction; +import org.apache.jackrabbit.oak.segment.azure.queue.SegmentWriteQueue; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.getSegmentFileName; +import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.readBufferFully; + +public class AzureSegmentArchiveWriter implements SegmentArchiveWriter { + + private final CloudBlobDirectory archiveDirectory; + + private final IOMonitor ioMonitor; + + private final FileStoreMonitor monitor; + + private final Optional queue; + + private Map index = Collections.synchronizedMap(new LinkedHashMap<>()); + + private int entries; + + private long totalLength; + + private volatile boolean created = false; + + public AzureSegmentArchiveWriter(CloudBlobDirectory archiveDirectory, IOMonitor ioMonitor, FileStoreMonitor monitor) { + this.archiveDirectory = archiveDirectory; + this.ioMonitor = ioMonitor; + this.monitor = monitor; + this.queue = SegmentWriteQueue.THREADS > 0 ? Optional.of(new SegmentWriteQueue(this::doWriteEntry)) : Optional.empty(); + } + + @Override + public void writeSegment(long msb, long lsb, byte[] data, int offset, int size, int generation, int fullGeneration, boolean compacted) throws IOException { + created = true; + + AzureSegmentArchiveEntry entry = new AzureSegmentArchiveEntry(msb, lsb, entries++, size, generation, fullGeneration, compacted); + if (queue.isPresent()) { + queue.get().addToQueue(entry, data, offset, size); + } else { + doWriteEntry(entry, data, offset, size); + } + index.put(new UUID(msb, lsb), entry); + + totalLength += size; + monitor.written(size); + } + + private void doWriteEntry(AzureSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException { + long msb = indexEntry.getMsb(); + long lsb = indexEntry.getLsb(); + ioMonitor.beforeSegmentWrite(pathAsFile(), msb, lsb, size); + Stopwatch stopwatch = Stopwatch.createStarted(); + try { + CloudBlockBlob blob = getBlob(getSegmentFileName(indexEntry)); + blob.setMetadata(AzureBlobMetadata.toSegmentMetadata(indexEntry)); + blob.uploadFromByteArray(data, offset, size); + blob.uploadMetadata(); + } catch (StorageException e) { + throw new IOException(e); + } + ioMonitor.afterSegmentWrite(pathAsFile(), msb, lsb, size, stopwatch.elapsed(TimeUnit.NANOSECONDS)); + } + + @Override + public ByteBuffer readSegment(long msb, long lsb) throws IOException { + UUID uuid = new UUID(msb, lsb); + Optional segment = queue.map(q -> q.read(uuid)); + if (segment.isPresent()) { + return segment.get().toByteBuffer(); + } + AzureSegmentArchiveEntry indexEntry = index.get(new UUID(msb, lsb)); + if (indexEntry == null) { + return null; + } + ByteBuffer buffer = ByteBuffer.allocate(indexEntry.getLength()); + readBufferFully(getBlob(getSegmentFileName(indexEntry)), buffer); + return buffer; + } + + @Override + public boolean containsSegment(long msb, long lsb) { + UUID uuid = new UUID(msb, lsb); + Optional segment = queue.map(q -> q.read(uuid)); + if (segment.isPresent()) { + return true; + } + return index.containsKey(new UUID(msb, lsb)); + } + + @Override + public void writeGraph(byte[] data) throws IOException { + writeDataFile(data, ".gph"); + } + + @Override + public void writeBinaryReferences(byte[] data) throws IOException { + writeDataFile(data, ".brf"); + } + + private void writeDataFile(byte[] data, String extension) throws IOException { + try { + getBlob(getName() + extension).uploadFromByteArray(data, 0, data.length); + } catch (StorageException e) { + throw new IOException(e); + } + totalLength += data.length; + monitor.written(data.length); + } + + @Override + public long getLength() { + return totalLength; + } + + @Override + public void close() throws IOException { + if (queue.isPresent()) { // required to handle IOException + SegmentWriteQueue q = queue.get(); + q.flush(); + q.close(); + } + try { + getBlob("closed").uploadFromByteArray(new byte[0], 0, 0); + } catch (StorageException e) { + throw new IOException(e); + } + } + + @Override + public boolean isCreated() { + return created || !queueIsEmpty(); + } + + @Override + public void flush() throws IOException { + if (queue.isPresent()) { // required to handle IOException + queue.get().flush(); + } + } + + private boolean queueIsEmpty() { + return queue.map(SegmentWriteQueue::isEmpty).orElse(true); + } + + @Override + public String getName() { + return AzureUtilities.getName(archiveDirectory); + } + + private File pathAsFile() { + return new File(archiveDirectory.getUri().getPath()); + } + + private CloudBlockBlob getBlob(String name) throws IOException { + try { + return archiveDirectory.getBlockBlobReference(name); + } catch (URISyntaxException | StorageException e) { + throw new IOException(e); + } + } +} diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentStoreService.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentStoreService.java new file mode 100644 index 0000000000..f0ead9e093 --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentStoreService.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.osgi.framework.ServiceRegistration; +import org.osgi.service.component.ComponentContext; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.ConfigurationPolicy; +import org.osgi.service.component.annotations.Deactivate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.Properties; + + +@Component( + configurationPolicy = ConfigurationPolicy.REQUIRE, + configurationPid = {Configuration.PID}) +public class AzureSegmentStoreService { + + private static final Logger log = LoggerFactory.getLogger(AzureSegmentStoreService.class); + + public static final String DEFAULT_CONTAINER_NAME = "oak"; + + public static final String DEFAULT_ROOT_PATH = "/oak"; + + private ServiceRegistration registration; + + private SegmentNodeStorePersistence persistence; + + @Activate + public void activate(ComponentContext context, Configuration config) throws IOException { + persistence = createAzurePersistence(config); + registration = context.getBundleContext().registerService(SegmentNodeStorePersistence.class.getName(), persistence, new Properties()); + } + + @Deactivate + public void deactivate() throws IOException { + if (registration != null) { + registration.unregister(); + registration = null; + } + persistence = null; + } + + private static SegmentNodeStorePersistence createAzurePersistence(Configuration configuration) throws IOException { + try { + StringBuilder connectionString = new StringBuilder(); + if (configuration.connectionURL() != null) { + connectionString.append(configuration.connectionURL()); + } else { + connectionString.append("DefaultEndpointsProtocol=https;"); + connectionString.append("AccountName=").append(configuration.accountName()).append(';'); + connectionString.append("AccountKey=").append(configuration.accessKey()).append(';'); + } + log.info("Connection string: {}", connectionString.toString()); + CloudStorageAccount cloud = CloudStorageAccount.parse(connectionString.toString()); + CloudBlobContainer container = cloud.createCloudBlobClient().getContainerReference(configuration.containerName()); + container.createIfNotExists(); + + String path = configuration.rootPath(); + if (path != null && path.length() > 0 && path.charAt(0) == '/') { + path = path.substring(1); + } + + AzurePersistence persistence = new AzurePersistence(container.getDirectoryReference(path)); + return persistence; + } catch (StorageException | URISyntaxException | InvalidKeyException e) { + throw new IOException(e); + } + } + +} + diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java new file mode 100644 index 0000000000..1e5d7f214c --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlob; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.nio.file.Paths; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +public final class AzureUtilities { + + public static String SEGMENT_FILE_NAME_PATTERN = "^([0-9a-f]{4})\\.([0-9a-f-]+)$"; + + private AzureUtilities() { + } + + public static String getSegmentFileName(AzureSegmentArchiveEntry indexEntry) { + return getSegmentFileName(indexEntry.getPosition(), indexEntry.getMsb(), indexEntry.getLsb()); + } + + public static String getSegmentFileName(long offset, long msb, long lsb) { + return String.format("%04x.%s", offset, new UUID(msb, lsb).toString()); + } + + public static String getName(CloudBlob blob) { + return Paths.get(blob.getName()).getFileName().toString(); + } + + public static String getName(CloudBlobDirectory directory) { + return Paths.get(directory.getUri().getPath()).getFileName().toString(); + } + + public static Stream getBlobs(CloudBlobDirectory directory) throws IOException { + try { + return StreamSupport.stream(directory.listBlobs().spliterator(), false) + .filter(i -> i instanceof CloudBlob) + .map(i -> (CloudBlob) i); + } catch (StorageException | URISyntaxException e) { + throw new IOException(e); + } + } + + public static long getDirectorySize(CloudBlobDirectory directory) throws IOException { + long size = 0; + for (CloudBlob b : getBlobs(directory).collect(Collectors.toList())) { + try { + b.downloadAttributes(); + } catch (StorageException e) { + throw new IOException(e); + } + size += b.getProperties().getLength(); + } + return size; + } + + public static void readBufferFully(CloudBlob blob, ByteBuffer buffer) throws IOException { + try { + buffer.rewind(); + long readBytes = blob.downloadToByteArray(buffer.array(), 0); + if (buffer.limit() != readBytes) { + throw new IOException("Buffer size: " + buffer.limit() + ", read bytes: " + readBytes); + } + } catch (StorageException e) { + throw new IOException(e); + } + } +} diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/Configuration.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/Configuration.java new file mode 100644 index 0000000000..34edd746b4 --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/Configuration.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import org.osgi.service.metatype.annotations.AttributeDefinition; +import org.osgi.service.metatype.annotations.ObjectClassDefinition; + +import static org.apache.jackrabbit.oak.segment.azure.Configuration.PID; + +@ObjectClassDefinition( + pid = {PID}, + name = "Apache Jackrabbit Oak Azure Segment Store Service", + description = "Azure backend for the Oak Segment Node Store") +@interface Configuration { + + String PID = "org.apache.jackrabbit.oak.segment.azure.AzureSegmentStoreService"; + + @AttributeDefinition( + name = "Azure account name", + description = "Name of the Azure Storage account to use.") + String accountName(); + + @AttributeDefinition( + name = "Azure container name", + description = "Name of the container to use. If it doesn't exists, it'll be created.") + String containerName() default AzureSegmentStoreService.DEFAULT_CONTAINER_NAME; + + @AttributeDefinition( + name = "Azure account access key", + description = "Access key which should be used to authenticate on the account") + String accessKey(); + + @AttributeDefinition( + name = "Root path", + description = "Names of all the created blobs will be prefixed with this path") + String rootPath() default AzureSegmentStoreService.DEFAULT_ROOT_PATH; + + @AttributeDefinition( + name = "Azure connection URL (optional)", + description = "Connection URL to be used to connect to the Azure Storage. " + + "Setting it will override the accountName, containerName and accessKey properties.") + String connectionURL(); +} \ No newline at end of file diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/ReverseFileReader.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/ReverseFileReader.java new file mode 100644 index 0000000000..439fab5f9c --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/ReverseFileReader.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlob; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static java.lang.Math.min; + +public class ReverseFileReader { + + private static final int BUFFER_SIZE = 16 * 1024; + + private int bufferSize; + + private final CloudBlob blob; + + private byte[] buffer; + + private int bufferOffset; + + private int fileOffset; + + public ReverseFileReader(CloudBlob blob) throws StorageException { + this(blob, BUFFER_SIZE); + } + + public ReverseFileReader(CloudBlob blob, int bufferSize) throws StorageException { + this.blob = blob; + if (blob.exists()) { + this.fileOffset = (int) blob.getProperties().getLength(); + } else { + this.fileOffset = 0; + } + this.bufferSize = bufferSize; + } + + private void readBlock() throws IOException { + if (buffer == null) { + buffer = new byte[min(fileOffset, bufferSize)]; + } else if (fileOffset < buffer.length) { + buffer = new byte[fileOffset]; + } + + if (buffer.length > 0) { + fileOffset -= buffer.length; + try { + blob.downloadRangeToByteArray(fileOffset, Long.valueOf(buffer.length), buffer, 0); + } catch (StorageException e) { + throw new IOException(e); + } + } + bufferOffset = buffer.length; + } + + private String readUntilNewLine() { + if (bufferOffset == -1) { + return ""; + } + int stop = bufferOffset; + while (--bufferOffset >= 0) { + if (buffer[bufferOffset] == '\n') { + break; + } + } + // bufferOffset points either the previous '\n' character or -1 + return new String(buffer, bufferOffset + 1, stop - bufferOffset - 1, Charset.defaultCharset()); + } + + public String readLine() throws IOException { + if (bufferOffset == -1 && fileOffset == 0) { + return null; + } + + if (buffer == null) { + readBlock(); + } + + List result = new ArrayList<>(1); + while (true) { + result.add(readUntilNewLine()); + if (bufferOffset > -1) { // stopped on the '\n' + break; + } + if (fileOffset == 0) { // reached the beginning of the file + break; + } + readBlock(); + } + Collections.reverse(result); + return String.join("", result); + } +} diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java new file mode 100644 index 0000000000..b06fdac880 --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure.queue; + +import org.apache.jackrabbit.oak.segment.azure.AzureSegmentArchiveEntry; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.UUID; + +public class SegmentWriteAction { + + private final AzureSegmentArchiveEntry indexEntry; + + private final byte[] buffer; + + private final int offset; + + private final int length; + + public SegmentWriteAction(AzureSegmentArchiveEntry indexEntry, byte[] buffer, int offset, int length) { + this.indexEntry = indexEntry; + + this.buffer = new byte[length]; + for (int i = 0; i < length; i++) { + this.buffer[i] = buffer[i + offset]; + } + this.offset = 0; + this.length = length; + } + + public UUID getUuid() { + return new UUID(indexEntry.getMsb(), indexEntry.getLsb()); + } + + public ByteBuffer toByteBuffer() { + return ByteBuffer.wrap(buffer, offset, length); + } + + void passTo(SegmentWriteQueue.SegmentConsumer consumer) throws IOException { + consumer.consume(indexEntry, buffer, offset, length); + } + + @Override + public String toString() { + return getUuid().toString(); + } +} diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java new file mode 100644 index 0000000000..86a7dbdf74 --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure.queue; + +import org.apache.jackrabbit.oak.segment.azure.AzureSegmentArchiveEntry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class SegmentWriteQueue implements Closeable { + + public static final int THREADS = Integer.getInteger("oak.segment.azure.threads", 5); + + private static final int QUEUE_SIZE = Integer.getInteger("oak.segment.org.apache.jackrabbit.oak.segment.azure.queue", 20); + + private static final Logger log = LoggerFactory.getLogger(SegmentWriteQueue.class); + + private final BlockingDeque queue; + + private final Map segmentsByUUID; + + private final ExecutorService executor; + + private final ReadWriteLock flushLock; + + private final SegmentConsumer writer; + + private volatile boolean shutdown; + + private final Object brokenMonitor = new Object(); + + private volatile boolean broken; + + public SegmentWriteQueue(SegmentConsumer writer) { + this(writer, QUEUE_SIZE, THREADS); + } + + SegmentWriteQueue(SegmentConsumer writer, int queueSize, int threadNo) { + this.writer = writer; + segmentsByUUID = new ConcurrentHashMap<>(); + flushLock = new ReentrantReadWriteLock(); + + queue = new LinkedBlockingDeque<>(queueSize); + executor = Executors.newFixedThreadPool(threadNo + 1); + for (int i = 0; i < threadNo; i++) { + executor.submit(this::mainLoop); + } + executor.submit(this::emergencyLoop); + } + + private void mainLoop() { + while (!shutdown) { + try { + waitWhileBroken(); + if (shutdown) { + break; + } + consume(); + } catch (SegmentConsumeException e) { + SegmentWriteAction segment = e.segment; + log.error("Can't persist the segment {}", segment.getUuid(), e.getCause()); + try { + queue.put(segment); + } catch (InterruptedException e1) { + log.error("Can't re-add the segment {} to the queue. It'll be dropped.", segment.getUuid(), e1); + } + } + } + } + + private void consume() throws SegmentConsumeException { + SegmentWriteAction segment = null; + try { + segment = queue.poll(100, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + log.error("Poll from queue interrupted", e); + } + if (segment != null) { + consume(segment); + } + } + + private void consume(SegmentWriteAction segment) throws SegmentConsumeException { + try { + segment.passTo(writer); + } catch (IOException e) { + setBroken(true); + throw new SegmentConsumeException(segment, e); + } + synchronized (segmentsByUUID) { + segmentsByUUID.remove(segment.getUuid()); + segmentsByUUID.notifyAll(); + } + setBroken(false); + } + + private void emergencyLoop() { + while (!shutdown) { + waitUntilBroken(); + if (shutdown) { + break; + } + + boolean success = false; + SegmentWriteAction segmentToRetry = null; + do { + try { + if (segmentToRetry == null) { + consume(); + } else { + consume(segmentToRetry); + } + success = true; + } catch (SegmentConsumeException e) { + segmentToRetry = e.segment; + log.error("Can't persist the segment {}", segmentToRetry.getUuid(), e.getCause()); + try { + Thread.sleep(1000); + } catch (InterruptedException e1) { + log.warn("Interrupted", e); + } + if (shutdown) { + log.error("Shutdown initiated. The segment {} will be dropped.", segmentToRetry.getUuid()); + } + } + } while (!success && !shutdown); + } + } + + public void addToQueue(AzureSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException { + waitWhileBroken(); + if (shutdown) { + throw new IllegalStateException("Can't accept the new segment - shutdown in progress"); + } + + SegmentWriteAction action = new SegmentWriteAction(indexEntry, data, offset, size); + flushLock.readLock().lock(); + try { + segmentsByUUID.put(action.getUuid(), action); + if (!queue.offer(action, 1, TimeUnit.MINUTES)) { + segmentsByUUID.remove(action.getUuid()); + throw new IOException("Can't add segment to the queue"); + } + } catch (InterruptedException e) { + throw new IOException(e); + } finally { + flushLock.readLock().unlock(); + } + } + + public void flush() throws IOException { + flushLock.writeLock().lock(); + try { + synchronized (segmentsByUUID) { + long start = System.currentTimeMillis(); + while (!segmentsByUUID.isEmpty()) { + segmentsByUUID.wait(100); + if (System.currentTimeMillis() - start > TimeUnit.MINUTES.toMillis(1)) { + log.error("Can't flush the queue in 1 minute. Queue: {}. Segment map: {}", queue, segmentsByUUID); + start = System.currentTimeMillis(); + } + } + } + } catch (InterruptedException e) { + throw new IOException(e); + } finally { + flushLock.writeLock().unlock(); + } + } + + public SegmentWriteAction read(UUID id) { + return segmentsByUUID.get(id); + } + + public void close() throws IOException { + shutdown = true; + try { + executor.shutdown(); + if (!executor.awaitTermination(1, TimeUnit.MINUTES)) { + throw new IOException("The write wasn't able to shut down clearly"); + } + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + public boolean isEmpty() { + return segmentsByUUID.isEmpty(); + } + + boolean isBroken() { + return broken; + } + + int getSize() { + return queue.size(); + } + + private void setBroken(boolean broken) { + synchronized (brokenMonitor) { + this.broken = broken; + brokenMonitor.notifyAll(); + } + } + + private void waitWhileBroken() { + if (!broken) { + return; + } + synchronized (brokenMonitor) { + while (broken && !shutdown) { + try { + brokenMonitor.wait(100); + } catch (InterruptedException e) { + log.warn("Interrupted", e); + } + } + } + } + + private void waitUntilBroken() { + if (broken) { + return; + } + synchronized (brokenMonitor) { + while (!broken && !shutdown) { + try { + brokenMonitor.wait(100); + } catch (InterruptedException e) { + log.warn("Interrupted", e); + } + } + } + } + + public interface SegmentConsumer { + + void consume(AzureSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException; + + } + + public static class SegmentConsumeException extends Exception { + + private final SegmentWriteAction segment; + + public SegmentConsumeException(SegmentWriteAction segment, IOException cause) { + super(cause); + this.segment = segment; + } + } +} diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java new file mode 100644 index 0000000000..579e6acf74 --- /dev/null +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.UUID; + +import static com.google.common.collect.Lists.newArrayList; +import static org.junit.Assert.assertEquals; + +public class AzureArchiveManagerTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + @Before + public void setup() throws StorageException, InvalidKeyException, URISyntaxException { + container = azurite.getContainer("oak-test"); + } + + @Test + public void testRecovery() throws StorageException, URISyntaxException, IOException { + SegmentArchiveManager manager = new AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(false, new IOMonitorAdapter(), new FileStoreMonitorAdapter()); + SegmentArchiveWriter writer = manager.create("data00000a.tar"); + + List uuids = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + UUID u = UUID.randomUUID(); + writer.writeSegment(u.getMostSignificantBits(), u.getLeastSignificantBits(), new byte[10], 0, 10, 0, 0, false); + uuids.add(u); + } + + writer.flush(); + writer.close(); + + container.getBlockBlobReference("oak/data00000a.tar/0005." + uuids.get(5).toString()).delete(); + + LinkedHashMap recovered = new LinkedHashMap<>(); + manager.recoverEntries("data00000a.tar", recovered); + assertEquals(uuids.subList(0, 5), newArrayList(recovered.keySet())); + } + +} diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalTest.java new file mode 100644 index 0000000000..6db6a1da8a --- /dev/null +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalTest.java @@ -0,0 +1,45 @@ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.segment.file.GcJournalTest; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Test; + +import java.net.URISyntaxException; +import java.security.InvalidKeyException; + +public class AzureGCJournalTest extends GcJournalTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + @Before + public void setup() throws StorageException, InvalidKeyException, URISyntaxException { + container = azurite.getContainer("oak-test"); + } + + @Override + protected SegmentNodeStorePersistence getPersistence() throws Exception { + return new AzurePersistence(container.getDirectoryReference("oak")); + } + + @Test + @Ignore + @Override + public void testReadOak16GCLog() throws Exception { + super.testReadOak16GCLog(); + } + + @Test + @Ignore + @Override + public void testUpdateOak16GCLog() throws Exception { + super.testUpdateOak16GCLog(); + } +} diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java new file mode 100644 index 0000000000..828cf48b6b --- /dev/null +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader; +import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class AzureJournalFileTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + private AzureJournalFile journal; + + @Before + public void setup() throws StorageException, InvalidKeyException, URISyntaxException { + container = azurite.getContainer("oak-test"); + journal = new AzureJournalFile(container.getDirectoryReference("journal"), "journal.log", 10); + } + + @Test + public void testSplitJournalFiles() throws IOException { + assertFalse(journal.exists()); + + JournalFileWriter writer = journal.openJournalWriter(); + for (int i = 0; i < 100; i++) { + writer.writeLine("line " + i); + } + + assertTrue(journal.exists()); + + writer = journal.openJournalWriter(); + for (int i = 100; i < 200; i++) { + writer.writeLine("line " + i); + } + + JournalFileReader reader = journal.openJournalReader(); + for (int i = 199; i >= 0; i--) { + assertEquals("line " + i, reader.readLine()); + } + + + } + +} diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFileTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFileTest.java new file mode 100644 index 0000000000..89aad51a2e --- /dev/null +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFileTest.java @@ -0,0 +1,44 @@ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.spi.persistence.ManifestFile; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class AzureManifestFileTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + @Before + public void setup() throws StorageException, InvalidKeyException, URISyntaxException { + container = azurite.getContainer("oak-test"); + } + + @Test + public void testManifest() throws URISyntaxException, IOException { + ManifestFile manifestFile = new AzurePersistence(container.getDirectoryReference("oak")).getManifestFile(); + assertFalse(manifestFile.exists()); + + Properties props = new Properties(); + props.setProperty("xyz", "abc"); + props.setProperty("version", "123"); + manifestFile.save(props); + + Properties loaded = manifestFile.load(); + assertEquals(props, loaded); + } + +} diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java new file mode 100644 index 0000000000..bba4713c9f --- /dev/null +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter; +import org.apache.jackrabbit.oak.segment.file.tar.TarFileTest; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; + +public class AzureTarFileTest extends TarFileTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + @Before + @Override + public void setUp() throws IOException { + try { + container = azurite.getContainer("oak-test"); + archiveManager = new AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(true, new IOMonitorAdapter(), new FileStoreMonitorAdapter()); + } catch (StorageException | InvalidKeyException | URISyntaxException e) { + throw new IOException(e); + } + } + + @Override + protected long getWriteAndReadExpectedSize() { + return 45; + } + + @Test + @Ignore + @Override + public void graphShouldBeTrimmedDownOnSweep() throws Exception { + super.graphShouldBeTrimmedDownOnSweep(); + } +} \ No newline at end of file diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java new file mode 100644 index 0000000000..eb9462906b --- /dev/null +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter; +import org.apache.jackrabbit.oak.segment.file.tar.TarFiles; +import org.apache.jackrabbit.oak.segment.file.tar.TarFilesTest; +import org.junit.Before; +import org.junit.ClassRule; + +public class AzureTarFilesTest extends TarFilesTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + @Before + @Override + public void setUp() throws Exception { + container = azurite.getContainer("oak-test"); + tarFiles = TarFiles.builder() + .withDirectory(folder.newFolder()) + .withTarRecovery((id, data, recovery) -> { + // Intentionally left blank + }) + .withIOMonitor(new IOMonitorAdapter()) + .withFileStoreMonitor(new FileStoreMonitorAdapter()) + .withMaxFileSize(MAX_FILE_SIZE) + .withPersistence(new AzurePersistence(container.getDirectoryReference("oak"))) + .build(); + } +} diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java new file mode 100644 index 0000000000..ab4ee852c2 --- /dev/null +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter; +import org.apache.jackrabbit.oak.segment.file.tar.TarWriterTest; +import org.junit.Before; +import org.junit.ClassRule; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; + +public class AzureTarWriterTest extends TarWriterTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + @Before + @Override + public void setUp() throws IOException { + try { + monitor = new TestFileStoreMonitor(); + container = azurite.getContainer("oak-test"); + archiveManager = new AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(true, new IOMonitorAdapter(), monitor); + } catch (StorageException | InvalidKeyException | URISyntaxException e) { + throw new IOException(e); + } + } +} diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzuriteDockerRule.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzuriteDockerRule.java new file mode 100644 index 0000000000..7e957d5fb3 --- /dev/null +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzuriteDockerRule.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.arakelian.docker.junit.DockerRule; +import com.arakelian.docker.junit.model.ImmutableDockerConfig; +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import com.spotify.docker.client.DefaultDockerClient; +import org.junit.Assume; +import org.junit.rules.TestRule; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +import java.net.URISyntaxException; +import java.security.InvalidKeyException; + +public class AzuriteDockerRule implements TestRule { + + private final DockerRule wrappedRule; + + public AzuriteDockerRule() { + wrappedRule = new DockerRule(ImmutableDockerConfig.builder() + .image("trekawek/azurite") + .name("oak-test-azurite") + .ports("10000") + .addStartedListener(container -> { + container.waitForPort("10000/tcp"); + container.waitForLog("Azure Blob Storage Emulator listening on port 10000"); + }) + .addContainerConfigurer(builder -> builder.env("executable=blob")) + .alwaysRemoveContainer(true) + .build()); + + } + + public CloudBlobContainer getContainer(String name) throws URISyntaxException, StorageException, InvalidKeyException { + int mappedPort = wrappedRule.getContainer().getPortBinding("10000/tcp").getPort(); + CloudStorageAccount cloud = CloudStorageAccount.parse("DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:" + mappedPort + "/devstoreaccount1;"); + CloudBlobContainer container = cloud.createCloudBlobClient().getContainerReference(name); + container.deleteIfExists(); + container.create(); + return container; + } + + @Override + public Statement apply(Statement statement, Description description) { + try { + DefaultDockerClient client = DefaultDockerClient.fromEnv().connectTimeoutMillis(5000L).readTimeoutMillis(20000L).build(); + client.ping(); + client.close(); + } catch (Exception e) { + Assume.assumeNoException(e); + } + + return wrappedRule.apply(statement, description); + } +} diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/fixture/SegmentAzureFixture.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/fixture/SegmentAzureFixture.java new file mode 100644 index 0000000000..78f8fbcfd6 --- /dev/null +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/fixture/SegmentAzureFixture.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.segment.azure.fixture; + +import com.google.common.io.Files; +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; +import org.apache.jackrabbit.oak.fixture.NodeStoreFixture; +import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; +import org.apache.jackrabbit.oak.segment.azure.AzurePersistence; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder; +import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException; +import org.apache.jackrabbit.oak.spi.state.NodeStore; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.HashMap; +import java.util.Map; + +public class SegmentAzureFixture extends NodeStoreFixture { + + private static final String AZURE_CONNECTION_STRING = System.getProperty("oak.segment.azure.connection", "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"); + + private static final String AZURE_CONTAINER = System.getProperty("oak.segment.azure.container", "oak"); + + private static final String AZURE_ROOT_PATH = System.getProperty("oak.segment.azure.rootPath", "/oak"); + + private Map fileStoreMap = new HashMap<>(); + + private Map containerMap = new HashMap<>(); + + @Override + public NodeStore createNodeStore() { + AzurePersistence persistence; + CloudBlobContainer container; + try { + CloudStorageAccount cloud = CloudStorageAccount.parse(AZURE_CONNECTION_STRING); + + int i = 1; + while (true) { + String containerName; + if (i == 1) { + containerName = AZURE_CONTAINER; + } else { + containerName = AZURE_CONTAINER + "_" + i; + } + container = cloud.createCloudBlobClient().getContainerReference(containerName); + if (!container.exists()) { + container.create(); + break; + } + i++; + } + CloudBlobDirectory directory = container.getDirectoryReference(AZURE_ROOT_PATH); + persistence = new AzurePersistence(directory); + } catch (StorageException | URISyntaxException | InvalidKeyException e) { + throw new RuntimeException(e); + } + + try { + FileStore fileStore = FileStoreBuilder.fileStoreBuilder(Files.createTempDir()).withCustomPersistence(persistence).build(); + NodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); + fileStoreMap.put(nodeStore, fileStore); + containerMap.put(nodeStore, container); + return nodeStore; + } catch (IOException | InvalidFileStoreVersionException e) { + throw new RuntimeException(e); + } + } + + public void dispose(NodeStore nodeStore) { + FileStore fs = fileStoreMap.remove(nodeStore); + if (fs != null) { + fs.close(); + } + try { + CloudBlobContainer container = containerMap.remove(nodeStore); + if (container != null) { + container.deleteIfExists(); + } + } catch (StorageException e) { + throw new RuntimeException(e); + } + } + + @Override + public String toString() { + return "SegmentAzure"; + } +} diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java new file mode 100644 index 0000000000..fcf766d1a3 --- /dev/null +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure.journal; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudAppendBlob; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.azure.AzuriteDockerRule; +import org.apache.jackrabbit.oak.segment.file.JournalReader; +import org.apache.jackrabbit.oak.segment.file.JournalReaderTest; +import org.apache.jackrabbit.oak.segment.azure.AzureJournalFile; +import org.junit.Before; +import org.junit.ClassRule; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; + +public class AzureJournalReaderTest extends JournalReaderTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + @Before + public void setup() throws StorageException, InvalidKeyException, URISyntaxException { + container = azurite.getContainer("oak-test"); + } + + protected JournalReader createJournalReader(String s) throws IOException { + try { + CloudAppendBlob blob = container.getAppendBlobReference("journal/journal.log.001"); + blob.createOrReplace(); + blob.appendText(s); + return new JournalReader(new AzureJournalFile(container.getDirectoryReference("journal"), "journal.log")); + } catch (StorageException | URISyntaxException e) { + throw new IOException(e); + } + } +} diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureTarRevisionsTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureTarRevisionsTest.java new file mode 100644 index 0000000000..910afe2dbc --- /dev/null +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureTarRevisionsTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure.journal; + +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.segment.azure.AzuriteDockerRule; +import org.apache.jackrabbit.oak.segment.file.TarRevisionsTest; +import org.apache.jackrabbit.oak.segment.azure.AzurePersistence; +import org.junit.Before; +import org.junit.ClassRule; + +import java.io.IOException; +import java.net.URISyntaxException; + +public class AzureTarRevisionsTest extends TarRevisionsTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + @Before + public void setup() throws Exception { + container = azurite.getContainer("oak-test"); + super.setup(); + } + + @Override + protected SegmentNodeStorePersistence getPersistence() throws IOException { + try { + return new AzurePersistence(container.getDirectoryReference("oak")); + } catch (URISyntaxException e) { + throw new IOException(e); + } + } +} diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/ReverseFileReaderTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/ReverseFileReaderTest.java new file mode 100644 index 0000000000..f3d1425a60 --- /dev/null +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/ReverseFileReaderTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure.journal; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudAppendBlob; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.azure.AzuriteDockerRule; +import org.apache.jackrabbit.oak.segment.azure.ReverseFileReader; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +public class ReverseFileReaderTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + @Before + public void setup() throws StorageException, InvalidKeyException, URISyntaxException { + container = azurite.getContainer("oak-test"); + getBlob().createOrReplace(); + } + + private CloudAppendBlob getBlob() throws URISyntaxException, StorageException { + return container.getAppendBlobReference("test-blob"); + } + + @Test + public void testReverseReader() throws IOException, URISyntaxException, StorageException { + List entries = createFile( 1024, 80); + ReverseFileReader reader = new ReverseFileReader(getBlob(), 256); + assertEquals(entries, reader); + } + + @Test + public void testEmptyFile() throws IOException, URISyntaxException, StorageException { + List entries = createFile( 0, 80); + ReverseFileReader reader = new ReverseFileReader(getBlob(), 256); + assertEquals(entries, reader); + } + + @Test + public void test1ByteBlock() throws IOException, URISyntaxException, StorageException { + List entries = createFile( 10, 16); + ReverseFileReader reader = new ReverseFileReader(getBlob(), 1); + assertEquals(entries, reader); + } + + + private List createFile(int lines, int maxLineLength) throws IOException, URISyntaxException, StorageException { + Random random = new Random(); + List entries = new ArrayList<>(); + CloudAppendBlob blob = getBlob(); + for (int i = 0; i < lines; i++) { + int entrySize = random.nextInt(maxLineLength) + 1; + String entry = randomString(entrySize); + try { + blob.appendText(entry + '\n'); + } catch (StorageException e) { + throw new IOException(e); + } + entries.add(entry); + } + + entries.add(""); + Collections.reverse(entries); + return entries; + } + + private static void assertEquals(List entries, ReverseFileReader reader) throws IOException { + int i = entries.size(); + for (String e : entries) { + Assert.assertEquals("line " + (--i), e, reader.readLine()); + } + Assert.assertNull(reader.readLine()); + } + + private static String randomString(int entrySize) { + Random r = new Random(); + + StringBuilder result = new StringBuilder(); + for (int i = 0; i < entrySize; i++) { + result.append((char) ('a' + r.nextInt('z' - 'a'))); + } + + return result.toString(); + } +} diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java new file mode 100644 index 0000000000..54c8329e04 --- /dev/null +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure.queue; + +import org.apache.jackrabbit.oak.segment.azure.AzureSegmentArchiveEntry; +import org.junit.After; +import org.junit.Test; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class SegmentWriteQueueTest { + + private static final byte[] EMPTY_DATA = new byte[0]; + + private SegmentWriteQueue queue; + + @After + public void shutdown() throws IOException { + if (queue != null) { + queue.close(); + } + } + + @Test + public void testQueue() throws IOException, InterruptedException { + Set added = Collections.synchronizedSet(new HashSet<>()); + Semaphore semaphore = new Semaphore(0); + queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> { + try { + semaphore.acquire(); + } catch (InterruptedException e) { + } + added.add(new UUID(tarEntry.getMsb(), tarEntry.getLsb())); + }); + + for (int i = 0; i < 10; i++) { + queue.addToQueue(tarEntry(i), EMPTY_DATA, 0, 0); + } + + for (int i = 0; i < 10; i++) { + assertNotNull("Segments should be available for read", queue.read(uuid(i))); + } + assertFalse("Queue shouldn't be empty", queue.isEmpty()); + + semaphore.release(Integer.MAX_VALUE); + while (!queue.isEmpty()) { + Thread.sleep(10); + } + + assertEquals("There should be 10 segments consumed",10, added.size()); + for (int i = 0; i < 10; i++) { + assertTrue("Missing consumed segment", added.contains(uuid(i))); + } + } + + @Test(timeout = 1000) + public void testFlush() throws IOException, InterruptedException { + Set added = Collections.synchronizedSet(new HashSet<>()); + Semaphore semaphore = new Semaphore(0); + queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> { + try { + semaphore.acquire(); + } catch (InterruptedException e) { + } + added.add(new UUID(tarEntry.getMsb(), tarEntry.getLsb())); + }); + + for (int i = 0; i < 3; i++) { + queue.addToQueue(tarEntry(i), EMPTY_DATA, 0, 0); + } + + AtomicBoolean flushFinished = new AtomicBoolean(false); + Set addedAfterFlush = new HashSet<>(); + new Thread(() -> { + try { + queue.flush(); + flushFinished.set(true); + addedAfterFlush.addAll(added); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }).start(); + + Thread.sleep(100); + assertFalse("Flush should be blocked", flushFinished.get()); + + AtomicBoolean addFinished = new AtomicBoolean(false); + new Thread(() -> { + try { + queue.addToQueue(tarEntry(10), EMPTY_DATA, 0, 0); + addFinished.set(true); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }).start(); + + Thread.sleep(100); + assertFalse("Adding segments should be blocked until the flush is finished", addFinished.get()); + + semaphore.release(Integer.MAX_VALUE); + + while (!addFinished.get()) { + Thread.sleep(10); + } + assertTrue("Flush should be finished once the ", flushFinished.get()); + assertTrue("Adding segments should be blocked until the flush is finished", addFinished.get()); + + for (int i = 0; i < 3; i++) { + assertTrue(addedAfterFlush.contains(uuid(i))); + } + } + + @Test(expected = IllegalStateException.class) + public void testClose() throws IOException, InterruptedException { + queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> {}); + queue.close(); + queue.addToQueue(tarEntry(10), EMPTY_DATA, 0, 0); + } + + @Test + public void testRecoveryMode() throws IOException, InterruptedException { + Set added = Collections.synchronizedSet(new HashSet<>()); + Semaphore semaphore = new Semaphore(0); + AtomicBoolean doBreak = new AtomicBoolean(true); + List writeAttempts = Collections.synchronizedList(new ArrayList<>()); + queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> { + writeAttempts.add(System.currentTimeMillis()); + try { + semaphore.acquire(); + } catch (InterruptedException e) { + } + if (doBreak.get()) { + throw new IOException(); + } + added.add(new UUID(tarEntry.getMsb(), tarEntry.getLsb())); + }); + + for (int i = 0; i < 10; i++) { + queue.addToQueue(tarEntry(i), EMPTY_DATA, 0, 0); + } + + semaphore.release(Integer.MAX_VALUE); + Thread.sleep(100); + + assertTrue(queue.isBroken()); + assertEquals(9, queue.getSize()); // the 10th segment is handled by the recovery thread + + writeAttempts.clear(); + while (writeAttempts.size() < 5) { + Thread.sleep(100); + } + long lastAttempt = writeAttempts.get(0); + for (int i = 1; i < 5; i++) { + long delay = writeAttempts.get(i) - lastAttempt; + assertTrue("The delay between attempts to persist segment should be larger than 1s. Actual: " + delay, delay >= 1000); + lastAttempt = writeAttempts.get(i); + } + + AtomicBoolean addFinished = new AtomicBoolean(false); + new Thread(() -> { + try { + queue.addToQueue(tarEntry(10), EMPTY_DATA, 0, 0); + addFinished.set(true); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }).start(); + + Thread.sleep(100); + assertFalse("Adding segments should be blocked until the recovery mode is finished", addFinished.get()); + + doBreak.set(false); + while (queue.isBroken()) { + Thread.sleep(10); + } + assertFalse("Queue shouldn't be broken anymore", queue.isBroken()); + + while (added.size() < 11) { + Thread.sleep(10); + } + assertEquals("All segments should be consumed",11, added.size()); + for (int i = 0; i < 11; i++) { + assertTrue("All segments should be consumed", added.contains(uuid(i))); + } + + int i = writeAttempts.size() - 10; + lastAttempt = writeAttempts.get(i); + for (; i < writeAttempts.size(); i++) { + long delay = writeAttempts.get(i) - lastAttempt; + assertTrue("Segments should be persisted immediately", delay < 1000); + lastAttempt = writeAttempts.get(i); + } + } + + private static AzureSegmentArchiveEntry tarEntry(long i) { + return new AzureSegmentArchiveEntry(0, i, 0, 0, 0, 0, false); + } + + private static UUID uuid(long i) { + return new UUID(0, i); + } + +} diff --git a/oak-segment-azure/start-azurite.sh b/oak-segment-azure/start-azurite.sh new file mode 100755 index 0000000000..06e5e61674 --- /dev/null +++ b/oak-segment-azure/start-azurite.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +docker run -e executable=blob --rm -t -p 10000:10000 trekawek/azurite diff --git a/pom.xml b/pom.xml index 6443bc6f36..4ecf979ea7 100644 --- a/pom.xml +++ b/pom.xml @@ -70,6 +70,7 @@ oak-examples oak-it oak-segment-tar + oak-segment-azure oak-benchmarks