diff --git a/oak-blob-cloud/pom.xml b/oak-blob-cloud/pom.xml index 800f716..ad3cb3c 100644 --- a/oak-blob-cloud/pom.xml +++ b/oak-blob-cloud/pom.xml @@ -41,7 +41,7 @@ maven-bundle-plugin - org.apache.jackrabbit.oak.blob.cloud.aws.s3 + org.apache.jackrabbit.oak.blob.cloud.aws.s3,org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats sun.io @@ -101,6 +101,13 @@ ${jackrabbit.version} + + + org.apache.jackrabbit + oak-commons + ${project.version} + + com.amazonaws @@ -140,6 +147,12 @@ logback-classic test + + org.mockito + mockito-core + 1.10.19 + test + diff --git a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java index fc21bf6..042eb31 100644 --- a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java +++ b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java @@ -17,14 +17,26 @@ package org.apache.jackrabbit.oak.blob.cloud.aws.s3; import java.util.Properties; + +import com.google.common.base.Strings; import org.apache.jackrabbit.core.data.Backend; import org.apache.jackrabbit.core.data.CachingDataStore; +import org.apache.jackrabbit.core.data.DataIdentifier; +import org.apache.jackrabbit.core.data.DataStoreException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An Amazon S3 data store. */ public class S3DataStore extends CachingDataStore { + + /** + * Logger instance. + */ + private static final Logger LOG = LoggerFactory.getLogger(S3DataStore.class); + protected Properties properties; @Override @@ -47,4 +59,24 @@ public class S3DataStore extends CachingDataStore { public void setProperties(Properties properties) { this.properties = properties; } + + /** + * Look in the backend for a record matching the given identifier. Returns true + * if such a record exists. + * + * @param identifier - A path-like identifier that represents the path to + * the record in question. + * @return true if a record for the provided identifier can be found. + */ + public boolean haveRecordForIdentifier(final String identifier) { + try { + if (!Strings.isNullOrEmpty(identifier)) { + return this.getBackend().exists(new DataIdentifier(identifier)); + } + } + catch (DataStoreException e) { + LOG.warn(String.format("Data Store Exception caught checking for %s in pending uploads", identifier), e); + } + return false; + } } diff --git a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java new file mode 100644 index 0000000..70d1a5f --- /dev/null +++ b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats; + +/** + * MBean for JMX statistics pertaining to an S3DataStore. + */ +public interface S3DataStoreStatsMBean { + String TYPE = "S3DataStoreStats"; + + /** + * Obtains the number of records that are in the process + * of being "synced", meaning they are either scheduled to + * be copied to S3 or are actively being copied to S3 + * but the copy of these files has not yet completed. + * + * @return number of syncs in progress (active). + */ + long getActiveSyncs(); + + /** + * Determines whether a file-like entity with the given name + * has been "synced" (completely copied) to S3. + * + * @param nodePathName - Path to the entity to check. This is + * the repository node path, not an external file path. + * @return true if the file is synced to S3. + */ + boolean isFileSynced(final String nodePathName); +} diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobStoreBlob.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobStoreBlob.java index 4281f0e..03e6773 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobStoreBlob.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobStoreBlob.java @@ -24,8 +24,8 @@ import java.io.InputStream; import javax.annotation.CheckForNull; import javax.annotation.Nonnull; -import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; /** * A blob implementation. diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/S3DataStoreStats.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/S3DataStoreStats.java new file mode 100644 index 0000000..bca6abb --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/S3DataStoreStats.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.blob.datastore; + +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.api.PropertyState; +import org.apache.jackrabbit.oak.api.Type; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean; +import org.apache.jackrabbit.oak.commons.PathUtils; +import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.apache.jackrabbit.oak.spi.state.NodeStore; + +import java.io.File; +import java.util.List; + +public class S3DataStoreStats extends AnnotatedStandardMBean implements S3DataStoreStatsMBean { + private final S3DataStore s3ds; + + protected NodeStore nodeStore; + + public S3DataStoreStats(final S3DataStore s3ds, final NodeStore nodeStore) { + super(S3DataStoreStatsMBean.class); + this.s3ds = s3ds; + this.nodeStore = nodeStore; + } + + /** + * Obtains the number of records that are in the process + * of being "synced", meaning they are either scheduled to + * be copied to S3 or are actively being copied to S3 + * but the copy of these files has not yet completed. + * + * @return number of syncs in progress (active). + */ + @Override + public long getActiveSyncs() { + return s3ds.getPendingUploads().size(); + } + + /** + * Determines whether a file-like entity with the given name + * has been "synced" (completely copied) to S3. + * + * Determination of "synced": + * - A nodeName of null or "" is always "not synced". + * - A nodeName that does not map to a valid node is always "not synced". + * - If the node for this nodeName does not have a "jcr:data" property, + * this node is always "not synced" since such a node would never be + * copied to S3. + * - If the node for this nodeName is not in the nodeStore, this node is + * always "not synced". + * - Otherwise, the state is "synced" if the corresponding blob is + * completely stored in S3. + * + * @param nodePathName - Path to the entity to check. This is + * the repository node path, not an external file path. + * @return true if the file is synced to S3. + */ + @Override + public boolean isFileSynced(final String nodePathName) { + if (Strings.isNullOrEmpty(nodePathName)) { + return false; + } + + if (null == nodeStore) { + return false; + } + + final NodeState leafNode = findLeafNode(nodePathName); + if (! leafNode.exists()) { + return false; + } + + boolean nodeHasBinaryProperties = false; + for (final PropertyState propertyState : leafNode.getProperties()) { + nodeHasBinaryProperties |= (propertyState.getType() == Type.BINARY || propertyState.getType() == Type.BINARIES); + try { + if (propertyState.getType() == Type.BINARY) { + final Blob blob = (Blob) propertyState.getValue(propertyState.getType()); + if (null == blob || !haveRecordForBlob(blob)) { + return false; + } + } else if (propertyState.getType() == Type.BINARIES) { + final List blobs = (List) propertyState.getValue(propertyState.getType()); + if (null == blobs) { + return false; + } + for (final Blob blob : blobs) { + if (!haveRecordForBlob(blob)) { + return false; + } + } + } + } + catch (ClassCastException e) { + return false; + } + } + + // If we got here and nodeHasBinaryProperties is true, + // it means at least one binary property was found for + // the leaf node and that we were able to locate a + // record for it. + return nodeHasBinaryProperties; + } + + private NodeState findLeafNode(final String nodePathName) { + final Iterable pathNodes = PathUtils.elements(PathUtils.getParentPath(nodePathName)); + final String leafNodeName = PathUtils.getName(nodePathName); + + NodeState currentNode = nodeStore.getRoot(); + for (final String pathNodeName : pathNodes) { + if (pathNodeName.length() > 0) { + final NodeState childNode = currentNode.getChildNode(pathNodeName); + if (!childNode.exists()) { + break; + } + currentNode = childNode; + } + } + return currentNode.getChildNode(leafNodeName); + } + + private boolean haveRecordForBlob(final Blob blob) { + final String fullBlobId = blob.getContentIdentity(); + if (! Strings.isNullOrEmpty(fullBlobId) + && !InMemoryDataRecord.isInstance(fullBlobId)) { + String blobId = DataStoreBlobStore.BlobId.of(fullBlobId).blobId; + return s3ds.haveRecordForIdentifier(blobId); + } + return false; + } +} diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java index 6086e4b..5456c3c 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java @@ -60,6 +60,8 @@ import org.apache.jackrabbit.oak.api.Descriptors; import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean; import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean; import org.apache.jackrabbit.oak.api.jmx.PersistentCacheStatsMBean; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean; import org.apache.jackrabbit.oak.cache.CacheStats; import org.apache.jackrabbit.oak.commons.PropertiesUtil; import org.apache.jackrabbit.oak.osgi.ObserverTracker; @@ -71,6 +73,8 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobStoreStats; import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore; import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker; +import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore; +import org.apache.jackrabbit.oak.plugins.blob.datastore.S3DataStoreStats; import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; import org.apache.jackrabbit.oak.plugins.document.persistentCache.CacheType; import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCacheStats; @@ -796,6 +800,21 @@ public class DocumentNodeStoreService { BlobGCMBean.TYPE, "Document node store blob garbage collection")); } + // Expose statistics about S3DataStore, if one is being used + + if (null != store.getBlobStore() && store.getBlobStore() instanceof DataStoreBlobStore) { + final DataStoreBlobStore dsbs = (DataStoreBlobStore)store.getBlobStore(); + if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof S3DataStore) { + final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore(); + final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds, nodeStore); + registrations.add(registerMBean(whiteboard, + S3DataStoreStatsMBean.class, + s3dsStats, + S3DataStoreStatsMBean.TYPE, + s3dsStats.getClass().getSimpleName())); + } + } + RevisionGC revisionGC = new RevisionGC(new Runnable() { @Override public void run() { diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/S3DataStoreStatsTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/S3DataStoreStatsTest.java new file mode 100644 index 0000000..8420fd5 --- /dev/null +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/S3DataStoreStatsTest.java @@ -0,0 +1,862 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.blob.datastore; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assume.assumeFalse; +import static org.junit.Assume.assumeNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.amazonaws.util.StringInputStream; +import com.google.common.base.Optional; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.output.NullOutputStream; +import org.apache.jackrabbit.core.data.AsyncTouchCallback; +import org.apache.jackrabbit.core.data.AsyncTouchResult; +import org.apache.jackrabbit.core.data.AsyncUploadCallback; +import org.apache.jackrabbit.core.data.AsyncUploadResult; +import org.apache.jackrabbit.core.data.Backend; +import org.apache.jackrabbit.core.data.CachingDataStore; +import org.apache.jackrabbit.core.data.DataIdentifier; +import org.apache.jackrabbit.core.data.DataRecord; +import org.apache.jackrabbit.core.data.DataStoreException; +import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean; +import org.apache.jackrabbit.oak.commons.PathUtils; +import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore; +import org.apache.jackrabbit.oak.plugins.memory.MultiBinaryPropertyState; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.joda.time.DateTime; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.jcr.RepositoryException; +import javax.management.InstanceAlreadyExistsException; +import javax.management.InstanceNotFoundException; +import javax.management.JMX; +import javax.management.MBeanRegistrationException; +import javax.management.MBeanServer; +import javax.management.MBeanServerConnection; +import javax.management.MBeanServerFactory; +import javax.management.MalformedObjectNameException; +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.management.ManagementFactory; +import java.security.DigestOutputStream; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class S3DataStoreStatsTest { + @Rule + public TemporaryFolder folder = new TemporaryFolder(new File("target")); + + private static MBeanServer jmxServer; + private static ObjectName mBeanName; + private static NodeStore nodeStore; + private static Blob mockBlob; + + private static String testNodePathName = "test/node/path/name"; + private static String testNodePathContents = "testNodePathContents"; + + private S3DataStore defaultS3ds; + private S3DataStore autoSyncMockS3ds; + private S3DataStore manualSyncMockS3ds; + private S3DataStoreStatsMBean mBean; + + @BeforeClass + public static void preClass() throws IOException, RepositoryException, MalformedObjectNameException, + NoSuchAlgorithmException, CommitFailedException + { + + // This will cause all tests in this file to be ignored if JMX properties + // are not passed into the test execution. + // + // If you want to run this unit test suite you will need to + // pass the following settings into the command-line. + // Example: + // -Djava.rmi.server.hostname=localhost + // -Dcom.sun.management.jmxremote.port=9999 + // -Dcom.sun.management.jmxremote.ssl=false + // -Dcom.sun.management.jmxremote.authenticate=false + for (final String property : Lists.newArrayList("java.rmi.server.hostname", + "com.sun.management.jmxremote.port", + "com.sun.management.jmxremote.ssl", + "com.sun.management.jmxremote.authenticate")) { + assumeFalse(Strings.isNullOrEmpty(System.getProperty(property))); + } + + // This will cause all tests in this file to be ignored if no JMX + // server could be found. + jmxServer = ManagementFactory.getPlatformMBeanServer(); + if (null == jmxServer) { + jmxServer = MBeanServerFactory.newMBeanServer(); + } + assumeNotNull(jmxServer); + + mBeanName = new ObjectName("org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats:type=S3DataStoreStats"); + + final String testNodeId = getIdForInputStream(new StringInputStream(testNodePathContents)); + + mockBlob = mock(Blob.class); + when(mockBlob.getContentIdentity()).thenReturn(testNodeId); + + nodeStore = initNodeStore(Optional.of(mockBlob), + Optional.absent(), + Optional.absent(), + Optional.absent(), + Optional.>absent()); + } + + @Before + public void setup() throws IOException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException, RepositoryException { + // Set up JMX connection and mbean + final JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi"); + final JMXConnector connector = JMXConnectorFactory.connect(url, null); + final MBeanServerConnection connection = connector.getMBeanServerConnection(); + mBean = JMX.newMBeanProxy(connection, mBeanName, S3DataStoreStatsMBean.class, true); + + defaultS3ds = mock(S3DataStore.class); + autoSyncMockS3ds = new CustomBackendS3DataStore(new InMemoryBackend()); + autoSyncMockS3ds.init(folder.newFolder().getAbsolutePath()); + manualSyncMockS3ds = new CustomBackendS3DataStore(new ManuallySyncingInMemoryBackend()); + manualSyncMockS3ds.init(folder.newFolder().getAbsolutePath()); + } + + @After + public void teardown() throws InstanceNotFoundException, MBeanRegistrationException { + jmxServer.unregisterMBean(mBeanName); + } + + private static String getIdForInputStream(final InputStream in) throws IOException, NoSuchAlgorithmException { + final char[] HEX = "0123456789abcdef".toCharArray(); + MessageDigest digest = MessageDigest.getInstance("SHA-1"); + OutputStream out = new DigestOutputStream(new NullOutputStream(), digest); + IOUtils.copyLarge(in, out); + out.close(); + byte[] digestBytes = digest.digest(); + char[] buffer = new char[digestBytes.length * 2]; + for (int i=0; i> 4) & 0x0f]; + buffer[2*i+1] = HEX[digestBytes[i] & 0x0f]; + } + return new String(buffer); + } + + private static NodeStore initNodeStore(final Optional blobProp1, + final Optional blobProp2, + final Optional stringProp, + final Optional intProp, + final Optional> blobPropList) + throws CommitFailedException { + final NodeStore nodeStore = new MemoryNodeStore(); + NodeBuilder rootBuilder = nodeStore.getRoot().builder(); + NodeBuilder builder = initNodeBuilder(rootBuilder); + + if (blobProp1.isPresent()) { + builder.setProperty("blobProp1", blobProp1.get()); + } + if (blobProp2.isPresent()) { + builder.setProperty("blobProp2", blobProp2.get()); + } + if (stringProp.isPresent()) { + builder.setProperty("stringProp", stringProp.get()); + } + if (intProp.isPresent()) { + builder.setProperty("intProp", intProp.get()); + } + if (blobPropList.isPresent()) { + builder.setProperty(MultiBinaryPropertyState + .binaryPropertyFromBlob("blobPropList", blobPropList.get())); + } + + nodeStore.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + return nodeStore; + } + + private static NodeBuilder initNodeBuilder(final NodeBuilder rootBuilder) { + NodeBuilder builder = rootBuilder; + for (final String nodeName : PathUtils.elements(testNodePathName)) { + builder = builder.child(nodeName); + } + return builder; + } + + @Test + public void testGetActiveS3FileSyncMetricExists() throws RepositoryException, IOException, MalformedObjectNameException, + InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException { + jmxServer.registerMBean(new S3DataStoreStats(defaultS3ds, nodeStore), mBeanName); + + assert(0 == mBean.getActiveSyncs()); + } + + @Test + public void testGetSingleActiveS3FileSyncMetric() throws IOException, RepositoryException, MalformedObjectNameException, + InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException { + jmxServer.registerMBean(new S3DataStoreStats(manualSyncMockS3ds, nodeStore), mBeanName); + + DataRecord record = null; + try { + record = manualSyncMockS3ds.addRecord(new StringInputStream("test")); + assert(1 == mBean.getActiveSyncs()); + } + finally { + if (null != record) { + manualSyncMockS3ds.deleteRecord(record.getIdentifier()); + } + } + + ((ManuallySyncingInMemoryBackend) manualSyncMockS3ds.getBackend()).clearInProgressWrites(); + + assert(0 == mBean.getActiveSyncs()); + } + + @Test + public void testGetMultilpleActiveS3FileSyncMetric() throws IOException, RepositoryException, MalformedObjectNameException, + InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException { + jmxServer.registerMBean(new S3DataStoreStats(manualSyncMockS3ds, nodeStore), mBeanName); + + final Set records = Sets.newHashSet(); + try { + records.add(manualSyncMockS3ds.addRecord(new StringInputStream("test1"))); + records.add(manualSyncMockS3ds.addRecord(new StringInputStream("test2"))); + records.add(manualSyncMockS3ds.addRecord(new StringInputStream("test3"))); + + assert (3 == mBean.getActiveSyncs()); + } + finally { + for (final DataRecord record : records) { + manualSyncMockS3ds.deleteRecord(record.getIdentifier()); + } + } + + ((ManuallySyncingInMemoryBackend) manualSyncMockS3ds.getBackend()).clearInProgressWrites(); + + assert(0 == mBean.getActiveSyncs()); + } + + @Test + public void testIsFileSyncedMetricExists() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + jmxServer.registerMBean(new S3DataStoreStats(defaultS3ds, nodeStore), mBeanName); + + assertFalse(mBean.isFileSynced(testNodePathName)); + } + + @Test + public void testIsFileSyncedNullFileReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + jmxServer.registerMBean(new S3DataStoreStats(defaultS3ds, nodeStore), mBeanName); + + assertFalse(mBean.isFileSynced(null)); + } + + @Test + public void testIsFileSyncedEmptyStringReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + jmxServer.registerMBean(new S3DataStoreStats(defaultS3ds, nodeStore), mBeanName); + + assertFalse(mBean.isFileSynced("")); + } + + @Test + public void testIsFileSyncedInvalidFilenameReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + jmxServer.registerMBean(new S3DataStoreStats(defaultS3ds, nodeStore), mBeanName); + + assertFalse(mBean.isFileSynced("invalid")); + } + + @Test + public void testIsFileSyncedFileNotAddedReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + jmxServer.registerMBean(new S3DataStoreStats(autoSyncMockS3ds, nodeStore), mBeanName); + + assertFalse(mBean.isFileSynced(testNodePathName)); + } + + @Test + public void testIsFileSyncedSyncIncompleteReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + jmxServer.registerMBean(new S3DataStoreStats(manualSyncMockS3ds, nodeStore), mBeanName); + + DataRecord record = null; + try { + record = manualSyncMockS3ds.addRecord(new StringInputStream(testNodePathContents)); + assertFalse(mBean.isFileSynced(testNodePathName)); + } + finally { + if (null != record) { + manualSyncMockS3ds.deleteRecord(record.getIdentifier()); + } + } + } + + @Test + public void testIsFileSyncedSyncCompleteReturnsTrue() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + jmxServer.registerMBean(new S3DataStoreStats(autoSyncMockS3ds, nodeStore), mBeanName); + + DataRecord record = null; + try { + record = autoSyncMockS3ds.addRecord(new StringInputStream(testNodePathContents)); + assert(mBean.isFileSynced(testNodePathName)); + } + finally { + if (null != record) { + autoSyncMockS3ds.deleteRecord(record.getIdentifier()); + } + } + } + + @Test + public void testIsFileSyncedDifferentPaths() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException, CommitFailedException, NoSuchAlgorithmException { + final String path1 = "path/to/node/1"; + final String path2 = "path/to/node/2"; + final String path3 = "shortpath"; + final String path4 = "a/very/very/long/path/leads/to/node/4"; + final List paths = Lists.newArrayList(path1, path2, path3, path4); + final String leadingSlashPath = "/" + path1; + + final List blobContents = Lists.newArrayList("1", "2", "3", "4"); + final List blobs = Lists.newArrayList( + mock(Blob.class), + mock(Blob.class), + mock(Blob.class), + mock(Blob.class) + ); + final List blobIds = Lists.newArrayList( + getIdForInputStream(new StringInputStream(blobContents.get(0))), + getIdForInputStream(new StringInputStream(blobContents.get(1))), + getIdForInputStream(new StringInputStream(blobContents.get(2))), + getIdForInputStream(new StringInputStream(blobContents.get(3))) + ); + when(blobs.get(0).getContentIdentity()).thenReturn(blobIds.get(0)); + when(blobs.get(1).getContentIdentity()).thenReturn(blobIds.get(1)); + when(blobs.get(2).getContentIdentity()).thenReturn(blobIds.get(2)); + when(blobs.get(3).getContentIdentity()).thenReturn(blobIds.get(3)); + + final NodeStore nodeStore = new MemoryNodeStore(); + final NodeBuilder rootBuilder = nodeStore.getRoot().builder(); + + final List builders = Lists.newArrayList(); + for (final String path : paths) { + NodeBuilder builder = rootBuilder; + for (final String nodeName : PathUtils.elements(path)) { + builder = builder.child(nodeName); + } + builders.add(builder); + } + builders.get(0).setProperty("blob1", blobs.get(0)); + builders.get(1).setProperty("blob2", blobs.get(1)); + builders.get(2).setProperty("blob3", blobs.get(2)); + builders.get(3).setProperty("blob4", blobs.get(3)); + + nodeStore.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + jmxServer.registerMBean(new S3DataStoreStats(autoSyncMockS3ds, nodeStore), mBeanName); + + final List records = Lists.newArrayList(); + try { + for (final String s : blobContents) { + records.add(autoSyncMockS3ds.addRecord(new StringInputStream(s))); + } + + for (final String path : Lists.newArrayList(path1, path2, path3, path4, leadingSlashPath)) { + assert(mBean.isFileSynced(path)); + } + + for (final String invalidPath : Lists.newArrayList(path1 + "/", "/" + path1 + "/", "/path//to/node///1")) { + try { + mBean.isFileSynced(invalidPath); + assert(false); // shouldn't get here on an invalid path + } + catch (AssertionError e) { + // expected + } + } + } + finally { + for (final DataRecord record : records) { + autoSyncMockS3ds.deleteRecord(record.getIdentifier()); + } + } + } + + @Test + public void testIsFileSyncedFileDeletedReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException { + jmxServer.registerMBean(new S3DataStoreStats(autoSyncMockS3ds, nodeStore), mBeanName); + + DataRecord record = null; + try { + record = autoSyncMockS3ds.addRecord(new StringInputStream(testNodePathContents)); + } + finally { + if (null != record) { + autoSyncMockS3ds.deleteRecord(record.getIdentifier()); + } + } + + assertFalse(mBean.isFileSynced(testNodePathName)); + } + + @Test + public void testIsFileSyncedMultiplePropertiesReturnsTrue() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException, CommitFailedException { + NodeStore nodeStore = initNodeStore(Optional.of(mockBlob), + Optional.absent(), + Optional.of("abc"), + Optional.of(123), + Optional.>absent()); + + jmxServer.registerMBean(new S3DataStoreStats(autoSyncMockS3ds, nodeStore), mBeanName); + + DataRecord record = null; + try { + record = autoSyncMockS3ds.addRecord(new StringInputStream(testNodePathContents)); + + assert(mBean.isFileSynced(testNodePathName)); + } + finally { + if (null != record) { + autoSyncMockS3ds.deleteRecord(record.getIdentifier()); + } + } + } + + @Test + public void testIsFileSyncedMultipleBinaryPropertiesAllSyncedReturnsTrue() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException, CommitFailedException, NoSuchAlgorithmException { + Blob mockBlob2 = mock(Blob.class); + final String id2 = getIdForInputStream(new StringInputStream("testContents2")); + when(mockBlob2.getContentIdentity()).thenReturn(id2); + NodeStore nodeStore = initNodeStore(Optional.of(mockBlob), + Optional.of(mockBlob2), + Optional.absent(), + Optional.absent(), + Optional.>absent()); + + jmxServer.registerMBean(new S3DataStoreStats(autoSyncMockS3ds, nodeStore), mBeanName); + + DataRecord record = null; + DataRecord record2 = null; + try { + record = autoSyncMockS3ds.addRecord(new StringInputStream(testNodePathContents)); + record2 = autoSyncMockS3ds.addRecord(new StringInputStream("testContents2")); + + assert(mBean.isFileSynced(testNodePathName)); + } + finally { + if (null != record) { + autoSyncMockS3ds.deleteRecord(record.getIdentifier()); + } + if (null != record2) { + autoSyncMockS3ds.deleteRecord(record2.getIdentifier()); + } + } + } + + @Test + public void testIsFileSyncedMultipleBinaryPropertiesNotAllSyncedReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException, CommitFailedException, NoSuchAlgorithmException { + Blob mockBlob2 = mock(Blob.class); + final String id2 = getIdForInputStream(new StringInputStream("testContents2")); + when(mockBlob2.getContentIdentity()).thenReturn(id2); + NodeStore nodeStore = initNodeStore(Optional.of(mockBlob), + Optional.of(mockBlob2), + Optional.absent(), + Optional.absent(), + Optional.>absent()); + + jmxServer.registerMBean(new S3DataStoreStats(autoSyncMockS3ds, nodeStore), mBeanName); + + DataRecord record = null; + try { + record = autoSyncMockS3ds.addRecord(new StringInputStream(testNodePathContents)); + + assertFalse(mBean.isFileSynced(testNodePathName)); + } + finally { + if (null != record) { + autoSyncMockS3ds.deleteRecord(record.getIdentifier()); + } + } + } + + @Test + public void testIsFileSyncedBinariesPropertySingleReturnsTrue() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException, CommitFailedException, NoSuchAlgorithmException { + List blobPropList = Lists.newArrayList(mockBlob); + NodeStore nodeStore = initNodeStore(Optional.absent(), + Optional.absent(), + Optional.absent(), + Optional.absent(), + Optional.of(blobPropList)); + + jmxServer.registerMBean(new S3DataStoreStats(autoSyncMockS3ds, nodeStore), mBeanName); + + DataRecord record = null; + try { + record = autoSyncMockS3ds.addRecord(new StringInputStream(testNodePathContents)); + + assert(mBean.isFileSynced(testNodePathName)); + } + finally { + if (null != record) { + autoSyncMockS3ds.deleteRecord(record.getIdentifier()); + } + } + } + + @Test + public void testIsFileSyncedBinariesPropertyMultipleReturnsTrue() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException, CommitFailedException, NoSuchAlgorithmException { + Blob mockBlob2 = mock(Blob.class); + final String id2 = getIdForInputStream(new StringInputStream("testContents2")); + when(mockBlob2.getContentIdentity()).thenReturn(id2); + List blobPropList = Lists.newArrayList(mockBlob, mockBlob2); + NodeStore nodeStore = initNodeStore(Optional.absent(), + Optional.absent(), + Optional.absent(), + Optional.absent(), + Optional.of(blobPropList)); + + jmxServer.registerMBean(new S3DataStoreStats(autoSyncMockS3ds, nodeStore), mBeanName); + + DataRecord record1 = null; + DataRecord record2 = null; + try { + record1 = autoSyncMockS3ds.addRecord(new StringInputStream(testNodePathContents)); + record2 = autoSyncMockS3ds.addRecord(new StringInputStream("testContents2")); + + assert(mBean.isFileSynced(testNodePathName)); + } + finally { + if (null != record1) { + autoSyncMockS3ds.deleteRecord(record1.getIdentifier()); + } + if (null != record2) { + autoSyncMockS3ds.deleteRecord(record2.getIdentifier()); + } + } + } + + @Test + public void testIsFileSyncedBinariesPropertyNotAllSyncedReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException, CommitFailedException, NoSuchAlgorithmException { + Blob mockBlob2 = mock(Blob.class); + final String id2 = getIdForInputStream(new StringInputStream("testContents2")); + when(mockBlob2.getContentIdentity()).thenReturn(id2); + List blobPropList = Lists.newArrayList(mockBlob, mockBlob2); + NodeStore nodeStore = initNodeStore(Optional.absent(), + Optional.absent(), + Optional.absent(), + Optional.absent(), + Optional.of(blobPropList)); + + jmxServer.registerMBean(new S3DataStoreStats(autoSyncMockS3ds, nodeStore), mBeanName); + + DataRecord record = null; + try { + record = autoSyncMockS3ds.addRecord(new StringInputStream(testNodePathContents)); + + assertFalse(mBean.isFileSynced(testNodePathName)); + } + finally { + if (null != record) { + autoSyncMockS3ds.deleteRecord(record.getIdentifier()); + } + } + } + + @Test + public void testIsFileSyncedBinarySyncedAndBinariesNotSyncedReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException, CommitFailedException, NoSuchAlgorithmException { + Blob mockBlob2 = mock(Blob.class); + final String id2 = getIdForInputStream(new StringInputStream("testContents2")); + when(mockBlob2.getContentIdentity()).thenReturn(id2); + Blob mockBlob3 = mock(Blob.class); + final String id3 = getIdForInputStream(new StringInputStream("testContents3")); + when(mockBlob2.getContentIdentity()).thenReturn(id3); + List blobPropList = Lists.newArrayList(mockBlob2, mockBlob3); + NodeStore nodeStore = initNodeStore(Optional.of(mockBlob), + Optional.absent(), + Optional.absent(), + Optional.absent(), + Optional.of(blobPropList)); + + jmxServer.registerMBean(new S3DataStoreStats(autoSyncMockS3ds, nodeStore), mBeanName); + + DataRecord record1 = null; + DataRecord record2 = null; + try { + record1 = autoSyncMockS3ds.addRecord(new StringInputStream(testNodePathContents)); + record2 = autoSyncMockS3ds.addRecord(new StringInputStream("testContents2")); + + assertFalse(mBean.isFileSynced(testNodePathName)); + } + finally { + if (null != record1) { + autoSyncMockS3ds.deleteRecord(record1.getIdentifier()); + } + if (null != record2) { + autoSyncMockS3ds.deleteRecord(record2.getIdentifier()); + } + } + } + + @Test + public void testIsFileSyncedBinaryNotSyncedAndBinariesSyncedReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException, CommitFailedException, NoSuchAlgorithmException { + Blob mockBlob2 = mock(Blob.class); + final String id2 = getIdForInputStream(new StringInputStream("testContents2")); + when(mockBlob2.getContentIdentity()).thenReturn(id2); + Blob mockBlob3 = mock(Blob.class); + final String id3 = getIdForInputStream(new StringInputStream("testContents3")); + when(mockBlob2.getContentIdentity()).thenReturn(id3); + List blobPropList = Lists.newArrayList(mockBlob2, mockBlob3); + NodeStore nodeStore = initNodeStore(Optional.of(mockBlob), + Optional.absent(), + Optional.absent(), + Optional.absent(), + Optional.of(blobPropList)); + + jmxServer.registerMBean(new S3DataStoreStats(autoSyncMockS3ds, nodeStore), mBeanName); + + DataRecord record1 = null; + DataRecord record2 = null; + try { + record1 = autoSyncMockS3ds.addRecord(new StringInputStream("testContents2")); + record2 = autoSyncMockS3ds.addRecord(new StringInputStream("testContents3")); + + assertFalse(mBean.isFileSynced(testNodePathName)); + } + finally { + if (null != record1) { + autoSyncMockS3ds.deleteRecord(record1.getIdentifier()); + } + if (null != record2) { + autoSyncMockS3ds.deleteRecord(record2.getIdentifier()); + } + } + } + + @Test + public void testIsFileSyncedBinaryAndBinariesSyncedReturnsTrue() throws IOException, RepositoryException, InstanceAlreadyExistsException, + MBeanRegistrationException, NotCompliantMBeanException, CommitFailedException, NoSuchAlgorithmException { + Blob mockBlob2 = mock(Blob.class); + final String id2 = getIdForInputStream(new StringInputStream("testContents2")); + when(mockBlob2.getContentIdentity()).thenReturn(id2); + Blob mockBlob3 = mock(Blob.class); + final String id3 = getIdForInputStream(new StringInputStream("testContents3")); + when(mockBlob2.getContentIdentity()).thenReturn(id3); + List blobPropList = Lists.newArrayList(mockBlob2, mockBlob3); + NodeStore nodeStore = initNodeStore(Optional.of(mockBlob), + Optional.absent(), + Optional.absent(), + Optional.absent(), + Optional.of(blobPropList)); + + jmxServer.registerMBean(new S3DataStoreStats(autoSyncMockS3ds, nodeStore), mBeanName); + + DataRecord record1 = null; + DataRecord record2 = null; + DataRecord record3 = null; + try { + record1 = autoSyncMockS3ds.addRecord(new StringInputStream(testNodePathContents)); + record2 = autoSyncMockS3ds.addRecord(new StringInputStream("testContents2")); + record3 = autoSyncMockS3ds.addRecord(new StringInputStream("testContents3")); + + assertFalse(mBean.isFileSynced(testNodePathName)); + } + finally { + if (null != record1) { + autoSyncMockS3ds.deleteRecord(record1.getIdentifier()); + } + if (null != record2) { + autoSyncMockS3ds.deleteRecord(record2.getIdentifier()); + } + if (null != record3) { + autoSyncMockS3ds.deleteRecord(record3.getIdentifier()); + } + } + } + + + // A mock S3DataStore that allows us to replace the default + // S3Backend with our own backend, for test purposes only. + private class CustomBackendS3DataStore extends S3DataStore { + private Backend _localBackend; + + CustomBackendS3DataStore(final Backend backend) { _localBackend = backend; } + + @Override + protected Backend createBackend() { + return _localBackend; + } + } + + // A mock Backend implementation that uses a Map to keep track of what + // records have been added and removed, for test purposes only. + private class InMemoryBackend implements Backend { + final Map _backend = Maps.newHashMap(); + + @Override + public void init(CachingDataStore store, String homeDir, String config) throws DataStoreException { + + } + + @Override + public InputStream read(DataIdentifier identifier) throws DataStoreException { + try { + return new FileInputStream(_backend.get(identifier)); + } + catch (FileNotFoundException e) { + throw new DataStoreException(e); + } + } + + @Override + public long getLength(DataIdentifier identifier) throws DataStoreException { + return _backend.get(identifier).length(); + } + + @Override + public long getLastModified(DataIdentifier identifier) throws DataStoreException { + return _backend.get(identifier).lastModified(); + } + + @Override + public void write(DataIdentifier identifier, File file) throws DataStoreException { + _backend.put(identifier, file); + } + + @Override + public void writeAsync(final DataIdentifier identifier, final File file, AsyncUploadCallback callback) throws DataStoreException { + write(identifier, file); + callback.onSuccess(new AsyncUploadResult(identifier, file)); + } + + @Override + public Iterator getAllIdentifiers() throws DataStoreException { + return _backend.keySet().iterator(); + } + + @Override + public boolean exists(DataIdentifier identifier, boolean touch) throws DataStoreException { + if (_backend.containsKey(identifier) && touch) { + touch(identifier, new DateTime().getMillis()); + } + return exists(identifier); + } + + @Override + public boolean exists(DataIdentifier identifier) throws DataStoreException { + return _backend.containsKey(identifier); + } + + @Override + public void touch(DataIdentifier identifier, long minModifiedDate) throws DataStoreException { + + } + + @Override + public void touchAsync(DataIdentifier identifier, long minModifiedDate, AsyncTouchCallback callback) throws DataStoreException { + callback.onSuccess(new AsyncTouchResult(identifier)); + } + + @Override + public void close() throws DataStoreException { + + } + + @Override + public Set deleteAllOlderThan(long timestamp) throws DataStoreException { + final Set toDelete = Sets.newHashSet(); + for (final DataIdentifier identifier : _backend.keySet()) { + if (_backend.get(identifier).lastModified() < timestamp) { + toDelete.add(identifier); + } + } + for (final DataIdentifier identifier : toDelete) { + _backend.remove(identifier); + } + return toDelete; + } + + @Override + public void deleteRecord(DataIdentifier identifier) throws DataStoreException { + if (_backend.containsKey(identifier)) { + _backend.remove(identifier); + } + } + } + + // A modified InMemoryBackend that, when writeAsync() is called, does not + // actually store the record but keeps track that it was intended to be + // stored, and allows the test to tell it when it expects the record + // to be "synced". + private class ManuallySyncingInMemoryBackend extends InMemoryBackend { + final Map inProgessWrites = Maps.newHashMap(); + final Map asyncCallbacks = Maps.newHashMap(); + + @Override + public void writeAsync(final DataIdentifier identifier, final File file, AsyncUploadCallback callback) throws DataStoreException { + inProgessWrites.put(identifier, file); + asyncCallbacks.put(identifier, callback); + } + + void clearInProgressWrites() { + for (final DataIdentifier identifier : inProgessWrites.keySet()) { + final File file = inProgessWrites.get(identifier); + asyncCallbacks.get(identifier).onSuccess(new AsyncUploadResult(identifier, file)); + _backend.put(identifier, file); + } + inProgessWrites.clear(); + } + } +} diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java index 35c4206..3f2256e 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java @@ -59,6 +59,8 @@ import org.apache.jackrabbit.commons.SimpleValueFactory; import org.apache.jackrabbit.oak.api.Descriptors; import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean; import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean; import org.apache.jackrabbit.oak.cache.CacheStats; import org.apache.jackrabbit.oak.osgi.ObserverTracker; import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard; @@ -69,6 +71,8 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore; import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector; import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker; +import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore; +import org.apache.jackrabbit.oak.plugins.blob.datastore.S3DataStoreStats; import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType; import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo; @@ -635,6 +639,23 @@ public class SegmentNodeStoreService extends ProxyNodeStore )); } + // Expose statistics about S3DataStore, if one is being used + + if (null != blobStore && blobStore instanceof DataStoreBlobStore) { + final DataStoreBlobStore dsbs = (DataStoreBlobStore)blobStore; + if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof S3DataStore) { + final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore(); + final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds, segmentNodeStore); + registrations.add(registerMBean( + whiteboard, + S3DataStoreStatsMBean.class, + s3dsStats, + S3DataStoreStatsMBean.TYPE, + s3dsStats.getClass().getSimpleName() + )); + } + } + log.info("SegmentNodeStore initialized"); return true; } diff --git a/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java b/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java index 11abf49..66bd33e 100644 --- a/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java +++ b/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java @@ -59,6 +59,8 @@ import org.apache.jackrabbit.commons.SimpleValueFactory; import org.apache.jackrabbit.oak.api.Descriptors; import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean; import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore; +import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean; import org.apache.jackrabbit.oak.cache.CacheStats; import org.apache.jackrabbit.oak.commons.PropertiesUtil; import org.apache.jackrabbit.oak.osgi.ObserverTracker; @@ -70,6 +72,8 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore; import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector; import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker; +import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore; +import org.apache.jackrabbit.oak.plugins.blob.datastore.S3DataStoreStats; import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType; import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo; @@ -289,6 +293,7 @@ public class SegmentNodeStoreService extends ProxyNodeStore private Registration stringCacheMBean; private Registration fsgcMonitorMBean; private Registration fileStoreStatsMBean; + private Registration s3DataStoreStatsRegistration; private WhiteboardExecutor executor; private boolean customBlobStore; @@ -536,6 +541,23 @@ public class SegmentNodeStoreService extends ProxyNodeStore scheduleWithFixedDelay(whiteboard, fsgcm, 1) ); + // Expose statistics about S3DataStore, if one is being used + + if (null != blobStore && blobStore instanceof DataStoreBlobStore) { + final DataStoreBlobStore dsbs = (DataStoreBlobStore)blobStore; + if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof S3DataStore) { + final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore(); + final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds, segmentNodeStore); + s3DataStoreStatsRegistration = registerMBean( + whiteboard, + S3DataStoreStatsMBean.class, + s3dsStats, + S3DataStoreStatsMBean.TYPE, + s3dsStats.getClass().getSimpleName() + ); + } + } + // Register a factory service to expose the FileStore providerRegistration = context.getBundleContext().registerService(SegmentStoreProvider.class.getName(), this, null); @@ -714,6 +736,10 @@ public class SegmentNodeStoreService extends ProxyNodeStore fileStoreStatsMBean.unregister(); fileStoreStatsMBean = null; } + if (s3DataStoreStatsRegistration != null) { + s3DataStoreStatsRegistration.unregister(); + s3DataStoreStatsRegistration = null; + } if (executor != null) { executor.stop(); executor = null;