diff --git a/oak-segment-azure/pom.xml b/oak-segment-azure/pom.xml new file mode 100644 index 0000000000..beca0095ca --- /dev/null +++ b/oak-segment-azure/pom.xml @@ -0,0 +1,152 @@ + + + + + + 4.0.0 + + + org.apache.jackrabbit + oak-parent + 1.10-SNAPSHOT + ../oak-parent/pom.xml + + + oak-segment-azure + bundle + + Oak Segment Azure + + + + + org.apache.felix + maven-bundle-plugin + + + + org.apache.jackrabbit.oak.segment.azure + + + azure-storage, + azure-keyvault-core + + + + + + baseline + + baseline + + pre-integration-test + + + true + + + + + + + + + + + + + + org.osgi + org.osgi.core + provided + + + org.osgi + org.osgi.compendium + provided + + + org.apache.felix + org.apache.felix.scr.annotations + provided + + + + + org.apache.jackrabbit + oak-segment-tar + ${project.version} + provided + + + org.apache.jackrabbit + oak-store-spi + ${project.version} + provided + + + + + com.microsoft.azure + azure-storage + 5.0.0 + + + com.microsoft.azure + azure-keyvault-core + 0.9.7 + + + + + org.apache.jackrabbit + oak-segment-tar + ${project.version} + tests + test + + + org.apache.jackrabbit + oak-store-spi + ${project.version} + test-jar + test + + + org.mockito + mockito-core + 1.10.19 + test + + + junit + junit + test + + + com.arakelian + docker-junit-rule + 2.1.0 + test + + + + diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java new file mode 100644 index 0000000000..7c8058d6d9 --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.BlobListingDetails; +import com.microsoft.azure.storage.blob.CloudBlob; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import com.microsoft.azure.storage.blob.CopyStatus; +import org.apache.jackrabbit.oak.segment.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.file.tar.FileStoreMonitor; +import org.apache.jackrabbit.oak.segment.file.tar.IOMonitor; +import org.apache.jackrabbit.oak.segment.file.tar.index.Index; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.UUID; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.getName; + +public class AzureArchiveManager implements SegmentArchiveManager { + + private static final Logger log = LoggerFactory.getLogger(AzureSegmentArchiveReader.class); + + private final CloudBlobDirectory cloudBlobDirectory; + + private final IOMonitor ioMonitor; + + private final FileStoreMonitor monitor; + + public AzureArchiveManager(CloudBlobDirectory cloudBlobDirectory, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor) { + this.cloudBlobDirectory = cloudBlobDirectory; + this.ioMonitor = ioMonitor; + this.monitor = fileStoreMonitor; + } + + @Override + public List listArchives() throws IOException { + try { + return StreamSupport.stream(cloudBlobDirectory + .listBlobs(null, false, EnumSet.noneOf(BlobListingDetails.class), null, null) + .spliterator(), false) + .filter(i -> i instanceof CloudBlobDirectory) + .map(i -> (CloudBlobDirectory) i) + .map(CloudBlobDirectory::getPrefix) + .map(Paths::get) + .map(Path::getFileName) + .map(Path::toString) + .collect(Collectors.toList()); + } catch (URISyntaxException | StorageException e) { + throw new IOException(e); + } + } + + @Override + public SegmentArchiveReader open(String archiveName) throws IOException { + CloudBlobDirectory archiveDirectory = getDirectory(archiveName); + Index index = AzureSegmentArchiveReader.loadAndValidateIndex(archiveDirectory); + if (index == null) { + return null; + } + return new AzureSegmentArchiveReader(archiveDirectory, ioMonitor, monitor, index); + } + + @Override + public SegmentArchiveWriter create(String archiveName) throws IOException { + return new AzureSegmentArchiveWriter(getDirectory(archiveName), ioMonitor, monitor); + } + + @Override + public boolean delete(String archiveName) { + try { + getBlobs(archiveName) + .forEach(cloudBlob -> { + try { + cloudBlob.delete(); + } catch (StorageException e) { + log.error("Can't delete segment {}", cloudBlob.getUri().getPath(), e); + } + }); + return true; + } catch (IOException e) { + log.error("Can't delete archive {}", archiveName, e); + return false; + } + } + + @Override + public boolean renameTo(String from, String to) { + try { + CloudBlobDirectory targetDirectory = getDirectory(to); + getBlobs(from) + .forEach(cloudBlob -> { + try { + renameBlob(cloudBlob, targetDirectory); + } catch (IOException e) { + log.error("Can't rename segment {}", cloudBlob.getUri().getPath(), e); + } + }); + return true; + } catch (IOException e) { + log.error("Can't rename archive {} to {}", from, to, e); + return false; + } + } + + @Override + public void copyFile(String from, String to) throws IOException { + CloudBlobDirectory targetDirectory = getDirectory(to); + getBlobs(from) + .forEach(cloudBlob -> { + try { + copyBlob(cloudBlob, targetDirectory); + } catch (IOException e) { + log.error("Can't copy segment {}", cloudBlob.getUri().getPath(), e); + } + }); + } + + @Override + public boolean exists(String archiveName) { + try { + return listArchives().contains(archiveName); + } catch (IOException e) { + log.error("Can't check the existence of {}", archiveName, e); + return false; + } + } + + @Override + public void recoverEntries(String archiveName, LinkedHashMap entries) throws IOException { + Pattern pattern = Pattern.compile(AzureUtilities.SEGMENT_FILE_NAME_PATTERN); + List entryList = new ArrayList<>(); + + for (CloudBlob b : getBlobList(archiveName)) { + String name = getName(b); + Matcher m = pattern.matcher(name); + if (!m.matches()) { + continue; + } + int position = Integer.parseInt(m.group(1), 16); + UUID uuid = UUID.fromString(m.group(2)); + long length = b.getProperties().getLength(); + if (length > 0) { + byte[] data = new byte[(int) length]; + try { + b.downloadToByteArray(data, 0); + } catch (StorageException e) { + throw new IOException(e); + } + entryList.add(new RecoveredEntry(position, uuid, data, name)); + } + } + Collections.sort(entryList); + + int i = 0; + for (RecoveredEntry e : entryList) { + if (e.position != i) { + log.warn("Missing entry {}.??? when recovering {}. No more segments will be read.", String.format("%04X", i), archiveName); + break; + } + log.info("Recovering segment {}/{}", archiveName, e.fileName); + entries.put(e.uuid, e.data); + i++; + } + } + + + private CloudBlobDirectory getDirectory(String archiveName) throws IOException { + try { + return cloudBlobDirectory.getDirectoryReference(archiveName); + } catch (URISyntaxException e) { + throw new IOException(e); + } + } + + private Stream getBlobs(String archiveName) throws IOException { + return AzureUtilities.getBlobs(getDirectory(archiveName)); + } + + private List getBlobList(String archiveName) throws IOException { + return getBlobs(archiveName).collect(Collectors.toList()); + } + + private void renameBlob(CloudBlob blob, CloudBlobDirectory newParent) throws IOException { + copyBlob(blob, newParent); + try { + blob.delete(); + } catch (StorageException e) { + throw new IOException(e); + } + } + + private void copyBlob(CloudBlob blob, CloudBlobDirectory newParent) throws IOException { + checkArgument(blob instanceof CloudBlockBlob, "Only page blobs are supported for the rename"); + try { + String blobName = getName(blob); + CloudBlockBlob newBlob = newParent.getBlockBlobReference(blobName); + newBlob.startCopy(blob.getUri()); + while (newBlob.getCopyState().getStatus() == CopyStatus.PENDING) { + Thread.sleep(100); + } + + CopyStatus finalStatus = newBlob.getCopyState().getStatus(); + if (newBlob.getCopyState().getStatus() != CopyStatus.SUCCESS) { + throw new IOException("Invalid copy status for " + blob.getUri().getPath() + ": " + finalStatus); + } + } catch (StorageException | InterruptedException | URISyntaxException e) { + throw new IOException(e); + } + } + + private static class RecoveredEntry implements Comparable { + + private final byte[] data; + + private final UUID uuid; + + private final int position; + + private final String fileName; + + public RecoveredEntry(int position, UUID uuid, byte[] data, String fileName) { + this.data = data; + this.uuid = uuid; + this.position = position; + this.fileName = fileName; + } + + @Override + public int compareTo(RecoveredEntry o) { + return Integer.compare(this.position, o.position); + } + } + +} diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalFile.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalFile.java new file mode 100644 index 0000000000..851da686de --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalFile.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.google.common.base.Charsets; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudAppendBlob; +import org.apache.commons.io.IOUtils; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.Collections; +import java.util.List; + +public class AzureGCJournalFile implements SegmentNodeStorePersistence.GCJournalFile { + + private final CloudAppendBlob gcJournal; + + public AzureGCJournalFile(CloudAppendBlob gcJournal) { + this.gcJournal = gcJournal; + } + + @Override + public void writeLine(String line) throws IOException { + try { + if (!gcJournal.exists()) { + gcJournal.createOrReplace(); + } + gcJournal.appendText(line + "\n", Charsets.UTF_8.name(), null, null, null); + } catch (StorageException e) { + throw new IOException(e); + } + } + + @Override + public List readLines() throws IOException { + try { + if (!gcJournal.exists()) { + return Collections.emptyList(); + } + byte[] data = new byte[(int) gcJournal.getProperties().getLength()]; + gcJournal.downloadToByteArray(data, 0); + return IOUtils.readLines(new ByteArrayInputStream(data), Charset.defaultCharset()); + } catch (StorageException e) { + throw new IOException(e); + } + } +} diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java new file mode 100644 index 0000000000..8c969e26e3 --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudAppendBlob; +import com.microsoft.azure.storage.blob.CloudBlob; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; +import com.microsoft.azure.storage.blob.ListBlobItem; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class AzureJournalFile implements SegmentNodeStorePersistence.JournalFile { + + private static final Logger log = LoggerFactory.getLogger(AzureJournalFile.class); + + private static final int JOURNAL_LINE_LIMIT = Integer.getInteger("org.apache.jackrabbit.oak.segment.azure.journal.lines", 40_000); + + private final CloudBlobDirectory directory; + + private final String journalNamePrefix; + + private final int lineLimit; + + AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix, int lineLimit) { + this.directory = directory; + this.journalNamePrefix = journalNamePrefix; + this.lineLimit = lineLimit; + } + + public AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix) { + this(directory, journalNamePrefix, JOURNAL_LINE_LIMIT); + } + + @Override + public SegmentNodeStorePersistence.JournalFileReader openJournalReader() throws IOException { + return new CombinedReader(getJournalBlobs()); + } + + @Override + public SegmentNodeStorePersistence.JournalFileWriter openJournalWriter() throws IOException { + return new AzureJournalWriter(); + } + + @Override + public String getName() { + return journalNamePrefix; + } + + @Override + public boolean exists() { + try { + return !getJournalBlobs().isEmpty(); + } catch (IOException e) { + log.error("Can't check if the file exists", e); + return false; + } + } + + private String getJournalFileName(int index) { + return String.format("%s.%03d", journalNamePrefix, index); + } + + private List getJournalBlobs() throws IOException { + try { + List result = new ArrayList<>(); + for (ListBlobItem b : directory.listBlobs(journalNamePrefix)) { + if (b instanceof CloudAppendBlob) { + result.add((CloudAppendBlob) b); + } else { + log.warn("Invalid blob type: {} {}", b.getUri(), b.getClass()); + } + } + result.sort(Comparator.comparing(AzureUtilities::getName).reversed()); + return result; + } catch (URISyntaxException | StorageException e) { + throw new IOException(e); + } + } + + private static class AzureJournalReader implements SegmentNodeStorePersistence.JournalFileReader { + + private final CloudBlob blob; + + private ReverseFileReader reader; + + private AzureJournalReader(CloudBlob blob) { + this.blob = blob; + } + + @Override + public String readLine() throws IOException { + if (reader == null) { + try { + reader = new ReverseFileReader(blob); + } catch (StorageException e) { + throw new IOException(e); + } + } + return reader.readLine(); + } + + @Override + public void close() throws IOException { + } + } + + private class AzureJournalWriter implements SegmentNodeStorePersistence.JournalFileWriter { + + private CloudAppendBlob currentBlob; + + private int blockCount; + + public AzureJournalWriter() throws IOException { + List blobs = getJournalBlobs(); + if (blobs.isEmpty()) { + try { + currentBlob = directory.getAppendBlobReference(getJournalFileName(1)); + currentBlob.createOrReplace(); + } catch (URISyntaxException | StorageException e) { + throw new IOException(e); + } + } else { + currentBlob = blobs.get(0); + } + Integer bc = currentBlob.getProperties().getAppendBlobCommittedBlockCount(); + blockCount = bc == null ? 0 : bc; + } + + @Override + public void truncate() throws IOException { + try { + for (CloudAppendBlob cloudAppendBlob : getJournalBlobs()) { + cloudAppendBlob.delete(); + } + } catch (StorageException e) { + throw new IOException(e); + } + } + + @Override + public void writeLine(String line) throws IOException { + if (blockCount >= lineLimit) { + createNewFile(); + } + try { + currentBlob.appendText(line + "\n"); + blockCount++; + } catch (StorageException e) { + throw new IOException(e); + } + } + + private void createNewFile() throws IOException { + String name = AzureUtilities.getName(currentBlob); + Pattern pattern = Pattern.compile(Pattern.quote(journalNamePrefix) + "\\.(\\d+)" ); + Matcher matcher = pattern.matcher(name); + int parsedSuffix; + if (matcher.find()) { + String suffix = matcher.group(1); + try { + parsedSuffix = Integer.parseInt(suffix); + } catch (NumberFormatException e) { + log.warn("Can't parse suffix for journal file {}", name); + parsedSuffix = 0; + } + } else { + log.warn("Can't parse journal file name {}", name); + parsedSuffix = 0; + } + try { + currentBlob = directory.getAppendBlobReference(getJournalFileName(parsedSuffix + 1)); + currentBlob.createOrReplace(); + blockCount = 0; + } catch (URISyntaxException | StorageException e) { + throw new IOException(e); + } + } + + @Override + public void close() throws IOException { + // do nothing + } + } + + private static class CombinedReader implements SegmentNodeStorePersistence.JournalFileReader { + + private final Iterator readers; + + private SegmentNodeStorePersistence.JournalFileReader currentReader; + + private CombinedReader(List blobs) { + readers = blobs.stream().map(AzureJournalReader::new).iterator(); + } + + @Override + public String readLine() throws IOException { + String line; + do { + if (currentReader == null) { + if (!readers.hasNext()) { + return null; + } + currentReader = readers.next(); + } + do { + line = currentReader.readLine(); + } while ("".equals(line)); + if (line == null) { + currentReader.close(); + currentReader = null; + } + } while (line == null); + return line; + } + + @Override + public void close() throws IOException { + while (readers.hasNext()) { + readers.next().close(); + } + if (currentReader != null) { + currentReader.close(); + currentReader = null; + } + } + } +} \ No newline at end of file diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFile.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFile.java new file mode 100644 index 0000000000..b9174078d9 --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFile.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +public class AzureManifestFile implements SegmentNodeStorePersistence.ManifestFile { + + private static final Logger log = LoggerFactory.getLogger(AzureManifestFile.class); + + private final CloudBlockBlob manifestBlob; + + public AzureManifestFile(CloudBlockBlob manifestBlob) { + this.manifestBlob = manifestBlob; + } + + @Override + public boolean exists() { + try { + return manifestBlob.exists(); + } catch (StorageException e) { + log.error("Can't check if the manifest exists", e); + return false; + } + } + + @Override + public Properties load() throws IOException { + Properties properties = new Properties(); + if (exists()) { + long length = manifestBlob.getProperties().getLength(); + byte[] data = new byte[(int) length]; + try { + manifestBlob.downloadToByteArray(data, 0); + } catch (StorageException e) { + throw new IOException(e); + } + properties.load(new ByteArrayInputStream(data)); + } + return properties; + } + + @Override + public void save(Properties properties) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + properties.store(bos, null); + + byte[] data = bos.toByteArray(); + try { + manifestBlob.uploadFromByteArray(data, 0, data.length); + } catch (StorageException e) { + throw new IOException(e); + } + } +} diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java new file mode 100644 index 0000000000..8a4de9615c --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.BlobListingDetails; +import com.microsoft.azure.storage.blob.CloudAppendBlob; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import com.microsoft.azure.storage.blob.ListBlobItem; +import org.apache.jackrabbit.oak.segment.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.segment.file.tar.FileStoreMonitor; +import org.apache.jackrabbit.oak.segment.file.tar.IOMonitor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Paths; +import java.util.EnumSet; + +public class AzurePersistence implements SegmentNodeStorePersistence { + + private static final Logger log = LoggerFactory.getLogger(AzurePersistence.class); + + private final CloudBlobDirectory segmentstoreDirectory; + + public AzurePersistence(CloudBlobDirectory segmentstoreDirectory) { + this.segmentstoreDirectory = segmentstoreDirectory; + } + + @Override + public SegmentArchiveManager createArchiveManager(boolean mmap, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor) { + return new AzureArchiveManager(segmentstoreDirectory, ioMonitor, fileStoreMonitor); + } + + @Override + public boolean segmentFilesExist() { + try { + for (ListBlobItem i : segmentstoreDirectory.listBlobs(null, false, EnumSet.noneOf(BlobListingDetails.class), null, null)) { + if (i instanceof CloudBlobDirectory) { + CloudBlobDirectory dir = (CloudBlobDirectory) i; + String name = Paths.get(dir.getPrefix()).getFileName().toString(); + if (name.endsWith(".tar")) { + return true; + } + } + } + return false; + } catch (StorageException | URISyntaxException e) { + log.error("Can't check if the segment archives exists", e); + return false; + } + } + + @Override + public JournalFile getJournalFile() { + return new AzureJournalFile(segmentstoreDirectory, "journal.log"); + } + + @Override + public GCJournalFile getGCJournalFile() throws IOException { + return new AzureGCJournalFile(getAppendBlob("gc.log")); + } + + @Override + public ManifestFile getManifestFile() throws IOException { + return new AzureManifestFile(getBlockBlob("manifest")); + } + + @Override + public RepositoryLock lockRepository() throws IOException { + return new AzureRepositoryLock(getBlockBlob("repo.lock"), new Runnable() { + @Override + public void run() { + log.warn("Lost connection to the Azure. The client will be closed."); + // TODO close the connection + } + }).lock(); + } + + private CloudBlockBlob getBlockBlob(String path) throws IOException { + try { + return segmentstoreDirectory.getBlockBlobReference(path); + } catch (URISyntaxException | StorageException e) { + throw new IOException(e); + } + } + + private CloudAppendBlob getAppendBlob(String path) throws IOException { + try { + return segmentstoreDirectory.getAppendBlobReference(path); + } catch (URISyntaxException | StorageException e) { + throw new IOException(e); + } + } + +} diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java new file mode 100644 index 0000000000..41f3d54ed6 --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.AccessCondition; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public class AzureRepositoryLock implements AzurePersistence.RepositoryLock { + + private static final Logger log = LoggerFactory.getLogger(AzureRepositoryLock.class); + + private static int INTERVAL = 60; + + private final Runnable shutdownHook; + + private final CloudBlockBlob blob; + + private final ExecutorService executor; + + private String leaseId; + + private volatile boolean doUpdate; + + public AzureRepositoryLock(CloudBlockBlob blob, Runnable shutdownHook) { + this.shutdownHook = shutdownHook; + this.blob = blob; + this.executor = Executors.newSingleThreadExecutor(); + } + + public AzureRepositoryLock lock() throws IOException { + try { + blob.openOutputStream().close(); + leaseId = blob.acquireLease(INTERVAL, null); + log.info("Acquired lease {}", leaseId); + } catch (StorageException e) { + throw new IOException(e); + } + executor.submit(this::refreshLease); + return this; + } + + private void refreshLease() { + doUpdate = true; + long lastUpdate = 0; + while (doUpdate) { + try { + long timeSinceLastUpdate = (System.currentTimeMillis() - lastUpdate) / 1000; + if (timeSinceLastUpdate > INTERVAL / 2) { + blob.renewLease(AccessCondition.generateLeaseCondition(leaseId)); + lastUpdate = System.currentTimeMillis(); + } + } catch (StorageException e) { + log.error("Can't renew the lease", e); + shutdownHook.run(); + doUpdate = false; + return; + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + log.error("Interrupted the lease renewal loop", e); + } + } + } + + @Override + public void unlock() throws IOException { + doUpdate = false; + executor.shutdown(); + try { + executor.awaitTermination(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + throw new IOException(e); + } finally { + releaseLease(); + } + } + + private void releaseLease() throws IOException { + try { + blob.releaseLease(AccessCondition.generateLeaseCondition(leaseId)); + blob.delete(); + log.info("Released lease {}", leaseId); + } catch (StorageException e) { + throw new IOException(e); + } + } +} diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java new file mode 100644 index 0000000000..248bec2722 --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.google.common.base.Stopwatch; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.BlobProperties; +import com.microsoft.azure.storage.blob.CloudBlob; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import org.apache.jackrabbit.oak.segment.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.file.tar.FileStoreMonitor; +import org.apache.jackrabbit.oak.segment.file.tar.GraphLoader; +import org.apache.jackrabbit.oak.segment.file.tar.IOMonitor; +import org.apache.jackrabbit.oak.segment.file.tar.binaries.BinaryReferencesIndex; +import org.apache.jackrabbit.oak.segment.file.tar.binaries.BinaryReferencesIndexLoader; +import org.apache.jackrabbit.oak.segment.file.tar.binaries.InvalidBinaryReferencesIndexException; +import org.apache.jackrabbit.oak.segment.file.tar.index.Index; +import org.apache.jackrabbit.oak.segment.file.tar.index.IndexEntry; +import org.apache.jackrabbit.oak.segment.file.tar.index.IndexLoader; +import org.apache.jackrabbit.oak.segment.file.tar.index.InvalidIndexException; +import org.apache.jackrabbit.oak.segment.util.ReaderAtEnd; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.jackrabbit.oak.segment.file.tar.TarConstants.BLOCK_SIZE; +import static org.apache.jackrabbit.oak.segment.file.tar.index.IndexLoader.newIndexLoader; +import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.getSegmentFileName; +import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.readBufferFully; + +public class AzureSegmentArchiveReader implements SegmentArchiveManager.SegmentArchiveReader { + + private static final Logger log = LoggerFactory.getLogger(AzureSegmentArchiveReader.class); + + private static final IndexLoader indexLoader = newIndexLoader(BLOCK_SIZE); + + private final CloudBlobDirectory archiveDirectory; + + private final IOMonitor ioMonitor; + + private final FileStoreMonitor monitor; + + private final Index index; + + private final long length; + + private Boolean hasGraph; + + AzureSegmentArchiveReader(CloudBlobDirectory archiveDirectory, IOMonitor ioMonitor, FileStoreMonitor monitor, Index index) throws IOException { + this.archiveDirectory = archiveDirectory; + this.ioMonitor = ioMonitor; + this.monitor = monitor; + this.index = index; + this.length = AzureUtilities.getBlobs(archiveDirectory) + .map(CloudBlob::getProperties) + .mapToLong(BlobProperties::getLength) + .sum(); + } + + @Override + public ByteBuffer readSegment(long msb, long lsb) throws IOException { + int i = index.findEntry(msb, lsb); + if (i == -1) { + return null; + } + IndexEntry entry = index.entry(i); + + ByteBuffer buffer = ByteBuffer.allocate(entry.getLength()); + ioMonitor.beforeSegmentRead(pathAsFile(), msb, lsb, entry.getLength()); + Stopwatch stopwatch = Stopwatch.createStarted(); + readBufferFully(getBlob(getSegmentFileName(entry)), buffer); + long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); + ioMonitor.afterSegmentRead(pathAsFile(), msb, lsb, entry.getLength(), elapsed); + return buffer; + } + + @Override + public Index getIndex() { + return index; + } + + @Override + public Map> getGraph() throws IOException { + ByteBuffer graph = loadGraph(); + if (graph == null) { + return null; + } else { + return GraphLoader.parseGraph(graph); + } + } + private ByteBuffer loadGraph() throws IOException { + ByteBuffer graph = GraphLoader.loadGraph(openAsReaderAtEnd(getName() + ".gph")); + hasGraph = graph != null; + return graph; + } + + @Override + public boolean hasGraph() { + if (hasGraph == null) { + try { + loadGraph(); + } catch (IOException ignore) { } + } + return hasGraph; + } + + @Override + public BinaryReferencesIndex getBinaryReferences() { + BinaryReferencesIndex index = null; + try { + index = loadBinaryReferences(); + } catch (InvalidBinaryReferencesIndexException | IOException e) { + log.warn("Exception while loading binary reference", e); + } + return index; + } + + private BinaryReferencesIndex loadBinaryReferences() throws IOException, InvalidBinaryReferencesIndexException { + return BinaryReferencesIndexLoader.loadBinaryReferencesIndex(openAsReaderAtEnd(getName() + ".brf")); + } + + + @Override + public long length() { + return length; + } + + @Override + public String getName() { + return AzureUtilities.getName(archiveDirectory); + } + + @Override + public void close() throws IOException { + // do nothing + } + + @Override + public int getEntrySize(int size) { + return size; + } + + private ReaderAtEnd openAsReaderAtEnd(String name) throws IOException { + return openAsReaderAtEnd(getBlob(name)); + } + + private static ReaderAtEnd openAsReaderAtEnd(CloudBlob cloudBlob) throws IOException { + try { + if (!cloudBlob.exists()) { + return null; + } + int length = (int) cloudBlob.getProperties().getLength(); + ByteBuffer buffer = ByteBuffer.allocate(length); + cloudBlob.downloadToByteArray(buffer.array(), 0); + + return (whence, amount) -> { + ByteBuffer result = buffer.duplicate(); + result.position(length - whence); + result.limit(length - whence + amount); + return result.slice(); + }; + } catch (StorageException e) { + throw new IOException(e); + } + } + + private File pathAsFile() { + return new File(archiveDirectory.getUri().getPath()); + } + + public static Index loadAndValidateIndex(CloudBlobDirectory archiveDirectory) throws IOException { + CloudBlockBlob blob; + try { + blob = archiveDirectory.getBlockBlobReference(AzureUtilities.getName(archiveDirectory) + ".idx"); + } catch (StorageException | URISyntaxException e) { + log.error("Can't open index", e); + return null; + } + ReaderAtEnd reader = openAsReaderAtEnd(blob); + if (reader == null) { + return null; + } else { + try { + return indexLoader.loadIndex(reader); + } catch (InvalidIndexException e) { + log.warn("Can't open index file: {}", blob.getUri().getPath(), e); + return null; + } + } + } + + private CloudBlockBlob getBlob(String name) throws IOException { + try { + return archiveDirectory.getBlockBlobReference(name); + } catch (URISyntaxException | StorageException e) { + throw new IOException(e); + } + } +} diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java new file mode 100644 index 0000000000..9b337348dc --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.google.common.base.Stopwatch; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import org.apache.jackrabbit.oak.segment.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.file.tar.FileStoreMonitor; +import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration; +import org.apache.jackrabbit.oak.segment.file.tar.IOMonitor; +import org.apache.jackrabbit.oak.segment.file.tar.TarEntry; +import org.apache.jackrabbit.oak.segment.azure.queue.SegmentWriteAction; +import org.apache.jackrabbit.oak.segment.azure.queue.SegmentWriteQueue; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.jackrabbit.oak.segment.file.tar.TarConstants.BLOCK_SIZE; +import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.getSegmentFileName; +import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.readBufferFully; + +public class AzureSegmentArchiveWriter implements SegmentArchiveManager.SegmentArchiveWriter { + + private final CloudBlobDirectory archiveDirectory; + + private final IOMonitor ioMonitor; + + private final FileStoreMonitor monitor; + + private final Optional queue; + + private int entries; + + private long totalLength; + + private volatile boolean created = false; + + public AzureSegmentArchiveWriter(CloudBlobDirectory archiveDirectory, IOMonitor ioMonitor, FileStoreMonitor monitor) { + this.archiveDirectory = archiveDirectory; + this.ioMonitor = ioMonitor; + this.monitor = monitor; + this.queue = SegmentWriteQueue.THREADS > 0 ? Optional.of(new SegmentWriteQueue(this::doWriteEntry)) : Optional.empty(); + } + + @Override + public TarEntry writeSegment(long msb, long lsb, byte[] data, int offset, int size, GCGeneration generation) throws IOException { + created = true; + + TarEntry tarEntry = new TarEntry(msb, lsb, (entries++) * BLOCK_SIZE, size, generation); + if (queue.isPresent()) { + queue.get().addToQueue(tarEntry, data, offset, size); + } else { + doWriteEntry(tarEntry, data, offset, size); + } + totalLength += size; + monitor.written(size); + return tarEntry; + } + + private void doWriteEntry(TarEntry tarEntry, byte[] data, int offset, int size) throws IOException { + long msb = tarEntry.msb(); + long lsb = tarEntry.lsb(); + ioMonitor.beforeSegmentWrite(pathAsFile(), msb, lsb, size); + Stopwatch stopwatch = Stopwatch.createStarted(); + try { + CloudBlockBlob blob = getBlob(getSegmentFileName(tarEntry)); + blob.uploadFromByteArray(data, offset, size); + } catch (StorageException e) { + throw new IOException(e); + } + ioMonitor.afterSegmentWrite(pathAsFile(), msb, lsb, size, stopwatch.elapsed(TimeUnit.NANOSECONDS)); + } + + @Override + public ByteBuffer readSegment(TarEntry tarEntry) throws IOException { + UUID uuid = new UUID(tarEntry.msb(), tarEntry.lsb()); + Optional segment = queue.map(q -> q.read(uuid)); + if (segment.isPresent()) { + return segment.get().toByteBuffer(); + } + ByteBuffer buffer = ByteBuffer.allocate(tarEntry.size()); + readBufferFully(getBlob(getSegmentFileName(tarEntry)), buffer); + return buffer; + } + + @Override + public void writeIndex(byte[] data) throws IOException { + writeDataFile(data, ".idx"); + } + + @Override + public void writeGraph(byte[] data) throws IOException { + writeDataFile(data, ".gph"); + } + + @Override + public void writeBinaryReferences(byte[] data) throws IOException { + writeDataFile(data, ".brf"); + } + + private void writeDataFile(byte[] data, String extension) throws IOException { + try { + getBlob(getName() + extension).uploadFromByteArray(data, 0, data.length); + } catch (StorageException e) { + throw new IOException(e); + } + totalLength += data.length; + monitor.written(data.length); + } + + @Override + public long getLength() { + return totalLength; + } + + @Override + public void close() throws IOException { + if (queue.isPresent()) { // required to handle IOException + SegmentWriteQueue q = queue.get(); + q.flush(); + q.close(); + } + } + + @Override + public boolean isCreated() { + return created || !queueIsEmpty(); + } + + @Override + public void flush() throws IOException { + if (queue.isPresent()) { // required to handle IOException + queue.get().flush(); + } + } + + private boolean queueIsEmpty() { + return queue.map(SegmentWriteQueue::isEmpty).orElse(true); + } + + @Override + public String getName() { + return AzureUtilities.getName(archiveDirectory); + } + + private File pathAsFile() { + return new File(archiveDirectory.getUri().getPath()); + } + + private CloudBlockBlob getBlob(String name) throws IOException { + try { + return archiveDirectory.getBlockBlobReference(name); + } catch (URISyntaxException | StorageException e) { + throw new IOException(e); + } + } +} diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentStoreService.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentStoreService.java new file mode 100644 index 0000000000..9c4ae1ade4 --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentStoreService.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.ConfigurationPolicy; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Property; +import org.apache.jackrabbit.oak.commons.PropertiesUtil; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence; +import org.osgi.framework.ServiceRegistration; +import org.osgi.service.component.ComponentContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.Properties; + +import static org.apache.jackrabbit.oak.osgi.OsgiUtil.lookupConfigurationThenFramework; +import static org.apache.jackrabbit.oak.segment.azure.AzureSegmentStoreService.AZURE_ACCESS_KEY; +import static org.apache.jackrabbit.oak.segment.azure.AzureSegmentStoreService.AZURE_ACCOUNT_NAME; +import static org.apache.jackrabbit.oak.segment.azure.AzureSegmentStoreService.AZURE_CONNECTION_URL; +import static org.apache.jackrabbit.oak.segment.azure.AzureSegmentStoreService.AZURE_CONTAINER_NAME; +import static org.apache.jackrabbit.oak.segment.azure.AzureSegmentStoreService.AZURE_ROOT_PATH; + +@Component(policy = ConfigurationPolicy.REQUIRE, + metatype = true, + label = "Oak Azure Segment Store", + description = "Azure backend for the Oak Segment Node Store" +) +public class AzureSegmentStoreService { + + private static final Logger log = LoggerFactory.getLogger(AzureSegmentStoreService.class); + + @Property( + label = "Azure account name" + ) + public static final String AZURE_ACCOUNT_NAME = "azure.accountName"; + + @Property( + label = "Azure container name" + ) + public static final String AZURE_CONTAINER_NAME = "azure.containerName"; + + @Property( + label = "Azure access key" + ) + public static final String AZURE_ACCESS_KEY = "azure.accessKey"; + + @Property( + label = "Azure root path", + value = "/oak" + ) + public static final String AZURE_ROOT_PATH = "azure.rootPath"; + + @Property( + label = "Azure connection URL (optional)", + description = "The connection URL to be used (it overrides all the other Azure properties)." + ) + public static final String AZURE_CONNECTION_URL = "azure.connectionUrl"; + + private ServiceRegistration registration; + + private SegmentNodeStorePersistence persistence; + + @Activate + public void activate(ComponentContext context) throws IOException { + Configuration config = new Configuration(context); + persistence = createAzurePersistence(config); + registration = context.getBundleContext().registerService(SegmentNodeStorePersistence.class.getName(), persistence, new Properties()); + } + + @Deactivate + public void deactivate() throws IOException { + if (registration != null) { + registration.unregister(); + registration = null; + } + persistence = null; + } + + private static SegmentNodeStorePersistence createAzurePersistence(Configuration configuration) throws IOException { + try { + StringBuilder connectionString = new StringBuilder(); + if (configuration.getAzureConnectionUrl() != null) { + connectionString.append(configuration.getAzureConnectionUrl()); + } else { + connectionString.append("DefaultEndpointsProtocol=https;"); + connectionString.append("AccountName=").append(configuration.getAzureAccountName()).append(';'); + connectionString.append("AccountKey=").append(configuration.getAzureAccessKey()).append(';'); + } + log.info("Connection string: {}", connectionString.toString()); + CloudStorageAccount cloud = CloudStorageAccount.parse(connectionString.toString()); + CloudBlobContainer container = cloud.createCloudBlobClient().getContainerReference(configuration.getAzureContainerName()); + container.createIfNotExists(); + + String path = configuration.getAzureRootPath(); + if (path != null && path.length() > 0 && path.charAt(0) == '/') { + path = path.substring(1); + } + + AzurePersistence persistence = new AzurePersistence(container.getDirectoryReference(path)); + return persistence; + } catch (StorageException | URISyntaxException | InvalidKeyException e) { + throw new IOException(e); + } + } + +} + +class Configuration { + + private final ComponentContext context; + + Configuration(ComponentContext context) { + this.context = context; + } + + String property(String name) { + return lookupConfigurationThenFramework(context, name); + } + + String getAzureAccountName() { + return property(AZURE_ACCOUNT_NAME); + } + + String getAzureContainerName() { + return property(AZURE_CONTAINER_NAME); + } + + String getAzureAccessKey() { + return property(AZURE_ACCESS_KEY); + } + + String getAzureRootPath() { return PropertiesUtil.toString(property(AZURE_ROOT_PATH), "/oak"); } + + String getAzureConnectionUrl() { + return property(AZURE_CONNECTION_URL); + } + +} diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java new file mode 100644 index 0000000000..90962452ec --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlob; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; +import org.apache.jackrabbit.oak.segment.file.tar.TarEntry; +import org.apache.jackrabbit.oak.segment.file.tar.index.IndexEntry; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.nio.file.Paths; +import java.util.UUID; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.jackrabbit.oak.segment.file.tar.TarConstants.BLOCK_SIZE; + +public final class AzureUtilities { + + public static String SEGMENT_FILE_NAME_PATTERN = "^([0-9a-f]{4})\\.([0-9a-f-]+)$"; + + private AzureUtilities() { + } + + public static String getSegmentFileName(TarEntry tarEntry) { + return getSegmentFileName(tarEntry.offset(), tarEntry.msb(), tarEntry.lsb()); + } + + public static String getSegmentFileName(IndexEntry tarEntry) { + return getSegmentFileName(tarEntry.getPosition(), tarEntry.getMsb(), tarEntry.getLsb()); + } + + public static String getSegmentFileName(long offset, long msb, long lsb) { + return String.format("%04x.%s", offset / BLOCK_SIZE, new UUID(msb, lsb).toString()); + } + + public static String getName(CloudBlob blob) { + return Paths.get(blob.getName()).getFileName().toString(); + } + + public static String getName(CloudBlobDirectory directory) { + return Paths.get(directory.getUri().getPath()).getFileName().toString(); + } + + public static Stream getBlobs(CloudBlobDirectory directory) throws IOException { + try { + return StreamSupport.stream(directory.listBlobs().spliterator(), false) + .filter(i -> i instanceof CloudBlob) + .map(i -> (CloudBlob) i); + } catch (StorageException | URISyntaxException e) { + throw new IOException(e); + } + } + + public static void readBufferFully(CloudBlob blob, ByteBuffer buffer) throws IOException { + try { + buffer.rewind(); + long readBytes = blob.downloadToByteArray(buffer.array(), 0); + if (buffer.limit() != readBytes) { + throw new IOException("Buffer size: " + buffer.limit() + ", read bytes: " + readBytes); + } + } catch (StorageException e) { + throw new IOException(e); + } + } +} diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/ReverseFileReader.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/ReverseFileReader.java new file mode 100644 index 0000000000..439fab5f9c --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/ReverseFileReader.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlob; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static java.lang.Math.min; + +public class ReverseFileReader { + + private static final int BUFFER_SIZE = 16 * 1024; + + private int bufferSize; + + private final CloudBlob blob; + + private byte[] buffer; + + private int bufferOffset; + + private int fileOffset; + + public ReverseFileReader(CloudBlob blob) throws StorageException { + this(blob, BUFFER_SIZE); + } + + public ReverseFileReader(CloudBlob blob, int bufferSize) throws StorageException { + this.blob = blob; + if (blob.exists()) { + this.fileOffset = (int) blob.getProperties().getLength(); + } else { + this.fileOffset = 0; + } + this.bufferSize = bufferSize; + } + + private void readBlock() throws IOException { + if (buffer == null) { + buffer = new byte[min(fileOffset, bufferSize)]; + } else if (fileOffset < buffer.length) { + buffer = new byte[fileOffset]; + } + + if (buffer.length > 0) { + fileOffset -= buffer.length; + try { + blob.downloadRangeToByteArray(fileOffset, Long.valueOf(buffer.length), buffer, 0); + } catch (StorageException e) { + throw new IOException(e); + } + } + bufferOffset = buffer.length; + } + + private String readUntilNewLine() { + if (bufferOffset == -1) { + return ""; + } + int stop = bufferOffset; + while (--bufferOffset >= 0) { + if (buffer[bufferOffset] == '\n') { + break; + } + } + // bufferOffset points either the previous '\n' character or -1 + return new String(buffer, bufferOffset + 1, stop - bufferOffset - 1, Charset.defaultCharset()); + } + + public String readLine() throws IOException { + if (bufferOffset == -1 && fileOffset == 0) { + return null; + } + + if (buffer == null) { + readBlock(); + } + + List result = new ArrayList<>(1); + while (true) { + result.add(readUntilNewLine()); + if (bufferOffset > -1) { // stopped on the '\n' + break; + } + if (fileOffset == 0) { // reached the beginning of the file + break; + } + readBlock(); + } + Collections.reverse(result); + return String.join("", result); + } +} diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java new file mode 100644 index 0000000000..5bc29f98f3 --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure.queue; + +import org.apache.jackrabbit.oak.segment.file.tar.TarEntry; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.UUID; + +public class SegmentWriteAction { + + private final TarEntry tarEntry; + + private final byte[] buffer; + + private final int offset; + + private final int length; + + public SegmentWriteAction(TarEntry tarEntry, byte[] buffer, int offset, int length) { + this.tarEntry = tarEntry; + + this.buffer = new byte[length]; + for (int i = 0; i < length; i++) { + this.buffer[i] = buffer[i + offset]; + } + this.offset = 0; + this.length = length; + } + + public UUID getUuid() { + return new UUID(tarEntry.msb(), tarEntry.lsb()); + } + + public byte[] getBuffer() { + return buffer; + } + + public int getOffset() { + return offset; + } + + public int getLength() { + return length; + } + + public ByteBuffer toByteBuffer() { + return ByteBuffer.wrap(buffer, offset, length); + } + + void passTo(SegmentWriteQueue.SegmentConsumer consumer) throws IOException { + consumer.consume(tarEntry, buffer, offset, length); + } + + @Override + public String toString() { + return getUuid().toString(); + } +} diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java new file mode 100644 index 0000000000..4ae8e3d8a2 --- /dev/null +++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure.queue; + +import org.apache.jackrabbit.oak.segment.file.tar.TarEntry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class SegmentWriteQueue implements Closeable { + + public static final int THREADS = Integer.getInteger("oak.segment.azure.threads", 5); + + private static final int QUEUE_SIZE = Integer.getInteger("oak.segment.org.apache.jackrabbit.oak.segment.azure.queue", 20); + + private static final Logger log = LoggerFactory.getLogger(SegmentWriteQueue.class); + + private final BlockingDeque queue; + + private final Map segmentsByUUID; + + private final ExecutorService executor; + + private final ReadWriteLock flushLock; + + private final SegmentConsumer writer; + + private volatile boolean shutdown; + + private final Object brokenMonitor = new Object(); + + private volatile boolean broken; + + public SegmentWriteQueue(SegmentConsumer writer) { + this(writer, QUEUE_SIZE, THREADS); + } + + SegmentWriteQueue(SegmentConsumer writer, int queueSize, int threadNo) { + this.writer = writer; + segmentsByUUID = new ConcurrentHashMap<>(); + flushLock = new ReentrantReadWriteLock(); + + queue = new LinkedBlockingDeque<>(queueSize); + executor = Executors.newFixedThreadPool(threadNo + 1); + for (int i = 0; i < threadNo; i++) { + executor.submit(this::mainLoop); + } + executor.submit(this::emergencyLoop); + } + + private void mainLoop() { + while (!shutdown) { + try { + waitWhileBroken(); + if (shutdown) { + break; + } + consume(); + } catch (SegmentConsumeException e) { + SegmentWriteAction segment = e.segment; + log.error("Can't persist the segment {}", segment.getUuid(), e.getCause()); + try { + queue.put(segment); + } catch (InterruptedException e1) { + log.error("Can't re-add the segment {} to the queue. It'll be dropped.", segment.getUuid(), e1); + } + } + } + } + + private void consume() throws SegmentConsumeException { + SegmentWriteAction segment = null; + try { + segment = queue.poll(100, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + log.error("Poll from queue interrupted", e); + } + if (segment != null) { + consume(segment); + } + } + + private void consume(SegmentWriteAction segment) throws SegmentConsumeException { + try { + segment.passTo(writer); + } catch (IOException e) { + setBroken(true); + throw new SegmentConsumeException(segment, e); + } + synchronized (segmentsByUUID) { + segmentsByUUID.remove(segment.getUuid()); + segmentsByUUID.notifyAll(); + } + setBroken(false); + } + + private void emergencyLoop() { + while (!shutdown) { + waitUntilBroken(); + if (shutdown) { + break; + } + + boolean success = false; + SegmentWriteAction segmentToRetry = null; + do { + try { + if (segmentToRetry == null) { + consume(); + } else { + consume(segmentToRetry); + } + success = true; + } catch (SegmentConsumeException e) { + segmentToRetry = e.segment; + log.error("Can't persist the segment {}", segmentToRetry.getUuid(), e.getCause()); + try { + Thread.sleep(1000); + } catch (InterruptedException e1) { + log.warn("Interrupted", e); + } + if (shutdown) { + log.error("Shutdown initiated. The segment {} will be dropped.", segmentToRetry.getUuid()); + } + } + } while (!success && !shutdown); + } + } + + public void addToQueue(TarEntry tarEntry, byte[] data, int offset, int size) throws IOException { + waitWhileBroken(); + if (shutdown) { + throw new IllegalStateException("Can't accept the new segment - shutdown in progress"); + } + + SegmentWriteAction action = new SegmentWriteAction(tarEntry, data, offset, size); + flushLock.readLock().lock(); + try { + segmentsByUUID.put(action.getUuid(), action); + if (!queue.offer(action, 1, TimeUnit.MINUTES)) { + segmentsByUUID.remove(action.getUuid()); + throw new IOException("Can't add segment to the queue"); + } + } catch (InterruptedException e) { + throw new IOException(e); + } finally { + flushLock.readLock().unlock(); + } + } + + public void flush() throws IOException { + flushLock.writeLock().lock(); + try { + synchronized (segmentsByUUID) { + long start = System.currentTimeMillis(); + while (!segmentsByUUID.isEmpty()) { + segmentsByUUID.wait(100); + if (System.currentTimeMillis() - start > TimeUnit.MINUTES.toMillis(1)) { + log.error("Can't flush the queue in 1 minute. Queue: {}. Segment map: {}", queue, segmentsByUUID); + start = System.currentTimeMillis(); + } + } + } + } catch (InterruptedException e) { + throw new IOException(e); + } finally { + flushLock.writeLock().unlock(); + } + } + + public SegmentWriteAction read(UUID id) { + return segmentsByUUID.get(id); + } + + public void close() throws IOException { + shutdown = true; + try { + executor.shutdown(); + if (!executor.awaitTermination(1, TimeUnit.MINUTES)) { + throw new IOException("The write wasn't able to shut down clearly"); + } + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + public boolean isEmpty() { + return segmentsByUUID.isEmpty(); + } + + boolean isBroken() { + return broken; + } + + int getSize() { + return queue.size(); + } + + private void setBroken(boolean broken) { + synchronized (brokenMonitor) { + this.broken = broken; + brokenMonitor.notifyAll(); + } + } + + private void waitWhileBroken() { + if (!broken) { + return; + } + synchronized (brokenMonitor) { + while (broken && !shutdown) { + try { + brokenMonitor.wait(100); + } catch (InterruptedException e) { + log.warn("Interrupted", e); + } + } + } + } + + private void waitUntilBroken() { + if (broken) { + return; + } + synchronized (brokenMonitor) { + while (!broken && !shutdown) { + try { + brokenMonitor.wait(100); + } catch (InterruptedException e) { + log.warn("Interrupted", e); + } + } + } + } + + public interface SegmentConsumer { + + void consume(TarEntry tarEntry, byte[] data, int offset, int size) throws IOException; + + } + + public static class SegmentConsumeException extends Exception { + + private final SegmentWriteAction segment; + + public SegmentConsumeException(SegmentWriteAction segment, IOException cause) { + super(cause); + this.segment = segment; + } + } +} diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java new file mode 100644 index 0000000000..c6d4d85fbd --- /dev/null +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.SegmentArchiveManager.SegmentArchiveWriter; +import org.apache.jackrabbit.oak.segment.file.tar.FileStoreMonitorAdapter; +import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration; +import org.apache.jackrabbit.oak.segment.file.tar.IOMonitorAdapter; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.UUID; + +import static com.google.common.collect.Lists.newArrayList; +import static org.junit.Assert.assertEquals; + +public class AzureArchiveManagerTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + @Before + public void setup() throws StorageException, InvalidKeyException, URISyntaxException { + container = azurite.getContainer("oak-test"); + } + + @Test + public void testRecovery() throws StorageException, URISyntaxException, IOException { + SegmentArchiveManager manager = new AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(false, new IOMonitorAdapter(), new FileStoreMonitorAdapter()); + SegmentArchiveWriter writer = manager.create("data00000a.tar"); + + List uuids = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + UUID u = UUID.randomUUID(); + writer.writeSegment(u.getMostSignificantBits(), u.getLeastSignificantBits(), new byte[10], 0, 10, GCGeneration.NULL); + uuids.add(u); + } + + writer.flush(); + writer.close(); + + container.getBlockBlobReference("oak/data00000a.tar/0005." + uuids.get(5).toString()).delete(); + + LinkedHashMap recovered = new LinkedHashMap<>(); + manager.recoverEntries("data00000a.tar", recovered); + assertEquals(uuids.subList(0, 5), newArrayList(recovered.keySet())); + } + +} diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalTest.java new file mode 100644 index 0000000000..fae3337b5e --- /dev/null +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalTest.java @@ -0,0 +1,45 @@ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.segment.file.GcJournalTest; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Test; + +import java.net.URISyntaxException; +import java.security.InvalidKeyException; + +public class AzureGCJournalTest extends GcJournalTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + @Before + public void setup() throws StorageException, InvalidKeyException, URISyntaxException { + container = azurite.getContainer("oak-test"); + } + + @Override + protected SegmentNodeStorePersistence getPersistence() throws Exception { + return new AzurePersistence(container.getDirectoryReference("oak")); + } + + @Test + @Ignore + @Override + public void testReadOak16GCLog() throws Exception { + super.testReadOak16GCLog(); + } + + @Test + @Ignore + @Override + public void testUpdateOak16GCLog() throws Exception { + super.testUpdateOak16GCLog(); + } +} diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java new file mode 100644 index 0000000000..934f5a3cfe --- /dev/null +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class AzureJournalFileTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + private AzureJournalFile journal; + + @Before + public void setup() throws StorageException, InvalidKeyException, URISyntaxException { + container = azurite.getContainer("oak-test"); + journal = new AzureJournalFile(container.getDirectoryReference("journal"), "journal.log", 10); + } + + @Test + public void testSplitJournalFiles() throws IOException { + assertFalse(journal.exists()); + + SegmentNodeStorePersistence.JournalFileWriter writer = journal.openJournalWriter(); + for (int i = 0; i < 100; i++) { + writer.writeLine("line " + i); + } + + assertTrue(journal.exists()); + + writer = journal.openJournalWriter(); + for (int i = 100; i < 200; i++) { + writer.writeLine("line " + i); + } + + SegmentNodeStorePersistence.JournalFileReader reader = journal.openJournalReader(); + for (int i = 199; i >= 0; i--) { + assertEquals("line " + i, reader.readLine()); + } + + + } + +} diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFileTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFileTest.java new file mode 100644 index 0000000000..67784dbc6a --- /dev/null +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFileTest.java @@ -0,0 +1,44 @@ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence.ManifestFile; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class AzureManifestFileTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + @Before + public void setup() throws StorageException, InvalidKeyException, URISyntaxException { + container = azurite.getContainer("oak-test"); + } + + @Test + public void testManifest() throws URISyntaxException, IOException { + ManifestFile manifestFile = new AzurePersistence(container.getDirectoryReference("oak")).getManifestFile(); + assertFalse(manifestFile.exists()); + + Properties props = new Properties(); + props.setProperty("xyz", "abc"); + props.setProperty("version", "123"); + manifestFile.save(props); + + Properties loaded = manifestFile.load(); + assertEquals(props, loaded); + } + +} diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java new file mode 100644 index 0000000000..939c2b5331 --- /dev/null +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.file.tar.FileStoreMonitorAdapter; +import org.apache.jackrabbit.oak.segment.file.tar.IOMonitorAdapter; +import org.apache.jackrabbit.oak.segment.file.tar.TarFileTest; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; + +public class AzureTarFileTest extends TarFileTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + @Before + @Override + public void setUp() throws IOException { + try { + container = azurite.getContainer("oak-test"); + archiveManager = new AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(true, new IOMonitorAdapter(), new FileStoreMonitorAdapter()); + } catch (StorageException | InvalidKeyException | URISyntaxException e) { + throw new IOException(e); + } + } + + @Override + protected long getWriteAndReadExpectedSize() { + return 557; + } + + @Test + @Ignore + @Override + public void graphShouldBeTrimmedDownOnSweep() throws Exception { + super.graphShouldBeTrimmedDownOnSweep(); + } +} \ No newline at end of file diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java new file mode 100644 index 0000000000..c98ff581b6 --- /dev/null +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.file.tar.FileStoreMonitorAdapter; +import org.apache.jackrabbit.oak.segment.file.tar.IOMonitorAdapter; +import org.apache.jackrabbit.oak.segment.file.tar.TarFiles; +import org.apache.jackrabbit.oak.segment.file.tar.TarFilesTest; +import org.junit.Before; +import org.junit.ClassRule; + +public class AzureTarFilesTest extends TarFilesTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + @Before + @Override + public void setUp() throws Exception { + container = azurite.getContainer("oak-test"); + tarFiles = TarFiles.builder() + .withDirectory(folder.newFolder()) + .withTarRecovery((id, data, recovery) -> { + // Intentionally left blank + }) + .withIOMonitor(new IOMonitorAdapter()) + .withFileStoreMonitor(new FileStoreMonitorAdapter()) + .withMaxFileSize(MAX_FILE_SIZE) + .withPersistence(new AzurePersistence(container.getDirectoryReference("oak"))) + .build(); + } +} diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java new file mode 100644 index 0000000000..01685e36c0 --- /dev/null +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.file.tar.IOMonitorAdapter; +import org.apache.jackrabbit.oak.segment.file.tar.TarWriterTest; +import org.junit.Before; +import org.junit.ClassRule; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; + +public class AzureTarWriterTest extends TarWriterTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + @Before + @Override + public void setUp() throws IOException { + try { + monitor = new TestFileStoreMonitor(); + container = azurite.getContainer("oak-test"); + archiveManager = new AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(true, new IOMonitorAdapter(), monitor); + } catch (StorageException | InvalidKeyException | URISyntaxException e) { + throw new IOException(e); + } + } +} diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzuriteDockerRule.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzuriteDockerRule.java new file mode 100644 index 0000000000..7e957d5fb3 --- /dev/null +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzuriteDockerRule.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.arakelian.docker.junit.DockerRule; +import com.arakelian.docker.junit.model.ImmutableDockerConfig; +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import com.spotify.docker.client.DefaultDockerClient; +import org.junit.Assume; +import org.junit.rules.TestRule; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +import java.net.URISyntaxException; +import java.security.InvalidKeyException; + +public class AzuriteDockerRule implements TestRule { + + private final DockerRule wrappedRule; + + public AzuriteDockerRule() { + wrappedRule = new DockerRule(ImmutableDockerConfig.builder() + .image("trekawek/azurite") + .name("oak-test-azurite") + .ports("10000") + .addStartedListener(container -> { + container.waitForPort("10000/tcp"); + container.waitForLog("Azure Blob Storage Emulator listening on port 10000"); + }) + .addContainerConfigurer(builder -> builder.env("executable=blob")) + .alwaysRemoveContainer(true) + .build()); + + } + + public CloudBlobContainer getContainer(String name) throws URISyntaxException, StorageException, InvalidKeyException { + int mappedPort = wrappedRule.getContainer().getPortBinding("10000/tcp").getPort(); + CloudStorageAccount cloud = CloudStorageAccount.parse("DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:" + mappedPort + "/devstoreaccount1;"); + CloudBlobContainer container = cloud.createCloudBlobClient().getContainerReference(name); + container.deleteIfExists(); + container.create(); + return container; + } + + @Override + public Statement apply(Statement statement, Description description) { + try { + DefaultDockerClient client = DefaultDockerClient.fromEnv().connectTimeoutMillis(5000L).readTimeoutMillis(20000L).build(); + client.ping(); + client.close(); + } catch (Exception e) { + Assume.assumeNoException(e); + } + + return wrappedRule.apply(statement, description); + } +} diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/fixture/SegmentAzureFixture.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/fixture/SegmentAzureFixture.java new file mode 100644 index 0000000000..3f9900a775 --- /dev/null +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/fixture/SegmentAzureFixture.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.segment.azure.fixture; + +import com.google.common.io.Files; +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; +import org.apache.jackrabbit.oak.fixture.NodeStoreFixture; +import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; +import org.apache.jackrabbit.oak.segment.azure.AzurePersistence; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder; +import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException; +import org.apache.jackrabbit.oak.spi.state.NodeStore; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.HashMap; +import java.util.Map; + +public class SegmentAzureFixture extends NodeStoreFixture { + + private static final String AZURE_CONNECTION_STRING = System.getProperty("oak.segment.azure.connection", "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"); + + private static final String AZURE_CONTAINER = System.getProperty("oak.segment.azure.container", "oak"); + + private static final String AZURE_ROOT_PATH = System.getProperty("oak.segment.azure.rootPath", "/oak"); + + private Map fileStoreMap = new HashMap<>(); + + @Override + public NodeStore createNodeStore() { + AzurePersistence persistence; + try { + CloudStorageAccount cloud = CloudStorageAccount.parse(AZURE_CONNECTION_STRING); + CloudBlobContainer container = cloud.createCloudBlobClient().getContainerReference(AZURE_CONTAINER); + container.deleteIfExists(); + container.create(); + CloudBlobDirectory directory = container.getDirectoryReference(AZURE_ROOT_PATH); + persistence = new AzurePersistence(directory); + } catch (StorageException | URISyntaxException | InvalidKeyException e) { + throw new RuntimeException(e); + } + + try { + FileStore fileStore = FileStoreBuilder.fileStoreBuilder(Files.createTempDir()).withCustomPersistence(persistence).build(); + NodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); + fileStoreMap.put(nodeStore, fileStore); + return nodeStore; + } catch (IOException | InvalidFileStoreVersionException e) { + throw new RuntimeException(e); + } + } + + public void dispose(NodeStore nodeStore) { + FileStore fs = fileStoreMap.remove(nodeStore); + if (fs != null) { + fs.close(); + } + } + + @Override + public String toString() { + return "SegmentAzure"; + } +} diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java new file mode 100644 index 0000000000..fcf766d1a3 --- /dev/null +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure.journal; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudAppendBlob; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.azure.AzuriteDockerRule; +import org.apache.jackrabbit.oak.segment.file.JournalReader; +import org.apache.jackrabbit.oak.segment.file.JournalReaderTest; +import org.apache.jackrabbit.oak.segment.azure.AzureJournalFile; +import org.junit.Before; +import org.junit.ClassRule; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; + +public class AzureJournalReaderTest extends JournalReaderTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + @Before + public void setup() throws StorageException, InvalidKeyException, URISyntaxException { + container = azurite.getContainer("oak-test"); + } + + protected JournalReader createJournalReader(String s) throws IOException { + try { + CloudAppendBlob blob = container.getAppendBlobReference("journal/journal.log.001"); + blob.createOrReplace(); + blob.appendText(s); + return new JournalReader(new AzureJournalFile(container.getDirectoryReference("journal"), "journal.log")); + } catch (StorageException | URISyntaxException e) { + throw new IOException(e); + } + } +} diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureTarRevisionsTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureTarRevisionsTest.java new file mode 100644 index 0000000000..3d0a12245b --- /dev/null +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureTarRevisionsTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure.journal; + +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.segment.azure.AzuriteDockerRule; +import org.apache.jackrabbit.oak.segment.file.TarRevisionsTest; +import org.apache.jackrabbit.oak.segment.azure.AzurePersistence; +import org.junit.Before; +import org.junit.ClassRule; + +import java.io.IOException; +import java.net.URISyntaxException; + +public class AzureTarRevisionsTest extends TarRevisionsTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + @Before + public void setup() throws Exception { + container = azurite.getContainer("oak-test"); + super.setup(); + } + + @Override + protected SegmentNodeStorePersistence getPersistence() throws IOException { + try { + return new AzurePersistence(container.getDirectoryReference("oak")); + } catch (URISyntaxException e) { + throw new IOException(e); + } + } +} diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/ReverseFileReaderTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/ReverseFileReaderTest.java new file mode 100644 index 0000000000..f3d1425a60 --- /dev/null +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/ReverseFileReaderTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure.journal; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudAppendBlob; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.azure.AzuriteDockerRule; +import org.apache.jackrabbit.oak.segment.azure.ReverseFileReader; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +public class ReverseFileReaderTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + @Before + public void setup() throws StorageException, InvalidKeyException, URISyntaxException { + container = azurite.getContainer("oak-test"); + getBlob().createOrReplace(); + } + + private CloudAppendBlob getBlob() throws URISyntaxException, StorageException { + return container.getAppendBlobReference("test-blob"); + } + + @Test + public void testReverseReader() throws IOException, URISyntaxException, StorageException { + List entries = createFile( 1024, 80); + ReverseFileReader reader = new ReverseFileReader(getBlob(), 256); + assertEquals(entries, reader); + } + + @Test + public void testEmptyFile() throws IOException, URISyntaxException, StorageException { + List entries = createFile( 0, 80); + ReverseFileReader reader = new ReverseFileReader(getBlob(), 256); + assertEquals(entries, reader); + } + + @Test + public void test1ByteBlock() throws IOException, URISyntaxException, StorageException { + List entries = createFile( 10, 16); + ReverseFileReader reader = new ReverseFileReader(getBlob(), 1); + assertEquals(entries, reader); + } + + + private List createFile(int lines, int maxLineLength) throws IOException, URISyntaxException, StorageException { + Random random = new Random(); + List entries = new ArrayList<>(); + CloudAppendBlob blob = getBlob(); + for (int i = 0; i < lines; i++) { + int entrySize = random.nextInt(maxLineLength) + 1; + String entry = randomString(entrySize); + try { + blob.appendText(entry + '\n'); + } catch (StorageException e) { + throw new IOException(e); + } + entries.add(entry); + } + + entries.add(""); + Collections.reverse(entries); + return entries; + } + + private static void assertEquals(List entries, ReverseFileReader reader) throws IOException { + int i = entries.size(); + for (String e : entries) { + Assert.assertEquals("line " + (--i), e, reader.readLine()); + } + Assert.assertNull(reader.readLine()); + } + + private static String randomString(int entrySize) { + Random r = new Random(); + + StringBuilder result = new StringBuilder(); + for (int i = 0; i < entrySize; i++) { + result.append((char) ('a' + r.nextInt('z' - 'a'))); + } + + return result.toString(); + } +} diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java new file mode 100644 index 0000000000..0272b4e532 --- /dev/null +++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure.queue; + +import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration; +import org.apache.jackrabbit.oak.segment.file.tar.TarEntry; +import org.junit.After; +import org.junit.Test; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class SegmentWriteQueueTest { + + private static final byte[] EMPTY_DATA = new byte[0]; + + private SegmentWriteQueue queue; + + @After + public void shutdown() throws IOException { + if (queue != null) { + queue.close(); + } + } + + @Test + public void testQueue() throws IOException, InterruptedException { + Set added = Collections.synchronizedSet(new HashSet<>()); + Semaphore semaphore = new Semaphore(0); + queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> { + try { + semaphore.acquire(); + } catch (InterruptedException e) { + } + added.add(new UUID(tarEntry.msb(), tarEntry.lsb())); + }); + + for (int i = 0; i < 10; i++) { + queue.addToQueue(tarEntry(i), EMPTY_DATA, 0, 0); + } + + for (int i = 0; i < 10; i++) { + assertNotNull("Segments should be available for read", queue.read(uuid(i))); + } + assertFalse("Queue shouldn't be empty", queue.isEmpty()); + + semaphore.release(Integer.MAX_VALUE); + while (!queue.isEmpty()) { + Thread.sleep(10); + } + + assertEquals("There should be 10 segments consumed",10, added.size()); + for (int i = 0; i < 10; i++) { + assertTrue("Missing consumed segment", added.contains(uuid(i))); + } + } + + @Test(timeout = 1000) + public void testFlush() throws IOException, InterruptedException { + Set added = Collections.synchronizedSet(new HashSet<>()); + Semaphore semaphore = new Semaphore(0); + queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> { + try { + semaphore.acquire(); + } catch (InterruptedException e) { + } + added.add(new UUID(tarEntry.msb(), tarEntry.lsb())); + }); + + for (int i = 0; i < 3; i++) { + queue.addToQueue(tarEntry(i), EMPTY_DATA, 0, 0); + } + + AtomicBoolean flushFinished = new AtomicBoolean(false); + Set addedAfterFlush = new HashSet<>(); + new Thread(() -> { + try { + queue.flush(); + flushFinished.set(true); + addedAfterFlush.addAll(added); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }).start(); + + Thread.sleep(100); + assertFalse("Flush should be blocked", flushFinished.get()); + + AtomicBoolean addFinished = new AtomicBoolean(false); + new Thread(() -> { + try { + queue.addToQueue(tarEntry(10), EMPTY_DATA, 0, 0); + addFinished.set(true); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }).start(); + + Thread.sleep(100); + assertFalse("Adding segments should be blocked until the flush is finished", addFinished.get()); + + semaphore.release(Integer.MAX_VALUE); + + while (!addFinished.get()) { + Thread.sleep(10); + } + assertTrue("Flush should be finished once the ", flushFinished.get()); + assertTrue("Adding segments should be blocked until the flush is finished", addFinished.get()); + + for (int i = 0; i < 3; i++) { + assertTrue(addedAfterFlush.contains(uuid(i))); + } + } + + @Test(expected = IllegalStateException.class) + public void testClose() throws IOException, InterruptedException { + queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> {}); + queue.close(); + queue.addToQueue(tarEntry(10), EMPTY_DATA, 0, 0); + } + + @Test + public void testRecoveryMode() throws IOException, InterruptedException { + Set added = Collections.synchronizedSet(new HashSet<>()); + Semaphore semaphore = new Semaphore(0); + AtomicBoolean doBreak = new AtomicBoolean(true); + List writeAttempts = Collections.synchronizedList(new ArrayList<>()); + queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> { + writeAttempts.add(System.currentTimeMillis()); + try { + semaphore.acquire(); + } catch (InterruptedException e) { + } + if (doBreak.get()) { + throw new IOException(); + } + added.add(new UUID(tarEntry.msb(), tarEntry.lsb())); + }); + + for (int i = 0; i < 10; i++) { + queue.addToQueue(tarEntry(i), EMPTY_DATA, 0, 0); + } + + semaphore.release(Integer.MAX_VALUE); + Thread.sleep(100); + + assertTrue(queue.isBroken()); + assertEquals(9, queue.getSize()); // the 10th segment is handled by the recovery thread + + writeAttempts.clear(); + while (writeAttempts.size() < 5) { + Thread.sleep(100); + } + long lastAttempt = writeAttempts.get(0); + for (int i = 1; i < 5; i++) { + long delay = writeAttempts.get(i) - lastAttempt; + assertTrue("The delay between attempts to persist segment should be larger than 1s. Actual: " + delay, delay >= 1000); + lastAttempt = writeAttempts.get(i); + } + + AtomicBoolean addFinished = new AtomicBoolean(false); + new Thread(() -> { + try { + queue.addToQueue(tarEntry(10), EMPTY_DATA, 0, 0); + addFinished.set(true); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }).start(); + + Thread.sleep(100); + assertFalse("Adding segments should be blocked until the recovery mode is finished", addFinished.get()); + + doBreak.set(false); + while (queue.isBroken()) { + Thread.sleep(10); + } + assertFalse("Queue shouldn't be broken anymore", queue.isBroken()); + + while (added.size() < 11) { + Thread.sleep(10); + } + assertEquals("All segments should be consumed",11, added.size()); + for (int i = 0; i < 11; i++) { + assertTrue("All segments should be consumed", added.contains(uuid(i))); + } + + int i = writeAttempts.size() - 10; + lastAttempt = writeAttempts.get(i); + for (; i < writeAttempts.size(); i++) { + long delay = writeAttempts.get(i) - lastAttempt; + assertTrue("Segments should be persisted immediately", delay < 1000); + lastAttempt = writeAttempts.get(i); + } + } + + private static TarEntry tarEntry(long i) { + return new TarEntry(0, i, 0, 0, GCGeneration.NULL); + } + + private static UUID uuid(long i) { + return new UUID(0, i); + } + +} diff --git a/oak-segment-azure/start-azurite.sh b/oak-segment-azure/start-azurite.sh new file mode 100755 index 0000000000..06e5e61674 --- /dev/null +++ b/oak-segment-azure/start-azurite.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +docker run -e executable=blob --rm -t -p 10000:10000 trekawek/azurite diff --git a/oak-segment-tar/pom.xml b/oak-segment-tar/pom.xml index 17915f82a9..d59f8fb5bc 100644 --- a/oak-segment-tar/pom.xml +++ b/oak-segment-tar/pom.xml @@ -43,7 +43,13 @@ maven-bundle-plugin - + + org.apache.jackrabbit.oak.segment, + org.apache.jackrabbit.oak.segment.file.tar, + org.apache.jackrabbit.oak.segment.file.tar.binaries, + org.apache.jackrabbit.oak.segment.file.tar.index, + org.apache.jackrabbit.oak.segment.util + netty-* diff --git a/pom.xml b/pom.xml index 6443bc6f36..4ecf979ea7 100644 --- a/pom.xml +++ b/pom.xml @@ -70,6 +70,7 @@ oak-examples oak-it oak-segment-tar + oak-segment-azure oak-benchmarks