diff --git a/oak-segment-azure/pom.xml b/oak-segment-azure/pom.xml
new file mode 100644
index 0000000000..beca0095ca
--- /dev/null
+++ b/oak-segment-azure/pom.xml
@@ -0,0 +1,152 @@
+
+
+
+
+
+ 4.0.0
+
+
+ org.apache.jackrabbit
+ oak-parent
+ 1.10-SNAPSHOT
+ ../oak-parent/pom.xml
+
+
+ oak-segment-azure
+ bundle
+
+ Oak Segment Azure
+
+
+
+
+ org.apache.felix
+ maven-bundle-plugin
+
+
+
+ org.apache.jackrabbit.oak.segment.azure
+
+
+ azure-storage,
+ azure-keyvault-core
+
+
+
+
+
+ baseline
+
+ baseline
+
+ pre-integration-test
+
+
+ true
+
+
+
+
+
+
+
+
+
+
+
+
+
+ org.osgi
+ org.osgi.core
+ provided
+
+
+ org.osgi
+ org.osgi.compendium
+ provided
+
+
+ org.apache.felix
+ org.apache.felix.scr.annotations
+ provided
+
+
+
+
+ org.apache.jackrabbit
+ oak-segment-tar
+ ${project.version}
+ provided
+
+
+ org.apache.jackrabbit
+ oak-store-spi
+ ${project.version}
+ provided
+
+
+
+
+ com.microsoft.azure
+ azure-storage
+ 5.0.0
+
+
+ com.microsoft.azure
+ azure-keyvault-core
+ 0.9.7
+
+
+
+
+ org.apache.jackrabbit
+ oak-segment-tar
+ ${project.version}
+ tests
+ test
+
+
+ org.apache.jackrabbit
+ oak-store-spi
+ ${project.version}
+ test-jar
+ test
+
+
+ org.mockito
+ mockito-core
+ 1.10.19
+ test
+
+
+ junit
+ junit
+ test
+
+
+ com.arakelian
+ docker-junit-rule
+ 2.1.0
+ test
+
+
+
+
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java
new file mode 100644
index 0000000000..7c8058d6d9
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.BlobListingDetails;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import com.microsoft.azure.storage.blob.CopyStatus;
+import org.apache.jackrabbit.oak.segment.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.file.tar.FileStoreMonitor;
+import org.apache.jackrabbit.oak.segment.file.tar.IOMonitor;
+import org.apache.jackrabbit.oak.segment.file.tar.index.Index;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.getName;
+
+public class AzureArchiveManager implements SegmentArchiveManager {
+
+ private static final Logger log = LoggerFactory.getLogger(AzureSegmentArchiveReader.class);
+
+ private final CloudBlobDirectory cloudBlobDirectory;
+
+ private final IOMonitor ioMonitor;
+
+ private final FileStoreMonitor monitor;
+
+ public AzureArchiveManager(CloudBlobDirectory cloudBlobDirectory, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor) {
+ this.cloudBlobDirectory = cloudBlobDirectory;
+ this.ioMonitor = ioMonitor;
+ this.monitor = fileStoreMonitor;
+ }
+
+ @Override
+ public List listArchives() throws IOException {
+ try {
+ return StreamSupport.stream(cloudBlobDirectory
+ .listBlobs(null, false, EnumSet.noneOf(BlobListingDetails.class), null, null)
+ .spliterator(), false)
+ .filter(i -> i instanceof CloudBlobDirectory)
+ .map(i -> (CloudBlobDirectory) i)
+ .map(CloudBlobDirectory::getPrefix)
+ .map(Paths::get)
+ .map(Path::getFileName)
+ .map(Path::toString)
+ .collect(Collectors.toList());
+ } catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public SegmentArchiveReader open(String archiveName) throws IOException {
+ CloudBlobDirectory archiveDirectory = getDirectory(archiveName);
+ Index index = AzureSegmentArchiveReader.loadAndValidateIndex(archiveDirectory);
+ if (index == null) {
+ return null;
+ }
+ return new AzureSegmentArchiveReader(archiveDirectory, ioMonitor, monitor, index);
+ }
+
+ @Override
+ public SegmentArchiveWriter create(String archiveName) throws IOException {
+ return new AzureSegmentArchiveWriter(getDirectory(archiveName), ioMonitor, monitor);
+ }
+
+ @Override
+ public boolean delete(String archiveName) {
+ try {
+ getBlobs(archiveName)
+ .forEach(cloudBlob -> {
+ try {
+ cloudBlob.delete();
+ } catch (StorageException e) {
+ log.error("Can't delete segment {}", cloudBlob.getUri().getPath(), e);
+ }
+ });
+ return true;
+ } catch (IOException e) {
+ log.error("Can't delete archive {}", archiveName, e);
+ return false;
+ }
+ }
+
+ @Override
+ public boolean renameTo(String from, String to) {
+ try {
+ CloudBlobDirectory targetDirectory = getDirectory(to);
+ getBlobs(from)
+ .forEach(cloudBlob -> {
+ try {
+ renameBlob(cloudBlob, targetDirectory);
+ } catch (IOException e) {
+ log.error("Can't rename segment {}", cloudBlob.getUri().getPath(), e);
+ }
+ });
+ return true;
+ } catch (IOException e) {
+ log.error("Can't rename archive {} to {}", from, to, e);
+ return false;
+ }
+ }
+
+ @Override
+ public void copyFile(String from, String to) throws IOException {
+ CloudBlobDirectory targetDirectory = getDirectory(to);
+ getBlobs(from)
+ .forEach(cloudBlob -> {
+ try {
+ copyBlob(cloudBlob, targetDirectory);
+ } catch (IOException e) {
+ log.error("Can't copy segment {}", cloudBlob.getUri().getPath(), e);
+ }
+ });
+ }
+
+ @Override
+ public boolean exists(String archiveName) {
+ try {
+ return listArchives().contains(archiveName);
+ } catch (IOException e) {
+ log.error("Can't check the existence of {}", archiveName, e);
+ return false;
+ }
+ }
+
+ @Override
+ public void recoverEntries(String archiveName, LinkedHashMap entries) throws IOException {
+ Pattern pattern = Pattern.compile(AzureUtilities.SEGMENT_FILE_NAME_PATTERN);
+ List entryList = new ArrayList<>();
+
+ for (CloudBlob b : getBlobList(archiveName)) {
+ String name = getName(b);
+ Matcher m = pattern.matcher(name);
+ if (!m.matches()) {
+ continue;
+ }
+ int position = Integer.parseInt(m.group(1), 16);
+ UUID uuid = UUID.fromString(m.group(2));
+ long length = b.getProperties().getLength();
+ if (length > 0) {
+ byte[] data = new byte[(int) length];
+ try {
+ b.downloadToByteArray(data, 0);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ entryList.add(new RecoveredEntry(position, uuid, data, name));
+ }
+ }
+ Collections.sort(entryList);
+
+ int i = 0;
+ for (RecoveredEntry e : entryList) {
+ if (e.position != i) {
+ log.warn("Missing entry {}.??? when recovering {}. No more segments will be read.", String.format("%04X", i), archiveName);
+ break;
+ }
+ log.info("Recovering segment {}/{}", archiveName, e.fileName);
+ entries.put(e.uuid, e.data);
+ i++;
+ }
+ }
+
+
+ private CloudBlobDirectory getDirectory(String archiveName) throws IOException {
+ try {
+ return cloudBlobDirectory.getDirectoryReference(archiveName);
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private Stream getBlobs(String archiveName) throws IOException {
+ return AzureUtilities.getBlobs(getDirectory(archiveName));
+ }
+
+ private List getBlobList(String archiveName) throws IOException {
+ return getBlobs(archiveName).collect(Collectors.toList());
+ }
+
+ private void renameBlob(CloudBlob blob, CloudBlobDirectory newParent) throws IOException {
+ copyBlob(blob, newParent);
+ try {
+ blob.delete();
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private void copyBlob(CloudBlob blob, CloudBlobDirectory newParent) throws IOException {
+ checkArgument(blob instanceof CloudBlockBlob, "Only page blobs are supported for the rename");
+ try {
+ String blobName = getName(blob);
+ CloudBlockBlob newBlob = newParent.getBlockBlobReference(blobName);
+ newBlob.startCopy(blob.getUri());
+ while (newBlob.getCopyState().getStatus() == CopyStatus.PENDING) {
+ Thread.sleep(100);
+ }
+
+ CopyStatus finalStatus = newBlob.getCopyState().getStatus();
+ if (newBlob.getCopyState().getStatus() != CopyStatus.SUCCESS) {
+ throw new IOException("Invalid copy status for " + blob.getUri().getPath() + ": " + finalStatus);
+ }
+ } catch (StorageException | InterruptedException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private static class RecoveredEntry implements Comparable {
+
+ private final byte[] data;
+
+ private final UUID uuid;
+
+ private final int position;
+
+ private final String fileName;
+
+ public RecoveredEntry(int position, UUID uuid, byte[] data, String fileName) {
+ this.data = data;
+ this.uuid = uuid;
+ this.position = position;
+ this.fileName = fileName;
+ }
+
+ @Override
+ public int compareTo(RecoveredEntry o) {
+ return Integer.compare(this.position, o.position);
+ }
+ }
+
+}
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalFile.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalFile.java
new file mode 100644
index 0000000000..851da686de
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalFile.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.google.common.base.Charsets;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudAppendBlob;
+import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Collections;
+import java.util.List;
+
+public class AzureGCJournalFile implements SegmentNodeStorePersistence.GCJournalFile {
+
+ private final CloudAppendBlob gcJournal;
+
+ public AzureGCJournalFile(CloudAppendBlob gcJournal) {
+ this.gcJournal = gcJournal;
+ }
+
+ @Override
+ public void writeLine(String line) throws IOException {
+ try {
+ if (!gcJournal.exists()) {
+ gcJournal.createOrReplace();
+ }
+ gcJournal.appendText(line + "\n", Charsets.UTF_8.name(), null, null, null);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public List readLines() throws IOException {
+ try {
+ if (!gcJournal.exists()) {
+ return Collections.emptyList();
+ }
+ byte[] data = new byte[(int) gcJournal.getProperties().getLength()];
+ gcJournal.downloadToByteArray(data, 0);
+ return IOUtils.readLines(new ByteArrayInputStream(data), Charset.defaultCharset());
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java
new file mode 100644
index 0000000000..8c969e26e3
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFile.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudAppendBlob;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
+import com.microsoft.azure.storage.blob.ListBlobItem;
+import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class AzureJournalFile implements SegmentNodeStorePersistence.JournalFile {
+
+ private static final Logger log = LoggerFactory.getLogger(AzureJournalFile.class);
+
+ private static final int JOURNAL_LINE_LIMIT = Integer.getInteger("org.apache.jackrabbit.oak.segment.azure.journal.lines", 40_000);
+
+ private final CloudBlobDirectory directory;
+
+ private final String journalNamePrefix;
+
+ private final int lineLimit;
+
+ AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix, int lineLimit) {
+ this.directory = directory;
+ this.journalNamePrefix = journalNamePrefix;
+ this.lineLimit = lineLimit;
+ }
+
+ public AzureJournalFile(CloudBlobDirectory directory, String journalNamePrefix) {
+ this(directory, journalNamePrefix, JOURNAL_LINE_LIMIT);
+ }
+
+ @Override
+ public SegmentNodeStorePersistence.JournalFileReader openJournalReader() throws IOException {
+ return new CombinedReader(getJournalBlobs());
+ }
+
+ @Override
+ public SegmentNodeStorePersistence.JournalFileWriter openJournalWriter() throws IOException {
+ return new AzureJournalWriter();
+ }
+
+ @Override
+ public String getName() {
+ return journalNamePrefix;
+ }
+
+ @Override
+ public boolean exists() {
+ try {
+ return !getJournalBlobs().isEmpty();
+ } catch (IOException e) {
+ log.error("Can't check if the file exists", e);
+ return false;
+ }
+ }
+
+ private String getJournalFileName(int index) {
+ return String.format("%s.%03d", journalNamePrefix, index);
+ }
+
+ private List getJournalBlobs() throws IOException {
+ try {
+ List result = new ArrayList<>();
+ for (ListBlobItem b : directory.listBlobs(journalNamePrefix)) {
+ if (b instanceof CloudAppendBlob) {
+ result.add((CloudAppendBlob) b);
+ } else {
+ log.warn("Invalid blob type: {} {}", b.getUri(), b.getClass());
+ }
+ }
+ result.sort(Comparator.comparing(AzureUtilities::getName).reversed());
+ return result;
+ } catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private static class AzureJournalReader implements SegmentNodeStorePersistence.JournalFileReader {
+
+ private final CloudBlob blob;
+
+ private ReverseFileReader reader;
+
+ private AzureJournalReader(CloudBlob blob) {
+ this.blob = blob;
+ }
+
+ @Override
+ public String readLine() throws IOException {
+ if (reader == null) {
+ try {
+ reader = new ReverseFileReader(blob);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+ return reader.readLine();
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+ }
+
+ private class AzureJournalWriter implements SegmentNodeStorePersistence.JournalFileWriter {
+
+ private CloudAppendBlob currentBlob;
+
+ private int blockCount;
+
+ public AzureJournalWriter() throws IOException {
+ List blobs = getJournalBlobs();
+ if (blobs.isEmpty()) {
+ try {
+ currentBlob = directory.getAppendBlobReference(getJournalFileName(1));
+ currentBlob.createOrReplace();
+ } catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ } else {
+ currentBlob = blobs.get(0);
+ }
+ Integer bc = currentBlob.getProperties().getAppendBlobCommittedBlockCount();
+ blockCount = bc == null ? 0 : bc;
+ }
+
+ @Override
+ public void truncate() throws IOException {
+ try {
+ for (CloudAppendBlob cloudAppendBlob : getJournalBlobs()) {
+ cloudAppendBlob.delete();
+ }
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void writeLine(String line) throws IOException {
+ if (blockCount >= lineLimit) {
+ createNewFile();
+ }
+ try {
+ currentBlob.appendText(line + "\n");
+ blockCount++;
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private void createNewFile() throws IOException {
+ String name = AzureUtilities.getName(currentBlob);
+ Pattern pattern = Pattern.compile(Pattern.quote(journalNamePrefix) + "\\.(\\d+)" );
+ Matcher matcher = pattern.matcher(name);
+ int parsedSuffix;
+ if (matcher.find()) {
+ String suffix = matcher.group(1);
+ try {
+ parsedSuffix = Integer.parseInt(suffix);
+ } catch (NumberFormatException e) {
+ log.warn("Can't parse suffix for journal file {}", name);
+ parsedSuffix = 0;
+ }
+ } else {
+ log.warn("Can't parse journal file name {}", name);
+ parsedSuffix = 0;
+ }
+ try {
+ currentBlob = directory.getAppendBlobReference(getJournalFileName(parsedSuffix + 1));
+ currentBlob.createOrReplace();
+ blockCount = 0;
+ } catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // do nothing
+ }
+ }
+
+ private static class CombinedReader implements SegmentNodeStorePersistence.JournalFileReader {
+
+ private final Iterator readers;
+
+ private SegmentNodeStorePersistence.JournalFileReader currentReader;
+
+ private CombinedReader(List blobs) {
+ readers = blobs.stream().map(AzureJournalReader::new).iterator();
+ }
+
+ @Override
+ public String readLine() throws IOException {
+ String line;
+ do {
+ if (currentReader == null) {
+ if (!readers.hasNext()) {
+ return null;
+ }
+ currentReader = readers.next();
+ }
+ do {
+ line = currentReader.readLine();
+ } while ("".equals(line));
+ if (line == null) {
+ currentReader.close();
+ currentReader = null;
+ }
+ } while (line == null);
+ return line;
+ }
+
+ @Override
+ public void close() throws IOException {
+ while (readers.hasNext()) {
+ readers.next().close();
+ }
+ if (currentReader != null) {
+ currentReader.close();
+ currentReader = null;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFile.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFile.java
new file mode 100644
index 0000000000..b9174078d9
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFile.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+public class AzureManifestFile implements SegmentNodeStorePersistence.ManifestFile {
+
+ private static final Logger log = LoggerFactory.getLogger(AzureManifestFile.class);
+
+ private final CloudBlockBlob manifestBlob;
+
+ public AzureManifestFile(CloudBlockBlob manifestBlob) {
+ this.manifestBlob = manifestBlob;
+ }
+
+ @Override
+ public boolean exists() {
+ try {
+ return manifestBlob.exists();
+ } catch (StorageException e) {
+ log.error("Can't check if the manifest exists", e);
+ return false;
+ }
+ }
+
+ @Override
+ public Properties load() throws IOException {
+ Properties properties = new Properties();
+ if (exists()) {
+ long length = manifestBlob.getProperties().getLength();
+ byte[] data = new byte[(int) length];
+ try {
+ manifestBlob.downloadToByteArray(data, 0);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ properties.load(new ByteArrayInputStream(data));
+ }
+ return properties;
+ }
+
+ @Override
+ public void save(Properties properties) throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ properties.store(bos, null);
+
+ byte[] data = bos.toByteArray();
+ try {
+ manifestBlob.uploadFromByteArray(data, 0, data.length);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java
new file mode 100644
index 0000000000..8a4de9615c
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzurePersistence.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.BlobListingDetails;
+import com.microsoft.azure.storage.blob.CloudAppendBlob;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import com.microsoft.azure.storage.blob.ListBlobItem;
+import org.apache.jackrabbit.oak.segment.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence;
+import org.apache.jackrabbit.oak.segment.file.tar.FileStoreMonitor;
+import org.apache.jackrabbit.oak.segment.file.tar.IOMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.EnumSet;
+
+public class AzurePersistence implements SegmentNodeStorePersistence {
+
+ private static final Logger log = LoggerFactory.getLogger(AzurePersistence.class);
+
+ private final CloudBlobDirectory segmentstoreDirectory;
+
+ public AzurePersistence(CloudBlobDirectory segmentstoreDirectory) {
+ this.segmentstoreDirectory = segmentstoreDirectory;
+ }
+
+ @Override
+ public SegmentArchiveManager createArchiveManager(boolean mmap, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor) {
+ return new AzureArchiveManager(segmentstoreDirectory, ioMonitor, fileStoreMonitor);
+ }
+
+ @Override
+ public boolean segmentFilesExist() {
+ try {
+ for (ListBlobItem i : segmentstoreDirectory.listBlobs(null, false, EnumSet.noneOf(BlobListingDetails.class), null, null)) {
+ if (i instanceof CloudBlobDirectory) {
+ CloudBlobDirectory dir = (CloudBlobDirectory) i;
+ String name = Paths.get(dir.getPrefix()).getFileName().toString();
+ if (name.endsWith(".tar")) {
+ return true;
+ }
+ }
+ }
+ return false;
+ } catch (StorageException | URISyntaxException e) {
+ log.error("Can't check if the segment archives exists", e);
+ return false;
+ }
+ }
+
+ @Override
+ public JournalFile getJournalFile() {
+ return new AzureJournalFile(segmentstoreDirectory, "journal.log");
+ }
+
+ @Override
+ public GCJournalFile getGCJournalFile() throws IOException {
+ return new AzureGCJournalFile(getAppendBlob("gc.log"));
+ }
+
+ @Override
+ public ManifestFile getManifestFile() throws IOException {
+ return new AzureManifestFile(getBlockBlob("manifest"));
+ }
+
+ @Override
+ public RepositoryLock lockRepository() throws IOException {
+ return new AzureRepositoryLock(getBlockBlob("repo.lock"), new Runnable() {
+ @Override
+ public void run() {
+ log.warn("Lost connection to the Azure. The client will be closed.");
+ // TODO close the connection
+ }
+ }).lock();
+ }
+
+ private CloudBlockBlob getBlockBlob(String path) throws IOException {
+ try {
+ return segmentstoreDirectory.getBlockBlobReference(path);
+ } catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private CloudAppendBlob getAppendBlob(String path) throws IOException {
+ try {
+ return segmentstoreDirectory.getAppendBlobReference(path);
+ } catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+}
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java
new file mode 100644
index 0000000000..41f3d54ed6
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureRepositoryLock.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.AccessCondition;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+public class AzureRepositoryLock implements AzurePersistence.RepositoryLock {
+
+ private static final Logger log = LoggerFactory.getLogger(AzureRepositoryLock.class);
+
+ private static int INTERVAL = 60;
+
+ private final Runnable shutdownHook;
+
+ private final CloudBlockBlob blob;
+
+ private final ExecutorService executor;
+
+ private String leaseId;
+
+ private volatile boolean doUpdate;
+
+ public AzureRepositoryLock(CloudBlockBlob blob, Runnable shutdownHook) {
+ this.shutdownHook = shutdownHook;
+ this.blob = blob;
+ this.executor = Executors.newSingleThreadExecutor();
+ }
+
+ public AzureRepositoryLock lock() throws IOException {
+ try {
+ blob.openOutputStream().close();
+ leaseId = blob.acquireLease(INTERVAL, null);
+ log.info("Acquired lease {}", leaseId);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ executor.submit(this::refreshLease);
+ return this;
+ }
+
+ private void refreshLease() {
+ doUpdate = true;
+ long lastUpdate = 0;
+ while (doUpdate) {
+ try {
+ long timeSinceLastUpdate = (System.currentTimeMillis() - lastUpdate) / 1000;
+ if (timeSinceLastUpdate > INTERVAL / 2) {
+ blob.renewLease(AccessCondition.generateLeaseCondition(leaseId));
+ lastUpdate = System.currentTimeMillis();
+ }
+ } catch (StorageException e) {
+ log.error("Can't renew the lease", e);
+ shutdownHook.run();
+ doUpdate = false;
+ return;
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ log.error("Interrupted the lease renewal loop", e);
+ }
+ }
+ }
+
+ @Override
+ public void unlock() throws IOException {
+ doUpdate = false;
+ executor.shutdown();
+ try {
+ executor.awaitTermination(1, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } finally {
+ releaseLease();
+ }
+ }
+
+ private void releaseLease() throws IOException {
+ try {
+ blob.releaseLease(AccessCondition.generateLeaseCondition(leaseId));
+ blob.delete();
+ log.info("Released lease {}", leaseId);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java
new file mode 100644
index 0000000000..248bec2722
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.google.common.base.Stopwatch;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.BlobProperties;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import org.apache.jackrabbit.oak.segment.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.file.tar.FileStoreMonitor;
+import org.apache.jackrabbit.oak.segment.file.tar.GraphLoader;
+import org.apache.jackrabbit.oak.segment.file.tar.IOMonitor;
+import org.apache.jackrabbit.oak.segment.file.tar.binaries.BinaryReferencesIndex;
+import org.apache.jackrabbit.oak.segment.file.tar.binaries.BinaryReferencesIndexLoader;
+import org.apache.jackrabbit.oak.segment.file.tar.binaries.InvalidBinaryReferencesIndexException;
+import org.apache.jackrabbit.oak.segment.file.tar.index.Index;
+import org.apache.jackrabbit.oak.segment.file.tar.index.IndexEntry;
+import org.apache.jackrabbit.oak.segment.file.tar.index.IndexLoader;
+import org.apache.jackrabbit.oak.segment.file.tar.index.InvalidIndexException;
+import org.apache.jackrabbit.oak.segment.util.ReaderAtEnd;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.jackrabbit.oak.segment.file.tar.TarConstants.BLOCK_SIZE;
+import static org.apache.jackrabbit.oak.segment.file.tar.index.IndexLoader.newIndexLoader;
+import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.getSegmentFileName;
+import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.readBufferFully;
+
+public class AzureSegmentArchiveReader implements SegmentArchiveManager.SegmentArchiveReader {
+
+ private static final Logger log = LoggerFactory.getLogger(AzureSegmentArchiveReader.class);
+
+ private static final IndexLoader indexLoader = newIndexLoader(BLOCK_SIZE);
+
+ private final CloudBlobDirectory archiveDirectory;
+
+ private final IOMonitor ioMonitor;
+
+ private final FileStoreMonitor monitor;
+
+ private final Index index;
+
+ private final long length;
+
+ private Boolean hasGraph;
+
+ AzureSegmentArchiveReader(CloudBlobDirectory archiveDirectory, IOMonitor ioMonitor, FileStoreMonitor monitor, Index index) throws IOException {
+ this.archiveDirectory = archiveDirectory;
+ this.ioMonitor = ioMonitor;
+ this.monitor = monitor;
+ this.index = index;
+ this.length = AzureUtilities.getBlobs(archiveDirectory)
+ .map(CloudBlob::getProperties)
+ .mapToLong(BlobProperties::getLength)
+ .sum();
+ }
+
+ @Override
+ public ByteBuffer readSegment(long msb, long lsb) throws IOException {
+ int i = index.findEntry(msb, lsb);
+ if (i == -1) {
+ return null;
+ }
+ IndexEntry entry = index.entry(i);
+
+ ByteBuffer buffer = ByteBuffer.allocate(entry.getLength());
+ ioMonitor.beforeSegmentRead(pathAsFile(), msb, lsb, entry.getLength());
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ readBufferFully(getBlob(getSegmentFileName(entry)), buffer);
+ long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
+ ioMonitor.afterSegmentRead(pathAsFile(), msb, lsb, entry.getLength(), elapsed);
+ return buffer;
+ }
+
+ @Override
+ public Index getIndex() {
+ return index;
+ }
+
+ @Override
+ public Map> getGraph() throws IOException {
+ ByteBuffer graph = loadGraph();
+ if (graph == null) {
+ return null;
+ } else {
+ return GraphLoader.parseGraph(graph);
+ }
+ }
+ private ByteBuffer loadGraph() throws IOException {
+ ByteBuffer graph = GraphLoader.loadGraph(openAsReaderAtEnd(getName() + ".gph"));
+ hasGraph = graph != null;
+ return graph;
+ }
+
+ @Override
+ public boolean hasGraph() {
+ if (hasGraph == null) {
+ try {
+ loadGraph();
+ } catch (IOException ignore) { }
+ }
+ return hasGraph;
+ }
+
+ @Override
+ public BinaryReferencesIndex getBinaryReferences() {
+ BinaryReferencesIndex index = null;
+ try {
+ index = loadBinaryReferences();
+ } catch (InvalidBinaryReferencesIndexException | IOException e) {
+ log.warn("Exception while loading binary reference", e);
+ }
+ return index;
+ }
+
+ private BinaryReferencesIndex loadBinaryReferences() throws IOException, InvalidBinaryReferencesIndexException {
+ return BinaryReferencesIndexLoader.loadBinaryReferencesIndex(openAsReaderAtEnd(getName() + ".brf"));
+ }
+
+
+ @Override
+ public long length() {
+ return length;
+ }
+
+ @Override
+ public String getName() {
+ return AzureUtilities.getName(archiveDirectory);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // do nothing
+ }
+
+ @Override
+ public int getEntrySize(int size) {
+ return size;
+ }
+
+ private ReaderAtEnd openAsReaderAtEnd(String name) throws IOException {
+ return openAsReaderAtEnd(getBlob(name));
+ }
+
+ private static ReaderAtEnd openAsReaderAtEnd(CloudBlob cloudBlob) throws IOException {
+ try {
+ if (!cloudBlob.exists()) {
+ return null;
+ }
+ int length = (int) cloudBlob.getProperties().getLength();
+ ByteBuffer buffer = ByteBuffer.allocate(length);
+ cloudBlob.downloadToByteArray(buffer.array(), 0);
+
+ return (whence, amount) -> {
+ ByteBuffer result = buffer.duplicate();
+ result.position(length - whence);
+ result.limit(length - whence + amount);
+ return result.slice();
+ };
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private File pathAsFile() {
+ return new File(archiveDirectory.getUri().getPath());
+ }
+
+ public static Index loadAndValidateIndex(CloudBlobDirectory archiveDirectory) throws IOException {
+ CloudBlockBlob blob;
+ try {
+ blob = archiveDirectory.getBlockBlobReference(AzureUtilities.getName(archiveDirectory) + ".idx");
+ } catch (StorageException | URISyntaxException e) {
+ log.error("Can't open index", e);
+ return null;
+ }
+ ReaderAtEnd reader = openAsReaderAtEnd(blob);
+ if (reader == null) {
+ return null;
+ } else {
+ try {
+ return indexLoader.loadIndex(reader);
+ } catch (InvalidIndexException e) {
+ log.warn("Can't open index file: {}", blob.getUri().getPath(), e);
+ return null;
+ }
+ }
+ }
+
+ private CloudBlockBlob getBlob(String name) throws IOException {
+ try {
+ return archiveDirectory.getBlockBlobReference(name);
+ } catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java
new file mode 100644
index 0000000000..9b337348dc
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveWriter.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.google.common.base.Stopwatch;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import org.apache.jackrabbit.oak.segment.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.file.tar.FileStoreMonitor;
+import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
+import org.apache.jackrabbit.oak.segment.file.tar.IOMonitor;
+import org.apache.jackrabbit.oak.segment.file.tar.TarEntry;
+import org.apache.jackrabbit.oak.segment.azure.queue.SegmentWriteAction;
+import org.apache.jackrabbit.oak.segment.azure.queue.SegmentWriteQueue;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.jackrabbit.oak.segment.file.tar.TarConstants.BLOCK_SIZE;
+import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.getSegmentFileName;
+import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.readBufferFully;
+
+public class AzureSegmentArchiveWriter implements SegmentArchiveManager.SegmentArchiveWriter {
+
+ private final CloudBlobDirectory archiveDirectory;
+
+ private final IOMonitor ioMonitor;
+
+ private final FileStoreMonitor monitor;
+
+ private final Optional queue;
+
+ private int entries;
+
+ private long totalLength;
+
+ private volatile boolean created = false;
+
+ public AzureSegmentArchiveWriter(CloudBlobDirectory archiveDirectory, IOMonitor ioMonitor, FileStoreMonitor monitor) {
+ this.archiveDirectory = archiveDirectory;
+ this.ioMonitor = ioMonitor;
+ this.monitor = monitor;
+ this.queue = SegmentWriteQueue.THREADS > 0 ? Optional.of(new SegmentWriteQueue(this::doWriteEntry)) : Optional.empty();
+ }
+
+ @Override
+ public TarEntry writeSegment(long msb, long lsb, byte[] data, int offset, int size, GCGeneration generation) throws IOException {
+ created = true;
+
+ TarEntry tarEntry = new TarEntry(msb, lsb, (entries++) * BLOCK_SIZE, size, generation);
+ if (queue.isPresent()) {
+ queue.get().addToQueue(tarEntry, data, offset, size);
+ } else {
+ doWriteEntry(tarEntry, data, offset, size);
+ }
+ totalLength += size;
+ monitor.written(size);
+ return tarEntry;
+ }
+
+ private void doWriteEntry(TarEntry tarEntry, byte[] data, int offset, int size) throws IOException {
+ long msb = tarEntry.msb();
+ long lsb = tarEntry.lsb();
+ ioMonitor.beforeSegmentWrite(pathAsFile(), msb, lsb, size);
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ try {
+ CloudBlockBlob blob = getBlob(getSegmentFileName(tarEntry));
+ blob.uploadFromByteArray(data, offset, size);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ ioMonitor.afterSegmentWrite(pathAsFile(), msb, lsb, size, stopwatch.elapsed(TimeUnit.NANOSECONDS));
+ }
+
+ @Override
+ public ByteBuffer readSegment(TarEntry tarEntry) throws IOException {
+ UUID uuid = new UUID(tarEntry.msb(), tarEntry.lsb());
+ Optional segment = queue.map(q -> q.read(uuid));
+ if (segment.isPresent()) {
+ return segment.get().toByteBuffer();
+ }
+ ByteBuffer buffer = ByteBuffer.allocate(tarEntry.size());
+ readBufferFully(getBlob(getSegmentFileName(tarEntry)), buffer);
+ return buffer;
+ }
+
+ @Override
+ public void writeIndex(byte[] data) throws IOException {
+ writeDataFile(data, ".idx");
+ }
+
+ @Override
+ public void writeGraph(byte[] data) throws IOException {
+ writeDataFile(data, ".gph");
+ }
+
+ @Override
+ public void writeBinaryReferences(byte[] data) throws IOException {
+ writeDataFile(data, ".brf");
+ }
+
+ private void writeDataFile(byte[] data, String extension) throws IOException {
+ try {
+ getBlob(getName() + extension).uploadFromByteArray(data, 0, data.length);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ totalLength += data.length;
+ monitor.written(data.length);
+ }
+
+ @Override
+ public long getLength() {
+ return totalLength;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (queue.isPresent()) { // required to handle IOException
+ SegmentWriteQueue q = queue.get();
+ q.flush();
+ q.close();
+ }
+ }
+
+ @Override
+ public boolean isCreated() {
+ return created || !queueIsEmpty();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (queue.isPresent()) { // required to handle IOException
+ queue.get().flush();
+ }
+ }
+
+ private boolean queueIsEmpty() {
+ return queue.map(SegmentWriteQueue::isEmpty).orElse(true);
+ }
+
+ @Override
+ public String getName() {
+ return AzureUtilities.getName(archiveDirectory);
+ }
+
+ private File pathAsFile() {
+ return new File(archiveDirectory.getUri().getPath());
+ }
+
+ private CloudBlockBlob getBlob(String name) throws IOException {
+ try {
+ return archiveDirectory.getBlockBlobReference(name);
+ } catch (URISyntaxException | StorageException e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentStoreService.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentStoreService.java
new file mode 100644
index 0000000000..9c4ae1ade4
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentStoreService.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.ConfigurationPolicy;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.jackrabbit.oak.commons.PropertiesUtil;
+import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.Properties;
+
+import static org.apache.jackrabbit.oak.osgi.OsgiUtil.lookupConfigurationThenFramework;
+import static org.apache.jackrabbit.oak.segment.azure.AzureSegmentStoreService.AZURE_ACCESS_KEY;
+import static org.apache.jackrabbit.oak.segment.azure.AzureSegmentStoreService.AZURE_ACCOUNT_NAME;
+import static org.apache.jackrabbit.oak.segment.azure.AzureSegmentStoreService.AZURE_CONNECTION_URL;
+import static org.apache.jackrabbit.oak.segment.azure.AzureSegmentStoreService.AZURE_CONTAINER_NAME;
+import static org.apache.jackrabbit.oak.segment.azure.AzureSegmentStoreService.AZURE_ROOT_PATH;
+
+@Component(policy = ConfigurationPolicy.REQUIRE,
+ metatype = true,
+ label = "Oak Azure Segment Store",
+ description = "Azure backend for the Oak Segment Node Store"
+)
+public class AzureSegmentStoreService {
+
+ private static final Logger log = LoggerFactory.getLogger(AzureSegmentStoreService.class);
+
+ @Property(
+ label = "Azure account name"
+ )
+ public static final String AZURE_ACCOUNT_NAME = "azure.accountName";
+
+ @Property(
+ label = "Azure container name"
+ )
+ public static final String AZURE_CONTAINER_NAME = "azure.containerName";
+
+ @Property(
+ label = "Azure access key"
+ )
+ public static final String AZURE_ACCESS_KEY = "azure.accessKey";
+
+ @Property(
+ label = "Azure root path",
+ value = "/oak"
+ )
+ public static final String AZURE_ROOT_PATH = "azure.rootPath";
+
+ @Property(
+ label = "Azure connection URL (optional)",
+ description = "The connection URL to be used (it overrides all the other Azure properties)."
+ )
+ public static final String AZURE_CONNECTION_URL = "azure.connectionUrl";
+
+ private ServiceRegistration registration;
+
+ private SegmentNodeStorePersistence persistence;
+
+ @Activate
+ public void activate(ComponentContext context) throws IOException {
+ Configuration config = new Configuration(context);
+ persistence = createAzurePersistence(config);
+ registration = context.getBundleContext().registerService(SegmentNodeStorePersistence.class.getName(), persistence, new Properties());
+ }
+
+ @Deactivate
+ public void deactivate() throws IOException {
+ if (registration != null) {
+ registration.unregister();
+ registration = null;
+ }
+ persistence = null;
+ }
+
+ private static SegmentNodeStorePersistence createAzurePersistence(Configuration configuration) throws IOException {
+ try {
+ StringBuilder connectionString = new StringBuilder();
+ if (configuration.getAzureConnectionUrl() != null) {
+ connectionString.append(configuration.getAzureConnectionUrl());
+ } else {
+ connectionString.append("DefaultEndpointsProtocol=https;");
+ connectionString.append("AccountName=").append(configuration.getAzureAccountName()).append(';');
+ connectionString.append("AccountKey=").append(configuration.getAzureAccessKey()).append(';');
+ }
+ log.info("Connection string: {}", connectionString.toString());
+ CloudStorageAccount cloud = CloudStorageAccount.parse(connectionString.toString());
+ CloudBlobContainer container = cloud.createCloudBlobClient().getContainerReference(configuration.getAzureContainerName());
+ container.createIfNotExists();
+
+ String path = configuration.getAzureRootPath();
+ if (path != null && path.length() > 0 && path.charAt(0) == '/') {
+ path = path.substring(1);
+ }
+
+ AzurePersistence persistence = new AzurePersistence(container.getDirectoryReference(path));
+ return persistence;
+ } catch (StorageException | URISyntaxException | InvalidKeyException e) {
+ throw new IOException(e);
+ }
+ }
+
+}
+
+class Configuration {
+
+ private final ComponentContext context;
+
+ Configuration(ComponentContext context) {
+ this.context = context;
+ }
+
+ String property(String name) {
+ return lookupConfigurationThenFramework(context, name);
+ }
+
+ String getAzureAccountName() {
+ return property(AZURE_ACCOUNT_NAME);
+ }
+
+ String getAzureContainerName() {
+ return property(AZURE_CONTAINER_NAME);
+ }
+
+ String getAzureAccessKey() {
+ return property(AZURE_ACCESS_KEY);
+ }
+
+ String getAzureRootPath() { return PropertiesUtil.toString(property(AZURE_ROOT_PATH), "/oak"); }
+
+ String getAzureConnectionUrl() {
+ return property(AZURE_CONNECTION_URL);
+ }
+
+}
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java
new file mode 100644
index 0000000000..90962452ec
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureUtilities.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
+import org.apache.jackrabbit.oak.segment.file.tar.TarEntry;
+import org.apache.jackrabbit.oak.segment.file.tar.index.IndexEntry;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.file.Paths;
+import java.util.UUID;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.jackrabbit.oak.segment.file.tar.TarConstants.BLOCK_SIZE;
+
+public final class AzureUtilities {
+
+ public static String SEGMENT_FILE_NAME_PATTERN = "^([0-9a-f]{4})\\.([0-9a-f-]+)$";
+
+ private AzureUtilities() {
+ }
+
+ public static String getSegmentFileName(TarEntry tarEntry) {
+ return getSegmentFileName(tarEntry.offset(), tarEntry.msb(), tarEntry.lsb());
+ }
+
+ public static String getSegmentFileName(IndexEntry tarEntry) {
+ return getSegmentFileName(tarEntry.getPosition(), tarEntry.getMsb(), tarEntry.getLsb());
+ }
+
+ public static String getSegmentFileName(long offset, long msb, long lsb) {
+ return String.format("%04x.%s", offset / BLOCK_SIZE, new UUID(msb, lsb).toString());
+ }
+
+ public static String getName(CloudBlob blob) {
+ return Paths.get(blob.getName()).getFileName().toString();
+ }
+
+ public static String getName(CloudBlobDirectory directory) {
+ return Paths.get(directory.getUri().getPath()).getFileName().toString();
+ }
+
+ public static Stream getBlobs(CloudBlobDirectory directory) throws IOException {
+ try {
+ return StreamSupport.stream(directory.listBlobs().spliterator(), false)
+ .filter(i -> i instanceof CloudBlob)
+ .map(i -> (CloudBlob) i);
+ } catch (StorageException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public static void readBufferFully(CloudBlob blob, ByteBuffer buffer) throws IOException {
+ try {
+ buffer.rewind();
+ long readBytes = blob.downloadToByteArray(buffer.array(), 0);
+ if (buffer.limit() != readBytes) {
+ throw new IOException("Buffer size: " + buffer.limit() + ", read bytes: " + readBytes);
+ }
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/ReverseFileReader.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/ReverseFileReader.java
new file mode 100644
index 0000000000..439fab5f9c
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/ReverseFileReader.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlob;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static java.lang.Math.min;
+
+public class ReverseFileReader {
+
+ private static final int BUFFER_SIZE = 16 * 1024;
+
+ private int bufferSize;
+
+ private final CloudBlob blob;
+
+ private byte[] buffer;
+
+ private int bufferOffset;
+
+ private int fileOffset;
+
+ public ReverseFileReader(CloudBlob blob) throws StorageException {
+ this(blob, BUFFER_SIZE);
+ }
+
+ public ReverseFileReader(CloudBlob blob, int bufferSize) throws StorageException {
+ this.blob = blob;
+ if (blob.exists()) {
+ this.fileOffset = (int) blob.getProperties().getLength();
+ } else {
+ this.fileOffset = 0;
+ }
+ this.bufferSize = bufferSize;
+ }
+
+ private void readBlock() throws IOException {
+ if (buffer == null) {
+ buffer = new byte[min(fileOffset, bufferSize)];
+ } else if (fileOffset < buffer.length) {
+ buffer = new byte[fileOffset];
+ }
+
+ if (buffer.length > 0) {
+ fileOffset -= buffer.length;
+ try {
+ blob.downloadRangeToByteArray(fileOffset, Long.valueOf(buffer.length), buffer, 0);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+ bufferOffset = buffer.length;
+ }
+
+ private String readUntilNewLine() {
+ if (bufferOffset == -1) {
+ return "";
+ }
+ int stop = bufferOffset;
+ while (--bufferOffset >= 0) {
+ if (buffer[bufferOffset] == '\n') {
+ break;
+ }
+ }
+ // bufferOffset points either the previous '\n' character or -1
+ return new String(buffer, bufferOffset + 1, stop - bufferOffset - 1, Charset.defaultCharset());
+ }
+
+ public String readLine() throws IOException {
+ if (bufferOffset == -1 && fileOffset == 0) {
+ return null;
+ }
+
+ if (buffer == null) {
+ readBlock();
+ }
+
+ List result = new ArrayList<>(1);
+ while (true) {
+ result.add(readUntilNewLine());
+ if (bufferOffset > -1) { // stopped on the '\n'
+ break;
+ }
+ if (fileOffset == 0) { // reached the beginning of the file
+ break;
+ }
+ readBlock();
+ }
+ Collections.reverse(result);
+ return String.join("", result);
+ }
+}
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java
new file mode 100644
index 0000000000..5bc29f98f3
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure.queue;
+
+import org.apache.jackrabbit.oak.segment.file.tar.TarEntry;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+public class SegmentWriteAction {
+
+ private final TarEntry tarEntry;
+
+ private final byte[] buffer;
+
+ private final int offset;
+
+ private final int length;
+
+ public SegmentWriteAction(TarEntry tarEntry, byte[] buffer, int offset, int length) {
+ this.tarEntry = tarEntry;
+
+ this.buffer = new byte[length];
+ for (int i = 0; i < length; i++) {
+ this.buffer[i] = buffer[i + offset];
+ }
+ this.offset = 0;
+ this.length = length;
+ }
+
+ public UUID getUuid() {
+ return new UUID(tarEntry.msb(), tarEntry.lsb());
+ }
+
+ public byte[] getBuffer() {
+ return buffer;
+ }
+
+ public int getOffset() {
+ return offset;
+ }
+
+ public int getLength() {
+ return length;
+ }
+
+ public ByteBuffer toByteBuffer() {
+ return ByteBuffer.wrap(buffer, offset, length);
+ }
+
+ void passTo(SegmentWriteQueue.SegmentConsumer consumer) throws IOException {
+ consumer.consume(tarEntry, buffer, offset, length);
+ }
+
+ @Override
+ public String toString() {
+ return getUuid().toString();
+ }
+}
diff --git a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java
new file mode 100644
index 0000000000..4ae8e3d8a2
--- /dev/null
+++ b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure.queue;
+
+import org.apache.jackrabbit.oak.segment.file.tar.TarEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class SegmentWriteQueue implements Closeable {
+
+ public static final int THREADS = Integer.getInteger("oak.segment.azure.threads", 5);
+
+ private static final int QUEUE_SIZE = Integer.getInteger("oak.segment.org.apache.jackrabbit.oak.segment.azure.queue", 20);
+
+ private static final Logger log = LoggerFactory.getLogger(SegmentWriteQueue.class);
+
+ private final BlockingDeque queue;
+
+ private final Map segmentsByUUID;
+
+ private final ExecutorService executor;
+
+ private final ReadWriteLock flushLock;
+
+ private final SegmentConsumer writer;
+
+ private volatile boolean shutdown;
+
+ private final Object brokenMonitor = new Object();
+
+ private volatile boolean broken;
+
+ public SegmentWriteQueue(SegmentConsumer writer) {
+ this(writer, QUEUE_SIZE, THREADS);
+ }
+
+ SegmentWriteQueue(SegmentConsumer writer, int queueSize, int threadNo) {
+ this.writer = writer;
+ segmentsByUUID = new ConcurrentHashMap<>();
+ flushLock = new ReentrantReadWriteLock();
+
+ queue = new LinkedBlockingDeque<>(queueSize);
+ executor = Executors.newFixedThreadPool(threadNo + 1);
+ for (int i = 0; i < threadNo; i++) {
+ executor.submit(this::mainLoop);
+ }
+ executor.submit(this::emergencyLoop);
+ }
+
+ private void mainLoop() {
+ while (!shutdown) {
+ try {
+ waitWhileBroken();
+ if (shutdown) {
+ break;
+ }
+ consume();
+ } catch (SegmentConsumeException e) {
+ SegmentWriteAction segment = e.segment;
+ log.error("Can't persist the segment {}", segment.getUuid(), e.getCause());
+ try {
+ queue.put(segment);
+ } catch (InterruptedException e1) {
+ log.error("Can't re-add the segment {} to the queue. It'll be dropped.", segment.getUuid(), e1);
+ }
+ }
+ }
+ }
+
+ private void consume() throws SegmentConsumeException {
+ SegmentWriteAction segment = null;
+ try {
+ segment = queue.poll(100, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ log.error("Poll from queue interrupted", e);
+ }
+ if (segment != null) {
+ consume(segment);
+ }
+ }
+
+ private void consume(SegmentWriteAction segment) throws SegmentConsumeException {
+ try {
+ segment.passTo(writer);
+ } catch (IOException e) {
+ setBroken(true);
+ throw new SegmentConsumeException(segment, e);
+ }
+ synchronized (segmentsByUUID) {
+ segmentsByUUID.remove(segment.getUuid());
+ segmentsByUUID.notifyAll();
+ }
+ setBroken(false);
+ }
+
+ private void emergencyLoop() {
+ while (!shutdown) {
+ waitUntilBroken();
+ if (shutdown) {
+ break;
+ }
+
+ boolean success = false;
+ SegmentWriteAction segmentToRetry = null;
+ do {
+ try {
+ if (segmentToRetry == null) {
+ consume();
+ } else {
+ consume(segmentToRetry);
+ }
+ success = true;
+ } catch (SegmentConsumeException e) {
+ segmentToRetry = e.segment;
+ log.error("Can't persist the segment {}", segmentToRetry.getUuid(), e.getCause());
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e1) {
+ log.warn("Interrupted", e);
+ }
+ if (shutdown) {
+ log.error("Shutdown initiated. The segment {} will be dropped.", segmentToRetry.getUuid());
+ }
+ }
+ } while (!success && !shutdown);
+ }
+ }
+
+ public void addToQueue(TarEntry tarEntry, byte[] data, int offset, int size) throws IOException {
+ waitWhileBroken();
+ if (shutdown) {
+ throw new IllegalStateException("Can't accept the new segment - shutdown in progress");
+ }
+
+ SegmentWriteAction action = new SegmentWriteAction(tarEntry, data, offset, size);
+ flushLock.readLock().lock();
+ try {
+ segmentsByUUID.put(action.getUuid(), action);
+ if (!queue.offer(action, 1, TimeUnit.MINUTES)) {
+ segmentsByUUID.remove(action.getUuid());
+ throw new IOException("Can't add segment to the queue");
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } finally {
+ flushLock.readLock().unlock();
+ }
+ }
+
+ public void flush() throws IOException {
+ flushLock.writeLock().lock();
+ try {
+ synchronized (segmentsByUUID) {
+ long start = System.currentTimeMillis();
+ while (!segmentsByUUID.isEmpty()) {
+ segmentsByUUID.wait(100);
+ if (System.currentTimeMillis() - start > TimeUnit.MINUTES.toMillis(1)) {
+ log.error("Can't flush the queue in 1 minute. Queue: {}. Segment map: {}", queue, segmentsByUUID);
+ start = System.currentTimeMillis();
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } finally {
+ flushLock.writeLock().unlock();
+ }
+ }
+
+ public SegmentWriteAction read(UUID id) {
+ return segmentsByUUID.get(id);
+ }
+
+ public void close() throws IOException {
+ shutdown = true;
+ try {
+ executor.shutdown();
+ if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
+ throw new IOException("The write wasn't able to shut down clearly");
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public boolean isEmpty() {
+ return segmentsByUUID.isEmpty();
+ }
+
+ boolean isBroken() {
+ return broken;
+ }
+
+ int getSize() {
+ return queue.size();
+ }
+
+ private void setBroken(boolean broken) {
+ synchronized (brokenMonitor) {
+ this.broken = broken;
+ brokenMonitor.notifyAll();
+ }
+ }
+
+ private void waitWhileBroken() {
+ if (!broken) {
+ return;
+ }
+ synchronized (brokenMonitor) {
+ while (broken && !shutdown) {
+ try {
+ brokenMonitor.wait(100);
+ } catch (InterruptedException e) {
+ log.warn("Interrupted", e);
+ }
+ }
+ }
+ }
+
+ private void waitUntilBroken() {
+ if (broken) {
+ return;
+ }
+ synchronized (brokenMonitor) {
+ while (!broken && !shutdown) {
+ try {
+ brokenMonitor.wait(100);
+ } catch (InterruptedException e) {
+ log.warn("Interrupted", e);
+ }
+ }
+ }
+ }
+
+ public interface SegmentConsumer {
+
+ void consume(TarEntry tarEntry, byte[] data, int offset, int size) throws IOException;
+
+ }
+
+ public static class SegmentConsumeException extends Exception {
+
+ private final SegmentWriteAction segment;
+
+ public SegmentConsumeException(SegmentWriteAction segment, IOException cause) {
+ super(cause);
+ this.segment = segment;
+ }
+ }
+}
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java
new file mode 100644
index 0000000000..c6d4d85fbd
--- /dev/null
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.SegmentArchiveManager.SegmentArchiveWriter;
+import org.apache.jackrabbit.oak.segment.file.tar.FileStoreMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
+import org.apache.jackrabbit.oak.segment.file.tar.IOMonitorAdapter;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.UUID;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static org.junit.Assert.assertEquals;
+
+public class AzureArchiveManagerTest {
+
+ @ClassRule
+ public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+ private CloudBlobContainer container;
+
+ @Before
+ public void setup() throws StorageException, InvalidKeyException, URISyntaxException {
+ container = azurite.getContainer("oak-test");
+ }
+
+ @Test
+ public void testRecovery() throws StorageException, URISyntaxException, IOException {
+ SegmentArchiveManager manager = new AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(false, new IOMonitorAdapter(), new FileStoreMonitorAdapter());
+ SegmentArchiveWriter writer = manager.create("data00000a.tar");
+
+ List uuids = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ UUID u = UUID.randomUUID();
+ writer.writeSegment(u.getMostSignificantBits(), u.getLeastSignificantBits(), new byte[10], 0, 10, GCGeneration.NULL);
+ uuids.add(u);
+ }
+
+ writer.flush();
+ writer.close();
+
+ container.getBlockBlobReference("oak/data00000a.tar/0005." + uuids.get(5).toString()).delete();
+
+ LinkedHashMap recovered = new LinkedHashMap<>();
+ manager.recoverEntries("data00000a.tar", recovered);
+ assertEquals(uuids.subList(0, 5), newArrayList(recovered.keySet()));
+ }
+
+}
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalTest.java
new file mode 100644
index 0000000000..fae3337b5e
--- /dev/null
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalTest.java
@@ -0,0 +1,45 @@
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence;
+import org.apache.jackrabbit.oak.segment.file.GcJournalTest;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+
+public class AzureGCJournalTest extends GcJournalTest {
+
+ @ClassRule
+ public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+ private CloudBlobContainer container;
+
+ @Before
+ public void setup() throws StorageException, InvalidKeyException, URISyntaxException {
+ container = azurite.getContainer("oak-test");
+ }
+
+ @Override
+ protected SegmentNodeStorePersistence getPersistence() throws Exception {
+ return new AzurePersistence(container.getDirectoryReference("oak"));
+ }
+
+ @Test
+ @Ignore
+ @Override
+ public void testReadOak16GCLog() throws Exception {
+ super.testReadOak16GCLog();
+ }
+
+ @Test
+ @Ignore
+ @Override
+ public void testUpdateOak16GCLog() throws Exception {
+ super.testUpdateOak16GCLog();
+ }
+}
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java
new file mode 100644
index 0000000000..934f5a3cfe
--- /dev/null
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class AzureJournalFileTest {
+
+ @ClassRule
+ public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+ private CloudBlobContainer container;
+
+ private AzureJournalFile journal;
+
+ @Before
+ public void setup() throws StorageException, InvalidKeyException, URISyntaxException {
+ container = azurite.getContainer("oak-test");
+ journal = new AzureJournalFile(container.getDirectoryReference("journal"), "journal.log", 10);
+ }
+
+ @Test
+ public void testSplitJournalFiles() throws IOException {
+ assertFalse(journal.exists());
+
+ SegmentNodeStorePersistence.JournalFileWriter writer = journal.openJournalWriter();
+ for (int i = 0; i < 100; i++) {
+ writer.writeLine("line " + i);
+ }
+
+ assertTrue(journal.exists());
+
+ writer = journal.openJournalWriter();
+ for (int i = 100; i < 200; i++) {
+ writer.writeLine("line " + i);
+ }
+
+ SegmentNodeStorePersistence.JournalFileReader reader = journal.openJournalReader();
+ for (int i = 199; i >= 0; i--) {
+ assertEquals("line " + i, reader.readLine());
+ }
+
+
+ }
+
+}
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFileTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFileTest.java
new file mode 100644
index 0000000000..67784dbc6a
--- /dev/null
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFileTest.java
@@ -0,0 +1,44 @@
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence.ManifestFile;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class AzureManifestFileTest {
+
+ @ClassRule
+ public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+ private CloudBlobContainer container;
+
+ @Before
+ public void setup() throws StorageException, InvalidKeyException, URISyntaxException {
+ container = azurite.getContainer("oak-test");
+ }
+
+ @Test
+ public void testManifest() throws URISyntaxException, IOException {
+ ManifestFile manifestFile = new AzurePersistence(container.getDirectoryReference("oak")).getManifestFile();
+ assertFalse(manifestFile.exists());
+
+ Properties props = new Properties();
+ props.setProperty("xyz", "abc");
+ props.setProperty("version", "123");
+ manifestFile.save(props);
+
+ Properties loaded = manifestFile.load();
+ assertEquals(props, loaded);
+ }
+
+}
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java
new file mode 100644
index 0000000000..939c2b5331
--- /dev/null
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.file.tar.FileStoreMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.file.tar.IOMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.file.tar.TarFileTest;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+
+public class AzureTarFileTest extends TarFileTest {
+
+ @ClassRule
+ public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+ private CloudBlobContainer container;
+
+ @Before
+ @Override
+ public void setUp() throws IOException {
+ try {
+ container = azurite.getContainer("oak-test");
+ archiveManager = new AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(true, new IOMonitorAdapter(), new FileStoreMonitorAdapter());
+ } catch (StorageException | InvalidKeyException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected long getWriteAndReadExpectedSize() {
+ return 557;
+ }
+
+ @Test
+ @Ignore
+ @Override
+ public void graphShouldBeTrimmedDownOnSweep() throws Exception {
+ super.graphShouldBeTrimmedDownOnSweep();
+ }
+}
\ No newline at end of file
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java
new file mode 100644
index 0000000000..c98ff581b6
--- /dev/null
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.file.tar.FileStoreMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.file.tar.IOMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.file.tar.TarFiles;
+import org.apache.jackrabbit.oak.segment.file.tar.TarFilesTest;
+import org.junit.Before;
+import org.junit.ClassRule;
+
+public class AzureTarFilesTest extends TarFilesTest {
+
+ @ClassRule
+ public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+ private CloudBlobContainer container;
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ container = azurite.getContainer("oak-test");
+ tarFiles = TarFiles.builder()
+ .withDirectory(folder.newFolder())
+ .withTarRecovery((id, data, recovery) -> {
+ // Intentionally left blank
+ })
+ .withIOMonitor(new IOMonitorAdapter())
+ .withFileStoreMonitor(new FileStoreMonitorAdapter())
+ .withMaxFileSize(MAX_FILE_SIZE)
+ .withPersistence(new AzurePersistence(container.getDirectoryReference("oak")))
+ .build();
+ }
+}
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java
new file mode 100644
index 0000000000..01685e36c0
--- /dev/null
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.file.tar.IOMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.file.tar.TarWriterTest;
+import org.junit.Before;
+import org.junit.ClassRule;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+
+public class AzureTarWriterTest extends TarWriterTest {
+
+ @ClassRule
+ public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+ private CloudBlobContainer container;
+
+ @Before
+ @Override
+ public void setUp() throws IOException {
+ try {
+ monitor = new TestFileStoreMonitor();
+ container = azurite.getContainer("oak-test");
+ archiveManager = new AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(true, new IOMonitorAdapter(), monitor);
+ } catch (StorageException | InvalidKeyException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzuriteDockerRule.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzuriteDockerRule.java
new file mode 100644
index 0000000000..7e957d5fb3
--- /dev/null
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzuriteDockerRule.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.arakelian.docker.junit.DockerRule;
+import com.arakelian.docker.junit.model.ImmutableDockerConfig;
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.spotify.docker.client.DefaultDockerClient;
+import org.junit.Assume;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+
+public class AzuriteDockerRule implements TestRule {
+
+ private final DockerRule wrappedRule;
+
+ public AzuriteDockerRule() {
+ wrappedRule = new DockerRule(ImmutableDockerConfig.builder()
+ .image("trekawek/azurite")
+ .name("oak-test-azurite")
+ .ports("10000")
+ .addStartedListener(container -> {
+ container.waitForPort("10000/tcp");
+ container.waitForLog("Azure Blob Storage Emulator listening on port 10000");
+ })
+ .addContainerConfigurer(builder -> builder.env("executable=blob"))
+ .alwaysRemoveContainer(true)
+ .build());
+
+ }
+
+ public CloudBlobContainer getContainer(String name) throws URISyntaxException, StorageException, InvalidKeyException {
+ int mappedPort = wrappedRule.getContainer().getPortBinding("10000/tcp").getPort();
+ CloudStorageAccount cloud = CloudStorageAccount.parse("DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:" + mappedPort + "/devstoreaccount1;");
+ CloudBlobContainer container = cloud.createCloudBlobClient().getContainerReference(name);
+ container.deleteIfExists();
+ container.create();
+ return container;
+ }
+
+ @Override
+ public Statement apply(Statement statement, Description description) {
+ try {
+ DefaultDockerClient client = DefaultDockerClient.fromEnv().connectTimeoutMillis(5000L).readTimeoutMillis(20000L).build();
+ client.ping();
+ client.close();
+ } catch (Exception e) {
+ Assume.assumeNoException(e);
+ }
+
+ return wrappedRule.apply(statement, description);
+ }
+}
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/fixture/SegmentAzureFixture.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/fixture/SegmentAzureFixture.java
new file mode 100644
index 0000000000..3f9900a775
--- /dev/null
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/fixture/SegmentAzureFixture.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.azure.fixture;
+
+import com.google.common.io.Files;
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
+import org.apache.jackrabbit.oak.fixture.NodeStoreFixture;
+import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders;
+import org.apache.jackrabbit.oak.segment.azure.AzurePersistence;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder;
+import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SegmentAzureFixture extends NodeStoreFixture {
+
+ private static final String AZURE_CONNECTION_STRING = System.getProperty("oak.segment.azure.connection", "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;");
+
+ private static final String AZURE_CONTAINER = System.getProperty("oak.segment.azure.container", "oak");
+
+ private static final String AZURE_ROOT_PATH = System.getProperty("oak.segment.azure.rootPath", "/oak");
+
+ private Map fileStoreMap = new HashMap<>();
+
+ @Override
+ public NodeStore createNodeStore() {
+ AzurePersistence persistence;
+ try {
+ CloudStorageAccount cloud = CloudStorageAccount.parse(AZURE_CONNECTION_STRING);
+ CloudBlobContainer container = cloud.createCloudBlobClient().getContainerReference(AZURE_CONTAINER);
+ container.deleteIfExists();
+ container.create();
+ CloudBlobDirectory directory = container.getDirectoryReference(AZURE_ROOT_PATH);
+ persistence = new AzurePersistence(directory);
+ } catch (StorageException | URISyntaxException | InvalidKeyException e) {
+ throw new RuntimeException(e);
+ }
+
+ try {
+ FileStore fileStore = FileStoreBuilder.fileStoreBuilder(Files.createTempDir()).withCustomPersistence(persistence).build();
+ NodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
+ fileStoreMap.put(nodeStore, fileStore);
+ return nodeStore;
+ } catch (IOException | InvalidFileStoreVersionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void dispose(NodeStore nodeStore) {
+ FileStore fs = fileStoreMap.remove(nodeStore);
+ if (fs != null) {
+ fs.close();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "SegmentAzure";
+ }
+}
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java
new file mode 100644
index 0000000000..fcf766d1a3
--- /dev/null
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure.journal;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudAppendBlob;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.azure.AzuriteDockerRule;
+import org.apache.jackrabbit.oak.segment.file.JournalReader;
+import org.apache.jackrabbit.oak.segment.file.JournalReaderTest;
+import org.apache.jackrabbit.oak.segment.azure.AzureJournalFile;
+import org.junit.Before;
+import org.junit.ClassRule;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+
+public class AzureJournalReaderTest extends JournalReaderTest {
+
+ @ClassRule
+ public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+ private CloudBlobContainer container;
+
+ @Before
+ public void setup() throws StorageException, InvalidKeyException, URISyntaxException {
+ container = azurite.getContainer("oak-test");
+ }
+
+ protected JournalReader createJournalReader(String s) throws IOException {
+ try {
+ CloudAppendBlob blob = container.getAppendBlobReference("journal/journal.log.001");
+ blob.createOrReplace();
+ blob.appendText(s);
+ return new JournalReader(new AzureJournalFile(container.getDirectoryReference("journal"), "journal.log"));
+ } catch (StorageException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureTarRevisionsTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureTarRevisionsTest.java
new file mode 100644
index 0000000000..3d0a12245b
--- /dev/null
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureTarRevisionsTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure.journal;
+
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence;
+import org.apache.jackrabbit.oak.segment.azure.AzuriteDockerRule;
+import org.apache.jackrabbit.oak.segment.file.TarRevisionsTest;
+import org.apache.jackrabbit.oak.segment.azure.AzurePersistence;
+import org.junit.Before;
+import org.junit.ClassRule;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+public class AzureTarRevisionsTest extends TarRevisionsTest {
+
+ @ClassRule
+ public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+ private CloudBlobContainer container;
+
+ @Before
+ public void setup() throws Exception {
+ container = azurite.getContainer("oak-test");
+ super.setup();
+ }
+
+ @Override
+ protected SegmentNodeStorePersistence getPersistence() throws IOException {
+ try {
+ return new AzurePersistence(container.getDirectoryReference("oak"));
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/ReverseFileReaderTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/ReverseFileReaderTest.java
new file mode 100644
index 0000000000..f3d1425a60
--- /dev/null
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/ReverseFileReaderTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure.journal;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudAppendBlob;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.azure.AzuriteDockerRule;
+import org.apache.jackrabbit.oak.segment.azure.ReverseFileReader;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+public class ReverseFileReaderTest {
+
+ @ClassRule
+ public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+ private CloudBlobContainer container;
+
+ @Before
+ public void setup() throws StorageException, InvalidKeyException, URISyntaxException {
+ container = azurite.getContainer("oak-test");
+ getBlob().createOrReplace();
+ }
+
+ private CloudAppendBlob getBlob() throws URISyntaxException, StorageException {
+ return container.getAppendBlobReference("test-blob");
+ }
+
+ @Test
+ public void testReverseReader() throws IOException, URISyntaxException, StorageException {
+ List entries = createFile( 1024, 80);
+ ReverseFileReader reader = new ReverseFileReader(getBlob(), 256);
+ assertEquals(entries, reader);
+ }
+
+ @Test
+ public void testEmptyFile() throws IOException, URISyntaxException, StorageException {
+ List entries = createFile( 0, 80);
+ ReverseFileReader reader = new ReverseFileReader(getBlob(), 256);
+ assertEquals(entries, reader);
+ }
+
+ @Test
+ public void test1ByteBlock() throws IOException, URISyntaxException, StorageException {
+ List entries = createFile( 10, 16);
+ ReverseFileReader reader = new ReverseFileReader(getBlob(), 1);
+ assertEquals(entries, reader);
+ }
+
+
+ private List createFile(int lines, int maxLineLength) throws IOException, URISyntaxException, StorageException {
+ Random random = new Random();
+ List entries = new ArrayList<>();
+ CloudAppendBlob blob = getBlob();
+ for (int i = 0; i < lines; i++) {
+ int entrySize = random.nextInt(maxLineLength) + 1;
+ String entry = randomString(entrySize);
+ try {
+ blob.appendText(entry + '\n');
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ entries.add(entry);
+ }
+
+ entries.add("");
+ Collections.reverse(entries);
+ return entries;
+ }
+
+ private static void assertEquals(List entries, ReverseFileReader reader) throws IOException {
+ int i = entries.size();
+ for (String e : entries) {
+ Assert.assertEquals("line " + (--i), e, reader.readLine());
+ }
+ Assert.assertNull(reader.readLine());
+ }
+
+ private static String randomString(int entrySize) {
+ Random r = new Random();
+
+ StringBuilder result = new StringBuilder();
+ for (int i = 0; i < entrySize; i++) {
+ result.append((char) ('a' + r.nextInt('z' - 'a')));
+ }
+
+ return result.toString();
+ }
+}
diff --git a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java
new file mode 100644
index 0000000000..0272b4e532
--- /dev/null
+++ b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure.queue;
+
+import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
+import org.apache.jackrabbit.oak.segment.file.tar.TarEntry;
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class SegmentWriteQueueTest {
+
+ private static final byte[] EMPTY_DATA = new byte[0];
+
+ private SegmentWriteQueue queue;
+
+ @After
+ public void shutdown() throws IOException {
+ if (queue != null) {
+ queue.close();
+ }
+ }
+
+ @Test
+ public void testQueue() throws IOException, InterruptedException {
+ Set added = Collections.synchronizedSet(new HashSet<>());
+ Semaphore semaphore = new Semaphore(0);
+ queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> {
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ }
+ added.add(new UUID(tarEntry.msb(), tarEntry.lsb()));
+ });
+
+ for (int i = 0; i < 10; i++) {
+ queue.addToQueue(tarEntry(i), EMPTY_DATA, 0, 0);
+ }
+
+ for (int i = 0; i < 10; i++) {
+ assertNotNull("Segments should be available for read", queue.read(uuid(i)));
+ }
+ assertFalse("Queue shouldn't be empty", queue.isEmpty());
+
+ semaphore.release(Integer.MAX_VALUE);
+ while (!queue.isEmpty()) {
+ Thread.sleep(10);
+ }
+
+ assertEquals("There should be 10 segments consumed",10, added.size());
+ for (int i = 0; i < 10; i++) {
+ assertTrue("Missing consumed segment", added.contains(uuid(i)));
+ }
+ }
+
+ @Test(timeout = 1000)
+ public void testFlush() throws IOException, InterruptedException {
+ Set added = Collections.synchronizedSet(new HashSet<>());
+ Semaphore semaphore = new Semaphore(0);
+ queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> {
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ }
+ added.add(new UUID(tarEntry.msb(), tarEntry.lsb()));
+ });
+
+ for (int i = 0; i < 3; i++) {
+ queue.addToQueue(tarEntry(i), EMPTY_DATA, 0, 0);
+ }
+
+ AtomicBoolean flushFinished = new AtomicBoolean(false);
+ Set addedAfterFlush = new HashSet<>();
+ new Thread(() -> {
+ try {
+ queue.flush();
+ flushFinished.set(true);
+ addedAfterFlush.addAll(added);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }).start();
+
+ Thread.sleep(100);
+ assertFalse("Flush should be blocked", flushFinished.get());
+
+ AtomicBoolean addFinished = new AtomicBoolean(false);
+ new Thread(() -> {
+ try {
+ queue.addToQueue(tarEntry(10), EMPTY_DATA, 0, 0);
+ addFinished.set(true);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }).start();
+
+ Thread.sleep(100);
+ assertFalse("Adding segments should be blocked until the flush is finished", addFinished.get());
+
+ semaphore.release(Integer.MAX_VALUE);
+
+ while (!addFinished.get()) {
+ Thread.sleep(10);
+ }
+ assertTrue("Flush should be finished once the ", flushFinished.get());
+ assertTrue("Adding segments should be blocked until the flush is finished", addFinished.get());
+
+ for (int i = 0; i < 3; i++) {
+ assertTrue(addedAfterFlush.contains(uuid(i)));
+ }
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testClose() throws IOException, InterruptedException {
+ queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> {});
+ queue.close();
+ queue.addToQueue(tarEntry(10), EMPTY_DATA, 0, 0);
+ }
+
+ @Test
+ public void testRecoveryMode() throws IOException, InterruptedException {
+ Set added = Collections.synchronizedSet(new HashSet<>());
+ Semaphore semaphore = new Semaphore(0);
+ AtomicBoolean doBreak = new AtomicBoolean(true);
+ List writeAttempts = Collections.synchronizedList(new ArrayList<>());
+ queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> {
+ writeAttempts.add(System.currentTimeMillis());
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ }
+ if (doBreak.get()) {
+ throw new IOException();
+ }
+ added.add(new UUID(tarEntry.msb(), tarEntry.lsb()));
+ });
+
+ for (int i = 0; i < 10; i++) {
+ queue.addToQueue(tarEntry(i), EMPTY_DATA, 0, 0);
+ }
+
+ semaphore.release(Integer.MAX_VALUE);
+ Thread.sleep(100);
+
+ assertTrue(queue.isBroken());
+ assertEquals(9, queue.getSize()); // the 10th segment is handled by the recovery thread
+
+ writeAttempts.clear();
+ while (writeAttempts.size() < 5) {
+ Thread.sleep(100);
+ }
+ long lastAttempt = writeAttempts.get(0);
+ for (int i = 1; i < 5; i++) {
+ long delay = writeAttempts.get(i) - lastAttempt;
+ assertTrue("The delay between attempts to persist segment should be larger than 1s. Actual: " + delay, delay >= 1000);
+ lastAttempt = writeAttempts.get(i);
+ }
+
+ AtomicBoolean addFinished = new AtomicBoolean(false);
+ new Thread(() -> {
+ try {
+ queue.addToQueue(tarEntry(10), EMPTY_DATA, 0, 0);
+ addFinished.set(true);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }).start();
+
+ Thread.sleep(100);
+ assertFalse("Adding segments should be blocked until the recovery mode is finished", addFinished.get());
+
+ doBreak.set(false);
+ while (queue.isBroken()) {
+ Thread.sleep(10);
+ }
+ assertFalse("Queue shouldn't be broken anymore", queue.isBroken());
+
+ while (added.size() < 11) {
+ Thread.sleep(10);
+ }
+ assertEquals("All segments should be consumed",11, added.size());
+ for (int i = 0; i < 11; i++) {
+ assertTrue("All segments should be consumed", added.contains(uuid(i)));
+ }
+
+ int i = writeAttempts.size() - 10;
+ lastAttempt = writeAttempts.get(i);
+ for (; i < writeAttempts.size(); i++) {
+ long delay = writeAttempts.get(i) - lastAttempt;
+ assertTrue("Segments should be persisted immediately", delay < 1000);
+ lastAttempt = writeAttempts.get(i);
+ }
+ }
+
+ private static TarEntry tarEntry(long i) {
+ return new TarEntry(0, i, 0, 0, GCGeneration.NULL);
+ }
+
+ private static UUID uuid(long i) {
+ return new UUID(0, i);
+ }
+
+}
diff --git a/oak-segment-azure/start-azurite.sh b/oak-segment-azure/start-azurite.sh
new file mode 100755
index 0000000000..06e5e61674
--- /dev/null
+++ b/oak-segment-azure/start-azurite.sh
@@ -0,0 +1,3 @@
+#!/bin/bash
+
+docker run -e executable=blob --rm -t -p 10000:10000 trekawek/azurite
diff --git a/oak-segment-tar/pom.xml b/oak-segment-tar/pom.xml
index 17915f82a9..d59f8fb5bc 100644
--- a/oak-segment-tar/pom.xml
+++ b/oak-segment-tar/pom.xml
@@ -43,7 +43,13 @@
maven-bundle-plugin
-
+
+ org.apache.jackrabbit.oak.segment,
+ org.apache.jackrabbit.oak.segment.file.tar,
+ org.apache.jackrabbit.oak.segment.file.tar.binaries,
+ org.apache.jackrabbit.oak.segment.file.tar.index,
+ org.apache.jackrabbit.oak.segment.util
+
netty-*
diff --git a/pom.xml b/pom.xml
index 6443bc6f36..4ecf979ea7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,6 +70,7 @@
oak-examples
oak-it
oak-segment-tar
+ oak-segment-azure
oak-benchmarks