diff --git a/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FixturesHelper.java b/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FixturesHelper.java index 60acc379ef..10f19e3a3d 100644 --- a/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FixturesHelper.java +++ b/oak-commons/src/test/java/org/apache/jackrabbit/oak/commons/FixturesHelper.java @@ -43,7 +43,7 @@ public final class FixturesHelper { * default fixtures when no {@code nsfixtures} is provided */ public enum Fixture { - DOCUMENT_NS, @Deprecated SEGMENT_MK, DOCUMENT_RDB, MEMORY_NS, DOCUMENT_MEM, SEGMENT_TAR, SEGMENT_AZURE, COMPOSITE_SEGMENT, COMPOSITE_MEM, COW_DOCUMENT + DOCUMENT_NS, @Deprecated SEGMENT_MK, DOCUMENT_RDB, MEMORY_NS, DOCUMENT_MEM, SEGMENT_TAR, SEGMENT_AZURE, COMPOSITE_SEGMENT, COMPOSITE_MEM, COW_DOCUMENT, REMOTE } private static final Set FIXTURES; diff --git a/oak-it/pom.xml b/oak-it/pom.xml index 807b398714..346b69ea98 100644 --- a/oak-it/pom.xml +++ b/oak-it/pom.xml @@ -154,6 +154,18 @@ ${project.version} test + + org.apache.jackrabbit + oak-store-remote-server + ${project.version} + test + + + org.apache.jackrabbit + oak-store-remote-client + ${project.version} + test + org.apache.jackrabbit oak-commons @@ -236,5 +248,12 @@ 1.1.1 test + + + com.google.guava + guava + 26.0-jre + test + diff --git a/oak-it/src/test/java/org/apache/jackrabbit/oak/NodeStoreFixtures.java b/oak-it/src/test/java/org/apache/jackrabbit/oak/NodeStoreFixtures.java index cf79b43eb5..b660a55b45 100644 --- a/oak-it/src/test/java/org/apache/jackrabbit/oak/NodeStoreFixtures.java +++ b/oak-it/src/test/java/org/apache/jackrabbit/oak/NodeStoreFixtures.java @@ -31,6 +31,7 @@ import org.apache.jackrabbit.oak.fixture.MemoryFixture; import org.apache.jackrabbit.oak.fixture.NodeStoreFixture; import org.apache.jackrabbit.oak.composite.CompositeMemoryStoreFixture; import org.apache.jackrabbit.oak.composite.CompositeSegmentStoreFixture; +import org.apache.jackrabbit.oak.remote.RemoteNodeStoreFixture; import org.apache.jackrabbit.oak.segment.azure.fixture.SegmentAzureFixture; import org.apache.jackrabbit.oak.segment.fixture.SegmentTarFixture; @@ -54,6 +55,8 @@ public class NodeStoreFixtures { public static final NodeStoreFixture COW_DOCUMENT = new COWStoreFixture(); + public static final NodeStoreFixture REMOTE = new RemoteNodeStoreFixture(); + public static Collection asJunitParameters(Set fixtures) { List configuredFixtures = new ArrayList(); if (fixtures.contains(FixturesHelper.Fixture.DOCUMENT_NS)) { @@ -83,6 +86,9 @@ public class NodeStoreFixtures { if (fixtures.contains(FixturesHelper.Fixture.COW_DOCUMENT)) { configuredFixtures.add(COW_DOCUMENT); } + if (fixtures.contains(FixturesHelper.Fixture.REMOTE)) { + configuredFixtures.add(REMOTE); + } Collection result = new ArrayList(); for (NodeStoreFixture f : configuredFixtures) { diff --git a/oak-it/src/test/java/org/apache/jackrabbit/oak/remote/RemoteNodeStoreFixture.java b/oak-it/src/test/java/org/apache/jackrabbit/oak/remote/RemoteNodeStoreFixture.java new file mode 100644 index 0000000000..ca432ee102 --- /dev/null +++ b/oak-it/src/test/java/org/apache/jackrabbit/oak/remote/RemoteNodeStoreFixture.java @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.remote; + +import com.google.common.io.Closer; +import com.google.common.io.Files; +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import com.spotify.docker.client.DefaultDockerClient; +import com.spotify.docker.client.DockerClient; +import com.spotify.docker.client.LogStream; +import com.spotify.docker.client.exceptions.DockerCertificateException; +import com.spotify.docker.client.exceptions.DockerException; +import com.spotify.docker.client.messages.ContainerConfig; +import com.spotify.docker.client.messages.ContainerCreation; +import com.spotify.docker.client.messages.HostConfig; +import com.spotify.docker.client.messages.PortBinding; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import org.apache.commons.io.FileUtils; +import org.apache.jackrabbit.core.data.FileDataStore; +import org.apache.jackrabbit.oak.fixture.NodeStoreFixture; +import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore; +import org.apache.jackrabbit.oak.remote.client.RemoteNodeStore; +import org.apache.jackrabbit.oak.remote.client.RemoteNodeStoreClient; +import org.apache.jackrabbit.oak.remote.client.TailingPersistenceFactory; +import org.apache.jackrabbit.oak.remote.server.NodeStoreServer; +import org.apache.jackrabbit.oak.segment.RevRepositoryService; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.segment.spi.rev.RevRepository; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.security.InvalidKeyException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static com.spotify.docker.client.DockerClient.LogsParam.follow; +import static com.spotify.docker.client.DockerClient.LogsParam.stderr; +import static com.spotify.docker.client.DockerClient.LogsParam.stdout; + +public class RemoteNodeStoreFixture extends NodeStoreFixture { + + private static final Logger log = LoggerFactory.getLogger(RemoteNodeStoreFixture.class); + + private Map instances = new IdentityHashMap<>(); + + private Map clusterInstances = new IdentityHashMap<>(); + + private RemoteNSServer sharedInstance; + + private int index; + + private AzureDockerContainer dockerContainer = new AzureDockerContainer(); + + private Closer dockerCloser = Closer.create(); + + @Override + public NodeStore createNodeStore() { + try { + dockerContainer.startDocker(); + } catch (DockerException | InterruptedException | DockerCertificateException e) { + throw new IllegalStateException(e); + } + + RemoteNSInstance instance = new RemoteNSInstance("test-" + index++); + NodeStore nodeStore = instance.getClient().getNodeStore(); + instances.put(nodeStore, instance); + return nodeStore; + } + + @Override + public NodeStore createNodeStore(int clusterNodeId) { + try { + dockerContainer.startDocker(); + } catch (DockerException | InterruptedException | DockerCertificateException e) { + throw new IllegalStateException(e); + } + if (sharedInstance == null) { + sharedInstance = new RemoteNSServer("test-" + index++); + } + RemoteNSClient clientInstance = new RemoteNSClient(sharedInstance); + NodeStore nodeStore = clientInstance.getNodeStore(); + clusterInstances.put(nodeStore, clientInstance); + return nodeStore; + } + + @Override + public void dispose(NodeStore nodeStore) { + try { + if (instances.containsKey(nodeStore)) { + instances.remove(nodeStore).close(); + } + if (clusterInstances.containsKey(nodeStore)) { + clusterInstances.remove(nodeStore).close(); + } + if (clusterInstances.isEmpty() && sharedInstance != null) { + sharedInstance.close(); + } + } catch (IOException e) { + throw new IllegalStateException("Can't dispose nodestore", e); + } + + if (instances.isEmpty() && clusterInstances.isEmpty()) { + try { + dockerCloser.close(); + } catch (IOException e) { + throw new IllegalStateException("Can't stop docker", e); + } + } + } + + private class RemoteNSServer implements Closeable { + + private final Closer closer = Closer.create(); + + private final String name; + + private BlobStore blobStore; + + public RemoteNSServer(String name) { + this.name = name; + + createDataStore(); + createServer(); + } + + private void createDataStore() { + FileDataStore fds = new FileDataStore(); + + File datastore = Files.createTempDir(); + closer.register(() -> FileUtils.deleteDirectory(datastore)); + fds.setPath(datastore.getPath()); + fds.init(null); + blobStore = new DataStoreBlobStore(fds); + } + + private void createServer() { + try { + CloudBlobContainer container = dockerContainer.getContainer(name); + container.deleteIfExists(); + container.create(); + + InProcessServerBuilder inProcessServerBuilder = InProcessServerBuilder.forName(name); + NodeStoreServer server = new NodeStoreServer(inProcessServerBuilder, container.getDirectoryReference("oak"), blobStore); + + server.start(); + closer.register(server); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + public void close() throws IOException { + closer.close(); + } + } + + private class RemoteNSClient implements Closeable { + + private final Closer closer = Closer.create(); + + private final BlobStore blobStore; + + private final String name; + + private RemoteNodeStore remoteNodeStore; + + public RemoteNSClient(RemoteNSServer server) { + this.blobStore = server.blobStore; + this.name = server.name; + + createNodeStore(); + } + + private void createNodeStore() { + InProcessChannelBuilder inProcessChannelBuilder = InProcessChannelBuilder.forName(name); + RemoteNodeStoreClient client = new RemoteNodeStoreClient(inProcessChannelBuilder); + try { + File segmentStore = Files.createTempDir(); + closer.register(() -> FileUtils.deleteDirectory(segmentStore)); + + String privateDirName = "oak-private-" + UUID.randomUUID().toString(); + + SegmentNodeStorePersistence tailingPersistence = new TailingPersistenceFactory( + dockerContainer.getContainer(name), + client, + name, + privateDirName + ).create(); + + RevRepository revNodeStore = new RevRepositoryService() + .builder() + .withPersistence(tailingPersistence) + .withBlobStore(blobStore) + .build(); + + remoteNodeStore = new RemoteNodeStore.Builder() + .setBlobStore(blobStore) + .setClient(client) + .setNodeStore(revNodeStore) + .setPrivateDirName(privateDirName) + .build(); + closer.register(remoteNodeStore); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + public NodeStore getNodeStore() { + return remoteNodeStore; + } + + public void close() throws IOException { + closer.close(); + } + } + + private class RemoteNSInstance implements Closeable { + + private final RemoteNSServer server; + + private final RemoteNSClient client; + + private final Closer closer = Closer.create(); + + public RemoteNSInstance(String name) { + server = new RemoteNSServer(name); + closer.register(server); + client = new RemoteNSClient(server); + closer.register(client); + } + + public RemoteNSClient getClient() { + return client; + } + + public void close() throws IOException { + closer.close(); + } + } + + private static class AzureDockerContainer implements Closeable { + + private Closer closer; + + private int port; + + public void startDocker() throws DockerException, InterruptedException, DockerCertificateException { + if (closer != null) { + return; + } + + closer = Closer.create(); + + DockerClient docker = DefaultDockerClient.fromEnv().build(); + closer.register(docker); + docker.pull("trekawek/azurite"); + + Map> portBindings = new HashMap<>(); + PortBinding randomPort = PortBinding.randomPort("0.0.0.0"); + portBindings.put("10000", Arrays.asList(randomPort)); + HostConfig hostConfig = HostConfig.builder().portBindings(portBindings).build(); + + File dataVolume = Files.createTempDir(); + closer.register(() -> FileUtils.deleteDirectory(dataVolume)); + + final ContainerConfig containerConfig = ContainerConfig.builder() + .hostConfig(hostConfig) + .image("trekawek/azurite") + .addVolume(dataVolume.getPath() + ":/data") + .env("executable=blob") + .exposedPorts("10000") + .build(); + + ContainerCreation creation = docker.createContainer(containerConfig); + String id = creation.id(); + closer.register(() -> { + try { + docker.removeContainer(id); + } catch (DockerException | InterruptedException e) { + throw new IOException(e); + } + }); + + docker.startContainer(id); + closer.register(() -> { + try { + docker.killContainer(id); + } catch (DockerException | InterruptedException e) { + throw new IOException(e); + } + }); + + LogStream logStream = docker.logs(id, follow(), stdout(), stderr()); + while (logStream.hasNext()) { + String line = StandardCharsets.UTF_8.decode(logStream.next().content()).toString(); + log.info("{}", line); + if (line.contains("Azure Blob Storage Emulator listening on port 10000")) { + break; + } + } + + Map> ports = docker.inspectContainer(id).networkSettings().ports(); + port = Integer.valueOf(ports.get("10000/tcp").get(0).hostPort()); + } + + private CloudBlobContainer getContainer(String name) throws URISyntaxException, InvalidKeyException, StorageException { + if (closer == null) { + throw new IllegalStateException("Docker is not started"); + } + CloudStorageAccount cloud = CloudStorageAccount.parse("DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:" + port + "/devstoreaccount1;"); + CloudBlobContainer container = cloud.createCloudBlobClient().getContainerReference(name); + return container; + } + + public void close() throws IOException { + if (closer != null) { + closer.close(); + closer = null; + } + } + } +} diff --git a/oak-it/src/test/java/org/apache/jackrabbit/oak/spi/state/NodeStoreTest.java b/oak-it/src/test/java/org/apache/jackrabbit/oak/spi/state/NodeStoreTest.java index 628be98070..df11fa8f2d 100644 --- a/oak-it/src/test/java/org/apache/jackrabbit/oak/spi/state/NodeStoreTest.java +++ b/oak-it/src/test/java/org/apache/jackrabbit/oak/spi/state/NodeStoreTest.java @@ -55,7 +55,6 @@ import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.commit.Observable; import org.apache.jackrabbit.oak.spi.commit.Observer; import org.jetbrains.annotations.NotNull; -import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -83,11 +82,6 @@ public class NodeStoreTest extends OakBaseTest { root = store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); } - @After - public void tearDown() { - fixture.dispose(store); - } - @Test public void getRoot() { assertEquals(root, store.getRoot()); @@ -457,7 +451,8 @@ public class NodeStoreTest extends OakBaseTest { NodeBuilder x = test.getChildNode("x"); if (fixture == NodeStoreFixtures.SEGMENT_TAR || fixture == NodeStoreFixtures.MEMORY_NS || fixture == NodeStoreFixtures.COMPOSITE_MEM || fixture == NodeStoreFixtures.COMPOSITE_SEGMENT - || fixture == NodeStoreFixtures.COW_DOCUMENT || fixture == NodeStoreFixtures.SEGMENT_AZURE) { + || fixture == NodeStoreFixtures.COW_DOCUMENT || fixture == NodeStoreFixtures.SEGMENT_AZURE + || fixture == NodeStoreFixtures.REMOTE) { assertTrue(x.moveTo(x, "xx")); assertFalse(x.exists()); assertFalse(test.hasChildNode("x")); diff --git a/oak-segment-tar/pom.xml b/oak-segment-tar/pom.xml index f0b19b86ef..424e2bcb66 100644 --- a/oak-segment-tar/pom.xml +++ b/oak-segment-tar/pom.xml @@ -48,7 +48,8 @@ org.apache.jackrabbit.oak.segment.spi, org.apache.jackrabbit.oak.segment.spi.monitor, org.apache.jackrabbit.oak.segment.spi.persistence, - org.apache.jackrabbit.oak.segment.spi.persistence.split + org.apache.jackrabbit.oak.segment.spi.persistence.split, + org.apache.jackrabbit.oak.segment.spi.rev netty-*, diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RevRepositoryService.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RevRepositoryService.java new file mode 100644 index 0000000000..7aa1623b7f --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RevRepositoryService.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment; + +import com.google.common.io.Files; +import org.apache.commons.io.FileUtils; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder; +import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException; +import org.apache.jackrabbit.oak.segment.file.ReadOnlyFileStore; +import org.apache.jackrabbit.oak.segment.spi.rev.RevRepositoryFactory; +import org.apache.jackrabbit.oak.segment.spi.rev.RevNodeState; +import org.apache.jackrabbit.oak.segment.spi.rev.RevRepository; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.ConfigurationPolicy; + +import java.io.File; +import java.io.IOException; + +/** + * This service is able to create {@link RevRepository} instances, using the + * configured FileStore to do so. + */ +@Component(configurationPolicy = ConfigurationPolicy.OPTIONAL) +public class RevRepositoryService implements RevRepositoryFactory { + + @Override + public RevRepository create(Builder builder) throws IOException { + File dir = Files.createTempDir(); + + FileStoreBuilder fsBuilder = FileStoreBuilder.fileStoreBuilder(dir) + .withCustomPersistence(builder.getPersistence()) + .withBlobStore(builder.getBlobStore()); + + try { + if (builder.isReadOnly()) { + return new RORevRepository(fsBuilder.buildReadOnly(), dir); + } else { + return new DefaultRevRepository(fsBuilder.build(), dir); + } + } catch (InvalidFileStoreVersionException e) { + throw new IOException(e); + } + } + + private static class DefaultRevRepository implements RevRepository { + + private final File directory; + + private final FileStore fileStore; + + public DefaultRevRepository(FileStore fileStore, File directory) { + this.fileStore = fileStore; + this.directory = directory; + } + + @Override + public RevNodeState getNodeStateByRevision(String revision) { + RecordId recordId = RecordId.fromString(fileStore.getSegmentIdProvider(), revision); + return fileStore.getReader().readNode(recordId); + } + + @Override + public void flushData() throws IOException { + fileStore.getWriter().flush(); + } + + @Override + public void flushJournal() throws IOException { + fileStore.flush(); + } + + @Override + public void close() throws IOException { + fileStore.close(); + FileUtils.deleteDirectory(directory); + } + } + + private static class RORevRepository implements RevRepository { + + private final File directory; + + private final ReadOnlyFileStore fileStore; + + public RORevRepository(ReadOnlyFileStore fileStore, File directory) { + this.fileStore = fileStore; + this.directory = directory; + } + + @Override + public RevNodeState getNodeStateByRevision(String revision) { + RecordId recordId = RecordId.fromString(fileStore.getSegmentIdProvider(), revision); + return fileStore.getReader().readNode(recordId); + } + + @Override + public void flushData() { + throw new UnsupportedOperationException(); + } + + @Override + public void flushJournal() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException { + fileStore.close(); + FileUtils.deleteDirectory(directory); + } + } +} diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeState.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeState.java index c6699ebcee..32a2a15b09 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeState.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeState.java @@ -47,6 +47,7 @@ import org.apache.jackrabbit.oak.api.Type; import org.apache.jackrabbit.oak.commons.Buffer; import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState; import org.apache.jackrabbit.oak.plugins.memory.MemoryChildNodeEntry; +import org.apache.jackrabbit.oak.segment.spi.rev.RevNodeState; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.state.AbstractNodeState; import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry; @@ -61,7 +62,7 @@ import org.jetbrains.annotations.Nullable; * A record of type "NODE". This class can read a node record from a segment. It * currently doesn't cache data (but the template is fully loaded). */ -public class SegmentNodeState extends Record implements NodeState { +public class SegmentNodeState extends Record implements NodeState, RevNodeState { @NotNull private final SegmentReader reader; @@ -720,4 +721,8 @@ public class SegmentNodeState extends Record implements NodeState { return AbstractNodeState.toString(this); } + @Override + public String getRevision() { + return getRecordId().toString(); + } } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/WrappingPersistence.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/WrappingPersistence.java new file mode 100644 index 0000000000..c0b74732db --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/WrappingPersistence.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.segment.spi.persistence; + +import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor; +import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitor; + +import java.io.IOException; + +public class WrappingPersistence implements SegmentNodeStorePersistence { + + private final SegmentNodeStorePersistence delegate; + + public WrappingPersistence(SegmentNodeStorePersistence delegate) { + this.delegate = delegate; + } + + @Override + public SegmentArchiveManager createArchiveManager(boolean memoryMapping, boolean offHeapAccess, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor, RemoteStoreMonitor remoteStoreMonitor) throws IOException { + return delegate.createArchiveManager(memoryMapping, offHeapAccess, ioMonitor, fileStoreMonitor, remoteStoreMonitor); + } + + @Override + public boolean segmentFilesExist() { + return delegate.segmentFilesExist(); + } + + @Override + public JournalFile getJournalFile() { + return delegate.getJournalFile(); + } + + @Override + public GCJournalFile getGCJournalFile() throws IOException { + return delegate.getGCJournalFile(); + } + + @Override + public ManifestFile getManifestFile() throws IOException { + return delegate.getManifestFile(); + } + + @Override + public RepositoryLock lockRepository() throws IOException { + return delegate.lockRepository(); + } +} diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/rev/RevNodeState.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/rev/RevNodeState.java new file mode 100644 index 0000000000..d9506dbdb8 --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/rev/RevNodeState.java @@ -0,0 +1,16 @@ +package org.apache.jackrabbit.oak.segment.spi.rev; + +import org.apache.jackrabbit.oak.spi.state.NodeState; + +/** + * This node state is able to expose its revision. + */ +public interface RevNodeState extends NodeState { + + /** + * The revision name, uniquely identifying this node state in the {@link RevRepository}. + * @return node's revision + */ + String getRevision(); + +} diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/rev/RevRepository.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/rev/RevRepository.java new file mode 100644 index 0000000000..f998d089a0 --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/rev/RevRepository.java @@ -0,0 +1,22 @@ +package org.apache.jackrabbit.oak.segment.spi.rev; + +import java.io.Closeable; +import java.io.IOException; + +/** + * The RevRepository allows to retrieve nodes by their revisions. The changes + * made in the node builder, derived from these nodes, will be persisted automatically + * or on demand, with the flush methods. + * + * Looking at the implementation, this is an abstraction over FileStore, that allows + * to read nodes created by the SegmentNodeStore instances. + */ +public interface RevRepository extends Closeable { + + RevNodeState getNodeStateByRevision(String revision); + + void flushData() throws IOException; + + void flushJournal() throws IOException; + +} diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/rev/RevRepositoryFactory.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/rev/RevRepositoryFactory.java new file mode 100644 index 0000000000..e088e75257 --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/rev/RevRepositoryFactory.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.spi.rev; + +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; + +import java.io.IOException; + +/** + * This is a factory, creating the RevRepository instances. It was introduced + * not to expose oak-segment-tar implementation details in the OSGi environment. + */ +public interface RevRepositoryFactory { + + RevRepository create(Builder builder) throws IOException; + + default Builder builder() { + return new Builder(this); + } + + class Builder { + + private final RevRepositoryFactory factory; + + private SegmentNodeStorePersistence persistence; + + private BlobStore blobStore; + + private boolean readOnly; + + public Builder(RevRepositoryFactory factory) { + this.factory = factory; + } + + public Builder withPersistence(SegmentNodeStorePersistence persistence) { + this.persistence = persistence; + return this; + } + + public Builder withBlobStore(BlobStore blobStore) { + this.blobStore = blobStore; + return this; + } + + public Builder readOnly() { + this.readOnly = true; + return this; + } + + public RevRepositoryFactory getFactory() { + return factory; + } + + public SegmentNodeStorePersistence getPersistence() { + return persistence; + } + + public BlobStore getBlobStore() { + return blobStore; + } + + public boolean isReadOnly() { + return readOnly; + } + + public RevRepository build() throws IOException { + return factory.create(this); + } + } +} diff --git a/oak-store-remote-client/pom.xml b/oak-store-remote-client/pom.xml new file mode 100644 index 0000000000..f678949760 --- /dev/null +++ b/oak-store-remote-client/pom.xml @@ -0,0 +1,313 @@ + + + + + + 4.0.0 + + + org.apache.jackrabbit + oak-parent + 1.18-SNAPSHOT + ../oak-parent/pom.xml + + + oak-store-remote-client + Oak Remote Store + bundle + + + + + kr.motd.maven + os-maven-plugin + 1.5.0.Final + + + + + + org.apache.felix + maven-bundle-plugin + + + + org.apache.jackrabbit.oak.remote.client + + + !com.google.errorprone.annotations, + !com.google.errorprone.annotations.concurrent, + !com.google.protobuf.nano, + !com.google.rpc, + !com.jcraft.jzlib, + !com.ning.compress, + !com.ning.compress.lzf, + !com.ning.compress.lzf.util, + !javax.annotation, + !lzma.sdk, + !lzma.sdk.lzma, + !net.jpountz.lz4, + !net.jpountz.xxhash, + !org.apache.logging.log4j, + !org.apache.logging.log4j.message, + !org.apache.logging.log4j.spi, + !org.bouncycastle.asn1.x500, + !org.bouncycastle.cert, + !org.bouncycastle.cert.jcajce, + !org.bouncycastle.jce.provider, + !org.bouncycastle.operator, + !org.bouncycastle.operator.jcajce, + !org.checkerframework.checker.nullness.qual, + !org.conscrypt, + !org.eclipse.jetty.alpn, + !org.eclipse.jetty.npn, + !org.jboss.marshalling, + !sun.misc, + !sun.security.util, + !sun.security.x509, + * + + + + + + + + + + + org.osgi + org.osgi.core + provided + + + org.osgi + org.osgi.compendium + provided + + + org.osgi + org.osgi.annotation + provided + + + + + org.apache.jackrabbit + oak-api + ${project.version} + + + org.apache.jackrabbit + oak-blob + ${project.version} + + + org.apache.jackrabbit + oak-blob-plugins + ${project.version} + + + org.apache.jackrabbit + oak-commons + ${project.version} + + + org.apache.jackrabbit + oak-core + ${project.version} + + + org.apache.jackrabbit + oak-core-spi + ${project.version} + + + org.apache.jackrabbit + oak-store-spi + ${project.version} + + + org.apache.jackrabbit + oak-store-remote-commons + ${project.version} + + + org.apache.jackrabbit + jackrabbit-data + ${jackrabbit.version} + + + org.apache.jackrabbit + jackrabbit-jcr-commons + ${jackrabbit.version} + + + javax.jcr + jcr + 2.0 + provided + + + + org.apache.jackrabbit + oak-segment-tar + ${project.version} + + + org.apache.jackrabbit + oak-segment-azure + ${project.version} + + + io.dropwizard.metrics + metrics-core + + + + + commons-io + commons-io + + + javax.annotation + javax.annotation-api + 1.3.2 + provided + + + + + io.grpc + grpc-netty-shaded + 1.22.1 + provided + + + io.grpc + grpc-protobuf + 1.22.1 + provided + + + io.grpc + grpc-stub + 1.22.1 + provided + + + + io.grpc + grpc-api + 1.22.1 + provided + + + io.grpc + grpc-core + 1.22.1 + provided + + + io.grpc + grpc-context + 1.22.1 + provided + + + + io.opencensus + opencensus-api + 0.21.0 + provided + + + io.opencensus + opencensus-contrib-grpc-metrics + 0.21.0 + provided + + + + com.google.protobuf + protobuf-java + 3.7.1 + provided + + + io.perfmark + perfmark-api + 0.16.0 + provided + + + + + org.slf4j + slf4j-api + + + + + org.jetbrains + annotations + + + + + com.microsoft.azure + azure-storage + 5.0.0 + + + + + junit + junit + test + + + com.arakelian + docker-junit-rule + test + + + org.slf4j + jul-to-slf4j + test + + + ch.qos.logback + logback-classic + test + + + org.apache.jackrabbit + oak-segment-azure + ${project.version} + test-jar + test + + + org.apache.jackrabbit + oak-store-remote-server + ${project.version} + test + + + diff --git a/oak-store-remote-client/src/main/java/org/apache/jackrabbit/oak/remote/client/Configuration.java b/oak-store-remote-client/src/main/java/org/apache/jackrabbit/oak/remote/client/Configuration.java new file mode 100644 index 0000000000..c8bbd53e67 --- /dev/null +++ b/oak-store-remote-client/src/main/java/org/apache/jackrabbit/oak/remote/client/Configuration.java @@ -0,0 +1,57 @@ +package org.apache.jackrabbit.oak.remote.client; + +import org.apache.jackrabbit.oak.segment.azure.AzureSegmentStoreService; +import org.osgi.service.metatype.annotations.AttributeDefinition; +import org.osgi.service.metatype.annotations.ObjectClassDefinition; + +import static org.apache.jackrabbit.oak.remote.client.Configuration.PID; + +@ObjectClassDefinition( + pid = {PID}, + name = "Apache Jackrabbit Oak Azure Segment Store Service", + description = "Azure backend for the Oak Segment Node Store") +@interface Configuration { + + String PID = "org.apache.jackrabbit.oak.remote.client.RemoteNodeStoreService"; + + @AttributeDefinition( + name = "Azure account name", + description = "Name of the Azure Storage account to use.") + String accountName(); + + @AttributeDefinition( + name = "Azure container name", + description = "Name of the container to use. If it doesn't exists, it'll be created.") + String containerName() default AzureSegmentStoreService.DEFAULT_CONTAINER_NAME; + + @AttributeDefinition( + name = "Azure account access key", + description = "Access key which should be used to authenticate on the account") + String accessKey(); + + @AttributeDefinition( + name = "Root path", + description = "Names of all the created blobs will be prefixed with this path") + String rootPath() default AzureSegmentStoreService.DEFAULT_ROOT_PATH; + + @AttributeDefinition( + name = "Azure connection string (optional)", + description = "Connection string to be used to connect to the Azure Storage. " + + "Setting it will override the accountName and accessKey properties.") + String connectionURL(); + + @AttributeDefinition( + name = "Remote server host", + description = "The host name of the remote server") + String remoteHost() default "localhost"; + + @AttributeDefinition( + name = "Remote server port", + description = "The port number of the remote server") + int remotePort() default 12300; + + @AttributeDefinition( + name = "NodeStoreProvider role", + description = "Property indicating that this component will not register as a NodeStore but as a NodeStoreProvider with given role") + String role(); +} \ No newline at end of file diff --git a/oak-store-remote-client/src/main/java/org/apache/jackrabbit/oak/remote/client/RemoteCheckpointMBean.java b/oak-store-remote-client/src/main/java/org/apache/jackrabbit/oak/remote/client/RemoteCheckpointMBean.java new file mode 100644 index 0000000000..281781e207 --- /dev/null +++ b/oak-store-remote-client/src/main/java/org/apache/jackrabbit/oak/remote/client/RemoteCheckpointMBean.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.remote.client; + +import org.apache.jackrabbit.oak.commons.jmx.AbstractCheckpointMBean; + +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.TabularDataSupport; +import java.util.Map; + +public class RemoteCheckpointMBean extends AbstractCheckpointMBean { + + private final RemoteNodeStore store; + + public RemoteCheckpointMBean(RemoteNodeStore store) { + this.store = store; + } + + @Override + protected void collectCheckpoints(TabularDataSupport tab) throws OpenDataException { + for (String id : store.checkpoints()) { + Map info = store.checkpointInfo(id); + tab.put(id, toCompositeData( + id, + "NA", + "NA", + info)); + } + } + + @Override + public long getOldestCheckpointCreationTimestamp() { + return 0; + } + + @Override + public String createCheckpoint(long lifetime) { + return store.checkpoint(lifetime); + } + + @Override + public boolean releaseCheckpoint(String id) { + return store.release(id); + } +} diff --git a/oak-store-remote-client/src/main/java/org/apache/jackrabbit/oak/remote/client/RemoteNodeStore.java b/oak-store-remote-client/src/main/java/org/apache/jackrabbit/oak/remote/client/RemoteNodeStore.java new file mode 100644 index 0000000000..14b345701b --- /dev/null +++ b/oak-store-remote-client/src/main/java/org/apache/jackrabbit/oak/remote/client/RemoteNodeStore.java @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.remote.client; + +import com.google.common.base.Strings; +import com.google.protobuf.Empty; +import io.grpc.stub.StreamObserver; +import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.json.JsopDiff; +import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob; +import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeBuilder; +import org.apache.jackrabbit.oak.remote.common.CommitInfoUtil; +import org.apache.jackrabbit.oak.remote.common.RevisionableUtils; +import org.apache.jackrabbit.oak.remote.proto.ChangeEventProtos; +import org.apache.jackrabbit.oak.remote.proto.CheckpointProtos; +import org.apache.jackrabbit.oak.remote.proto.CheckpointProtos.CreateCheckpointRequest; +import org.apache.jackrabbit.oak.remote.proto.CommitProtos; +import org.apache.jackrabbit.oak.remote.proto.CommitProtos.Commit; +import org.apache.jackrabbit.oak.remote.proto.LeaseProtos; +import org.apache.jackrabbit.oak.remote.proto.NodeStateProtos.NodeStateId; +import org.apache.jackrabbit.oak.segment.spi.rev.RevRepository; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.apache.jackrabbit.oak.spi.commit.CommitHook; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.CompositeObserver; +import org.apache.jackrabbit.oak.spi.commit.Observable; +import org.apache.jackrabbit.oak.spi.commit.Observer; +import org.apache.jackrabbit.oak.spi.state.ConflictAnnotatingRebaseDiff; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.jackrabbit.oak.remote.common.RevisionableUtils.getRevision; + +public class RemoteNodeStore implements NodeStore, Closeable, Observable { + + private static final Logger log = LoggerFactory.getLogger(RemoteNodeStore.class); + + private final RemoteNodeStoreClient client; + + private final BlobStore blobStore; + + private final CompositeObserver compositeObserver; + + private final StreamObserver observerStreamEvent; + + private final RevRepository nodeStore; + + private final String privateDirName; + + private LeaseProtos.LeaseInfo leaseInfo; + + private volatile LeaseProtos.ClusterView lastClusterView; + + private ScheduledExecutorService leaseRenewProcess = Executors.newScheduledThreadPool(1); + + public static class Builder { + + private RemoteNodeStoreClient client; + + private BlobStore blobStore; + + private RevRepository nodeStore; + + private String privateDirName; + + public Builder setClient(RemoteNodeStoreClient client) { + this.client = client; + return this; + } + + public Builder setBlobStore(BlobStore blobStore) { + this.blobStore = blobStore; + return this; + } + + public Builder setPrivateDirName(String privateDirName) { + this.privateDirName = privateDirName; + return this; + } + + public Builder setNodeStore(RevRepository nodeStore) { + this.nodeStore = nodeStore; + return this; + } + + public RemoteNodeStore build() throws Exception { + return new RemoteNodeStore(this); + } + } + + private RemoteNodeStore(Builder builder) throws Exception { + this.client = builder.client; + this.blobStore = builder.blobStore; + this.nodeStore = builder.nodeStore; + this.privateDirName = builder.privateDirName; + this.compositeObserver = new CompositeObserver(); + + leaseInfo = client.getLeaseService().acquire(Empty.getDefaultInstance()); + lastClusterView = client.getLeaseService().renew(leaseInfo); + leaseRenewProcess.scheduleAtFixedRate(() -> renewLease(), 2, 2, TimeUnit.SECONDS); + + observerStreamEvent = client.getNodeStoreAsyncService().observe(new StreamObserver() { + @Override + public void onNext(ChangeEventProtos.ChangeEvent changeEvent) { + NodeState root = createNodeState(changeEvent.getNodeStateId()); + compositeObserver.contentChanged(root, CommitInfoUtil.deserialize(changeEvent.getCommitInfo())); + } + + @Override + public void onError(Throwable throwable) { + } + + @Override + public void onCompleted() { + } + }); + } + + private void renewLease() { + LeaseProtos.ClusterView response = client.getLeaseService().renew(leaseInfo); + if (Strings.isNullOrEmpty(response.getId())) { + log.error("Lost the lease, acquiring a new one"); + leaseInfo = client.getLeaseService().acquire(Empty.getDefaultInstance()); + return; + } + lastClusterView = response; + } + + public void close() throws IOException { + observerStreamEvent.onCompleted(); + leaseRenewProcess.shutdown(); + try { + leaseRenewProcess.awaitTermination(1, TimeUnit.MINUTES); + this.client.getLeaseService().release(leaseInfo); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + @Override + public Closeable addObserver(Observer observer) { + compositeObserver.addObserver(observer); + return () -> compositeObserver.removeObserver(observer); + } + + @Override + @NotNull + public NodeState getRoot() { + NodeStateId id = client.getNodeStoreService().getRoot(Empty.getDefaultInstance()); + return createNodeState(id); + } + + @Override + public synchronized @NotNull NodeState merge(@NotNull NodeBuilder builder, @NotNull CommitHook commitHook, @NotNull CommitInfo info) throws CommitFailedException { + assertRootBuilder(builder); + NodeState head = builder.getNodeState(); + NodeState base = builder.getBaseState(); + + CommitFailedException ex = null; + for (int i = 0; i < 10; i++) { + try { + if (i > 0) { + Thread.sleep(500); + } + } catch (InterruptedException e1) { + log.error("Interrupted", e1); + } + + NodeState rootState = getRoot(); + + if (!RevisionableUtils.fastEquals(rootState, builder.getBaseState())) { + reset(builder, rootState); + head.compareAgainstBaseState(base, new ConflictAnnotatingRebaseDiff(builder)); + } + + NodeState baseNodeState = builder.getBaseState(); + NodeState headNodeState; + try { + headNodeState = commitHook.processCommit(baseNodeState, builder.getNodeState(), info); + } catch (CommitFailedException e) { + log.warn("Hooks failed, attempt {}/5", i+1); + log.info("diff: {}", JsopDiff.diffToJsop(baseNodeState, builder.getNodeState())); + ex = e; + continue; + } + + try { + nodeStore.flushData(); + } catch (IOException e) { + log.error("Can't flush", e); + continue; + } + + NodeStateId id = client.getNodeStoreService().merge(createCommitObject(info, baseNodeState, headNodeState)); + + if (Strings.isNullOrEmpty(id.getRevision())) { + log.warn("Rebased to an outdated root state, attempt {}/5", i+1); + ex = new CommitFailedException(CommitFailedException.MERGE, 1, "Can't merge, revision on remote has been updated"); + continue; + } + + NodeState mergedRoot = createNodeState(id); + reset(builder, mergedRoot); + return mergedRoot; + } + throw ex; + } + + @Override + public @NotNull NodeState rebase(@NotNull NodeBuilder builder) { + NodeState head = builder.getNodeState(); + NodeState base = builder.getBaseState(); + NodeState newBase = getRoot(); + if (!RevisionableUtils.fastEquals(base, newBase)) { + reset(builder, newBase); + head.compareAgainstBaseState(base, new ConflictAnnotatingRebaseDiff(builder)); + head = builder.getNodeState(); + } + return head; + } + + @Override + public NodeState reset(@NotNull NodeBuilder builder) { + assertRootBuilder(builder); + NodeState root = getRoot(); + reset(builder, root); + return root; + } + + private void reset(@NotNull NodeBuilder builder, NodeState newBase) { + if (!(builder instanceof MemoryNodeBuilder)) { + throw new IllegalArgumentException("Invalid node builder: " + builder.getClass()); + } + ((MemoryNodeBuilder) builder).reset(newBase); + } + + private void assertRootBuilder(NodeBuilder builder) { + if (!(builder instanceof MemoryNodeBuilder)) { + throw new IllegalArgumentException("Invalid node builder: " + builder.getClass()); + } + MemoryNodeBuilder nodeBuilder = (MemoryNodeBuilder) builder; + if (!nodeBuilder.isRoot()) { + throw new IllegalArgumentException("Not a root builder: " + builder.getClass()); + } + } + + @Override + public @NotNull Blob createBlob(InputStream inputStream) throws IOException { + return getBlob(blobStore.writeBlob(inputStream)); + } + + @Override + public @Nullable Blob getBlob(@NotNull String reference) { + return new BlobStoreBlob(blobStore, reference); + } + + @Override + public @NotNull String checkpoint(long lifetime, @NotNull Map properties) { + CreateCheckpointRequest.Builder builder = CreateCheckpointRequest.newBuilder().setLifetime(lifetime); + builder.getInfoBuilder().putAllCheckpointInfo(properties); + return client.getCheckpointService().createCheckpoint(builder.build()).getId(); + } + + @Override + public @NotNull String checkpoint(long lifetime) { + CreateCheckpointRequest.Builder builder = CreateCheckpointRequest.newBuilder().setLifetime(lifetime); + return client.getCheckpointService().createCheckpoint(builder.build()).getId(); + } + + @Override + public @NotNull Map checkpointInfo(@NotNull String checkpoint) { + CheckpointProtos.CheckpointId checkpointId = CheckpointProtos.CheckpointId.newBuilder().setId(checkpoint).build(); + CheckpointProtos.CheckpointInfo info = client.getCheckpointService().getCheckpointInfo(checkpointId); + return info.getCheckpointInfoMap(); + } + + @Override + public @NotNull Iterable checkpoints() { + return client.getCheckpointService().getCheckpointList(Empty.getDefaultInstance()) + .getCheckpointIdList() + .stream() + .map(CheckpointProtos.CheckpointId::getId) + .collect(Collectors.toList()); + } + + @Override + public @Nullable NodeState retrieve(@NotNull String checkpoint) { + CheckpointProtos.CheckpointId checkpointId = CheckpointProtos.CheckpointId.newBuilder().setId(checkpoint).build(); + NodeStateId nodeStateId = client.getCheckpointService().retrieveCheckpoint(checkpointId); + return createNodeState(nodeStateId); + } + + @Override + public boolean release(@NotNull String checkpoint) { + CheckpointProtos.CheckpointId checkpointId = CheckpointProtos.CheckpointId.newBuilder().setId(checkpoint).build(); + return client.getCheckpointService().releaseCheckpoint(checkpointId).getValue(); + } + + private NodeState createNodeState(NodeStateId id) { + String revision = id.getRevision(); + return nodeStore.getNodeStateByRevision(revision); + } + + @NotNull + private CommitProtos.Commit createCommitObject(@NotNull CommitInfo info, NodeState baseNodeState, NodeState headNodeState) { + Commit.Builder commitBuilder = Commit.newBuilder(); + commitBuilder.setCommitInfo(CommitInfoUtil.serialize(info)); + commitBuilder.getBaseNodeStateBuilder().setRevision(getRevision(baseNodeState)); + commitBuilder.getHeadNodeStateBuilder().setRevision(getRevision(headNodeState)); + commitBuilder.setSegmentStoreDir(privateDirName); + return commitBuilder.build(); + } + + public LeaseProtos.ClusterView getLastClusterView() { + return lastClusterView; + } +} diff --git a/oak-store-remote-client/src/main/java/org/apache/jackrabbit/oak/remote/client/RemoteNodeStoreClient.java b/oak-store-remote-client/src/main/java/org/apache/jackrabbit/oak/remote/client/RemoteNodeStoreClient.java new file mode 100644 index 0000000000..ea74f4de10 --- /dev/null +++ b/oak-store-remote-client/src/main/java/org/apache/jackrabbit/oak/remote/client/RemoteNodeStoreClient.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.remote.client; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import org.apache.jackrabbit.oak.remote.proto.CheckpointServiceGrpc; +import org.apache.jackrabbit.oak.remote.proto.CheckpointServiceGrpc.CheckpointServiceBlockingStub; +import org.apache.jackrabbit.oak.remote.proto.LeaseServiceGrpc; +import org.apache.jackrabbit.oak.remote.proto.LeaseServiceGrpc.LeaseServiceBlockingStub; +import org.apache.jackrabbit.oak.remote.proto.NodeStoreServiceGrpc; +import org.apache.jackrabbit.oak.remote.proto.NodeStoreServiceGrpc.NodeStoreServiceBlockingStub; +import org.apache.jackrabbit.oak.remote.proto.SegmentServiceGrpc; +import org.apache.jackrabbit.oak.remote.proto.SegmentServiceGrpc.SegmentServiceBlockingStub; +import org.apache.jackrabbit.oak.remote.proto.SegmentServiceGrpc.SegmentServiceStub; + +import java.io.Closeable; + +public class RemoteNodeStoreClient implements Closeable { + + private final ManagedChannel channel; + + private final CheckpointServiceBlockingStub checkpointService; + + private final NodeStoreServiceBlockingStub nodeStoreService; + + private final NodeStoreServiceGrpc.NodeStoreServiceStub nodeStoreAsyncService; + + private final LeaseServiceBlockingStub leaseService; + + private final SegmentServiceBlockingStub segmentService; + + private final SegmentServiceStub segmentAsyncService; + + public RemoteNodeStoreClient(String host, int port) { + this(ManagedChannelBuilder.forAddress(host, port).usePlaintext()); + } + + public RemoteNodeStoreClient(ManagedChannelBuilder channelBuilder) { + channel = channelBuilder.build(); + checkpointService = CheckpointServiceGrpc.newBlockingStub(channel); + nodeStoreService = NodeStoreServiceGrpc.newBlockingStub(channel); + nodeStoreAsyncService = NodeStoreServiceGrpc.newStub(channel); + leaseService = LeaseServiceGrpc.newBlockingStub(channel); + segmentAsyncService = SegmentServiceGrpc.newStub(channel); + segmentService = SegmentServiceGrpc.newBlockingStub(channel); + } + + public CheckpointServiceBlockingStub getCheckpointService() { + return checkpointService; + } + + public NodeStoreServiceBlockingStub getNodeStoreService() { + return nodeStoreService; + } + + public NodeStoreServiceGrpc.NodeStoreServiceStub getNodeStoreAsyncService() { + return nodeStoreAsyncService; + } + + public LeaseServiceBlockingStub getLeaseService() { + return leaseService; + } + + public SegmentServiceStub getSegmentAsyncService() { + return segmentAsyncService; + } + + public SegmentServiceBlockingStub getSegmentService() { + return segmentService; + } + + @Override + public void close() { + channel.shutdown(); + } + +} diff --git a/oak-store-remote-client/src/main/java/org/apache/jackrabbit/oak/remote/client/RemoteNodeStoreDiscoveryLiteDescriptors.java b/oak-store-remote-client/src/main/java/org/apache/jackrabbit/oak/remote/client/RemoteNodeStoreDiscoveryLiteDescriptors.java new file mode 100644 index 0000000000..ad8b7211bb --- /dev/null +++ b/oak-store-remote-client/src/main/java/org/apache/jackrabbit/oak/remote/client/RemoteNodeStoreDiscoveryLiteDescriptors.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.remote.client; + +import org.apache.jackrabbit.commons.SimpleValueFactory; +import org.apache.jackrabbit.oak.api.Descriptors; +import org.apache.jackrabbit.oak.commons.json.JsopBuilder; +import org.apache.jackrabbit.oak.remote.proto.LeaseProtos; +import org.jetbrains.annotations.NotNull; + +import javax.jcr.Value; +import java.util.Iterator; + +/** + * This provides the 'clusterView' repository descriptors + **/ +class RemoteNodeStoreDiscoveryLiteDescriptors implements Descriptors { + + private static final String OAK_DISCOVERYLITE_CLUSTERVIEW = "oak.discoverylite.clusterview"; + + private final SimpleValueFactory factory = new SimpleValueFactory(); + + private final RemoteNodeStore store; + + RemoteNodeStoreDiscoveryLiteDescriptors(RemoteNodeStore store) { + this.store = store; + } + + @NotNull + @Override + public String[] getKeys() { + return new String[] {OAK_DISCOVERYLITE_CLUSTERVIEW}; + } + + @Override + public boolean isStandardDescriptor(@NotNull String key) { + return OAK_DISCOVERYLITE_CLUSTERVIEW.equals(key); + } + + @Override + public boolean isSingleValueDescriptor(@NotNull String key) { + return OAK_DISCOVERYLITE_CLUSTERVIEW.equals(key); + } + + @Override + public Value getValue(@NotNull String key) { + if (!OAK_DISCOVERYLITE_CLUSTERVIEW.equals(key)) { + return null; + } + return factory.createValue(getClusterViewAsDescriptorValue()); + } + + @Override + public Value[] getValues(@NotNull String key) { + if (!OAK_DISCOVERYLITE_CLUSTERVIEW.equals(key)) { + return null; + } + return new Value[] {getValue(key)}; + } + + private String getClusterViewAsDescriptorValue() { + LeaseProtos.ClusterView view = store.getLastClusterView(); + return asJson(view.getSeq(), true, view.getId(), view.getMe(), view.getActiveList(), view.getDeactivatingList(), view.getInactiveList()); + } + + private static String asJson(final long viewSeqNum, final boolean viewFinal, final String clusterId, final int localId, + final Iterable activeIds, final Iterable deactivatingIds, final Iterable inactiveIds) { + JsopBuilder builder = new JsopBuilder(); + builder.object(); + builder.key("seq").value(viewSeqNum); + builder.key("final").value(viewFinal); + builder.key("id").value(clusterId); + builder.key("me").value(localId); + builder.key("active").array(); + for (Iterator it = activeIds.iterator(); it.hasNext();) { + Integer anInstance = it.next(); + builder.value(anInstance); + } + builder.endArray(); + builder.key("deactivating").array(); + for (Iterator it = deactivatingIds.iterator(); it.hasNext();) { + Integer anInstance = it.next(); + builder.value(anInstance); + } + builder.endArray(); + builder.key("inactive").array(); + for (Iterator it = inactiveIds.iterator(); it.hasNext();) { + Integer anInstance = it.next(); + builder.value(anInstance); + } + builder.endArray(); + builder.endObject(); + return builder.toString(); + } + + +} diff --git a/oak-store-remote-client/src/main/java/org/apache/jackrabbit/oak/remote/client/RemoteNodeStoreService.java b/oak-store-remote-client/src/main/java/org/apache/jackrabbit/oak/remote/client/RemoteNodeStoreService.java new file mode 100644 index 0000000000..0d787101d4 --- /dev/null +++ b/oak-store-remote-client/src/main/java/org/apache/jackrabbit/oak/remote/client/RemoteNodeStoreService.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.remote.client; + +import com.google.common.base.Strings; +import com.google.common.io.Closer; +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageCredentials; +import com.microsoft.azure.storage.StorageCredentialsAccountAndKey; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.commons.SimpleValueFactory; +import org.apache.jackrabbit.oak.api.Descriptors; +import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean; +import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard; +import org.apache.jackrabbit.oak.segment.spi.rev.RevRepositoryFactory; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.apache.jackrabbit.oak.spi.cluster.ClusterRepositoryInfo; +import org.apache.jackrabbit.oak.spi.commit.ObserverTracker; +import org.apache.jackrabbit.oak.spi.descriptors.GenericDescriptors; +import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.apache.jackrabbit.oak.spi.state.NodeStoreProvider; +import org.apache.jackrabbit.oak.spi.whiteboard.Registration; +import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard; +import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils; +import org.osgi.framework.Constants; +import org.osgi.framework.ServiceRegistration; +import org.osgi.service.component.ComponentContext; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.ConfigurationPolicy; +import org.osgi.service.component.annotations.Deactivate; +import org.osgi.service.component.annotations.Reference; +import org.osgi.service.component.annotations.ReferenceCardinality; +import org.osgi.service.component.annotations.ReferencePolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.Dictionary; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.UUID; + +import static org.apache.jackrabbit.oak.spi.cluster.ClusterRepositoryInfo.getOrCreateId; + +@Component( + configurationPolicy = ConfigurationPolicy.REQUIRE, + configurationPid = {Configuration.PID}) +public class RemoteNodeStoreService { + + private static final Logger LOG = LoggerFactory.getLogger(RemoteNodeStoreService.class); + + @Reference(cardinality = ReferenceCardinality.MANDATORY, policy = ReferencePolicy.STATIC) + private BlobStore blobStore; + + @Reference(cardinality = ReferenceCardinality.MANDATORY, policy = ReferencePolicy.STATIC) + private RevRepositoryFactory nodeStoreFactory; + + private ComponentContext context; + + private Closer closer; + + private Configuration config; + + private RemoteNodeStoreClient client; + + private String privateDirName; + + private SegmentNodeStorePersistence persistence; + + @Activate + protected void activate(ComponentContext context, Configuration config) throws Exception { + this.context = context; + this.config = config; + this.closer = Closer.create(); + createClient(); + registerPersistence(); + registerRemoteNodeStore(); + } + + @Deactivate + protected void deactivate() throws IOException { + closer.close(); + } + + private void createClient() { + client = new RemoteNodeStoreClient(config.remoteHost(), config.remotePort()); + closer.register(client); + } + + private void registerPersistence() throws StorageException, InvalidKeyException, URISyntaxException, IOException { + String sharedDirName = config.rootPath(); + privateDirName = sharedDirName + "-" + UUID.randomUUID().toString(); + CloudBlobContainer container = createContainer(config); + TailingPersistenceFactory persistenceFactory = new TailingPersistenceFactory(container, client, config.rootPath(), privateDirName); + closer.register(persistenceFactory); + persistence = persistenceFactory.create(); + } + + private static CloudBlobContainer createContainer(Configuration config) throws URISyntaxException, StorageException, InvalidKeyException { + CloudStorageAccount cloud; + if (!Strings.isNullOrEmpty(config.connectionURL())) { + cloud = CloudStorageAccount.parse(config.connectionURL()); + } else { + StorageCredentials credentials = new StorageCredentialsAccountAndKey( + config.accountName(), + config.accessKey()); + cloud = new CloudStorageAccount(credentials, true); + } + return cloud.createCloudBlobClient().getContainerReference(config.containerName()); + } + + private void registerRemoteNodeStore() throws Exception { + RemoteNodeStore.Builder builder = new RemoteNodeStore.Builder(); + builder.setBlobStore(blobStore); + builder.setClient(client); + builder.setPrivateDirName(privateDirName); + builder.setNodeStore(nodeStoreFactory.builder().withBlobStore(blobStore).withPersistence(persistence).build()); + + RemoteNodeStore store = builder.build(); + + Whiteboard whiteboard = new OsgiWhiteboard(context.getBundleContext()); + + if (config.role() == null) { + ObserverTracker observerTracker = new ObserverTracker(store); + observerTracker.start(context.getBundleContext()); + closer.register(() -> observerTracker.stop()); + + registerMBean(whiteboard, + CheckpointMBean.class, + new RemoteCheckpointMBean(store), + CheckpointMBean.TYPE, + "Remote node store checkpoint management"); + + registerDescriptors(whiteboard, store); + + Dictionary props = new Hashtable(); + props.put(Constants.SERVICE_PID, RemoteNodeStore.class.getName()); + props.put("oak.nodestore.description", new String[] { "nodeStoreType=remote" } ); + + LOG.info("Registering the remote node store"); + ServiceRegistration nsReg = context.getBundleContext().registerService( + NodeStore.class.getName(), + store, + props); + closer.register(() -> nsReg.unregister()); + } else { + registerDescriptors(whiteboard, store); + + Dictionary props = new Hashtable(); + props.put(NodeStoreProvider.ROLE, config.role()); + + LOG.info("Registering the remote node store provider"); + ServiceRegistration nsReg = context.getBundleContext().registerService( + NodeStoreProvider.class.getName(), + (NodeStoreProvider) () -> store, + props); + closer.register(() -> nsReg.unregister()); + } + } + + private void registerDescriptors(Whiteboard whiteboard, RemoteNodeStore remoteNodeStore) { + GenericDescriptors clusterIdDesc = new GenericDescriptors(); + clusterIdDesc.put( + ClusterRepositoryInfo.OAK_CLUSTERID_REPOSITORY_DESCRIPTOR_KEY, + new SimpleValueFactory().createValue(getOrCreateId(remoteNodeStore)), + true, + false + ); + register(whiteboard, Descriptors.class, clusterIdDesc); + register(whiteboard, Descriptors.class, new RemoteNodeStoreDiscoveryLiteDescriptors(remoteNodeStore)); + } + + private void register(Whiteboard whiteboard, Class iface, T bean) { + Registration reg = whiteboard.register(iface, bean, new HashMap<>()); + closer.register(() -> reg.unregister()); + } + + private void registerMBean(Whiteboard whiteboard, Class iface, T bean, String type, String name) { + Registration reg = WhiteboardUtils.registerMBean(whiteboard, iface, bean, type, name); + closer.register(() -> reg.unregister()); + } +} \ No newline at end of file diff --git a/oak-store-remote-client/src/main/java/org/apache/jackrabbit/oak/remote/client/TailingPersistenceFactory.java b/oak-store-remote-client/src/main/java/org/apache/jackrabbit/oak/remote/client/TailingPersistenceFactory.java new file mode 100644 index 0000000000..8bf421a6c2 --- /dev/null +++ b/oak-store-remote-client/src/main/java/org/apache/jackrabbit/oak/remote/client/TailingPersistenceFactory.java @@ -0,0 +1,92 @@ +package org.apache.jackrabbit.oak.remote.client; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import io.grpc.stub.StreamObserver; +import org.apache.jackrabbit.oak.remote.common.SegmentWriteListener; +import org.apache.jackrabbit.oak.remote.common.persistence.TailingPersistence; +import org.apache.jackrabbit.oak.remote.proto.SegmentProtos; +import org.apache.jackrabbit.oak.segment.azure.AzurePersistence; +import org.apache.jackrabbit.oak.segment.spi.monitor.CompositeIOMonitor; +import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor; +import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitor; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.segment.spi.persistence.WrappingPersistence; +import org.apache.jackrabbit.oak.segment.spi.persistence.split.SplitPersistence; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Arrays; + +public class TailingPersistenceFactory implements Closeable { + + private final CloudBlobContainer container; + + private final RemoteNodeStoreClient client; + + private final String sharedDirName; + + private final String privateDirName; + + private StreamObserver segmentStreamObserver; + + public TailingPersistenceFactory(CloudBlobContainer container, + RemoteNodeStoreClient client, + String sharedDirName, + String privateDirName) { + this.container = container; + this.client = client; + this.sharedDirName = sharedDirName; + this.privateDirName = privateDirName; + } + + public SegmentNodeStorePersistence create() throws IOException, URISyntaxException, StorageException { + AzurePersistence sharedPersistence; + AzurePersistence privatePersistence; + try { + sharedPersistence = new AzurePersistence(container.getDirectoryReference(sharedDirName)); + privatePersistence = new AzurePersistence(container.getDirectoryReference(privateDirName)); + } catch (URISyntaxException e) { + throw new IOException(e); + } + TailingPersistence tailingPersistence = new TailingPersistence(sharedPersistence, client.getSegmentService()); + segmentStreamObserver = client.getSegmentAsyncService().observeSegments(new StreamObserver() { + @Override + public void onNext(SegmentProtos.SegmentBlob segmentBlob) { + tailingPersistence.onNewSegment(segmentBlob); + } + + @Override + public void onError(Throwable throwable) { + } + + @Override + public void onCompleted() { + } + }); + SplitPersistence splitPersistence = new SplitPersistence(tailingPersistence, privatePersistence); + SegmentWriteListener listener = new SegmentWriteListener(); + listener.setDelegate(segmentBlob -> { + client.getSegmentService().newPrivateSegment(SegmentProtos.PrivateSegment.newBuilder() + .setSegmentStoreDir(privateDirName) + .setSegmentBlob(segmentBlob) + .build()); + } + ); + return new WrappingPersistence(splitPersistence) { + @Override + public SegmentArchiveManager createArchiveManager(boolean memoryMapping, boolean offHeapAccess, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor, RemoteStoreMonitor remoteStoreMonitor) throws IOException { + IOMonitor effectiveIOMonitor = new CompositeIOMonitor(Arrays.asList(ioMonitor, listener)); + return splitPersistence.createArchiveManager(memoryMapping, offHeapAccess, effectiveIOMonitor, fileStoreMonitor, remoteStoreMonitor); + } + }; + } + + @Override + public void close() throws IOException { + segmentStreamObserver.onCompleted(); + } +} diff --git a/oak-store-remote-client/src/test/java/org/apache/jackrabbit/oak/remote/AbstractRemoteNodeStoreTest.java b/oak-store-remote-client/src/test/java/org/apache/jackrabbit/oak/remote/AbstractRemoteNodeStoreTest.java new file mode 100644 index 0000000000..3d64f14865 --- /dev/null +++ b/oak-store-remote-client/src/test/java/org/apache/jackrabbit/oak/remote/AbstractRemoteNodeStoreTest.java @@ -0,0 +1,78 @@ +package org.apache.jackrabbit.oak.remote; + +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import org.apache.jackrabbit.core.data.FileDataStore; +import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore; +import org.apache.jackrabbit.oak.remote.client.RemoteNodeStore; +import org.apache.jackrabbit.oak.remote.client.RemoteNodeStoreClient; +import org.apache.jackrabbit.oak.remote.client.TailingPersistenceFactory; +import org.apache.jackrabbit.oak.remote.server.NodeStoreServer; +import org.apache.jackrabbit.oak.segment.RevRepositoryService; +import org.apache.jackrabbit.oak.segment.SegmentNodeStore; +import org.apache.jackrabbit.oak.segment.azure.AzuriteDockerRule; +import org.apache.jackrabbit.oak.segment.spi.rev.RevRepository; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; + +public abstract class AbstractRemoteNodeStoreTest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + private long index; + + protected SegmentNodeStore delegateNodeStore; + + protected NodeStoreServer server; + + protected RemoteNodeStore remoteNodeStore; + + @Before + public void setup() throws Exception { + container = azurite.getContainer("oak-test"); + + FileDataStore fds = new FileDataStore(); + fds.setPath(folder.newFolder().getPath()); + BlobStore blobStore = new DataStoreBlobStore(fds); + + String name = "oak-test-" + (++index); + + InProcessServerBuilder inProcessServerBuilder = InProcessServerBuilder.forName(name); + server = new NodeStoreServer(inProcessServerBuilder, container.getDirectoryReference(name), blobStore); + delegateNodeStore = server.getNodeStore(); + server.start(); + + InProcessChannelBuilder inProcessChannelBuilder = InProcessChannelBuilder.forName(name); + RemoteNodeStoreClient client = new RemoteNodeStoreClient(inProcessChannelBuilder); + TailingPersistenceFactory persistenceFactory = new TailingPersistenceFactory(container, client, name, name + "-priv"); + + RevRepositoryService nodeStoreFactory = new RevRepositoryService(); + RevRepository revNodeStore = nodeStoreFactory.builder().withBlobStore(blobStore).withPersistence(persistenceFactory.create()).build(); + + remoteNodeStore = new RemoteNodeStore.Builder() + .setBlobStore(blobStore) + .setClient(client) + .setPrivateDirName(name + "-priv") + .setNodeStore(revNodeStore) + .build(); + } + + @After + public void teardown() throws IOException { + remoteNodeStore.close(); + server.close(); + } +} diff --git a/oak-store-remote-client/src/test/java/org/apache/jackrabbit/oak/remote/RemoteNodeStoreTest.java b/oak-store-remote-client/src/test/java/org/apache/jackrabbit/oak/remote/RemoteNodeStoreTest.java new file mode 100644 index 0000000000..959ab463bd --- /dev/null +++ b/oak-store-remote-client/src/test/java/org/apache/jackrabbit/oak/remote/RemoteNodeStoreTest.java @@ -0,0 +1,55 @@ +package org.apache.jackrabbit.oak.remote; + +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.api.Type; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +public class RemoteNodeStoreTest extends AbstractRemoteNodeStoreTest { + + @Test + public void simpleMergeTest() throws CommitFailedException { + NodeState root = remoteNodeStore.getRoot(); + NodeBuilder builder2 = root.builder(); + builder2.setProperty("foo", "bar"); + remoteNodeStore.merge(builder2, EmptyHook.INSTANCE, CommitInfo.EMPTY); + assertEquals("bar", delegateNodeStore.getRoot().getString("foo")); + } + + @Test + @Ignore + public void manyChangesTest() throws CommitFailedException { + NodeState root = remoteNodeStore.getRoot(); + NodeBuilder builder = root.builder(); + NodeBuilder test = builder.child("test"); + for (int i = 0; i < 15_000; i++) { + test.setProperty("foo_" + i, i); + } + + root = remoteNodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + NodeState testState = root.getChildNode("test"); + for (long i = 0; i < 15_000; i++) { + assertEquals(i, (long) testState.getProperty("foo_" + i).getValue(Type.LONG)); + } + } + + @Test + public void testBlob() throws IOException, CommitFailedException { + NodeBuilder builder = remoteNodeStore.getRoot().builder(); + builder.setProperty("smallBlob", builder.createBlob(new ByteArrayInputStream(new byte[10]))); + builder.setProperty("largeBlob", builder.createBlob(new ByteArrayInputStream(new byte[10 * 1024]))); + remoteNodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + assertEquals(10, remoteNodeStore.getRoot().getProperty("smallBlob").getValue(Type.BINARY).length()); + assertEquals(10240, remoteNodeStore.getRoot().getProperty("largeBlob").getValue(Type.BINARY).length()); + } +} diff --git a/oak-store-remote-commons/pom.xml b/oak-store-remote-commons/pom.xml new file mode 100644 index 0000000000..f97823a5d5 --- /dev/null +++ b/oak-store-remote-commons/pom.xml @@ -0,0 +1,229 @@ + + + + + + 4.0.0 + + + org.apache.jackrabbit + oak-parent + 1.18-SNAPSHOT + ../oak-parent/pom.xml + + + oak-store-remote-commons + Oak Remote Store Commons + bundle + + + + + kr.motd.maven + os-maven-plugin + 1.5.0.Final + + + + + + org.apache.felix + maven-bundle-plugin + + + + !com.google.errorprone.annotations, + !com.google.errorprone.annotations.concurrent, + !com.google.protobuf.nano, + !com.google.rpc, + !com.jcraft.jzlib, + !com.ning.compress, + !com.ning.compress.lzf, + !com.ning.compress.lzf.util, + !javax.annotation, + !lzma.sdk, + !lzma.sdk.lzma, + !net.jpountz.lz4, + !net.jpountz.xxhash, + !org.apache.logging.log4j, + !org.apache.logging.log4j.message, + !org.apache.logging.log4j.spi, + !org.bouncycastle.asn1.x500, + !org.bouncycastle.cert, + !org.bouncycastle.cert.jcajce, + !org.bouncycastle.jce.provider, + !org.bouncycastle.operator, + !org.bouncycastle.operator.jcajce, + !org.checkerframework.checker.nullness.qual, + !org.conscrypt, + !org.eclipse.jetty.alpn, + !org.eclipse.jetty.npn, + !org.jboss.marshalling, + !sun.misc, + !sun.security.util, + !sun.security.x509, + * + + + org.apache.jackrabbit.oak.remote.common, + org.apache.jackrabbit.oak.remote.common.persistence, + org.apache.jackrabbit.oak.remote.proto, + com.google.protobuf, + io.grpc, + io.grpc.netty.shaded.io*, + io.grpc.protobuf, + io.grpc.protobuf.lite, + io.grpc.stub + + + grpc-api, + grpc-context, + grpc-core, + grpc-netty-shaded, + grpc-protobuf, + grpc-stub, + protobuf-java, + opencensus-api, + opencensus-contrib-grpc-metrics, + perfmark-api, + guava + + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.5.1 + + com.google.protobuf:protoc:3.7.1:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:1.22.1:exe:${os.detected.classifier} + + + + + compile + compile-custom + + + + + + + + + + + org.apache.jackrabbit + oak-segment-azure + ${project.version} + + + org.apache.jackrabbit + oak-segment-tar + ${project.version} + + + org.apache.jackrabbit + oak-store-spi + ${project.version} + + + + + org.jetbrains + annotations + + + + + io.grpc + grpc-api + 1.22.1 + + + io.grpc + grpc-context + 1.22.1 + + + io.grpc + grpc-core + 1.22.1 + + + io.grpc + grpc-protobuf + 1.22.1 + + + io.grpc + grpc-protobuf-lite + 1.22.1 + + + com.google.protobuf + protobuf-lite + + + + + io.grpc + grpc-stub + 1.22.1 + + + io.grpc + grpc-netty-shaded + 1.22.1 + + + com.google.protobuf + protobuf-java + 3.7.1 + + + javax.annotation + javax.annotation-api + 1.3.2 + + + + + io.opencensus + opencensus-api + 0.21.0 + + + io.opencensus + opencensus-contrib-grpc-metrics + 0.21.0 + + + io.perfmark + perfmark-api + 0.16.0 + + + com.google.guava + guava + 26.0-jre + + + diff --git a/oak-store-remote-commons/src/main/java/org/apache/jackrabbit/oak/remote/common/CommitInfoUtil.java b/oak-store-remote-commons/src/main/java/org/apache/jackrabbit/oak/remote/common/CommitInfoUtil.java new file mode 100644 index 0000000000..f6e691726d --- /dev/null +++ b/oak-store-remote-commons/src/main/java/org/apache/jackrabbit/oak/remote/common/CommitInfoUtil.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.remote.common; + +import org.apache.jackrabbit.oak.remote.proto.CommitProtos; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; + +public final class CommitInfoUtil { + + private CommitInfoUtil() { + } + + public static CommitProtos.CommitInfo serialize(CommitInfo info) { + CommitProtos.CommitInfo.Builder ciBuilder = CommitProtos.CommitInfo.newBuilder(); + ciBuilder.setSessionId(info.getSessionId()); + ciBuilder.setUserId(info.getUserId()); + ciBuilder.setIsExternal(info.isExternal()); + Map commitInfoMap = new LinkedHashMap<>(); + for (Map.Entry e : info.getInfo().entrySet()) { + if ((e.getValue() instanceof String)) { + commitInfoMap.put(e.getKey(), (String) e.getValue()); + } + } + ciBuilder.putAllCommitInfo(commitInfoMap); + return ciBuilder.build(); + } + + public static CommitInfo deserialize(CommitProtos.CommitInfo info) { + Map map = new HashMap<>(); + map.putAll(info.getCommitInfoMap()); + return new CommitInfo(info.getSessionId(), info.getUserId(), map, info.getIsExternal()); + } +} diff --git a/oak-store-remote-commons/src/main/java/org/apache/jackrabbit/oak/remote/common/RevisionableUtils.java b/oak-store-remote-commons/src/main/java/org/apache/jackrabbit/oak/remote/common/RevisionableUtils.java new file mode 100644 index 0000000000..65f62b0647 --- /dev/null +++ b/oak-store-remote-commons/src/main/java/org/apache/jackrabbit/oak/remote/common/RevisionableUtils.java @@ -0,0 +1,32 @@ +package org.apache.jackrabbit.oak.remote.common; + +import org.apache.jackrabbit.oak.remote.proto.NodeStateProtos.NodeStateId; +import org.apache.jackrabbit.oak.segment.spi.rev.RevNodeState; +import org.apache.jackrabbit.oak.spi.state.NodeState; + +import static com.google.common.base.Preconditions.checkArgument; + +public final class RevisionableUtils { + + private RevisionableUtils() { + } + + public static String getRevision(NodeState nodeState) { + checkArgument(nodeState instanceof RevNodeState); + return ((RevNodeState) nodeState).getRevision(); + } + + public static NodeStateId getNodeStateId(NodeState nodeState) { + return NodeStateId.newBuilder().setRevision(getRevision(nodeState)).build(); + } + + public static boolean fastEquals(NodeState nodeState1, NodeState nodeState2) { + String rev1 = getRevision(nodeState1); + String rev2 = getRevision(nodeState2); + if (rev1 == null || rev2 == null) { + return false; + } + return rev1.equals(rev2); + } + +} diff --git a/oak-store-remote-commons/src/main/java/org/apache/jackrabbit/oak/remote/common/SegmentWriteListener.java b/oak-store-remote-commons/src/main/java/org/apache/jackrabbit/oak/remote/common/SegmentWriteListener.java new file mode 100644 index 0000000000..ec2e484957 --- /dev/null +++ b/oak-store-remote-commons/src/main/java/org/apache/jackrabbit/oak/remote/common/SegmentWriteListener.java @@ -0,0 +1,37 @@ +package org.apache.jackrabbit.oak.remote.common; + +import org.apache.jackrabbit.oak.remote.proto.SegmentProtos; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.function.Consumer; + +public class SegmentWriteListener extends IOMonitorAdapter { + + private static final Logger log = LoggerFactory.getLogger(SegmentWriteListener.class); + + private Consumer delegate = s -> {}; + + @Override + public void afterSegmentWrite(File file, long msb, long lsb, int length, long elapsed) { + SegmentProtos.SegmentBlob.Builder builder = SegmentProtos.SegmentBlob.newBuilder(); + builder + .setBlobName(file.toString()) + .getSegmentIdBuilder() + .setMsb(msb) + .setLsb(lsb); + + try { + delegate.accept(builder.build()); + } catch (Exception e) { + log.error("Can't pass the blob to delegate", e); + } + } + + public void setDelegate(Consumer delegate) { + this.delegate = delegate; + } + +} diff --git a/oak-store-remote-commons/src/main/java/org/apache/jackrabbit/oak/remote/common/persistence/TailingArchiveManager.java b/oak-store-remote-commons/src/main/java/org/apache/jackrabbit/oak/remote/common/persistence/TailingArchiveManager.java new file mode 100644 index 0000000000..84bda50ed4 --- /dev/null +++ b/oak-store-remote-commons/src/main/java/org/apache/jackrabbit/oak/remote/common/persistence/TailingArchiveManager.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.remote.common.persistence; + +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.remote.proto.SegmentProtos; +import org.apache.jackrabbit.oak.remote.proto.SegmentServiceGrpc.SegmentServiceBlockingStub; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.io.IOException; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.UUID; + +public class TailingArchiveManager implements SegmentArchiveManager { + + private final SegmentArchiveManager delegate; + + private final TailingReader reader; + + public TailingArchiveManager(SegmentArchiveManager delegate, SegmentServiceBlockingStub segmentService, CloudBlobContainer container, List directoryNames) throws IOException { + this.delegate = delegate; + this.reader = new TailingReader(container, directoryNames, segmentService); + } + + @Override + public @NotNull List listArchives() { + return Arrays.asList("data00000a.tar"); + } + + @Override + @Nullable + public SegmentArchiveReader open(@NotNull String archiveName) { + return forceOpen(archiveName); + } + + @Override + @Nullable + public SegmentArchiveReader forceOpen(String archiveName) { + if ("data00000a.tar".equals(archiveName)) { + return reader; + } else { + return null; + } + } + + @Override + public @NotNull SegmentArchiveWriter create(@NotNull String archiveName) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean delete(@NotNull String archiveName) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean renameTo(@NotNull String from, @NotNull String to) { + throw new UnsupportedOperationException(); + } + + @Override + public void copyFile(@NotNull String from, @NotNull String to) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean exists(@NotNull String archiveName) { + return delegate.exists(archiveName); + } + + @Override + public void recoverEntries(@NotNull String archiveName, @NotNull LinkedHashMap entries) { + throw new UnsupportedOperationException(); + } + + public void onNewSegment(SegmentProtos.SegmentBlob segmentBlob) { + reader.onNewSegment(segmentBlob); + } +} diff --git a/oak-store-remote-commons/src/main/java/org/apache/jackrabbit/oak/remote/common/persistence/TailingPersistence.java b/oak-store-remote-commons/src/main/java/org/apache/jackrabbit/oak/remote/common/persistence/TailingPersistence.java new file mode 100644 index 0000000000..718c11cadb --- /dev/null +++ b/oak-store-remote-commons/src/main/java/org/apache/jackrabbit/oak/remote/common/persistence/TailingPersistence.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.remote.common.persistence; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.remote.proto.SegmentProtos; +import org.apache.jackrabbit.oak.remote.proto.SegmentServiceGrpc.SegmentServiceBlockingStub; +import org.apache.jackrabbit.oak.segment.azure.AzurePersistence; +import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor; +import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitor; +import org.apache.jackrabbit.oak.segment.spi.persistence.GCJournalFile; +import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile; +import org.apache.jackrabbit.oak.segment.spi.persistence.ManifestFile; +import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.jetbrains.annotations.Nullable; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class TailingPersistence implements SegmentNodeStorePersistence { + + private final AzurePersistence delegate; + + private final SegmentServiceBlockingStub segmentService; + + private final CloudBlobContainer container; + + private final List directoryNames; + + private volatile TailingArchiveManager tailingArchiveManager; + + private List waitingSegments = new ArrayList<>(); + + public TailingPersistence(AzurePersistence persistence, @Nullable SegmentServiceBlockingStub segmentService) throws URISyntaxException, StorageException { + this(persistence, segmentService, null); + } + + public TailingPersistence(AzurePersistence persistence, @Nullable SegmentServiceBlockingStub segmentService, @Nullable List directoryNames) throws URISyntaxException, StorageException { + this.delegate = persistence; + this.segmentService = segmentService; + this.container = persistence.getSegmentstoreDirectory().getContainer(); + if (directoryNames == null) { + this.directoryNames = Arrays.asList(persistence.getSegmentstoreDirectory().getPrefix()); + } else { + this.directoryNames = directoryNames; + } + } + + @Override + public synchronized SegmentArchiveManager createArchiveManager(boolean memoryMapping, boolean offHeapAccess, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor, RemoteStoreMonitor remoteStoreMonitor) throws IOException { + if (tailingArchiveManager != null) { + return tailingArchiveManager; + } + + tailingArchiveManager = new TailingArchiveManager(delegate.createArchiveManager(memoryMapping, offHeapAccess, ioMonitor, fileStoreMonitor, remoteStoreMonitor), segmentService, container, directoryNames); + waitingSegments.forEach(tailingArchiveManager::onNewSegment); + waitingSegments.clear(); + + return tailingArchiveManager; + } + + @Override + public boolean segmentFilesExist() { + return delegate.segmentFilesExist(); + } + + @Override + public JournalFile getJournalFile() { + return delegate.getJournalFile(); + } + + @Override + public GCJournalFile getGCJournalFile() throws IOException { + return delegate.getGCJournalFile(); + } + + @Override + public ManifestFile getManifestFile() throws IOException { + return delegate.getManifestFile(); + } + + @Override + public RepositoryLock lockRepository() { + return () -> {}; + } + + public void onNewSegment(SegmentProtos.SegmentBlob segmentBlob) { + if (tailingArchiveManager == null) { + synchronized (this) { + if (tailingArchiveManager == null) { + waitingSegments.add(segmentBlob); + } else { + tailingArchiveManager.onNewSegment(segmentBlob); + } + } + } else { + tailingArchiveManager.onNewSegment(segmentBlob); + } + } +} diff --git a/oak-store-remote-commons/src/main/java/org/apache/jackrabbit/oak/remote/common/persistence/TailingReader.java b/oak-store-remote-commons/src/main/java/org/apache/jackrabbit/oak/remote/common/persistence/TailingReader.java new file mode 100644 index 0000000000..c77b5b2326 --- /dev/null +++ b/oak-store-remote-commons/src/main/java/org/apache/jackrabbit/oak/remote/common/persistence/TailingReader.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.remote.common.persistence; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.BlobListingDetails; +import com.microsoft.azure.storage.blob.CloudBlob; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import com.microsoft.azure.storage.blob.ListBlobItem; +import org.apache.jackrabbit.oak.commons.Buffer; +import org.apache.jackrabbit.oak.remote.proto.SegmentProtos; +import org.apache.jackrabbit.oak.remote.proto.SegmentServiceGrpc.SegmentServiceBlockingStub; +import org.apache.jackrabbit.oak.segment.azure.AzureBlobMetadata; +import org.apache.jackrabbit.oak.segment.azure.AzureSegmentArchiveEntry; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.readBufferFully; + +public class TailingReader implements SegmentArchiveReader { + + private static final Logger log = LoggerFactory.getLogger(TailingReader.class); + + private final CloudBlobContainer container; + + private final Map index = new ConcurrentHashMap<>(); + + private final AtomicLong length = new AtomicLong(); + + private final Cache segmentCache; + + private final SegmentServiceBlockingStub segmentService; + + public TailingReader(CloudBlobContainer container, List segmentStoreNames, SegmentServiceBlockingStub segmentService) throws IOException { + this.segmentService = segmentService; + this.segmentCache = CacheBuilder.newBuilder() + .maximumSize(128) + .build(); + try { + this.container = container; + for (String segmentStore : segmentStoreNames) { + for (ListBlobItem blob : container.getDirectoryReference(segmentStore).listBlobs("data", true, EnumSet.of(BlobListingDetails.METADATA), null, null)) { + if (blob instanceof CloudBlob) { + addNewSegment((CloudBlob) blob); + } + } + } + } catch (StorageException | URISyntaxException e) { + throw new IOException(e); + } + } + + public void onNewSegment(SegmentProtos.SegmentBlob segmentBlob) { + try { + CloudBlob cloudBlob = getBlob(segmentBlob.getBlobName()); + for (int i = 0; i < 10; i++) { + if (cloudBlob.exists()) { + break; + } + log.info("Blob {} doesn't exist yet...", cloudBlob.getName()); + try { + Thread.sleep(500); + } catch (InterruptedException e) { + log.error("Interrupted", e); + } + } + cloudBlob.downloadAttributes(); + addNewSegment(cloudBlob); + } catch (StorageException | IOException e) { + log.error("Can't read blob {} (segment name: {})", segmentBlob.getBlobName(), e); + } + } + + private void addNewSegment(CloudBlob blob) { + Map metadata = blob.getMetadata(); + if (AzureBlobMetadata.isSegment(metadata)) { + AzureSegmentArchiveEntry indexEntry = AzureBlobMetadata.toIndexEntry(metadata, (int) blob.getProperties().getLength()); + AzureSegment segment = new AzureSegment(blob.getName(), indexEntry); + UUID uuid = new UUID(indexEntry.getMsb(), indexEntry.getLsb()); + if (index.containsKey(uuid)) { + return; + } + index.put(uuid, segment); + length.addAndGet(blob.getProperties().getLength()); + } + } + + @Override + @Nullable + public Buffer readSegment(long msb, long lsb) throws IOException { + UUID uuid = new UUID(msb, lsb); + AzureSegment segment = index.get(uuid); + if (segment == null) { + return getRecentSegment(msb, lsb); + } + try { + return segmentCache.get(uuid, () -> loadSegmentFromCloud(segment)); + } catch (ExecutionException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } else { + throw new IOException(e); + } + } + } + + @Override + public boolean containsSegment(long msb, long lsb) { + if (index.containsKey(new UUID(msb, lsb))) { + return true; + } else { + return getRecentSegment(msb, lsb) != null; + } + } + + private Buffer getRecentSegment(long msb, long lsb) { + UUID uuid = new UUID(msb, lsb); + if (segmentService == null) { + while (!index.containsKey(uuid)) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + log.error("Interrupted", e); + return null; + } + } + try { + return segmentCache.get(uuid, () -> loadSegmentFromCloud(index.get(uuid))); + } catch (ExecutionException e) { + log.error("Can't load segment {}", uuid, e); + return null; + } + } else { + try { + return segmentCache.get(uuid, () -> loadSegmentFromGrpc(msb, lsb)); + } catch (ExecutionException e) { + log.error("Can't load segment {}", uuid, e); + return null; + } + } + } + + private Buffer loadSegmentFromGrpc(long msb, long lsb) { + SegmentProtos.Segment segment = segmentService.getSegment(SegmentProtos.SegmentId.newBuilder() + .setMsb(msb) + .setLsb(lsb) + .build()); + return Buffer.wrap(segment.getSegmentData().toByteArray()); + } + + private Buffer loadSegmentFromCloud(AzureSegment segment) throws IOException { + Buffer buffer = Buffer.allocate(segment.segmentArchiveEntry.getLength()); + readBufferFully(getBlob(segment.blobName), buffer); + return buffer; + } + + @Override + public List listSegments() { + return index + .values() + .stream() + .map(a -> a.segmentArchiveEntry) + .collect(Collectors.toList()); + } + + @Override + @Nullable + public Buffer getGraph() { + return null; + } + + @Override + public boolean hasGraph() { + return false; + } + + @Override + public @NotNull Buffer getBinaryReferences() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long length() { + return length.get(); + } + + @Override + public @NotNull String getName() { + return "data00000a.tar"; + } + + @Override + public void close() throws IOException { + } + + @Override + public int getEntrySize(int size) { + return 0; + } + + private CloudBlockBlob getBlob(String name) throws IOException { + try { + return container.getBlockBlobReference(name); + } catch (URISyntaxException | StorageException e) { + throw new IOException(e); + } + } + + private static class AzureSegment { + + private final AzureSegmentArchiveEntry segmentArchiveEntry; + + private final String blobName; + + public AzureSegment(String blobName, AzureSegmentArchiveEntry segmentArchiveEntry) { + this.blobName = blobName; + this.segmentArchiveEntry = segmentArchiveEntry; + } + } +} diff --git a/oak-store-remote-commons/src/main/proto/change_event.proto b/oak-store-remote-commons/src/main/proto/change_event.proto new file mode 100644 index 0000000000..5dd2e64cdd --- /dev/null +++ b/oak-store-remote-commons/src/main/proto/change_event.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package org.apache.jackrabbit.oak.remote; + +option java_package = "org.apache.jackrabbit.oak.remote.proto"; +option java_outer_classname = "ChangeEventProtos"; + +import "commit.proto"; +import "node_state.proto"; + +message ChangeEvent { + NodeStateId nodeStateId = 1; + CommitInfo commitInfo = 2; +} \ No newline at end of file diff --git a/oak-store-remote-commons/src/main/proto/checkpoint.proto b/oak-store-remote-commons/src/main/proto/checkpoint.proto new file mode 100644 index 0000000000..bf40c3cb66 --- /dev/null +++ b/oak-store-remote-commons/src/main/proto/checkpoint.proto @@ -0,0 +1,23 @@ +syntax = "proto3"; + +package org.apache.jackrabbit.oak.remote; + +option java_package = "org.apache.jackrabbit.oak.remote.proto"; +option java_outer_classname = "CheckpointProtos"; + +message CheckpointId { + string id = 1; +} + +message CreateCheckpointRequest { + int64 lifetime = 1; + CheckpointInfo info = 2; +} + +message CheckpointInfo { + map checkpointInfo = 1; +} + +message CheckpointList { + repeated CheckpointId checkpointId = 1; +} diff --git a/oak-store-remote-commons/src/main/proto/commit.proto b/oak-store-remote-commons/src/main/proto/commit.proto new file mode 100644 index 0000000000..1f20315247 --- /dev/null +++ b/oak-store-remote-commons/src/main/proto/commit.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; + +package org.apache.jackrabbit.oak.remote; + +option java_package = "org.apache.jackrabbit.oak.remote.proto"; +option java_outer_classname = "CommitProtos"; + +import "node_state.proto"; + +message Commit { + CommitInfo commitInfo = 1; + NodeStateId baseNodeState = 2; + NodeStateId headNodeState = 3; + string segmentStoreDir = 4; +} + +message CommitInfo { + string sessionId = 1; + string userId = 2; + map commitInfo = 3; + bool isExternal = 4; +} diff --git a/oak-store-remote-commons/src/main/proto/lease.proto b/oak-store-remote-commons/src/main/proto/lease.proto new file mode 100644 index 0000000000..14efa93d4c --- /dev/null +++ b/oak-store-remote-commons/src/main/proto/lease.proto @@ -0,0 +1,20 @@ +syntax = "proto3"; + +package org.apache.jackrabbit.oak.remote; + +option java_package = "org.apache.jackrabbit.oak.remote.proto"; +option java_outer_classname = "LeaseProtos"; + +message LeaseInfo { + string token = 1; +} + +message ClusterView { + int32 seq = 1; + bool final = 2; + string id = 3; + int32 me = 4; + repeated int32 active = 5; + repeated int32 deactivating = 6; + repeated int32 inactive = 7; +} \ No newline at end of file diff --git a/oak-store-remote-commons/src/main/proto/node_state.proto b/oak-store-remote-commons/src/main/proto/node_state.proto new file mode 100644 index 0000000000..dc04890671 --- /dev/null +++ b/oak-store-remote-commons/src/main/proto/node_state.proto @@ -0,0 +1,10 @@ +syntax = "proto3"; + +package org.apache.jackrabbit.oak.remote; + +option java_package = "org.apache.jackrabbit.oak.remote.proto"; +option java_outer_classname = "NodeStateProtos"; + +message NodeStateId { + string revision = 1; +} diff --git a/oak-store-remote-commons/src/main/proto/node_store_svc.proto b/oak-store-remote-commons/src/main/proto/node_store_svc.proto new file mode 100644 index 0000000000..7cb9fabc18 --- /dev/null +++ b/oak-store-remote-commons/src/main/proto/node_store_svc.proto @@ -0,0 +1,41 @@ +syntax = "proto3"; + +package org.apache.jackrabbit.oak.remote; + +option java_package = "org.apache.jackrabbit.oak.remote.proto"; +option java_outer_classname = "NodeStoreServices"; + +import "google/protobuf/empty.proto"; +import "google/protobuf/wrappers.proto"; +import "change_event.proto"; +import "checkpoint.proto"; +import "commit.proto"; +import "node_state.proto"; +import "lease.proto"; +import "segment.proto"; + +service NodeStoreService { + rpc GetRoot (google.protobuf.Empty) returns (NodeStateId); + rpc Merge (Commit) returns (NodeStateId); + rpc Observe (stream google.protobuf.Empty) returns (stream ChangeEvent); +} + +service CheckpointService { + rpc CreateCheckpoint (CreateCheckpointRequest) returns (CheckpointId); + rpc GetCheckpointInfo (CheckpointId) returns (CheckpointInfo); + rpc GetCheckpointList (google.protobuf.Empty) returns (CheckpointList); + rpc RetrieveCheckpoint (CheckpointId) returns (NodeStateId); + rpc ReleaseCheckpoint (CheckpointId) returns (google.protobuf.BoolValue); +} + +service LeaseService { + rpc Acquire(google.protobuf.Empty) returns (LeaseInfo); + rpc Renew(LeaseInfo) returns (ClusterView); + rpc Release(LeaseInfo) returns (google.protobuf.Empty); +} + +service SegmentService { + rpc ObserveSegments (stream google.protobuf.Empty) returns (stream SegmentBlob); + rpc GetSegment (SegmentId) returns (Segment); + rpc NewPrivateSegment (PrivateSegment) returns (google.protobuf.Empty); +} \ No newline at end of file diff --git a/oak-store-remote-commons/src/main/proto/segment.proto b/oak-store-remote-commons/src/main/proto/segment.proto new file mode 100644 index 0000000000..c09f5f0a4a --- /dev/null +++ b/oak-store-remote-commons/src/main/proto/segment.proto @@ -0,0 +1,26 @@ +syntax = "proto3"; + +package org.apache.jackrabbit.oak.remote; + +option java_package = "org.apache.jackrabbit.oak.remote.proto"; +option java_outer_classname = "SegmentProtos"; + +message SegmentId { + int64 msb = 1; + int64 lsb = 2; +} + +message Segment { + SegmentId segmentId = 1; + bytes segmentData = 2; +} + +message SegmentBlob { + SegmentId segmentId = 1; + string blobName = 2; +} + +message PrivateSegment { + SegmentBlob segmentBlob = 1; + string segmentStoreDir = 2; +} \ No newline at end of file diff --git a/oak-store-remote-server/Dockerfile b/oak-store-remote-server/Dockerfile new file mode 100644 index 0000000000..8510646cdf --- /dev/null +++ b/oak-store-remote-server/Dockerfile @@ -0,0 +1,11 @@ +FROM openjdk:11 + +RUN mkdir /app +ARG JAR_FILE +COPY target/${JAR_FILE} /app/oak-store-remote-server.jar +WORKDIR /app + +VOLUME /app/segmentstore +VOLUME /app/datastore + +CMD ["java", "-Xmx4g", "-jar", "oak-store-remote-server.jar"] diff --git a/oak-store-remote-server/pom.xml b/oak-store-remote-server/pom.xml new file mode 100644 index 0000000000..369bcd472c --- /dev/null +++ b/oak-store-remote-server/pom.xml @@ -0,0 +1,236 @@ + + + + + + 4.0.0 + + + org.apache.jackrabbit + oak-parent + 1.18-SNAPSHOT + ../oak-parent/pom.xml + + + oak-store-remote-server + Oak Remote Store Server + jar + + + + + kr.motd.maven + os-maven-plugin + 1.5.0.Final + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + org.apache.jackrabbit.oak.remote.server.Main + + + + + + + + + + + + + org.apache.jackrabbit + oak-api + ${project.version} + + + org.apache.jackrabbit + oak-blob + ${project.version} + + + org.apache.jackrabbit + oak-blob-plugins + ${project.version} + + + org.apache.jackrabbit + oak-blob-cloud-azure + ${project.version} + + + org.apache.jackrabbit + oak-commons + ${project.version} + + + org.apache.jackrabbit + oak-core-spi + ${project.version} + + + org.apache.jackrabbit + oak-store-spi + ${project.version} + + + org.apache.jackrabbit + oak-store-composite + ${project.version} + + + org.apache.jackrabbit + oak-segment-azure + ${project.version} + + + org.apache.jackrabbit + oak-segment-tar + ${project.version} + + + org.apache.jackrabbit + oak-store-remote-commons + ${project.version} + + + org.apache.jackrabbit + jackrabbit-data + ${jackrabbit.version} + + + + + io.dropwizard.metrics + metrics-core + + + + + com.google.guava + guava + 26.0-jre + + + javax.annotation + javax.annotation-api + 1.3.2 + + + + + io.grpc + grpc-netty-shaded + 1.22.1 + + + io.grpc + grpc-stub + 1.22.1 + + + io.grpc + grpc-api + 1.22.1 + + + com.google.protobuf + protobuf-java + 3.7.1 + + + + + org.slf4j + slf4j-api + + + ch.qos.logback + logback-classic + + + + + org.jetbrains + annotations + + + + + com.microsoft.azure + azure-storage + 5.0.0 + + + + + + docker-image + + false + + + + + com.spotify + dockerfile-maven-plugin + 1.4.12 + + + default + + build + push + + + + + oak/oak-store-remote-server + ${project.version} + + ${project.build.finalName}.jar + + + + + + + + diff --git a/oak-store-remote-server/src/main/java/org/apache/jackrabbit/oak/remote/server/CheckpointService.java b/oak-store-remote-server/src/main/java/org/apache/jackrabbit/oak/remote/server/CheckpointService.java new file mode 100644 index 0000000000..0706b89698 --- /dev/null +++ b/oak-store-remote-server/src/main/java/org/apache/jackrabbit/oak/remote/server/CheckpointService.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.remote.server; + +import com.google.protobuf.BoolValue; +import com.google.protobuf.Empty; +import io.grpc.stub.StreamObserver; +import org.apache.jackrabbit.oak.remote.proto.CheckpointProtos.CheckpointId; +import org.apache.jackrabbit.oak.remote.proto.CheckpointProtos.CheckpointInfo; +import org.apache.jackrabbit.oak.remote.proto.CheckpointProtos.CheckpointList; +import org.apache.jackrabbit.oak.remote.proto.CheckpointProtos.CreateCheckpointRequest; +import org.apache.jackrabbit.oak.remote.proto.CheckpointServiceGrpc; +import org.apache.jackrabbit.oak.remote.proto.NodeStateProtos.NodeStateId; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.apache.jackrabbit.oak.spi.state.NodeStore; + +import java.util.Map; + +import static org.apache.jackrabbit.oak.remote.common.RevisionableUtils.getNodeStateId; + +public class CheckpointService extends CheckpointServiceGrpc.CheckpointServiceImplBase { + + private final NodeStore nodeStore; + + public CheckpointService(NodeStore nodeStore) { + this.nodeStore = nodeStore; + } + + public void createCheckpoint(CreateCheckpointRequest request, StreamObserver responseObserver) { + String checkpoint; + if (request.hasInfo()) { + checkpoint = nodeStore.checkpoint(request.getLifetime(), request.getInfo().getCheckpointInfoMap()); + } else { + checkpoint = nodeStore.checkpoint(request.getLifetime()); + } + responseObserver.onNext(CheckpointId.newBuilder().setId(checkpoint).build()); + responseObserver.onCompleted(); + } + + public void getCheckpointInfo(CheckpointId request, StreamObserver responseObserver) { + Map info = nodeStore.checkpointInfo(request.getId()); + responseObserver.onNext(CheckpointInfo.newBuilder().putAllCheckpointInfo(info).build()); + responseObserver.onCompleted(); + } + + public void getCheckpointList(Empty request, StreamObserver responseObserver) { + CheckpointList.Builder builder = CheckpointList.newBuilder(); + for (String checkpointId : nodeStore.checkpoints()) { + builder.addCheckpointId(CheckpointId.newBuilder().setId(checkpointId).build()); + } + responseObserver.onNext(builder.build()); + responseObserver.onCompleted(); + } + + public void retrieveCheckpoint(CheckpointId request, StreamObserver responseObserver) { + NodeState nodeState = nodeStore.retrieve(request.getId()); + if (nodeState == null) { + responseObserver.onCompleted(); + } else { + responseObserver.onNext(getNodeStateId(nodeState)); + responseObserver.onCompleted(); + } + } + + public void releaseCheckpoint(CheckpointId request, StreamObserver responseObserver) { + boolean result = nodeStore.release(request.getId()); + responseObserver.onNext(BoolValue.newBuilder().setValue(result).build()); + responseObserver.onCompleted(); + } +} diff --git a/oak-store-remote-server/src/main/java/org/apache/jackrabbit/oak/remote/server/LeaseService.java b/oak-store-remote-server/src/main/java/org/apache/jackrabbit/oak/remote/server/LeaseService.java new file mode 100644 index 0000000000..31aac8f921 --- /dev/null +++ b/oak-store-remote-server/src/main/java/org/apache/jackrabbit/oak/remote/server/LeaseService.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.remote.server; + +import com.google.protobuf.Empty; +import io.grpc.stub.StreamObserver; +import org.apache.jackrabbit.oak.remote.proto.LeaseProtos.ClusterView; +import org.apache.jackrabbit.oak.remote.proto.LeaseProtos.LeaseInfo; +import org.apache.jackrabbit.oak.remote.proto.LeaseServiceGrpc; +import org.apache.jackrabbit.oak.spi.cluster.ClusterRepositoryInfo; +import org.apache.jackrabbit.oak.spi.state.NodeStore; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +public class LeaseService extends LeaseServiceGrpc.LeaseServiceImplBase { + + private final List clusterEntries = new ArrayList<>(); + + private final String clusterId; + + private int seq = 1; + + public LeaseService(NodeStore nodeStore) { + clusterId = ClusterRepositoryInfo.getOrCreateId(nodeStore); + } + + public synchronized void acquire(Empty request, StreamObserver responseObserver) { + String token = UUID.randomUUID().toString(); + ClusterEntry entry = new ClusterEntry(token); + updateEntries(); + boolean set = false; + for (int i = 0; i < clusterEntries.size(); i++) { + if (clusterEntries.get(i).getState() == ClusterEntryState.INACTIVE) { + clusterEntries.set(i, entry); + set = true; + } + } + if (!set) { + clusterEntries.add(entry); + } + seq++; + responseObserver.onNext(LeaseInfo.newBuilder().setToken(token).build()); + responseObserver.onCompleted(); + } + + public synchronized void renew(LeaseInfo request, StreamObserver responseObserver) { + ClusterEntry entry = getEntryByToken(request.getToken()); + if (entry == null || entry.state == ClusterEntryState.INACTIVE) { + responseObserver.onNext(ClusterView.getDefaultInstance()); + responseObserver.onCompleted(); + } else { + entry.lastUpdate = Instant.now(); + updateEntries(); + + ClusterView.Builder builder = ClusterView.newBuilder(); + + for (int i = 0; i < clusterEntries.size(); i++) { + ClusterEntry e = clusterEntries.get(i); + if (e == entry) { + builder.setMe(i); + } + switch (e.state) { + case ACTIVE: + builder.addActive(i); + break; + + case DEACTIVATING: + builder.addDeactivating(i); + break; + + case INACTIVE: + builder.addInactive(i); + break; + } + } + builder.setId(clusterId); + builder.setSeq(seq); + responseObserver.onNext(builder.build()); + responseObserver.onCompleted(); + } + } + + public synchronized void release(LeaseInfo request, StreamObserver responseObserver) { + ClusterEntry entry = getEntryByToken(request.getToken()); + if (entry != null) { + entry.lastUpdate = null; + entry.state = ClusterEntryState.INACTIVE; + seq++; + } + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + + private ClusterEntry getEntryByToken(String token) { + for (ClusterEntry e : clusterEntries) { + if (token.equals(e.token)) { + return e; + } + } + return null; + } + + private void updateEntries() { + for (ClusterEntry entry : clusterEntries) { + Instant lastUpdate = entry.lastUpdate; + if (lastUpdate == null) { + continue; + } + Instant inactiveDeadline = Instant.now().minus(60, ChronoUnit.SECONDS); + Instant deactivatingDeadline = Instant.now().minus(10, ChronoUnit.SECONDS); + if (lastUpdate.isBefore(inactiveDeadline)) { + updateState(entry, ClusterEntryState.INACTIVE); + } else if (lastUpdate.isBefore(deactivatingDeadline)) { + updateState(entry, ClusterEntryState.DEACTIVATING); + } else { + updateState(entry, ClusterEntryState.ACTIVE); + } + } + } + + private void updateState(ClusterEntry entry, ClusterEntryState newState) { + if (entry.state != newState) { + entry.state = newState; + seq++; + } + } + + private enum ClusterEntryState { + ACTIVE, DEACTIVATING, INACTIVE + } + + private static class ClusterEntry { + + private final String token; + + private Instant lastUpdate; + + private ClusterEntryState state; + + public ClusterEntry(String token) { + this.token = token; + this.lastUpdate = Instant.now(); + this.state = ClusterEntryState.ACTIVE; + } + + public ClusterEntryState getState() { + return state; + } + } + +} diff --git a/oak-store-remote-server/src/main/java/org/apache/jackrabbit/oak/remote/server/Main.java b/oak-store-remote-server/src/main/java/org/apache/jackrabbit/oak/remote/server/Main.java new file mode 100644 index 0000000000..d52d5f0b7c --- /dev/null +++ b/oak-store-remote-server/src/main/java/org/apache/jackrabbit/oak/remote/server/Main.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.remote.server; + +import com.google.common.base.Strings; +import com.google.common.io.Closer; +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.RetryLinearRetry; +import com.microsoft.azure.storage.StorageCredentials; +import com.microsoft.azure.storage.StorageCredentialsAccountAndKey; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.BlobRequestOptions; +import com.microsoft.azure.storage.blob.CloudBlobClient; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; +import org.apache.jackrabbit.core.data.DataStoreException; +import org.apache.jackrabbit.core.data.FileDataStore; +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.blob.cloud.azure.blobstorage.AzureDataStore; +import org.apache.jackrabbit.oak.composite.InitialContentMigrator; +import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore; +import org.apache.jackrabbit.oak.segment.SegmentNodeStore; +import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder; +import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.apache.jackrabbit.oak.spi.mount.MountInfoProvider; +import org.apache.jackrabbit.oak.spi.mount.Mounts; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Properties; + +import static com.google.common.io.Files.createTempDir; +import static java.util.Arrays.asList; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; + +public class Main { + + private static final Logger log = LoggerFactory.getLogger(Main.class); + + public static void main(String[] args) throws IOException { + Closer closer = Closer.create(); + try { + BlobStore blobStore; + if (getenv("blobAzureAccount") == null) { + File dataStorePath = new File("datastore"); + dataStorePath.mkdirs(); + + FileDataStore dataStore = new FileDataStore(); + dataStore.setPath(dataStorePath.getPath()); + dataStore.init(null); + closer.register(() -> dataStore.close()); + blobStore = new DataStoreBlobStore(dataStore); + } else { + blobStore = createAzureBlobStore(); + } + + NodeStoreServer server = new NodeStoreServer(12300, getAzureSegmentStoreDirectory(), blobStore); + String seedSegmentStore = getenv("seed_segmentstore"); + if (!Strings.isNullOrEmpty(seedSegmentStore)) { + initialize(seedSegmentStore, server.getNodeStore()); + } + + System.out.println("Starting server. Press ^C to stop."); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + closer.close(); + } catch (IOException e) { + log.error("Can't close server", e); + } + })); + server.start(); + closer.register(server); + server.blockUntilShutdown(); + } catch(Throwable t) { + throw closer.rethrow(t); + } finally { + closer.close(); + } + } + + private static void initialize(String seedSegmentStore, SegmentNodeStore delegate) throws IOException, InvalidFileStoreVersionException, CommitFailedException { + File seedStore = new File(seedSegmentStore); + FileStore fs = FileStoreBuilder.fileStoreBuilder(seedStore).build(); + try { + SegmentNodeStore seed = SegmentNodeStoreBuilders.builder(fs).build(); + new InitialContentMigrator(delegate, seed, createMountInfoProvider().getMountByName("libs")).migrate(); + } finally { + fs.close(); + } + } + + private static BlobStore createAzureBlobStore() throws DataStoreException { + Properties properties = new Properties(); + properties.setProperty("accessKey", getenv("blobAzureAccount")); + properties.setProperty("secretKey", getenv("blobAzureAccessKey")); + properties.setProperty("container", getenv("blobAzureContainer")); + properties.setProperty("cacheSize", getenv("blobAzureCacheSize")); + properties.setProperty("secret", getenv("blobAzureSecret")); + + properties.setProperty("maxConnections", "4"); + properties.setProperty("maxErrorRetry", "10"); + properties.setProperty("socketTimeout", "120000"); + properties.setProperty("path", createTempDir().getAbsolutePath()); + + AzureDataStore azureDataStore = new AzureDataStore(); + azureDataStore.setProperties(properties); + azureDataStore.init(createTempDir().getAbsolutePath()); + return new DataStoreBlobStore(azureDataStore); + } + + public static CloudBlobDirectory getAzureSegmentStoreDirectory() throws URISyntaxException, StorageException { + StorageCredentials credentials = new StorageCredentialsAccountAndKey( + getenv("segmentAzureAccount"), + getenv("segmentAzureAccessKey")); + CloudStorageAccount cloud = new CloudStorageAccount(credentials, true); + CloudBlobClient client = cloud.createCloudBlobClient(); + CloudBlobContainer container = client.getContainerReference(getenv("segmentAzureContainer")); + setTimeouts(container); + container.createIfNotExists(); + CloudBlobDirectory directory = container.getDirectoryReference("aem"); + return directory; + } + + public static CloudBlobContainer setTimeouts(CloudBlobContainer container) { + BlobRequestOptions defaultRequestOptions = container.getServiceClient().getDefaultRequestOptions(); + defaultRequestOptions.setRetryPolicyFactory(new RetryLinearRetry((int) SECONDS.toMillis(30), 10)); + defaultRequestOptions.setMaximumExecutionTimeInMs((int) MINUTES.toMillis(10)); + defaultRequestOptions.setTimeoutIntervalInMs((int) SECONDS.toMillis(30)); + return container; + } + + private static String getenv(String envName) { + return System.getenv(envName); + } + + private static String getenv(String envName, String defaultValue) { + String value = System.getenv(envName); + if (Strings.isNullOrEmpty(value)) { + return defaultValue; + } else { + return value; + } + } + + public static MountInfoProvider createMountInfoProvider() { + return Mounts.newBuilder() + .mount("libs", true, asList( + "/oak:index/*$" // pathsSupportingFragments + ), asList( + "/libs", // mountedPaths + "/apps", + "/jcr:system/rep:permissionStore/oak:mount-libs-crx.default")) + .build(); + } + +} diff --git a/oak-store-remote-server/src/main/java/org/apache/jackrabbit/oak/remote/server/NodeStoreServer.java b/oak-store-remote-server/src/main/java/org/apache/jackrabbit/oak/remote/server/NodeStoreServer.java new file mode 100644 index 0000000000..9ed8723cc9 --- /dev/null +++ b/oak-store-remote-server/src/main/java/org/apache/jackrabbit/oak/remote/server/NodeStoreServer.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.remote.server; + +import com.google.common.io.Closer; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import org.apache.commons.io.FileUtils; +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.remote.common.SegmentWriteListener; +import org.apache.jackrabbit.oak.segment.SegmentNodeStore; +import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; +import org.apache.jackrabbit.oak.segment.azure.AzurePersistence; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder; +import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; + +import static com.google.common.io.Files.createTempDir; + +public class NodeStoreServer implements Closeable { + + private static final Logger log = LoggerFactory.getLogger(NodeStoreServer.class); + + private final Server server; + + private final FileStore fileStore; + + private final SegmentNodeStore nodeStore; + + private final Closer closer = Closer.create(); + + public NodeStoreServer(int port, CloudBlobDirectory sharedSegmentStoreDir, BlobStore blobStore) throws URISyntaxException, StorageException, IOException, InvalidFileStoreVersionException { + this(ServerBuilder.forPort(port), sharedSegmentStoreDir, blobStore); + } + + public NodeStoreServer(ServerBuilder serverBuilder, CloudBlobDirectory sharedSegmentStoreDir, BlobStore blobStore) throws URISyntaxException, StorageException, IOException, InvalidFileStoreVersionException { + SegmentWriteListener segmentWriteListener = new SegmentWriteListener(); + this.fileStore = createFileStore(sharedSegmentStoreDir, blobStore, segmentWriteListener); + init(fileStore); + this.nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); + PrivateFileStores privateFileStores = new PrivateFileStores(sharedSegmentStoreDir, blobStore); + this.server = serverBuilder + .addService(new CheckpointService(nodeStore)) + .addService(new NodeStoreService(nodeStore, fileStore, privateFileStores)) + .addService(new LeaseService(nodeStore)) + .addService(new SegmentService(segmentWriteListener, fileStore, privateFileStores)) + .build(); + } + + private FileStore createFileStore(CloudBlobDirectory sharedSegmentStoreDir, BlobStore blobStore, SegmentWriteListener listener) throws IOException, InvalidFileStoreVersionException { + File dir = createTempDir(); + closer.register(() -> FileUtils.deleteDirectory(dir)); + SegmentNodeStorePersistence persistence; + AzurePersistence azurePersistence = new AzurePersistence(sharedSegmentStoreDir); + persistence = azurePersistence; + FileStoreBuilder builder = FileStoreBuilder + .fileStoreBuilder(dir) + .withCustomPersistence(persistence) + .withBlobStore(blobStore) + .withIOMonitor(listener); + FileStore fileStore = builder.build(); + closer.register(fileStore); + return fileStore; + } + + private void init(FileStore fileStore) throws IOException { + try { + SegmentNodeStore segmentNodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); + NodeBuilder builder = segmentNodeStore.getRoot().builder(); + builder.setProperty(":initialized", true); + segmentNodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + } catch (CommitFailedException e) { + throw new IOException(e); + } + } + + public SegmentNodeStore getNodeStore() { + return nodeStore; + } + + public void start() throws IOException { + fileStore.flush(); // flush, to make the head segment available immediately + server.start(); + log.info("Server started"); + closer.register(() -> server.shutdown()); + } + + public void close() throws IOException { + closer.close(); + } + + public void blockUntilShutdown() throws InterruptedException { + if (server != null) { + server.awaitTermination(); + } + } + +} diff --git a/oak-store-remote-server/src/main/java/org/apache/jackrabbit/oak/remote/server/NodeStoreService.java b/oak-store-remote-server/src/main/java/org/apache/jackrabbit/oak/remote/server/NodeStoreService.java new file mode 100644 index 0000000000..4b90dde2f6 --- /dev/null +++ b/oak-store-remote-server/src/main/java/org/apache/jackrabbit/oak/remote/server/NodeStoreService.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.remote.server; + +import com.google.protobuf.Empty; +import io.grpc.stub.StreamObserver; +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.remote.common.CommitInfoUtil; +import org.apache.jackrabbit.oak.remote.common.RevisionableUtils; +import org.apache.jackrabbit.oak.remote.proto.ChangeEventProtos.ChangeEvent; +import org.apache.jackrabbit.oak.remote.proto.CommitProtos.Commit; +import org.apache.jackrabbit.oak.remote.proto.NodeStateProtos.NodeStateId; +import org.apache.jackrabbit.oak.remote.proto.NodeStoreServiceGrpc; +import org.apache.jackrabbit.oak.segment.SegmentNodeStore; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.commit.Observable; +import org.apache.jackrabbit.oak.spi.state.ApplyDiff; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.jackrabbit.oak.remote.common.RevisionableUtils.getNodeStateId; + +public class NodeStoreService extends NodeStoreServiceGrpc.NodeStoreServiceImplBase { + + private static final Logger log = LoggerFactory.getLogger(NodeStoreService.class); + + private final SegmentNodeStore nodeStore; + + private final FileStore fileStore; + + private final PrivateFileStores privateFileStores; + + public NodeStoreService(SegmentNodeStore nodeStore, FileStore fileStore, PrivateFileStores privateFileStores) { + this.nodeStore = nodeStore; + this.fileStore = fileStore; + this.privateFileStores = privateFileStores; + } + + @Override + public void getRoot(Empty request, StreamObserver responseObserver) { + responseObserver.onNext(getNodeStateId(nodeStore.getRoot())); + responseObserver.onCompleted(); + } + + @Override + public void merge(Commit commit, StreamObserver responseObserver) { + try { + NodeState newRoot; + synchronized (this) { + NodeState currentRoot = nodeStore.getRoot(); + + String currentRootRevision = RevisionableUtils.getRevision(currentRoot); + if (!currentRootRevision.equals(commit.getBaseNodeState().getRevision())) { + responseObserver.onNext(NodeStateId.getDefaultInstance()); + responseObserver.onCompleted(); + return; + } + + NodeBuilder builder = currentRoot.builder(); + NodeState newHead = privateFileStores.getNodeState(commit.getSegmentStoreDir(), commit.getHeadNodeState().getRevision()); + newHead.compareAgainstBaseState(currentRoot, new ApplyDiff(builder)); + + newRoot = nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfoUtil.deserialize(commit.getCommitInfo())); + } + fileStore.flush(); + responseObserver.onNext(getNodeStateId(newRoot)); + responseObserver.onCompleted(); + } catch (CommitFailedException | IOException e) { + log.error("Can't commit", e); + responseObserver.onError(e); + } + } + + @Override + public StreamObserver observe(StreamObserver responseObserver) { + Closeable closeable; + AtomicBoolean enabled = new AtomicBoolean(true); + if (nodeStore instanceof Observable) { + closeable = ((Observable) nodeStore).addObserver((root, info) -> { + if (!enabled.get()) { + return; + } + ChangeEvent.Builder builder = ChangeEvent.newBuilder(); + builder.setNodeStateId(getNodeStateId(root)); + builder.setCommitInfo(CommitInfoUtil.serialize(info)); + try { + responseObserver.onNext(builder.build()); + } catch (Exception e) { + log.error("Can't send state", e); + enabled.set(false); + } + }); + } else { + closeable = ()->{}; + } + return new StreamObserver() { + @Override + public void onNext(Empty empty) { + } + @Override + public void onError(Throwable throwable) { + } + @Override + public void onCompleted() { + try { + enabled.set(false); + closeable.close(); + } catch (IOException e) { + log.error("Can't close observer", e); + } + } + }; + } +} diff --git a/oak-store-remote-server/src/main/java/org/apache/jackrabbit/oak/remote/server/PrivateFileStores.java b/oak-store-remote-server/src/main/java/org/apache/jackrabbit/oak/remote/server/PrivateFileStores.java new file mode 100644 index 0000000000..57abcf3ead --- /dev/null +++ b/oak-store-remote-server/src/main/java/org/apache/jackrabbit/oak/remote/server/PrivateFileStores.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.remote.server; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; +import org.apache.jackrabbit.oak.remote.common.persistence.TailingPersistence; +import org.apache.jackrabbit.oak.remote.proto.SegmentProtos; +import org.apache.jackrabbit.oak.segment.RevRepositoryService; +import org.apache.jackrabbit.oak.segment.azure.AzurePersistence; +import org.apache.jackrabbit.oak.segment.spi.rev.RevRepository; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +public class PrivateFileStores { + + private static final Logger log = LoggerFactory.getLogger(PrivateFileStores.class); + + private final CloudBlobContainer container; + + private final String sharedDirectoryName; + + private final Map fileStoreMap; + + private final BlobStore blobStore; + + private final RevRepositoryService revNodeStoreService = new RevRepositoryService(); + + public PrivateFileStores(CloudBlobDirectory sharedDirectory, BlobStore blobStore) throws URISyntaxException, StorageException { + this.container = sharedDirectory.getContainer(); + this.sharedDirectoryName = sharedDirectory.getPrefix(); + this.fileStoreMap = new HashMap<>(); + this.blobStore = blobStore; + } + + public NodeState getNodeState(String segmentStoreDir, String revision) throws IOException { + return getFileStoreEntry(segmentStoreDir).revNodeStore.getNodeStateByRevision(revision); + } + + public synchronized void onNewSharedSegment(SegmentProtos.SegmentBlob segmentBlob) { + for (FileStoreEntry entry : fileStoreMap.values()) { + entry.privatePersistence.onNewSegment(segmentBlob); + } + } + + public void onNewPrivateSegment(String segmentStoreDir, SegmentProtos.SegmentBlob segmentBlob) { + try { + getFileStoreEntry(segmentStoreDir).privatePersistence.onNewSegment(segmentBlob); + } catch (IOException e) { + log.error("Can't process new segment {}", segmentBlob.getBlobName(), e); + } + } + + private synchronized FileStoreEntry getFileStoreEntry(String segmentStoreDir) throws IOException { + if (fileStoreMap.containsKey(segmentStoreDir)) { + return fileStoreMap.get(segmentStoreDir); + } + TailingPersistence privatePersistence; + try { + privatePersistence = new TailingPersistence( + new AzurePersistence(container.getDirectoryReference(sharedDirectoryName)), + null, + Arrays.asList(sharedDirectoryName, segmentStoreDir)); + } catch (URISyntaxException | StorageException e) { + throw new IOException(e); + } + FileStoreEntry entry = new FileStoreEntry(privatePersistence, blobStore); + fileStoreMap.put(segmentStoreDir, entry); + return entry; + } + + private class FileStoreEntry implements Closeable { + + private final RevRepository revNodeStore; + + private final TailingPersistence privatePersistence; + + public FileStoreEntry(TailingPersistence privatePersistence, BlobStore blobStore) throws IOException { + this.privatePersistence = privatePersistence; + this.revNodeStore = revNodeStoreService.builder() + .withBlobStore(blobStore) + .withPersistence(privatePersistence) + .readOnly() + .build(); + } + + public void close() throws IOException { + revNodeStore.close(); + } + } +} diff --git a/oak-store-remote-server/src/main/java/org/apache/jackrabbit/oak/remote/server/SegmentService.java b/oak-store-remote-server/src/main/java/org/apache/jackrabbit/oak/remote/server/SegmentService.java new file mode 100644 index 0000000000..ff05cfb51b --- /dev/null +++ b/oak-store-remote-server/src/main/java/org/apache/jackrabbit/oak/remote/server/SegmentService.java @@ -0,0 +1,104 @@ +package org.apache.jackrabbit.oak.remote.server; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Empty; +import io.grpc.stub.StreamObserver; +import org.apache.jackrabbit.oak.remote.common.SegmentWriteListener; +import org.apache.jackrabbit.oak.remote.proto.SegmentProtos; +import org.apache.jackrabbit.oak.remote.proto.SegmentProtos.PrivateSegment; +import org.apache.jackrabbit.oak.remote.proto.SegmentProtos.SegmentBlob; +import org.apache.jackrabbit.oak.remote.proto.SegmentServiceGrpc; +import org.apache.jackrabbit.oak.segment.Segment; +import org.apache.jackrabbit.oak.segment.SegmentId; +import org.apache.jackrabbit.oak.segment.SegmentIdProvider; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import static io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall; + +public class SegmentService extends SegmentServiceGrpc.SegmentServiceImplBase { + + private static final Logger log = LoggerFactory.getLogger(SegmentService.class); + + private final Set> observers = Collections.synchronizedSet(new HashSet<>()); + + private final FileStore fileStore; + + private final PrivateFileStores privateFileStores; + + public SegmentService(SegmentWriteListener segmentWriteListener, FileStore fileStore, PrivateFileStores privateFileStores) { + this.fileStore = fileStore; + this.privateFileStores = privateFileStores; + segmentWriteListener.setDelegate(this::onNewSegment); + } + + private void onNewSegment(SegmentBlob segmentBlob) { + for (StreamObserver o : observers) { + try { + privateFileStores.onNewSharedSegment(segmentBlob); + o.onNext(segmentBlob); + } catch (Exception e) { + log.error("Can't send event", e); + } + } + } + + @Override + public StreamObserver observeSegments(StreamObserver responseObserver) { + observers.add(responseObserver); + return new StreamObserver() { + @Override + public void onNext(Empty empty) { + } + + @Override + public void onError(Throwable throwable) { + observers.remove(responseObserver); + } + + @Override + public void onCompleted() { + observers.remove(responseObserver); + } + }; + } + + @Override + public void getSegment(SegmentProtos.SegmentId request, StreamObserver responseObserver) { + try { + SegmentIdProvider segmentIdProvider = fileStore.getSegmentIdProvider(); + SegmentId segmentId = segmentIdProvider.newSegmentId(request.getMsb(), request.getLsb()); + while (!fileStore.containsSegment(segmentId)) { + Thread.sleep(100); + } + + Segment segment = fileStore.readSegment(segmentId); + ByteString.Output output = ByteString.newOutput(segment.size()); + segment.writeTo(output); + + SegmentProtos.Segment.Builder responseBuilder = SegmentProtos.Segment.newBuilder(); + responseBuilder + .setSegmentData(output.toByteString()) + .getSegmentIdBuilder() + .setMsb(segmentId.getMostSignificantBits()) + .setLsb(segmentId.getLeastSignificantBits()); + responseObserver.onNext(responseBuilder.build()); + responseObserver.onCompleted(); + } catch (IOException | InterruptedException e) { + responseObserver.onError(e); + } + } + + @Override + public void newPrivateSegment(PrivateSegment request, StreamObserver responseObserver) { + privateFileStores.onNewPrivateSegment(request.getSegmentStoreDir(), request.getSegmentBlob()); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } +} diff --git a/oak-store-remote-server/src/main/resources/logback.xml b/oak-store-remote-server/src/main/resources/logback.xml new file mode 100644 index 0000000000..0827a35c64 --- /dev/null +++ b/oak-store-remote-server/src/main/resources/logback.xml @@ -0,0 +1,33 @@ + + + + + + + + %d{dd.MM.yyyy HH:mm:ss.SSS} [%thread] *%level* %logger - %message%n + + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index 2df4658aca..bd33554690 100644 --- a/pom.xml +++ b/pom.xml @@ -46,6 +46,9 @@ oak-security-spi oak-store-composite oak-store-document + oak-store-remote-commons + oak-store-remote-server + oak-store-remote-client oak-blob-plugins oak-blob