diff --git a/oak-segment-azure/pom.xml b/oak-segment-azure/pom.xml
new file mode 100644
index 0000000000..ccd0795dd8
--- /dev/null
+++ b/oak-segment-azure/pom.xml
@@ -0,0 +1,160 @@
+
+
+
+
+
+ 4.0.0
+
+
+ org.apache.jackrabbit
+ oak-parent
+ 1.10-SNAPSHOT
+ ../oak-parent/pom.xml
+
+
+ oak-segment-azure
+ bundle
+
+ Oak Segment Azure
+
+
+
+
+ org.apache.felix
+ maven-bundle-plugin
+
+
+
+
+ azure-storage,
+ azure-keyvault-core
+
+
+
+
+
+ baseline
+
+ baseline
+
+ pre-integration-test
+
+
+ true
+
+
+
+
+
+
+
+
+
+
+
+
+
+ org.osgi
+ org.osgi.core
+ provided
+
+
+ org.osgi
+ org.osgi.compendium
+ provided
+
+
+ org.osgi
+ org.osgi.annotation
+ provided
+
+
+ org.osgi
+ org.osgi.service.component.annotations
+ provided
+
+
+ org.osgi
+ org.osgi.service.metatype.annotations
+ provided
+
+
+
+
+ org.apache.jackrabbit
+ oak-segment-tar
+ ${project.version}
+ provided
+
+
+ org.apache.jackrabbit
+ oak-store-spi
+ ${project.version}
+ provided
+
+
+
+
+ com.microsoft.azure
+ azure-storage
+ 5.0.0
+
+
+ com.microsoft.azure
+ azure-keyvault-core
+ 0.9.7
+
+
+
+
+ org.apache.jackrabbit
+ oak-segment-tar
+ ${project.version}
+ tests
+ test
+
+
+ org.apache.jackrabbit
+ oak-store-spi
+ ${project.version}
+ test-jar
+ test
+
+
+ org.mockito
+ mockito-core
+ 1.10.19
+ test
+
+
+ junit
+ junit
+ test
+
+
+ com.arakelian
+ docker-junit-rule
+ 2.1.0
+ test
+
+
+
+
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java
new file mode 100644
index 0000000000..7e84e93ae6
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.BlobListingDetails;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import com.microsoft.azure.storage.blob.CopyStatus;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.getName;
+
+public class AzureArchiveManager implements SegmentArchiveManager {
+
+ private static final Logger log = LoggerFactory.getLogger(AzureSegmentArchiveReader.class);
+
+ private final CloudBlobDirectory cloudBlobDirectory;
+
+ private final IOMonitor ioMonitor;
+
+ private final FileStoreMonitor monitor;
+
+ public AzureArchiveManager(CloudBlobDirectory cloudBlobDirectory, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor) {
+ this.cloudBlobDirectory = cloudBlobDirectory;
+ this.ioMonitor = ioMonitor;
+ this.monitor = fileStoreMonitor;
+ }
+
+ @Override
+ public List listArchives() throws IOException {
+ try {
+ return StreamSupport.stream(cloudBlobDirectory
+ .listBlobs(null, false, EnumSet.noneOf(BlobListingDetails.class), null, null)
+ .spliterator(), false)
+ .filter(i -> i instanceof CloudBlobDirectory)
+ .map(i -> (CloudBlobDirectory) i)
+ .map(CloudBlobDirectory::getPrefix)
+ .map(Paths::get)
+ .map(Path::getFileName)
+ .map(Path::toString)
+ .collect(Collectors.toList());
+ } catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public SegmentArchiveReader open(String archiveName) throws IOException {
+ try {
+ CloudBlobDirectory archiveDirectory = getDirectory(archiveName);
+ if (!archiveDirectory.getBlockBlobReference("closed").exists()) {
+ throw new IOException("The archive " + archiveName + " hasn't been closed correctly.");
+ }
+ return new AzureSegmentArchiveReader(archiveDirectory, ioMonitor, monitor);
+ } catch (StorageException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public SegmentArchiveWriter create(String archiveName) throws IOException {
+ return new AzureSegmentArchiveWriter(getDirectory(archiveName), ioMonitor, monitor);
+ }
+
+ @Override
+ public boolean delete(String archiveName) {
+ try {
+ getBlobs(archiveName)
+ .forEach(cloudBlob -> {
+ try {
+ cloudBlob.delete();
+ } catch (StorageException e) {
+ log.error("Can't delete segment {}", cloudBlob.getUri().getPath(), e);
+ }
+ });
+ return true;
+ } catch (IOException e) {
+ log.error("Can't delete archive {}", archiveName, e);
+ return false;
+ }
+ }
+
+ @Override
+ public boolean renameTo(String from, String to) {
+ try {
+ CloudBlobDirectory targetDirectory = getDirectory(to);
+ getBlobs(from)
+ .forEach(cloudBlob -> {
+ try {
+ renameBlob(cloudBlob, targetDirectory);
+ } catch (IOException e) {
+ log.error("Can't rename segment {}", cloudBlob.getUri().getPath(), e);
+ }
+ });
+ return true;
+ } catch (IOException e) {
+ log.error("Can't rename archive {} to {}", from, to, e);
+ return false;
+ }
+ }
+
+ @Override
+ public void copyFile(String from, String to) throws IOException {
+ CloudBlobDirectory targetDirectory = getDirectory(to);
+ getBlobs(from)
+ .forEach(cloudBlob -> {
+ try {
+ copyBlob(cloudBlob, targetDirectory);
+ } catch (IOException e) {
+ log.error("Can't copy segment {}", cloudBlob.getUri().getPath(), e);
+ }
+ });
+ }
+
+ @Override
+ public boolean exists(String archiveName) {
+ try {
+ return listArchives().contains(archiveName);
+ } catch (IOException e) {
+ log.error("Can't check the existence of {}", archiveName, e);
+ return false;
+ }
+ }
+
+ @Override
+ public void recoverEntries(String archiveName, LinkedHashMap entries) throws IOException {
+ Pattern pattern = Pattern.compile(AzureUtilities.SEGMENT_FILE_NAME_PATTERN);
+ List entryList = new ArrayList<>();
+
+ for (CloudBlob b : getBlobList(archiveName)) {
+ String name = getName(b);
+ Matcher m = pattern.matcher(name);
+ if (!m.matches()) {
+ continue;
+ }
+ int position = Integer.parseInt(m.group(1), 16);
+ UUID uuid = UUID.fromString(m.group(2));
+ long length = b.getProperties().getLength();
+ if (length > 0) {
+ byte[] data = new byte[(int) length];
+ try {
+ b.downloadToByteArray(data, 0);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ entryList.add(new RecoveredEntry(position, uuid, data, name));
+ }
+ }
+ Collections.sort(entryList);
+
+ int i = 0;
+ for (RecoveredEntry e : entryList) {
+ if (e.position != i) {
+ log.warn("Missing entry {}.??? when recovering {}. No more segments will be read.", String.format("%04X", i), archiveName);
+ break;
+ }
+ log.info("Recovering segment {}/{}", archiveName, e.fileName);
+ entries.put(e.uuid, e.data);
+ i++;
+ }
+ }
+
+
+ private CloudBlobDirectory getDirectory(String archiveName) throws IOException {
+ try {
+ return cloudBlobDirectory.getDirectoryReference(archiveName);
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private Stream getBlobs(String archiveName) throws IOException {
+ return AzureUtilities.getBlobs(getDirectory(archiveName));
+ }
+
+ private List getBlobList(String archiveName) throws IOException {
+ return getBlobs(archiveName).collect(Collectors.toList());
+ }
+
+ private void renameBlob(CloudBlob blob, CloudBlobDirectory newParent) throws IOException {
+ copyBlob(blob, newParent);
+ try {
+ blob.delete();
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private void copyBlob(CloudBlob blob, CloudBlobDirectory newParent) throws IOException {
+ checkArgument(blob instanceof CloudBlockBlob, "Only page blobs are supported for the rename");
+ try {
+ String blobName = getName(blob);
+ CloudBlockBlob newBlob = newParent.getBlockBlobReference(blobName);
+ newBlob.startCopy(blob.getUri());
+ while (newBlob.getCopyState().getStatus() == CopyStatus.PENDING) {
+ Thread.sleep(100);
+ }
+
+ CopyStatus finalStatus = newBlob.getCopyState().getStatus();
+ if (newBlob.getCopyState().getStatus() != CopyStatus.SUCCESS) {
+ throw new IOException("Invalid copy status for " + blob.getUri().getPath() + ": " + finalStatus);
+ }
+ } catch (StorageException | InterruptedException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private static class RecoveredEntry implements Comparable {
+
+ private final byte[] data;
+
+ private final UUID uuid;
+
+ private final int position;
+
+ private final String fileName;
+
+ public RecoveredEntry(int position, UUID uuid, byte[] data, String fileName) {
+ this.data = data;
+ this.uuid = uuid;
+ this.position = position;
+ this.fileName = fileName;
+ }
+
+ @Override
+ public int compareTo(RecoveredEntry o) {
+ return Integer.compare(this.position, o.position);
+ }
+ }
+
+}
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadata.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadata.java
new file mode 100644
index 0000000000..103a706f82
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureBlobMetadata.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+public final class AzureBlobMetadata {
+
+ private static final String METADATA_TYPE = "type";
+
+ private static final String METADATA_SEGMENT_UUID = "segment-uuid";
+
+ private static final String METADATA_SEGMENT_POSITION = "segment-position";
+
+ private static final String METADATA_SEGMENT_GENERATION = "segment-generation";
+
+ private static final String METADATA_SEGMENT_FULL_GENERATION = "segment-fullGeneration";
+
+ private static final String METADATA_SEGMENT_COMPACTED = "segment-compacted";
+
+ private static final String TYPE_SEGMENT = "segment";
+
+ public static HashMap toSegmentMetadata(AzureSegmentArchiveEntry indexEntry) {
+ HashMap map = new HashMap<>();
+ map.put(METADATA_TYPE, TYPE_SEGMENT);
+ map.put(METADATA_SEGMENT_UUID, new UUID(indexEntry.getMsb(), indexEntry.getLsb()).toString());
+ map.put(METADATA_SEGMENT_POSITION, String.valueOf(indexEntry.getPosition()));
+ map.put(METADATA_SEGMENT_GENERATION, String.valueOf(indexEntry.getGeneration()));
+ map.put(METADATA_SEGMENT_FULL_GENERATION, String.valueOf(indexEntry.getFullGeneration()));
+ map.put(METADATA_SEGMENT_COMPACTED, String.valueOf(indexEntry.isCompacted()));
+ return map;
+ }
+
+ public static AzureSegmentArchiveEntry toIndexEntry(Map metadata, int length) {
+ UUID uuid = UUID.fromString(metadata.get(METADATA_SEGMENT_UUID));
+ long msb = uuid.getMostSignificantBits();
+ long lsb = uuid.getLeastSignificantBits();
+ int position = Integer.parseInt(metadata.get(METADATA_SEGMENT_POSITION));
+ int generation = Integer.parseInt(metadata.get(METADATA_SEGMENT_GENERATION));
+ int fullGeneration = Integer.parseInt(metadata.get(METADATA_SEGMENT_FULL_GENERATION));
+ boolean compacted = Boolean.parseBoolean(metadata.get(METADATA_SEGMENT_COMPACTED));
+ return new AzureSegmentArchiveEntry(msb, lsb, position, length, generation, fullGeneration, compacted);
+ }
+
+ public static boolean isSegment(Map metadata) {
+ return metadata != null && TYPE_SEGMENT.equals(metadata.get(METADATA_TYPE));
+ }
+
+}
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalFile.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalFile.java
new file mode 100644
index 0000000000..519da4d7ab
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalFile.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.google.common.base.Charsets;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudAppendBlob;
+import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.oak.segment.spi.persistence.GCJournalFile;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Collections;
+import java.util.List;
+
+public class AzureGCJournalFile implements GCJournalFile {
+
+ private final CloudAppendBlob gcJournal;
+
+ public AzureGCJournalFile(CloudAppendBlob gcJournal) {
+ this.gcJournal = gcJournal;
+ }
+
+ @Override
+ public void writeLine(String line) throws IOException {
+ try {
+ if (!gcJournal.exists()) {
+ gcJournal.createOrReplace();
+ }
+ gcJournal.appendText(line + "\n", Charsets.UTF_8.name(), null, null, null);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public List readLines() throws IOException {
+ try {
+ if (!gcJournal.exists()) {
+ return Collections.emptyList();
+ }
+ byte[] data = new byte[(int) gcJournal.getProperties().getLength()];
+ gcJournal.downloadToByteArray(data, 0);
+ return IOUtils.readLines(new ByteArrayInputStream(data), Charset.defaultCharset());
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java
new file mode 100644
index 0000000000..47397e40c6
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudAppendBlob;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
+import com.microsoft.azure.storage.blob.ListBlobItem;
+import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile;
+import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader;
+import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class AzureJournalFile implements JournalFile {
+
+ private static final Logger log = LoggerFactory.getLogger(AzureJournalFile.class);
+
+ private static final int JOURNAL_LINE_LIMIT = Integer.getInteger("org.apache.jackrabbit.oak.segment.azure.journal.lines", 40_000);
+
+ private final CloudBlobDirectory directory;
+
+ private final String journalNamePrefix;
+
+ private final int lineLimit;
+
+ AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix, int lineLimit) {
+ this.directory = directory;
+ this.journalNamePrefix = journalNamePrefix;
+ this.lineLimit = lineLimit;
+ }
+
+ public AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix) {
+ this(directory, journalNamePrefix, JOURNAL_LINE_LIMIT);
+ }
+
+ @Override
+ public JournalFileReader openJournalReader() throws IOException {
+ return new CombinedReader(getJournalBlobs());
+ }
+
+ @Override
+ public JournalFileWriter openJournalWriter() throws IOException {
+ return new AzureJournalWriter();
+ }
+
+ @Override
+ public String getName() {
+ return journalNamePrefix;
+ }
+
+ @Override
+ public boolean exists() {
+ try {
+ return !getJournalBlobs().isEmpty();
+ } catch (IOException e) {
+ log.error("Can't check if the file exists", e);
+ return false;
+ }
+ }
+
+ private String getJournalFileName(int index) {
+ return String.format("%s.%03d", journalNamePrefix, index);
+ }
+
+ private List getJournalBlobs() throws IOException {
+ try {
+ List result = new ArrayList<>();
+ for (ListBlobItem b : directory.listBlobs(journalNamePrefix)) {
+ if (b instanceof CloudAppendBlob) {
+ result.add((CloudAppendBlob) b);
+ } else {
+ log.warn("Invalid blob type: {} {}", b.getUri(), b.getClass());
+ }
+ }
+ result.sort(Comparator.comparing(AzureUtilities::getName).reversed());
+ return result;
+ } catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private static class AzureJournalReader implements JournalFileReader {
+
+ private final CloudBlob blob;
+
+ private ReverseFileReader reader;
+
+ private AzureJournalReader(CloudBlob blob) {
+ this.blob = blob;
+ }
+
+ @Override
+ public String readLine() throws IOException {
+ if (reader == null) {
+ try {
+ reader = new ReverseFileReader(blob);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+ return reader.readLine();
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+ }
+
+ private class AzureJournalWriter implements JournalFileWriter {
+
+ private CloudAppendBlob currentBlob;
+
+ private int blockCount;
+
+ public AzureJournalWriter() throws IOException {
+ List blobs = getJournalBlobs();
+ if (blobs.isEmpty()) {
+ try {
+ currentBlob = directory.getAppendBlobReference(getJournalFileName(1));
+ currentBlob.createOrReplace();
+ } catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ } else {
+ currentBlob = blobs.get(0);
+ }
+ Integer bc = currentBlob.getProperties().getAppendBlobCommittedBlockCount();
+ blockCount = bc == null ? 0 : bc;
+ }
+
+ @Override
+ public void truncate() throws IOException {
+ try {
+ for (CloudAppendBlob cloudAppendBlob : getJournalBlobs()) {
+ cloudAppendBlob.delete();
+ }
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void writeLine(String line) throws IOException {
+ if (blockCount >= lineLimit) {
+ createNewFile();
+ }
+ try {
+ currentBlob.appendText(line + "\n");
+ blockCount++;
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private void createNewFile() throws IOException {
+ String name = AzureUtilities.getName(currentBlob);
+ Pattern pattern = Pattern.compile(Pattern.quote(journalNamePrefix) + "\\.(\\d+)" );
+ Matcher matcher = pattern.matcher(name);
+ int parsedSuffix;
+ if (matcher.find()) {
+ String suffix = matcher.group(1);
+ try {
+ parsedSuffix = Integer.parseInt(suffix);
+ } catch (NumberFormatException e) {
+ log.warn("Can't parse suffix for journal file {}", name);
+ parsedSuffix = 0;
+ }
+ } else {
+ log.warn("Can't parse journal file name {}", name);
+ parsedSuffix = 0;
+ }
+ try {
+ currentBlob = directory.getAppendBlobReference(getJournalFileName(parsedSuffix + 1));
+ currentBlob.createOrReplace();
+ blockCount = 0;
+ } catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // do nothing
+ }
+ }
+
+ private static class CombinedReader implements JournalFileReader {
+
+ private final Iterator readers;
+
+ private JournalFileReader currentReader;
+
+ private CombinedReader(List blobs) {
+ readers = blobs.stream().map(AzureJournalReader::new).iterator();
+ }
+
+ @Override
+ public String readLine() throws IOException {
+ String line;
+ do {
+ if (currentReader == null) {
+ if (!readers.hasNext()) {
+ return null;
+ }
+ currentReader = readers.next();
+ }
+ do {
+ line = currentReader.readLine();
+ } while ("".equals(line));
+ if (line == null) {
+ currentReader.close();
+ currentReader = null;
+ }
+ } while (line == null);
+ return line;
+ }
+
+ @Override
+ public void close() throws IOException {
+ while (readers.hasNext()) {
+ readers.next().close();
+ }
+ if (currentReader != null) {
+ currentReader.close();
+ currentReader = null;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFile.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFile.java
new file mode 100644
index 0000000000..aae72c1200
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFile.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import org.apache.jackrabbit.oak.segment.spi.persistence.ManifestFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Properties;
+
+public class AzureManifestFile implements ManifestFile {
+
+ private static final Logger log = LoggerFactory.getLogger(AzureManifestFile.class);
+
+ private final CloudBlockBlob manifestBlob;
+
+ public AzureManifestFile(CloudBlockBlob manifestBlob) {
+ this.manifestBlob = manifestBlob;
+ }
+
+ @Override
+ public boolean exists() {
+ try {
+ return manifestBlob.exists();
+ } catch (StorageException e) {
+ log.error("Can't check if the manifest exists", e);
+ return false;
+ }
+ }
+
+ @Override
+ public Properties load() throws IOException {
+ Properties properties = new Properties();
+ if (exists()) {
+ long length = manifestBlob.getProperties().getLength();
+ byte[] data = new byte[(int) length];
+ try {
+ manifestBlob.downloadToByteArray(data, 0);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ properties.load(new ByteArrayInputStream(data));
+ }
+ return properties;
+ }
+
+ @Override
+ public void save(Properties properties) throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ properties.store(bos, null);
+
+ byte[] data = bos.toByteArray();
+ try {
+ manifestBlob.uploadFromByteArray(data, 0, data.length);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java
new file mode 100644
index 0000000000..f1d3cbd892
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.BlobListingDetails;
+import com.microsoft.azure.storage.blob.CloudAppendBlob;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import com.microsoft.azure.storage.blob.ListBlobItem;
+import org.apache.jackrabbit.oak.segment.spi.persistence.GCJournalFile;
+import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile;
+import org.apache.jackrabbit.oak.segment.spi.persistence.ManifestFile;
+import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.EnumSet;
+
+public class AzurePersistence implements SegmentNodeStorePersistence {
+
+ private static final Logger log = LoggerFactory.getLogger(AzurePersistence.class);
+
+ private final CloudBlobDirectory segmentstoreDirectory;
+
+ public AzurePersistence(CloudBlobDirectory segmentstoreDirectory) {
+ this.segmentstoreDirectory = segmentstoreDirectory;
+ }
+
+ @Override
+ public SegmentArchiveManager createArchiveManager(boolean mmap, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor) {
+ return new AzureArchiveManager(segmentstoreDirectory, ioMonitor, fileStoreMonitor);
+ }
+
+ @Override
+ public boolean segmentFilesExist() {
+ try {
+ for (ListBlobItem i : segmentstoreDirectory.listBlobs(null, false, EnumSet.noneOf(BlobListingDetails.class), null, null)) {
+ if (i instanceof CloudBlobDirectory) {
+ CloudBlobDirectory dir = (CloudBlobDirectory) i;
+ String name = Paths.get(dir.getPrefix()).getFileName().toString();
+ if (name.endsWith(".tar")) {
+ return true;
+ }
+ }
+ }
+ return false;
+ } catch (StorageException | URISyntaxException e) {
+ log.error("Can't check if the segment archives exists", e);
+ return false;
+ }
+ }
+
+ @Override
+ public JournalFile getJournalFile() {
+ return new AzureJournalFile(segmentstoreDirectory, "journal.log");
+ }
+
+ @Override
+ public GCJournalFile getGCJournalFile() throws IOException {
+ return new AzureGCJournalFile(getAppendBlob("gc.log"));
+ }
+
+ @Override
+ public ManifestFile getManifestFile() throws IOException {
+ return new AzureManifestFile(getBlockBlob("manifest"));
+ }
+
+ @Override
+ public RepositoryLock lockRepository() throws IOException {
+ return new AzureRepositoryLock(getBlockBlob("repo.lock"), new Runnable() {
+ @Override
+ public void run() {
+ log.warn("Lost connection to the Azure. The client will be closed.");
+ // TODO close the connection
+ }
+ }).lock();
+ }
+
+ private CloudBlockBlob getBlockBlob(String path) throws IOException {
+ try {
+ return segmentstoreDirectory.getBlockBlobReference(path);
+ } catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private CloudAppendBlob getAppendBlob(String path) throws IOException {
+ try {
+ return segmentstoreDirectory.getAppendBlobReference(path);
+ } catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+}
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java
new file mode 100644
index 0000000000..42542db3e5
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.AccessCondition;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+public class AzureRepositoryLock implements RepositoryLock {
+
+ private static final Logger log = LoggerFactory.getLogger(AzureRepositoryLock.class);
+
+ private static int INTERVAL = 60;
+
+ private final Runnable shutdownHook;
+
+ private final CloudBlockBlob blob;
+
+ private final ExecutorService executor;
+
+ private String leaseId;
+
+ private volatile boolean doUpdate;
+
+ public AzureRepositoryLock(CloudBlockBlob blob, Runnable shutdownHook) {
+ this.shutdownHook = shutdownHook;
+ this.blob = blob;
+ this.executor = Executors.newSingleThreadExecutor();
+ }
+
+ public AzureRepositoryLock lock() throws IOException {
+ try {
+ blob.openOutputStream().close();
+ leaseId = blob.acquireLease(INTERVAL, null);
+ log.info("Acquired lease {}", leaseId);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ executor.submit(this::refreshLease);
+ return this;
+ }
+
+ private void refreshLease() {
+ doUpdate = true;
+ long lastUpdate = 0;
+ while (doUpdate) {
+ try {
+ long timeSinceLastUpdate = (System.currentTimeMillis() - lastUpdate) / 1000;
+ if (timeSinceLastUpdate > INTERVAL / 2) {
+ blob.renewLease(AccessCondition.generateLeaseCondition(leaseId));
+ lastUpdate = System.currentTimeMillis();
+ }
+ } catch (StorageException e) {
+ log.error("Can't renew the lease", e);
+ shutdownHook.run();
+ doUpdate = false;
+ return;
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ log.error("Interrupted the lease renewal loop", e);
+ }
+ }
+ }
+
+ @Override
+ public void unlock() throws IOException {
+ doUpdate = false;
+ executor.shutdown();
+ try {
+ executor.awaitTermination(1, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } finally {
+ releaseLease();
+ }
+ }
+
+ private void releaseLease() throws IOException {
+ try {
+ blob.releaseLease(AccessCondition.generateLeaseCondition(leaseId));
+ blob.delete();
+ log.info("Released lease {}", leaseId);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveEntry.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveEntry.java
new file mode 100644
index 0000000000..08a1068ba5
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveEntry.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import org.apache.jackrabbit.oak.segment.file.tar.index.IndexEntry;
+
+public class AzureSegmentArchiveEntry implements IndexEntry {
+
+ private final long msb;
+
+ private final long lsb;
+
+ private final int position;
+
+ private final int length;
+
+ private final int generation;
+
+ private final int fullGeneration;
+
+ private final boolean compacted;
+
+ public AzureSegmentArchiveEntry(long msb, long lsb, int position, int length, int generation, int fullGeneration, boolean compacted) {
+ this.msb = msb;
+ this.lsb = lsb;
+ this.position = position;
+ this.length = length;
+ this.generation = generation;
+ this.fullGeneration = fullGeneration;
+ this.compacted = compacted;
+ }
+
+ @Override
+ public long getMsb() {
+ return msb;
+ }
+
+ @Override
+ public long getLsb() {
+ return lsb;
+ }
+
+ @Override
+ public int getPosition() {
+ return position;
+ }
+
+ @Override
+ public int getLength() {
+ return length;
+ }
+
+ @Override
+ public int getGeneration() {
+ return generation;
+ }
+
+ @Override
+ public int getFullGeneration() {
+ return fullGeneration;
+ }
+
+ @Override
+ public boolean isCompacted() {
+ return compacted;
+ }
+}
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java
new file mode 100644
index 0000000000..77dbd07d51
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.google.common.base.Stopwatch;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.getSegmentFileName;
+import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.readBufferFully;
+
+public class AzureSegmentArchiveReader implements SegmentArchiveReader {
+
+ private final CloudBlobDirectory archiveDirectory;
+
+ private final IOMonitor ioMonitor;
+
+ private final FileStoreMonitor monitor;
+
+ private final long length;
+
+ private final Map index = new LinkedHashMap<>();
+
+ private Boolean hasGraph;
+
+ AzureSegmentArchiveReader(CloudBlobDirectory archiveDirectory, IOMonitor ioMonitor, FileStoreMonitor monitor) throws IOException {
+ this.archiveDirectory = archiveDirectory;
+ this.ioMonitor = ioMonitor;
+ this.monitor = monitor;
+ long length = 0;
+ try {
+ for (CloudBlob blob : AzureUtilities.getBlobs(archiveDirectory).collect(Collectors.toList())) {
+ blob.downloadAttributes();
+ Map metadata = blob.getMetadata();
+ if (AzureBlobMetadata.isSegment(metadata)) {
+ AzureSegmentArchiveEntry indexEntry = AzureBlobMetadata.toIndexEntry(metadata, (int) blob.getProperties().getLength());
+ index.put(new UUID(indexEntry.getMsb(), indexEntry.getLsb()), indexEntry);
+ }
+ length += blob.getProperties().getLength();
+ }
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ this.length = length;
+ }
+
+ @Override
+ public ByteBuffer readSegment(long msb, long lsb) throws IOException {
+ AzureSegmentArchiveEntry indexEntry = index.get(new UUID(msb, lsb));
+ ByteBuffer buffer = ByteBuffer.allocate(indexEntry.getLength());
+ ioMonitor.beforeSegmentRead(pathAsFile(), msb, lsb, indexEntry.getLength());
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ readBufferFully(getBlob(getSegmentFileName(indexEntry)), buffer);
+ long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
+ ioMonitor.afterSegmentRead(pathAsFile(), msb, lsb, indexEntry.getLength(), elapsed);
+ return buffer;
+ }
+
+ @Override
+ public boolean containsSegment(long msb, long lsb) {
+ return index.containsKey(new UUID(msb, lsb));
+ }
+
+ @Override
+ public List listSegments() {
+ return new ArrayList<>(index.values());
+ }
+
+ @Override
+ public ByteBuffer getGraph() throws IOException {
+ ByteBuffer graph = readBlob(getName() + ".gph");
+ hasGraph = graph != null;
+ return graph;
+ }
+
+ @Override
+ public boolean hasGraph() {
+ if (hasGraph == null) {
+ try {
+ getGraph();
+ } catch (IOException ignore) { }
+ }
+ return hasGraph;
+ }
+
+ @Override
+ public ByteBuffer getBinaryReferences() throws IOException {
+ return readBlob(getName() + ".brf");
+ }
+
+ @Override
+ public long length() {
+ return length;
+ }
+
+ @Override
+ public String getName() {
+ return AzureUtilities.getName(archiveDirectory);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // do nothing
+ }
+
+ @Override
+ public int getEntrySize(int size) {
+ return size;
+ }
+
+ private File pathAsFile() {
+ return new File(archiveDirectory.getUri().getPath());
+ }
+
+ private CloudBlockBlob getBlob(String name) throws IOException {
+ try {
+ return archiveDirectory.getBlockBlobReference(name);
+ } catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private ByteBuffer readBlob(String name) throws IOException {
+ try {
+ CloudBlockBlob blob = getBlob(name);
+ if (!blob.exists()) {
+ return null;
+ }
+ long length = blob.getProperties().getLength();
+ ByteBuffer buffer = ByteBuffer.allocate((int) length);
+ AzureUtilities.readBufferFully(blob, buffer);
+ return buffer;
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+}
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java
new file mode 100644
index 0000000000..98bdda91e1
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.google.common.base.Stopwatch;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
+import org.apache.jackrabbit.oak.segment.azure.queue.SegmentWriteAction;
+import org.apache.jackrabbit.oak.segment.azure.queue.SegmentWriteQueue;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.getSegmentFileName;
+import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.readBufferFully;
+
+public class AzureSegmentArchiveWriter implements SegmentArchiveWriter {
+
+ private final CloudBlobDirectory archiveDirectory;
+
+ private final IOMonitor ioMonitor;
+
+ private final FileStoreMonitor monitor;
+
+ private final Optional queue;
+
+ private Map index = Collections.synchronizedMap(new LinkedHashMap<>());
+
+ private int entries;
+
+ private long totalLength;
+
+ private volatile boolean created = false;
+
+ public AzureSegmentArchiveWriter(CloudBlobDirectory archiveDirectory, IOMonitor ioMonitor, FileStoreMonitor monitor) {
+ this.archiveDirectory = archiveDirectory;
+ this.ioMonitor = ioMonitor;
+ this.monitor = monitor;
+ this.queue = SegmentWriteQueue.THREADS > 0 ? Optional.of(new SegmentWriteQueue(this::doWriteEntry)) : Optional.empty();
+ }
+
+ @Override
+ public void writeSegment(long msb, long lsb, byte[] data, int offset, int size, int generation, int fullGeneration, boolean compacted) throws IOException {
+ created = true;
+
+ AzureSegmentArchiveEntry entry = new AzureSegmentArchiveEntry(msb, lsb, entries++, size, generation, fullGeneration, compacted);
+ if (queue.isPresent()) {
+ queue.get().addToQueue(entry, data, offset, size);
+ } else {
+ doWriteEntry(entry, data, offset, size);
+ }
+ index.put(new UUID(msb, lsb), entry);
+
+ totalLength += size;
+ monitor.written(size);
+ }
+
+ private void doWriteEntry(AzureSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException {
+ long msb = indexEntry.getMsb();
+ long lsb = indexEntry.getLsb();
+ ioMonitor.beforeSegmentWrite(pathAsFile(), msb, lsb, size);
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ try {
+ CloudBlockBlob blob = getBlob(getSegmentFileName(indexEntry));
+ blob.setMetadata(AzureBlobMetadata.toSegmentMetadata(indexEntry));
+ blob.uploadFromByteArray(data, offset, size);
+ blob.uploadMetadata();
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ ioMonitor.afterSegmentWrite(pathAsFile(), msb, lsb, size, stopwatch.elapsed(TimeUnit.NANOSECONDS));
+ }
+
+ @Override
+ public ByteBuffer readSegment(long msb, long lsb) throws IOException {
+ UUID uuid = new UUID(msb, lsb);
+ Optional segment = queue.map(q -> q.read(uuid));
+ if (segment.isPresent()) {
+ return segment.get().toByteBuffer();
+ }
+ AzureSegmentArchiveEntry indexEntry = index.get(new UUID(msb, lsb));
+ if (indexEntry == null) {
+ return null;
+ }
+ ByteBuffer buffer = ByteBuffer.allocate(indexEntry.getLength());
+ readBufferFully(getBlob(getSegmentFileName(indexEntry)), buffer);
+ return buffer;
+ }
+
+ @Override
+ public boolean containsSegment(long msb, long lsb) {
+ UUID uuid = new UUID(msb, lsb);
+ Optional segment = queue.map(q -> q.read(uuid));
+ if (segment.isPresent()) {
+ return true;
+ }
+ return index.containsKey(new UUID(msb, lsb));
+ }
+
+ @Override
+ public void writeGraph(byte[] data) throws IOException {
+ writeDataFile(data, ".gph");
+ }
+
+ @Override
+ public void writeBinaryReferences(byte[] data) throws IOException {
+ writeDataFile(data, ".brf");
+ }
+
+ private void writeDataFile(byte[] data, String extension) throws IOException {
+ try {
+ getBlob(getName() + extension).uploadFromByteArray(data, 0, data.length);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ totalLength += data.length;
+ monitor.written(data.length);
+ }
+
+ @Override
+ public long getLength() {
+ return totalLength;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (queue.isPresent()) { // required to handle IOException
+ SegmentWriteQueue q = queue.get();
+ q.flush();
+ q.close();
+ }
+ try {
+ getBlob("closed").uploadFromByteArray(new byte[0], 0, 0);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public boolean isCreated() {
+ return created || !queueIsEmpty();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (queue.isPresent()) { // required to handle IOException
+ queue.get().flush();
+ }
+ }
+
+ private boolean queueIsEmpty() {
+ return queue.map(SegmentWriteQueue::isEmpty).orElse(true);
+ }
+
+ @Override
+ public String getName() {
+ return AzureUtilities.getName(archiveDirectory);
+ }
+
+ private File pathAsFile() {
+ return new File(archiveDirectory.getUri().getPath());
+ }
+
+ private CloudBlockBlob getBlob(String name) throws IOException {
+ try {
+ return archiveDirectory.getBlockBlobReference(name);
+ } catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentStoreService.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentStoreService.java
new file mode 100644
index 0000000000..f0ead9e093
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentStoreService.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.ConfigurationPolicy;
+import org.osgi.service.component.annotations.Deactivate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.Properties;
+
+
+@Component(
+ configurationPolicy = ConfigurationPolicy.REQUIRE,
+ configurationPid = {Configuration.PID})
+public class AzureSegmentStoreService {
+
+ private static final Logger log = LoggerFactory.getLogger(AzureSegmentStoreService.class);
+
+ public static final String DEFAULT_CONTAINER_NAME = "oak";
+
+ public static final String DEFAULT_ROOT_PATH = "/oak";
+
+ private ServiceRegistration registration;
+
+ private SegmentNodeStorePersistence persistence;
+
+ @Activate
+ public void activate(ComponentContext context, Configuration config) throws IOException {
+ persistence = createAzurePersistence(config);
+ registration = context.getBundleContext().registerService(SegmentNodeStorePersistence.class.getName(), persistence, new Properties());
+ }
+
+ @Deactivate
+ public void deactivate() throws IOException {
+ if (registration != null) {
+ registration.unregister();
+ registration = null;
+ }
+ persistence = null;
+ }
+
+ private static SegmentNodeStorePersistence createAzurePersistence(Configuration configuration) throws IOException {
+ try {
+ StringBuilder connectionString = new StringBuilder();
+ if (configuration.connectionURL() != null) {
+ connectionString.append(configuration.connectionURL());
+ } else {
+ connectionString.append("DefaultEndpointsProtocol=https;");
+ connectionString.append("AccountName=").append(configuration.accountName()).append(';');
+ connectionString.append("AccountKey=").append(configuration.accessKey()).append(';');
+ }
+ log.info("Connection string: {}", connectionString.toString());
+ CloudStorageAccount cloud = CloudStorageAccount.parse(connectionString.toString());
+ CloudBlobContainer container = cloud.createCloudBlobClient().getContainerReference(configuration.containerName());
+ container.createIfNotExists();
+
+ String path = configuration.rootPath();
+ if (path != null && path.length() > 0 && path.charAt(0) == '/') {
+ path = path.substring(1);
+ }
+
+ AzurePersistence persistence = new AzurePersistence(container.getDirectoryReference(path));
+ return persistence;
+ } catch (StorageException | URISyntaxException | InvalidKeyException e) {
+ throw new IOException(e);
+ }
+ }
+
+}
+
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java
new file mode 100644
index 0000000000..1e5d7f214c
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.file.Paths;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+public final class AzureUtilities {
+
+ public static String SEGMENT_FILE_NAME_PATTERN = "^([0-9a-f]{4})\\.([0-9a-f-]+)$";
+
+ private AzureUtilities() {
+ }
+
+ public static String getSegmentFileName(AzureSegmentArchiveEntry indexEntry) {
+ return getSegmentFileName(indexEntry.getPosition(), indexEntry.getMsb(), indexEntry.getLsb());
+ }
+
+ public static String getSegmentFileName(long offset, long msb, long lsb) {
+ return String.format("%04x.%s", offset, new UUID(msb, lsb).toString());
+ }
+
+ public static String getName(CloudBlob blob) {
+ return Paths.get(blob.getName()).getFileName().toString();
+ }
+
+ public static String getName(CloudBlobDirectory directory) {
+ return Paths.get(directory.getUri().getPath()).getFileName().toString();
+ }
+
+ public static Stream getBlobs(CloudBlobDirectory directory) throws IOException {
+ try {
+ return StreamSupport.stream(directory.listBlobs().spliterator(), false)
+ .filter(i -> i instanceof CloudBlob)
+ .map(i -> (CloudBlob) i);
+ } catch (StorageException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public static long getDirectorySize(CloudBlobDirectory directory) throws IOException {
+ long size = 0;
+ for (CloudBlob b : getBlobs(directory).collect(Collectors.toList())) {
+ try {
+ b.downloadAttributes();
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ size += b.getProperties().getLength();
+ }
+ return size;
+ }
+
+ public static void readBufferFully(CloudBlob blob, ByteBuffer buffer) throws IOException {
+ try {
+ buffer.rewind();
+ long readBytes = blob.downloadToByteArray(buffer.array(), 0);
+ if (buffer.limit() != readBytes) {
+ throw new IOException("Buffer size: " + buffer.limit() + ", read bytes: " + readBytes);
+ }
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/Configuration.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/Configuration.java
new file mode 100644
index 0000000000..34edd746b4
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/Configuration.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+
+import static org.apache.jackrabbit.oak.segment.azure.Configuration.PID;
+
+@ObjectClassDefinition(
+ pid = {PID},
+ name = "Apache Jackrabbit Oak Azure Segment Store Service",
+ description = "Azure backend for the Oak Segment Node Store")
+@interface Configuration {
+
+ String PID = "org.apache.jackrabbit.oak.segment.azure.AzureSegmentStoreService";
+
+ @AttributeDefinition(
+ name = "Azure account name",
+ description = "Name of the Azure Storage account to use.")
+ String accountName();
+
+ @AttributeDefinition(
+ name = "Azure container name",
+ description = "Name of the container to use. If it doesn't exists, it'll be created.")
+ String containerName() default AzureSegmentStoreService.DEFAULT_CONTAINER_NAME;
+
+ @AttributeDefinition(
+ name = "Azure account access key",
+ description = "Access key which should be used to authenticate on the account")
+ String accessKey();
+
+ @AttributeDefinition(
+ name = "Root path",
+ description = "Names of all the created blobs will be prefixed with this path")
+ String rootPath() default AzureSegmentStoreService.DEFAULT_ROOT_PATH;
+
+ @AttributeDefinition(
+ name = "Azure connection URL (optional)",
+ description = "Connection URL to be used to connect to the Azure Storage. " +
+ "Setting it will override the accountName, containerName and accessKey properties.")
+ String connectionURL();
+}
\ No newline at end of file
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/ReverseFileReader.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/ReverseFileReader.java
new file mode 100644
index 0000000000..439fab5f9c
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/ReverseFileReader.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlob;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static java.lang.Math.min;
+
+public class ReverseFileReader {
+
+ private static final int BUFFER_SIZE = 16 * 1024;
+
+ private int bufferSize;
+
+ private final CloudBlob blob;
+
+ private byte[] buffer;
+
+ private int bufferOffset;
+
+ private int fileOffset;
+
+ public ReverseFileReader(CloudBlob blob) throws StorageException {
+ this(blob, BUFFER_SIZE);
+ }
+
+ public ReverseFileReader(CloudBlob blob, int bufferSize) throws StorageException {
+ this.blob = blob;
+ if (blob.exists()) {
+ this.fileOffset = (int) blob.getProperties().getLength();
+ } else {
+ this.fileOffset = 0;
+ }
+ this.bufferSize = bufferSize;
+ }
+
+ private void readBlock() throws IOException {
+ if (buffer == null) {
+ buffer = new byte[min(fileOffset, bufferSize)];
+ } else if (fileOffset < buffer.length) {
+ buffer = new byte[fileOffset];
+ }
+
+ if (buffer.length > 0) {
+ fileOffset -= buffer.length;
+ try {
+ blob.downloadRangeToByteArray(fileOffset, Long.valueOf(buffer.length), buffer, 0);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+ bufferOffset = buffer.length;
+ }
+
+ private String readUntilNewLine() {
+ if (bufferOffset == -1) {
+ return "";
+ }
+ int stop = bufferOffset;
+ while (--bufferOffset >= 0) {
+ if (buffer[bufferOffset] == '\n') {
+ break;
+ }
+ }
+ // bufferOffset points either the previous '\n' character or -1
+ return new String(buffer, bufferOffset + 1, stop - bufferOffset - 1, Charset.defaultCharset());
+ }
+
+ public String readLine() throws IOException {
+ if (bufferOffset == -1 && fileOffset == 0) {
+ return null;
+ }
+
+ if (buffer == null) {
+ readBlock();
+ }
+
+ List result = new ArrayList<>(1);
+ while (true) {
+ result.add(readUntilNewLine());
+ if (bufferOffset > -1) { // stopped on the '\n'
+ break;
+ }
+ if (fileOffset == 0) { // reached the beginning of the file
+ break;
+ }
+ readBlock();
+ }
+ Collections.reverse(result);
+ return String.join("", result);
+ }
+}
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java
new file mode 100644
index 0000000000..b06fdac880
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure.queue;
+
+import org.apache.jackrabbit.oak.segment.azure.AzureSegmentArchiveEntry;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+public class SegmentWriteAction {
+
+ private final AzureSegmentArchiveEntry indexEntry;
+
+ private final byte[] buffer;
+
+ private final int offset;
+
+ private final int length;
+
+ public SegmentWriteAction(AzureSegmentArchiveEntry indexEntry, byte[] buffer, int offset, int length) {
+ this.indexEntry = indexEntry;
+
+ this.buffer = new byte[length];
+ for (int i = 0; i < length; i++) {
+ this.buffer[i] = buffer[i + offset];
+ }
+ this.offset = 0;
+ this.length = length;
+ }
+
+ public UUID getUuid() {
+ return new UUID(indexEntry.getMsb(), indexEntry.getLsb());
+ }
+
+ public ByteBuffer toByteBuffer() {
+ return ByteBuffer.wrap(buffer, offset, length);
+ }
+
+ void passTo(SegmentWriteQueue.SegmentConsumer consumer) throws IOException {
+ consumer.consume(indexEntry, buffer, offset, length);
+ }
+
+ @Override
+ public String toString() {
+ return getUuid().toString();
+ }
+}
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java
new file mode 100644
index 0000000000..86a7dbdf74
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure.queue;
+
+import org.apache.jackrabbit.oak.segment.azure.AzureSegmentArchiveEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class SegmentWriteQueue implements Closeable {
+
+ public static final int THREADS = Integer.getInteger("oak.segment.azure.threads", 5);
+
+ private static final int QUEUE_SIZE = Integer.getInteger("oak.segment.org.apache.jackrabbit.oak.segment.azure.queue", 20);
+
+ private static final Logger log = LoggerFactory.getLogger(SegmentWriteQueue.class);
+
+ private final BlockingDeque queue;
+
+ private final Map segmentsByUUID;
+
+ private final ExecutorService executor;
+
+ private final ReadWriteLock flushLock;
+
+ private final SegmentConsumer writer;
+
+ private volatile boolean shutdown;
+
+ private final Object brokenMonitor = new Object();
+
+ private volatile boolean broken;
+
+ public SegmentWriteQueue(SegmentConsumer writer) {
+ this(writer, QUEUE_SIZE, THREADS);
+ }
+
+ SegmentWriteQueue(SegmentConsumer writer, int queueSize, int threadNo) {
+ this.writer = writer;
+ segmentsByUUID = new ConcurrentHashMap<>();
+ flushLock = new ReentrantReadWriteLock();
+
+ queue = new LinkedBlockingDeque<>(queueSize);
+ executor = Executors.newFixedThreadPool(threadNo + 1);
+ for (int i = 0; i < threadNo; i++) {
+ executor.submit(this::mainLoop);
+ }
+ executor.submit(this::emergencyLoop);
+ }
+
+ private void mainLoop() {
+ while (!shutdown) {
+ try {
+ waitWhileBroken();
+ if (shutdown) {
+ break;
+ }
+ consume();
+ } catch (SegmentConsumeException e) {
+ SegmentWriteAction segment = e.segment;
+ log.error("Can't persist the segment {}", segment.getUuid(), e.getCause());
+ try {
+ queue.put(segment);
+ } catch (InterruptedException e1) {
+ log.error("Can't re-add the segment {} to the queue. It'll be dropped.", segment.getUuid(), e1);
+ }
+ }
+ }
+ }
+
+ private void consume() throws SegmentConsumeException {
+ SegmentWriteAction segment = null;
+ try {
+ segment = queue.poll(100, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ log.error("Poll from queue interrupted", e);
+ }
+ if (segment != null) {
+ consume(segment);
+ }
+ }
+
+ private void consume(SegmentWriteAction segment) throws SegmentConsumeException {
+ try {
+ segment.passTo(writer);
+ } catch (IOException e) {
+ setBroken(true);
+ throw new SegmentConsumeException(segment, e);
+ }
+ synchronized (segmentsByUUID) {
+ segmentsByUUID.remove(segment.getUuid());
+ segmentsByUUID.notifyAll();
+ }
+ setBroken(false);
+ }
+
+ private void emergencyLoop() {
+ while (!shutdown) {
+ waitUntilBroken();
+ if (shutdown) {
+ break;
+ }
+
+ boolean success = false;
+ SegmentWriteAction segmentToRetry = null;
+ do {
+ try {
+ if (segmentToRetry == null) {
+ consume();
+ } else {
+ consume(segmentToRetry);
+ }
+ success = true;
+ } catch (SegmentConsumeException e) {
+ segmentToRetry = e.segment;
+ log.error("Can't persist the segment {}", segmentToRetry.getUuid(), e.getCause());
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e1) {
+ log.warn("Interrupted", e);
+ }
+ if (shutdown) {
+ log.error("Shutdown initiated. The segment {} will be dropped.", segmentToRetry.getUuid());
+ }
+ }
+ } while (!success && !shutdown);
+ }
+ }
+
+ public void addToQueue(AzureSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException {
+ waitWhileBroken();
+ if (shutdown) {
+ throw new IllegalStateException("Can't accept the new segment - shutdown in progress");
+ }
+
+ SegmentWriteAction action = new SegmentWriteAction(indexEntry, data, offset, size);
+ flushLock.readLock().lock();
+ try {
+ segmentsByUUID.put(action.getUuid(), action);
+ if (!queue.offer(action, 1, TimeUnit.MINUTES)) {
+ segmentsByUUID.remove(action.getUuid());
+ throw new IOException("Can't add segment to the queue");
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } finally {
+ flushLock.readLock().unlock();
+ }
+ }
+
+ public void flush() throws IOException {
+ flushLock.writeLock().lock();
+ try {
+ synchronized (segmentsByUUID) {
+ long start = System.currentTimeMillis();
+ while (!segmentsByUUID.isEmpty()) {
+ segmentsByUUID.wait(100);
+ if (System.currentTimeMillis() - start > TimeUnit.MINUTES.toMillis(1)) {
+ log.error("Can't flush the queue in 1 minute. Queue: {}. Segment map: {}", queue, segmentsByUUID);
+ start = System.currentTimeMillis();
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } finally {
+ flushLock.writeLock().unlock();
+ }
+ }
+
+ public SegmentWriteAction read(UUID id) {
+ return segmentsByUUID.get(id);
+ }
+
+ public void close() throws IOException {
+ shutdown = true;
+ try {
+ executor.shutdown();
+ if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
+ throw new IOException("The write wasn't able to shut down clearly");
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public boolean isEmpty() {
+ return segmentsByUUID.isEmpty();
+ }
+
+ boolean isBroken() {
+ return broken;
+ }
+
+ int getSize() {
+ return queue.size();
+ }
+
+ private void setBroken(boolean broken) {
+ synchronized (brokenMonitor) {
+ this.broken = broken;
+ brokenMonitor.notifyAll();
+ }
+ }
+
+ private void waitWhileBroken() {
+ if (!broken) {
+ return;
+ }
+ synchronized (brokenMonitor) {
+ while (broken && !shutdown) {
+ try {
+ brokenMonitor.wait(100);
+ } catch (InterruptedException e) {
+ log.warn("Interrupted", e);
+ }
+ }
+ }
+ }
+
+ private void waitUntilBroken() {
+ if (broken) {
+ return;
+ }
+ synchronized (brokenMonitor) {
+ while (!broken && !shutdown) {
+ try {
+ brokenMonitor.wait(100);
+ } catch (InterruptedException e) {
+ log.warn("Interrupted", e);
+ }
+ }
+ }
+ }
+
+ public interface SegmentConsumer {
+
+ void consume(AzureSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException;
+
+ }
+
+ public static class SegmentConsumeException extends Exception {
+
+ private final SegmentWriteAction segment;
+
+ public SegmentConsumeException(SegmentWriteAction segment, IOException cause) {
+ super(cause);
+ this.segment = segment;
+ }
+ }
+}
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java
new file mode 100644
index 0000000000..579e6acf74
--- /dev/null
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.UUID;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static org.junit.Assert.assertEquals;
+
+public class AzureArchiveManagerTest {
+
+ @ClassRule
+ public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+ private CloudBlobContainer container;
+
+ @Before
+ public void setup() throws StorageException, InvalidKeyException, URISyntaxException {
+ container = azurite.getContainer("oak-test");
+ }
+
+ @Test
+ public void testRecovery() throws StorageException, URISyntaxException, IOException {
+ SegmentArchiveManager manager = new AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(false, new IOMonitorAdapter(), new FileStoreMonitorAdapter());
+ SegmentArchiveWriter writer = manager.create("data00000a.tar");
+
+ List uuids = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ UUID u = UUID.randomUUID();
+ writer.writeSegment(u.getMostSignificantBits(), u.getLeastSignificantBits(), new byte[10], 0, 10, 0, 0, false);
+ uuids.add(u);
+ }
+
+ writer.flush();
+ writer.close();
+
+ container.getBlockBlobReference("oak/data00000a.tar/0005." + uuids.get(5).toString()).delete();
+
+ LinkedHashMap recovered = new LinkedHashMap<>();
+ manager.recoverEntries("data00000a.tar", recovered);
+ assertEquals(uuids.subList(0, 5), newArrayList(recovered.keySet()));
+ }
+
+}
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalTest.java
new file mode 100644
index 0000000000..6db6a1da8a
--- /dev/null
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalTest.java
@@ -0,0 +1,45 @@
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.apache.jackrabbit.oak.segment.file.GcJournalTest;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+
+public class AzureGCJournalTest extends GcJournalTest {
+
+ @ClassRule
+ public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+ private CloudBlobContainer container;
+
+ @Before
+ public void setup() throws StorageException, InvalidKeyException, URISyntaxException {
+ container = azurite.getContainer("oak-test");
+ }
+
+ @Override
+ protected SegmentNodeStorePersistence getPersistence() throws Exception {
+ return new AzurePersistence(container.getDirectoryReference("oak"));
+ }
+
+ @Test
+ @Ignore
+ @Override
+ public void testReadOak16GCLog() throws Exception {
+ super.testReadOak16GCLog();
+ }
+
+ @Test
+ @Ignore
+ @Override
+ public void testUpdateOak16GCLog() throws Exception {
+ super.testUpdateOak16GCLog();
+ }
+}
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java
new file mode 100644
index 0000000000..828cf48b6b
--- /dev/null
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader;
+import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class AzureJournalFileTest {
+
+ @ClassRule
+ public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+ private CloudBlobContainer container;
+
+ private AzureJournalFile journal;
+
+ @Before
+ public void setup() throws StorageException, InvalidKeyException, URISyntaxException {
+ container = azurite.getContainer("oak-test");
+ journal = new AzureJournalFile(container.getDirectoryReference("journal"), "journal.log", 10);
+ }
+
+ @Test
+ public void testSplitJournalFiles() throws IOException {
+ assertFalse(journal.exists());
+
+ JournalFileWriter writer = journal.openJournalWriter();
+ for (int i = 0; i < 100; i++) {
+ writer.writeLine("line " + i);
+ }
+
+ assertTrue(journal.exists());
+
+ writer = journal.openJournalWriter();
+ for (int i = 100; i < 200; i++) {
+ writer.writeLine("line " + i);
+ }
+
+ JournalFileReader reader = journal.openJournalReader();
+ for (int i = 199; i >= 0; i--) {
+ assertEquals("line " + i, reader.readLine());
+ }
+
+
+ }
+
+}
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFileTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFileTest.java
new file mode 100644
index 0000000000..89aad51a2e
--- /dev/null
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFileTest.java
@@ -0,0 +1,44 @@
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.spi.persistence.ManifestFile;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class AzureManifestFileTest {
+
+ @ClassRule
+ public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+ private CloudBlobContainer container;
+
+ @Before
+ public void setup() throws StorageException, InvalidKeyException, URISyntaxException {
+ container = azurite.getContainer("oak-test");
+ }
+
+ @Test
+ public void testManifest() throws URISyntaxException, IOException {
+ ManifestFile manifestFile = new AzurePersistence(container.getDirectoryReference("oak")).getManifestFile();
+ assertFalse(manifestFile.exists());
+
+ Properties props = new Properties();
+ props.setProperty("xyz", "abc");
+ props.setProperty("version", "123");
+ manifestFile.save(props);
+
+ Properties loaded = manifestFile.load();
+ assertEquals(props, loaded);
+ }
+
+}
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java
new file mode 100644
index 0000000000..bba4713c9f
--- /dev/null
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.file.tar.TarFileTest;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+
+public class AzureTarFileTest extends TarFileTest {
+
+ @ClassRule
+ public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+ private CloudBlobContainer container;
+
+ @Before
+ @Override
+ public void setUp() throws IOException {
+ try {
+ container = azurite.getContainer("oak-test");
+ archiveManager = new AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(true, new IOMonitorAdapter(), new FileStoreMonitorAdapter());
+ } catch (StorageException | InvalidKeyException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected long getWriteAndReadExpectedSize() {
+ return 45;
+ }
+
+ @Test
+ @Ignore
+ @Override
+ public void graphShouldBeTrimmedDownOnSweep() throws Exception {
+ super.graphShouldBeTrimmedDownOnSweep();
+ }
+}
\ No newline at end of file
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java
new file mode 100644
index 0000000000..eb9462906b
--- /dev/null
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.file.tar.TarFiles;
+import org.apache.jackrabbit.oak.segment.file.tar.TarFilesTest;
+import org.junit.Before;
+import org.junit.ClassRule;
+
+public class AzureTarFilesTest extends TarFilesTest {
+
+ @ClassRule
+ public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+ private CloudBlobContainer container;
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ container = azurite.getContainer("oak-test");
+ tarFiles = TarFiles.builder()
+ .withDirectory(folder.newFolder())
+ .withTarRecovery((id, data, recovery) -> {
+ // Intentionally left blank
+ })
+ .withIOMonitor(new IOMonitorAdapter())
+ .withFileStoreMonitor(new FileStoreMonitorAdapter())
+ .withMaxFileSize(MAX_FILE_SIZE)
+ .withPersistence(new AzurePersistence(container.getDirectoryReference("oak")))
+ .build();
+ }
+}
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java
new file mode 100644
index 0000000000..ab4ee852c2
--- /dev/null
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.file.tar.TarWriterTest;
+import org.junit.Before;
+import org.junit.ClassRule;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+
+public class AzureTarWriterTest extends TarWriterTest {
+
+ @ClassRule
+ public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+ private CloudBlobContainer container;
+
+ @Before
+ @Override
+ public void setUp() throws IOException {
+ try {
+ monitor = new TestFileStoreMonitor();
+ container = azurite.getContainer("oak-test");
+ archiveManager = new AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(true, new IOMonitorAdapter(), monitor);
+ } catch (StorageException | InvalidKeyException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzuriteDockerRule.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzuriteDockerRule.java
new file mode 100644
index 0000000000..7e957d5fb3
--- /dev/null
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzuriteDockerRule.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.arakelian.docker.junit.DockerRule;
+import com.arakelian.docker.junit.model.ImmutableDockerConfig;
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.spotify.docker.client.DefaultDockerClient;
+import org.junit.Assume;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+
+public class AzuriteDockerRule implements TestRule {
+
+ private final DockerRule wrappedRule;
+
+ public AzuriteDockerRule() {
+ wrappedRule = new DockerRule(ImmutableDockerConfig.builder()
+ .image("trekawek/azurite")
+ .name("oak-test-azurite")
+ .ports("10000")
+ .addStartedListener(container -> {
+ container.waitForPort("10000/tcp");
+ container.waitForLog("Azure Blob Storage Emulator listening on port 10000");
+ })
+ .addContainerConfigurer(builder -> builder.env("executable=blob"))
+ .alwaysRemoveContainer(true)
+ .build());
+
+ }
+
+ public CloudBlobContainer getContainer(String name) throws URISyntaxException, StorageException, InvalidKeyException {
+ int mappedPort = wrappedRule.getContainer().getPortBinding("10000/tcp").getPort();
+ CloudStorageAccount cloud = CloudStorageAccount.parse("DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:" + mappedPort + "/devstoreaccount1;");
+ CloudBlobContainer container = cloud.createCloudBlobClient().getContainerReference(name);
+ container.deleteIfExists();
+ container.create();
+ return container;
+ }
+
+ @Override
+ public Statement apply(Statement statement, Description description) {
+ try {
+ DefaultDockerClient client = DefaultDockerClient.fromEnv().connectTimeoutMillis(5000L).readTimeoutMillis(20000L).build();
+ client.ping();
+ client.close();
+ } catch (Exception e) {
+ Assume.assumeNoException(e);
+ }
+
+ return wrappedRule.apply(statement, description);
+ }
+}
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/fixture/SegmentAzureFixture.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/fixture/SegmentAzureFixture.java
new file mode 100644
index 0000000000..78f8fbcfd6
--- /dev/null
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/fixture/SegmentAzureFixture.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.azure.fixture;
+
+import com.google.common.io.Files;
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
+import org.apache.jackrabbit.oak.fixture.NodeStoreFixture;
+import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders;
+import org.apache.jackrabbit.oak.segment.azure.AzurePersistence;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder;
+import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SegmentAzureFixture extends NodeStoreFixture {
+
+ private static final String AZURE_CONNECTION_STRING = System.getProperty("oak.segment.azure.connection", "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;");
+
+ private static final String AZURE_CONTAINER = System.getProperty("oak.segment.azure.container", "oak");
+
+ private static final String AZURE_ROOT_PATH = System.getProperty("oak.segment.azure.rootPath", "/oak");
+
+ private Map fileStoreMap = new HashMap<>();
+
+ private Map containerMap = new HashMap<>();
+
+ @Override
+ public NodeStore createNodeStore() {
+ AzurePersistence persistence;
+ CloudBlobContainer container;
+ try {
+ CloudStorageAccount cloud = CloudStorageAccount.parse(AZURE_CONNECTION_STRING);
+
+ int i = 1;
+ while (true) {
+ String containerName;
+ if (i == 1) {
+ containerName = AZURE_CONTAINER;
+ } else {
+ containerName = AZURE_CONTAINER + "_" + i;
+ }
+ container = cloud.createCloudBlobClient().getContainerReference(containerName);
+ if (!container.exists()) {
+ container.create();
+ break;
+ }
+ i++;
+ }
+ CloudBlobDirectory directory = container.getDirectoryReference(AZURE_ROOT_PATH);
+ persistence = new AzurePersistence(directory);
+ } catch (StorageException | URISyntaxException | InvalidKeyException e) {
+ throw new RuntimeException(e);
+ }
+
+ try {
+ FileStore fileStore = FileStoreBuilder.fileStoreBuilder(Files.createTempDir()).withCustomPersistence(persistence).build();
+ NodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
+ fileStoreMap.put(nodeStore, fileStore);
+ containerMap.put(nodeStore, container);
+ return nodeStore;
+ } catch (IOException | InvalidFileStoreVersionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void dispose(NodeStore nodeStore) {
+ FileStore fs = fileStoreMap.remove(nodeStore);
+ if (fs != null) {
+ fs.close();
+ }
+ try {
+ CloudBlobContainer container = containerMap.remove(nodeStore);
+ if (container != null) {
+ container.deleteIfExists();
+ }
+ } catch (StorageException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "SegmentAzure";
+ }
+}
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java
new file mode 100644
index 0000000000..fcf766d1a3
--- /dev/null
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure.journal;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudAppendBlob;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.azure.AzuriteDockerRule;
+import org.apache.jackrabbit.oak.segment.file.JournalReader;
+import org.apache.jackrabbit.oak.segment.file.JournalReaderTest;
+import org.apache.jackrabbit.oak.segment.azure.AzureJournalFile;
+import org.junit.Before;
+import org.junit.ClassRule;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+
+public class AzureJournalReaderTest extends JournalReaderTest {
+
+ @ClassRule
+ public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+ private CloudBlobContainer container;
+
+ @Before
+ public void setup() throws StorageException, InvalidKeyException, URISyntaxException {
+ container = azurite.getContainer("oak-test");
+ }
+
+ protected JournalReader createJournalReader(String s) throws IOException {
+ try {
+ CloudAppendBlob blob = container.getAppendBlobReference("journal/journal.log.001");
+ blob.createOrReplace();
+ blob.appendText(s);
+ return new JournalReader(new AzureJournalFile(container.getDirectoryReference("journal"), "journal.log"));
+ } catch (StorageException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureTarRevisionsTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureTarRevisionsTest.java
new file mode 100644
index 0000000000..910afe2dbc
--- /dev/null
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureTarRevisionsTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure.journal;
+
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.apache.jackrabbit.oak.segment.azure.AzuriteDockerRule;
+import org.apache.jackrabbit.oak.segment.file.TarRevisionsTest;
+import org.apache.jackrabbit.oak.segment.azure.AzurePersistence;
+import org.junit.Before;
+import org.junit.ClassRule;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+public class AzureTarRevisionsTest extends TarRevisionsTest {
+
+ @ClassRule
+ public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+ private CloudBlobContainer container;
+
+ @Before
+ public void setup() throws Exception {
+ container = azurite.getContainer("oak-test");
+ super.setup();
+ }
+
+ @Override
+ protected SegmentNodeStorePersistence getPersistence() throws IOException {
+ try {
+ return new AzurePersistence(container.getDirectoryReference("oak"));
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/ReverseFileReaderTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/ReverseFileReaderTest.java
new file mode 100644
index 0000000000..f3d1425a60
--- /dev/null
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/ReverseFileReaderTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure.journal;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudAppendBlob;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.azure.AzuriteDockerRule;
+import org.apache.jackrabbit.oak.segment.azure.ReverseFileReader;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+public class ReverseFileReaderTest {
+
+ @ClassRule
+ public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+ private CloudBlobContainer container;
+
+ @Before
+ public void setup() throws StorageException, InvalidKeyException, URISyntaxException {
+ container = azurite.getContainer("oak-test");
+ getBlob().createOrReplace();
+ }
+
+ private CloudAppendBlob getBlob() throws URISyntaxException, StorageException {
+ return container.getAppendBlobReference("test-blob");
+ }
+
+ @Test
+ public void testReverseReader() throws IOException, URISyntaxException, StorageException {
+ List entries = createFile( 1024, 80);
+ ReverseFileReader reader = new ReverseFileReader(getBlob(), 256);
+ assertEquals(entries, reader);
+ }
+
+ @Test
+ public void testEmptyFile() throws IOException, URISyntaxException, StorageException {
+ List entries = createFile( 0, 80);
+ ReverseFileReader reader = new ReverseFileReader(getBlob(), 256);
+ assertEquals(entries, reader);
+ }
+
+ @Test
+ public void test1ByteBlock() throws IOException, URISyntaxException, StorageException {
+ List entries = createFile( 10, 16);
+ ReverseFileReader reader = new ReverseFileReader(getBlob(), 1);
+ assertEquals(entries, reader);
+ }
+
+
+ private List createFile(int lines, int maxLineLength) throws IOException, URISyntaxException, StorageException {
+ Random random = new Random();
+ List entries = new ArrayList<>();
+ CloudAppendBlob blob = getBlob();
+ for (int i = 0; i < lines; i++) {
+ int entrySize = random.nextInt(maxLineLength) + 1;
+ String entry = randomString(entrySize);
+ try {
+ blob.appendText(entry + '\n');
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ entries.add(entry);
+ }
+
+ entries.add("");
+ Collections.reverse(entries);
+ return entries;
+ }
+
+ private static void assertEquals(List entries, ReverseFileReader reader) throws IOException {
+ int i = entries.size();
+ for (String e : entries) {
+ Assert.assertEquals("line " + (--i), e, reader.readLine());
+ }
+ Assert.assertNull(reader.readLine());
+ }
+
+ private static String randomString(int entrySize) {
+ Random r = new Random();
+
+ StringBuilder result = new StringBuilder();
+ for (int i = 0; i < entrySize; i++) {
+ result.append((char) ('a' + r.nextInt('z' - 'a')));
+ }
+
+ return result.toString();
+ }
+}
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java
new file mode 100644
index 0000000000..54c8329e04
--- /dev/null
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure.queue;
+
+import org.apache.jackrabbit.oak.segment.azure.AzureSegmentArchiveEntry;
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class SegmentWriteQueueTest {
+
+ private static final byte[] EMPTY_DATA = new byte[0];
+
+ private SegmentWriteQueue queue;
+
+ @After
+ public void shutdown() throws IOException {
+ if (queue != null) {
+ queue.close();
+ }
+ }
+
+ @Test
+ public void testQueue() throws IOException, InterruptedException {
+ Set added = Collections.synchronizedSet(new HashSet<>());
+ Semaphore semaphore = new Semaphore(0);
+ queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> {
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ }
+ added.add(new UUID(tarEntry.getMsb(), tarEntry.getLsb()));
+ });
+
+ for (int i = 0; i < 10; i++) {
+ queue.addToQueue(tarEntry(i), EMPTY_DATA, 0, 0);
+ }
+
+ for (int i = 0; i < 10; i++) {
+ assertNotNull("Segments should be available for read", queue.read(uuid(i)));
+ }
+ assertFalse("Queue shouldn't be empty", queue.isEmpty());
+
+ semaphore.release(Integer.MAX_VALUE);
+ while (!queue.isEmpty()) {
+ Thread.sleep(10);
+ }
+
+ assertEquals("There should be 10 segments consumed",10, added.size());
+ for (int i = 0; i < 10; i++) {
+ assertTrue("Missing consumed segment", added.contains(uuid(i)));
+ }
+ }
+
+ @Test(timeout = 1000)
+ public void testFlush() throws IOException, InterruptedException {
+ Set added = Collections.synchronizedSet(new HashSet<>());
+ Semaphore semaphore = new Semaphore(0);
+ queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> {
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ }
+ added.add(new UUID(tarEntry.getMsb(), tarEntry.getLsb()));
+ });
+
+ for (int i = 0; i < 3; i++) {
+ queue.addToQueue(tarEntry(i), EMPTY_DATA, 0, 0);
+ }
+
+ AtomicBoolean flushFinished = new AtomicBoolean(false);
+ Set addedAfterFlush = new HashSet<>();
+ new Thread(() -> {
+ try {
+ queue.flush();
+ flushFinished.set(true);
+ addedAfterFlush.addAll(added);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }).start();
+
+ Thread.sleep(100);
+ assertFalse("Flush should be blocked", flushFinished.get());
+
+ AtomicBoolean addFinished = new AtomicBoolean(false);
+ new Thread(() -> {
+ try {
+ queue.addToQueue(tarEntry(10), EMPTY_DATA, 0, 0);
+ addFinished.set(true);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }).start();
+
+ Thread.sleep(100);
+ assertFalse("Adding segments should be blocked until the flush is finished", addFinished.get());
+
+ semaphore.release(Integer.MAX_VALUE);
+
+ while (!addFinished.get()) {
+ Thread.sleep(10);
+ }
+ assertTrue("Flush should be finished once the ", flushFinished.get());
+ assertTrue("Adding segments should be blocked until the flush is finished", addFinished.get());
+
+ for (int i = 0; i < 3; i++) {
+ assertTrue(addedAfterFlush.contains(uuid(i)));
+ }
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testClose() throws IOException, InterruptedException {
+ queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> {});
+ queue.close();
+ queue.addToQueue(tarEntry(10), EMPTY_DATA, 0, 0);
+ }
+
+ @Test
+ public void testRecoveryMode() throws IOException, InterruptedException {
+ Set added = Collections.synchronizedSet(new HashSet<>());
+ Semaphore semaphore = new Semaphore(0);
+ AtomicBoolean doBreak = new AtomicBoolean(true);
+ List writeAttempts = Collections.synchronizedList(new ArrayList<>());
+ queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> {
+ writeAttempts.add(System.currentTimeMillis());
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ }
+ if (doBreak.get()) {
+ throw new IOException();
+ }
+ added.add(new UUID(tarEntry.getMsb(), tarEntry.getLsb()));
+ });
+
+ for (int i = 0; i < 10; i++) {
+ queue.addToQueue(tarEntry(i), EMPTY_DATA, 0, 0);
+ }
+
+ semaphore.release(Integer.MAX_VALUE);
+ Thread.sleep(100);
+
+ assertTrue(queue.isBroken());
+ assertEquals(9, queue.getSize()); // the 10th segment is handled by the recovery thread
+
+ writeAttempts.clear();
+ while (writeAttempts.size() < 5) {
+ Thread.sleep(100);
+ }
+ long lastAttempt = writeAttempts.get(0);
+ for (int i = 1; i < 5; i++) {
+ long delay = writeAttempts.get(i) - lastAttempt;
+ assertTrue("The delay between attempts to persist segment should be larger than 1s. Actual: " + delay, delay >= 1000);
+ lastAttempt = writeAttempts.get(i);
+ }
+
+ AtomicBoolean addFinished = new AtomicBoolean(false);
+ new Thread(() -> {
+ try {
+ queue.addToQueue(tarEntry(10), EMPTY_DATA, 0, 0);
+ addFinished.set(true);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }).start();
+
+ Thread.sleep(100);
+ assertFalse("Adding segments should be blocked until the recovery mode is finished", addFinished.get());
+
+ doBreak.set(false);
+ while (queue.isBroken()) {
+ Thread.sleep(10);
+ }
+ assertFalse("Queue shouldn't be broken anymore", queue.isBroken());
+
+ while (added.size() < 11) {
+ Thread.sleep(10);
+ }
+ assertEquals("All segments should be consumed",11, added.size());
+ for (int i = 0; i < 11; i++) {
+ assertTrue("All segments should be consumed", added.contains(uuid(i)));
+ }
+
+ int i = writeAttempts.size() - 10;
+ lastAttempt = writeAttempts.get(i);
+ for (; i < writeAttempts.size(); i++) {
+ long delay = writeAttempts.get(i) - lastAttempt;
+ assertTrue("Segments should be persisted immediately", delay < 1000);
+ lastAttempt = writeAttempts.get(i);
+ }
+ }
+
+ private static AzureSegmentArchiveEntry tarEntry(long i) {
+ return new AzureSegmentArchiveEntry(0, i, 0, 0, 0, 0, false);
+ }
+
+ private static UUID uuid(long i) {
+ return new UUID(0, i);
+ }
+
+}
diff --git a/oak-segment-azure/start-azurite.sh b/oak-segment-azure/start-azurite.sh
new file mode 100755
index 0000000000..06e5e61674
--- /dev/null
+++ b/oak-segment-azure/start-azurite.sh
@@ -0,0 +1,3 @@
+#!/bin/bash
+
+docker run -e executable=blob --rm -t -p 10000:10000 trekawek/azurite
diff --git a/pom.xml b/pom.xml
index 6443bc6f36..4ecf979ea7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,6 +70,7 @@
oak-examples
oak-it
oak-segment-tar
+ oak-segment-azure
oak-benchmarks