diff --git a/oak-blob-cloud/pom.xml b/oak-blob-cloud/pom.xml
index 800f716..ad3cb3c 100644
--- a/oak-blob-cloud/pom.xml
+++ b/oak-blob-cloud/pom.xml
@@ -41,7 +41,7 @@
maven-bundle-plugin
- org.apache.jackrabbit.oak.blob.cloud.aws.s3
+ org.apache.jackrabbit.oak.blob.cloud.aws.s3,org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats
sun.io
@@ -101,6 +101,13 @@
${jackrabbit.version}
+
+
+ org.apache.jackrabbit
+ oak-commons
+ ${project.version}
+
+
com.amazonaws
@@ -140,6 +147,12 @@
logback-classic
test
+
+ org.mockito
+ mockito-core
+ 1.10.19
+ test
+
diff --git a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java
index fc21bf6..042eb31 100644
--- a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java
+++ b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java
@@ -17,14 +17,26 @@
package org.apache.jackrabbit.oak.blob.cloud.aws.s3;
import java.util.Properties;
+
+import com.google.common.base.Strings;
import org.apache.jackrabbit.core.data.Backend;
import org.apache.jackrabbit.core.data.CachingDataStore;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* An Amazon S3 data store.
*/
public class S3DataStore extends CachingDataStore {
+
+ /**
+ * Logger instance.
+ */
+ private static final Logger LOG = LoggerFactory.getLogger(S3DataStore.class);
+
protected Properties properties;
@Override
@@ -47,4 +59,24 @@ public class S3DataStore extends CachingDataStore {
public void setProperties(Properties properties) {
this.properties = properties;
}
+
+ /**
+ * Look in the backend for a record matching the given identifier. Returns true
+ * if such a record exists.
+ *
+ * @param identifier - A path-like identifier that represents the path to
+ * the record in question.
+ * @return true if a record for the provided identifier can be found.
+ */
+ public boolean haveRecordForIdentifier(final String identifier) {
+ try {
+ if (!Strings.isNullOrEmpty(identifier)) {
+ return this.getBackend().exists(new DataIdentifier(identifier));
+ }
+ }
+ catch (DataStoreException e) {
+ LOG.warn(String.format("Data Store Exception caught checking for %s in pending uploads", identifier), e);
+ }
+ return false;
+ }
}
diff --git a/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java
new file mode 100644
index 0000000..70d1a5f
--- /dev/null
+++ b/oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats;
+
+/**
+ * MBean for JMX statistics pertaining to an S3DataStore.
+ */
+public interface S3DataStoreStatsMBean {
+ String TYPE = "S3DataStoreStats";
+
+ /**
+ * Obtains the number of records that are in the process
+ * of being "synced", meaning they are either scheduled to
+ * be copied to S3 or are actively being copied to S3
+ * but the copy of these files has not yet completed.
+ *
+ * @return number of syncs in progress (active).
+ */
+ long getActiveSyncs();
+
+ /**
+ * Determines whether a file-like entity with the given name
+ * has been "synced" (completely copied) to S3.
+ *
+ * @param nodePathName - Path to the entity to check. This is
+ * the repository node path, not an external file path.
+ * @return true if the file is synced to S3.
+ */
+ boolean isFileSynced(final String nodePathName);
+}
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobStoreBlob.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobStoreBlob.java
index 4281f0e..03e6773 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobStoreBlob.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobStoreBlob.java
@@ -24,8 +24,8 @@ import java.io.InputStream;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
-import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
/**
* A blob implementation.
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/S3DataStoreStats.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/S3DataStoreStats.java
new file mode 100644
index 0000000..bca6abb
--- /dev/null
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/S3DataStoreStats.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob.datastore;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean;
+import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+
+import java.io.File;
+import java.util.List;
+
+public class S3DataStoreStats extends AnnotatedStandardMBean implements S3DataStoreStatsMBean {
+ private final S3DataStore s3ds;
+
+ protected NodeStore nodeStore;
+
+ public S3DataStoreStats(final S3DataStore s3ds, final NodeStore nodeStore) {
+ super(S3DataStoreStatsMBean.class);
+ this.s3ds = s3ds;
+ this.nodeStore = nodeStore;
+ }
+
+ /**
+ * Obtains the number of records that are in the process
+ * of being "synced", meaning they are either scheduled to
+ * be copied to S3 or are actively being copied to S3
+ * but the copy of these files has not yet completed.
+ *
+ * @return number of syncs in progress (active).
+ */
+ @Override
+ public long getActiveSyncs() {
+ return s3ds.getPendingUploads().size();
+ }
+
+ /**
+ * Determines whether a file-like entity with the given name
+ * has been "synced" (completely copied) to S3.
+ *
+ * Determination of "synced":
+ * - A nodeName of null or "" is always "not synced".
+ * - A nodeName that does not map to a valid node is always "not synced".
+ * - If the node for this nodeName does not have a "jcr:data" property,
+ * this node is always "not synced" since such a node would never be
+ * copied to S3.
+ * - If the node for this nodeName is not in the nodeStore, this node is
+ * always "not synced".
+ * - Otherwise, the state is "synced" if the corresponding blob is
+ * completely stored in S3.
+ *
+ * @param nodePathName - Path to the entity to check. This is
+ * the repository node path, not an external file path.
+ * @return true if the file is synced to S3.
+ */
+ @Override
+ public boolean isFileSynced(final String nodePathName) {
+ if (Strings.isNullOrEmpty(nodePathName)) {
+ return false;
+ }
+
+ if (null == nodeStore) {
+ return false;
+ }
+
+ final NodeState leafNode = findLeafNode(nodePathName);
+ if (! leafNode.exists()) {
+ return false;
+ }
+
+ boolean nodeHasBinaryProperties = false;
+ for (final PropertyState propertyState : leafNode.getProperties()) {
+ nodeHasBinaryProperties |= (propertyState.getType() == Type.BINARY || propertyState.getType() == Type.BINARIES);
+ try {
+ if (propertyState.getType() == Type.BINARY) {
+ final Blob blob = (Blob) propertyState.getValue(propertyState.getType());
+ if (null == blob || !haveRecordForBlob(blob)) {
+ return false;
+ }
+ } else if (propertyState.getType() == Type.BINARIES) {
+ final List blobs = (List) propertyState.getValue(propertyState.getType());
+ if (null == blobs) {
+ return false;
+ }
+ for (final Blob blob : blobs) {
+ if (!haveRecordForBlob(blob)) {
+ return false;
+ }
+ }
+ }
+ }
+ catch (ClassCastException e) {
+ return false;
+ }
+ }
+
+ // If we got here and nodeHasBinaryProperties is true,
+ // it means at least one binary property was found for
+ // the leaf node and that we were able to locate a
+ // record for it.
+ return nodeHasBinaryProperties;
+ }
+
+ private NodeState findLeafNode(final String nodePathName) {
+ final Iterable pathNodes = PathUtils.elements(PathUtils.getParentPath(nodePathName));
+ final String leafNodeName = PathUtils.getName(nodePathName);
+
+ NodeState currentNode = nodeStore.getRoot();
+ for (final String pathNodeName : pathNodes) {
+ if (pathNodeName.length() > 0) {
+ final NodeState childNode = currentNode.getChildNode(pathNodeName);
+ if (!childNode.exists()) {
+ break;
+ }
+ currentNode = childNode;
+ }
+ }
+ return currentNode.getChildNode(leafNodeName);
+ }
+
+ private boolean haveRecordForBlob(final Blob blob) {
+ final String fullBlobId = blob.getContentIdentity();
+ if (! Strings.isNullOrEmpty(fullBlobId)
+ && !InMemoryDataRecord.isInstance(fullBlobId)) {
+ String blobId = DataStoreBlobStore.BlobId.of(fullBlobId).blobId;
+ return s3ds.haveRecordForIdentifier(blobId);
+ }
+ return false;
+ }
+}
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
index 6086e4b..5456c3c 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
@@ -60,6 +60,8 @@ import org.apache.jackrabbit.oak.api.Descriptors;
import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
import org.apache.jackrabbit.oak.api.jmx.PersistentCacheStatsMBean;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean;
import org.apache.jackrabbit.oak.cache.CacheStats;
import org.apache.jackrabbit.oak.commons.PropertiesUtil;
import org.apache.jackrabbit.oak.osgi.ObserverTracker;
@@ -71,6 +73,8 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobStoreStats;
import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore;
import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.S3DataStoreStats;
import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
import org.apache.jackrabbit.oak.plugins.document.persistentCache.CacheType;
import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCacheStats;
@@ -796,6 +800,21 @@ public class DocumentNodeStoreService {
BlobGCMBean.TYPE, "Document node store blob garbage collection"));
}
+ // Expose statistics about S3DataStore, if one is being used
+
+ if (null != store.getBlobStore() && store.getBlobStore() instanceof DataStoreBlobStore) {
+ final DataStoreBlobStore dsbs = (DataStoreBlobStore)store.getBlobStore();
+ if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof S3DataStore) {
+ final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore();
+ final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds, nodeStore);
+ registrations.add(registerMBean(whiteboard,
+ S3DataStoreStatsMBean.class,
+ s3dsStats,
+ S3DataStoreStatsMBean.TYPE,
+ s3dsStats.getClass().getSimpleName()));
+ }
+ }
+
RevisionGC revisionGC = new RevisionGC(new Runnable() {
@Override
public void run() {
diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/S3DataStoreStatsTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/S3DataStoreStatsTest.java
new file mode 100644
index 0000000..1225142
--- /dev/null
+++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/S3DataStoreStatsTest.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob.datastore;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assume.assumeFalse;
+import static org.junit.Assume.assumeNotNull;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.amazonaws.util.StringInputStream;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.output.NullOutputStream;
+import org.apache.jackrabbit.core.data.AsyncTouchCallback;
+import org.apache.jackrabbit.core.data.AsyncTouchResult;
+import org.apache.jackrabbit.core.data.AsyncUploadCallback;
+import org.apache.jackrabbit.core.data.AsyncUploadResult;
+import org.apache.jackrabbit.core.data.Backend;
+import org.apache.jackrabbit.core.data.CachingDataStore;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.joda.time.DateTime;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.jcr.RepositoryException;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.InstanceNotFoundException;
+import javax.management.JMX;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerConnection;
+import javax.management.MBeanServerFactory;
+import javax.management.MalformedObjectNameException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import java.io.*;
+import java.lang.management.ManagementFactory;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+public class S3DataStoreStatsTest {
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+
+ private static MBeanServer jmxServer;
+ private static ObjectName mBeanName;
+ private static NodeStore mockNodeStore;
+
+ private static String testNodePathName = "test/node/path/name";
+ private static String testNodePathContents = "testNodePathContents";
+
+ private S3DataStore defaultS3ds;
+ private S3DataStore autoSyncMockS3ds;
+ private S3DataStore manualSyncMockS3ds;
+ private S3DataStoreStatsMBean mBean;
+
+ @BeforeClass
+ public static void preClass() throws IOException, RepositoryException, MalformedObjectNameException,
+ NoSuchAlgorithmException
+ {
+
+ // This will cause all tests in this file to be ignored if JMX properties
+ // are not passed into the test execution.
+ //
+ // If you want to run this unit test suite you will need to
+ // pass the following settings into the command-line.
+ // Example:
+ // -Djava.rmi.server.hostname=localhost
+ // -Dcom.sun.management.jmxremote.port=9999
+ // -Dcom.sun.management.jmxremote.ssl=false
+ // -Dcom.sun.management.jmxremote.authenticate=false
+ for (final String property : Lists.newArrayList("java.rmi.server.hostname",
+ "com.sun.management.jmxremote.port",
+ "com.sun.management.jmxremote.ssl",
+ "com.sun.management.jmxremote.authenticate")) {
+ assumeFalse(Strings.isNullOrEmpty(System.getProperty(property)));
+ }
+
+ // This will cause all tests in this file to be ignored if no JMX
+ // server could be found.
+ jmxServer = ManagementFactory.getPlatformMBeanServer();
+ if (null == jmxServer) {
+ jmxServer = MBeanServerFactory.newMBeanServer();
+ }
+ assumeNotNull(jmxServer);
+
+ mBeanName = new ObjectName("org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats:type=S3DataStoreStats");
+
+ final char[] HEX = "0123456789abcdef".toCharArray();
+ MessageDigest digest = MessageDigest.getInstance("SHA-1");
+ OutputStream out = new DigestOutputStream(new NullOutputStream(), digest);
+ IOUtils.copyLarge(new StringInputStream(testNodePathContents), out);
+ out.close();
+ byte[] digestBytes = digest.digest();
+ char[] buffer = new char[digestBytes.length * 2];
+ for (int i=0; i> 4) & 0x0f];
+ buffer[2*i+1] = HEX[digestBytes[i] & 0x0f];
+ }
+ final String testNodeId = new String(buffer);
+
+ mockNodeStore = mock(NodeStore.class);
+ final NodeState mockRootState = mock(NodeState.class);
+ final NodeState mockLeafState = mock(NodeState.class);
+ final PropertyState mockLeafPropertyState = mock(PropertyState.class);
+ final Blob mockBlob = mock(Blob.class);
+ when(mockNodeStore.getRoot()).thenReturn(mockRootState);
+ when(mockRootState.getChildNode(anyString())).thenReturn(mockLeafState);
+ when(mockLeafState.getChildNode(anyString())).thenReturn(mockLeafState);
+ when(mockLeafState.exists()).thenReturn(true);
+ when(mockLeafState.getProperty(anyString())).thenReturn(mockLeafPropertyState);
+ doReturn(Lists.newArrayList(mockLeafPropertyState)).when(mockLeafState).getProperties();
+ doReturn(Type.BINARY).when(mockLeafPropertyState).getType();
+ when(mockLeafPropertyState.getValue(Type.BINARY)).thenReturn(mockBlob);
+ when(mockBlob.getContentIdentity()).thenReturn(testNodeId);
+ }
+
+ @Before
+ public void setup() throws IOException, InstanceAlreadyExistsException,
+ MBeanRegistrationException, NotCompliantMBeanException, RepositoryException {
+ // Set up JMX connection and mbean
+ final JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi");
+ final JMXConnector connector = JMXConnectorFactory.connect(url, null);
+ final MBeanServerConnection connection = connector.getMBeanServerConnection();
+ mBean = JMX.newMBeanProxy(connection, mBeanName, S3DataStoreStatsMBean.class, true);
+
+ defaultS3ds = mock(S3DataStore.class);
+ autoSyncMockS3ds = new CustomBackendS3DataStore(new InMemoryBackend());
+ autoSyncMockS3ds.init(folder.newFolder().getAbsolutePath());
+ manualSyncMockS3ds = new CustomBackendS3DataStore(new ManuallySyncingInMemoryBackend());
+ manualSyncMockS3ds.init(folder.newFolder().getAbsolutePath());
+ }
+
+ @After
+ public void teardown() throws InstanceNotFoundException, MBeanRegistrationException {
+ jmxServer.unregisterMBean(mBeanName);
+ }
+
+ @Test
+ public void testGetActiveS3FileSyncMetricExists() throws RepositoryException, IOException, MalformedObjectNameException,
+ InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException {
+ jmxServer.registerMBean(new S3DataStoreStats(defaultS3ds, mockNodeStore), mBeanName);
+
+ assert(0 == mBean.getActiveSyncs());
+ }
+
+ @Test
+ public void testGetSingleActiveS3FileSyncMetric() throws IOException, RepositoryException, MalformedObjectNameException,
+ InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException {
+ jmxServer.registerMBean(new S3DataStoreStats(manualSyncMockS3ds, mockNodeStore), mBeanName);
+
+ DataRecord record = null;
+ try {
+ record = manualSyncMockS3ds.addRecord(new StringInputStream("test"));
+ assert(1 == mBean.getActiveSyncs());
+ }
+ finally {
+ if (null != record) {
+ manualSyncMockS3ds.deleteRecord(record.getIdentifier());
+ }
+ }
+
+ ((ManuallySyncingInMemoryBackend) manualSyncMockS3ds.getBackend()).clearInProgressWrites();
+
+ assert(0 == mBean.getActiveSyncs());
+ }
+
+ @Test
+ public void testGetMultilpleActiveS3FileSyncMetric() throws IOException, RepositoryException, MalformedObjectNameException,
+ InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException {
+ jmxServer.registerMBean(new S3DataStoreStats(manualSyncMockS3ds, mockNodeStore), mBeanName);
+
+ final Set records = Sets.newHashSet();
+ try {
+ records.add(manualSyncMockS3ds.addRecord(new StringInputStream("test1")));
+ records.add(manualSyncMockS3ds.addRecord(new StringInputStream("test2")));
+ records.add(manualSyncMockS3ds.addRecord(new StringInputStream("test3")));
+
+ assert (3 == mBean.getActiveSyncs());
+ }
+ finally {
+ for (final DataRecord record : records) {
+ manualSyncMockS3ds.deleteRecord(record.getIdentifier());
+ }
+ }
+
+ ((ManuallySyncingInMemoryBackend) manualSyncMockS3ds.getBackend()).clearInProgressWrites();
+
+ assert(0 == mBean.getActiveSyncs());
+ }
+
+ @Test
+ public void testIsFileSyncedMetricExists() throws IOException, RepositoryException, InstanceAlreadyExistsException,
+ MBeanRegistrationException, NotCompliantMBeanException {
+ jmxServer.registerMBean(new S3DataStoreStats(defaultS3ds, mockNodeStore), mBeanName);
+
+ assertFalse(mBean.isFileSynced(testNodePathName));
+ }
+
+ @Test
+ public void testIsFileSyncedNullFileReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException,
+ MBeanRegistrationException, NotCompliantMBeanException {
+ jmxServer.registerMBean(new S3DataStoreStats(defaultS3ds, mockNodeStore), mBeanName);
+
+ assertFalse(mBean.isFileSynced(null));
+ }
+
+ @Test
+ public void testIsFileSyncedEmptyStringReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException,
+ MBeanRegistrationException, NotCompliantMBeanException {
+ jmxServer.registerMBean(new S3DataStoreStats(defaultS3ds, mockNodeStore), mBeanName);
+
+ assertFalse(mBean.isFileSynced(""));
+ }
+
+ @Test
+ public void testIsFileSyncedInvalidFilenameReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException,
+ MBeanRegistrationException, NotCompliantMBeanException {
+ jmxServer.registerMBean(new S3DataStoreStats(defaultS3ds, mockNodeStore), mBeanName);
+
+ assertFalse(mBean.isFileSynced("invalid"));
+ }
+
+ @Test
+ public void testIsFileSyncedFileNotAddedReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException,
+ MBeanRegistrationException, NotCompliantMBeanException {
+ jmxServer.registerMBean(new S3DataStoreStats(autoSyncMockS3ds, mockNodeStore), mBeanName);
+
+ assertFalse(mBean.isFileSynced(testNodePathName));
+ }
+
+ @Test
+ public void testIsFileSyncedSyncIncompleteReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException,
+ MBeanRegistrationException, NotCompliantMBeanException {
+ jmxServer.registerMBean(new S3DataStoreStats(manualSyncMockS3ds, mockNodeStore), mBeanName);
+
+ DataRecord record = null;
+ try {
+ record = manualSyncMockS3ds.addRecord(new StringInputStream(testNodePathContents));
+ assertFalse(mBean.isFileSynced(testNodePathName));
+ }
+ finally {
+ if (null != record) {
+ manualSyncMockS3ds.deleteRecord(record.getIdentifier());
+ }
+ }
+ }
+
+ @Test
+ public void testIsFileSyncedSyncCompleteReturnsTrue() throws IOException, RepositoryException, InstanceAlreadyExistsException,
+ MBeanRegistrationException, NotCompliantMBeanException {
+ jmxServer.registerMBean(new S3DataStoreStats(autoSyncMockS3ds, mockNodeStore), mBeanName);
+
+ DataRecord record = null;
+ try {
+ record = autoSyncMockS3ds.addRecord(new StringInputStream(testNodePathContents));
+ assert(mBean.isFileSynced(testNodePathName));
+ }
+ finally {
+ if (null != record) {
+ autoSyncMockS3ds.deleteRecord(record.getIdentifier());
+ }
+ }
+ }
+
+ @Test
+ public void testIsFileSyncedFileDeletedReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException,
+ MBeanRegistrationException, NotCompliantMBeanException {
+ jmxServer.registerMBean(new S3DataStoreStats(autoSyncMockS3ds, mockNodeStore), mBeanName);
+
+ DataRecord record = null;
+ try {
+ record = autoSyncMockS3ds.addRecord(new StringInputStream(testNodePathContents));
+ }
+ finally {
+ if (null != record) {
+ autoSyncMockS3ds.deleteRecord(record.getIdentifier());
+ }
+ }
+
+ assertFalse(mBean.isFileSynced(testNodePathName));
+ }
+
+
+ // A mock S3DataStore that allows us to replace the default
+ // S3Backend with our own backend, for test purposes only.
+ private class CustomBackendS3DataStore extends S3DataStore {
+ private Backend _localBackend;
+
+ CustomBackendS3DataStore(final Backend backend) { _localBackend = backend; }
+
+ @Override
+ protected Backend createBackend() {
+ return _localBackend;
+ }
+ }
+
+ // A mock Backend implementation that uses a Map to keep track of what
+ // records have been added and removed, for test purposes only.
+ private class InMemoryBackend implements Backend {
+ final Map _backend = Maps.newHashMap();
+
+ @Override
+ public void init(CachingDataStore store, String homeDir, String config) throws DataStoreException {
+
+ }
+
+ @Override
+ public InputStream read(DataIdentifier identifier) throws DataStoreException {
+ try {
+ return new FileInputStream(_backend.get(identifier));
+ }
+ catch (FileNotFoundException e) {
+ throw new DataStoreException(e);
+ }
+ }
+
+ @Override
+ public long getLength(DataIdentifier identifier) throws DataStoreException {
+ return _backend.get(identifier).length();
+ }
+
+ @Override
+ public long getLastModified(DataIdentifier identifier) throws DataStoreException {
+ return _backend.get(identifier).lastModified();
+ }
+
+ @Override
+ public void write(DataIdentifier identifier, File file) throws DataStoreException {
+ _backend.put(identifier, file);
+ }
+
+ @Override
+ public void writeAsync(final DataIdentifier identifier, final File file, AsyncUploadCallback callback) throws DataStoreException {
+ write(identifier, file);
+ callback.onSuccess(new AsyncUploadResult(identifier, file));
+ }
+
+ @Override
+ public Iterator getAllIdentifiers() throws DataStoreException {
+ return _backend.keySet().iterator();
+ }
+
+ @Override
+ public boolean exists(DataIdentifier identifier, boolean touch) throws DataStoreException {
+ if (_backend.containsKey(identifier) && touch) {
+ touch(identifier, new DateTime().getMillis());
+ }
+ return exists(identifier);
+ }
+
+ @Override
+ public boolean exists(DataIdentifier identifier) throws DataStoreException {
+ return _backend.containsKey(identifier);
+ }
+
+ @Override
+ public void touch(DataIdentifier identifier, long minModifiedDate) throws DataStoreException {
+
+ }
+
+ @Override
+ public void touchAsync(DataIdentifier identifier, long minModifiedDate, AsyncTouchCallback callback) throws DataStoreException {
+ callback.onSuccess(new AsyncTouchResult(identifier));
+ }
+
+ @Override
+ public void close() throws DataStoreException {
+
+ }
+
+ @Override
+ public Set deleteAllOlderThan(long timestamp) throws DataStoreException {
+ final Set toDelete = Sets.newHashSet();
+ for (final DataIdentifier identifier : _backend.keySet()) {
+ if (_backend.get(identifier).lastModified() < timestamp) {
+ toDelete.add(identifier);
+ }
+ }
+ for (final DataIdentifier identifier : toDelete) {
+ _backend.remove(identifier);
+ }
+ return toDelete;
+ }
+
+ @Override
+ public void deleteRecord(DataIdentifier identifier) throws DataStoreException {
+ if (_backend.containsKey(identifier)) {
+ _backend.remove(identifier);
+ }
+ }
+ }
+
+ // A modified InMemoryBackend that, when writeAsync() is called, does not
+ // actually store the record but keeps track that it was intended to be
+ // stored, and allows the test to tell it when it expects the record
+ // to be "synced".
+ private class ManuallySyncingInMemoryBackend extends InMemoryBackend {
+ final Map inProgessWrites = Maps.newHashMap();
+ final Map asyncCallbacks = Maps.newHashMap();
+
+ @Override
+ public void writeAsync(final DataIdentifier identifier, final File file, AsyncUploadCallback callback) throws DataStoreException {
+ inProgessWrites.put(identifier, file);
+ asyncCallbacks.put(identifier, callback);
+ }
+
+ void clearInProgressWrites() {
+ for (final DataIdentifier identifier : inProgessWrites.keySet()) {
+ final File file = inProgessWrites.get(identifier);
+ asyncCallbacks.get(identifier).onSuccess(new AsyncUploadResult(identifier, file));
+ _backend.put(identifier, file);
+ }
+ inProgessWrites.clear();
+ }
+ }
+}
diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
index 35c4206..3f2256e 100644
--- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
+++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
@@ -59,6 +59,8 @@ import org.apache.jackrabbit.commons.SimpleValueFactory;
import org.apache.jackrabbit.oak.api.Descriptors;
import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean;
import org.apache.jackrabbit.oak.cache.CacheStats;
import org.apache.jackrabbit.oak.osgi.ObserverTracker;
import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
@@ -69,6 +71,8 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore;
import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.S3DataStoreStats;
import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
@@ -635,6 +639,23 @@ public class SegmentNodeStoreService extends ProxyNodeStore
));
}
+ // Expose statistics about S3DataStore, if one is being used
+
+ if (null != blobStore && blobStore instanceof DataStoreBlobStore) {
+ final DataStoreBlobStore dsbs = (DataStoreBlobStore)blobStore;
+ if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof S3DataStore) {
+ final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore();
+ final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds, segmentNodeStore);
+ registrations.add(registerMBean(
+ whiteboard,
+ S3DataStoreStatsMBean.class,
+ s3dsStats,
+ S3DataStoreStatsMBean.TYPE,
+ s3dsStats.getClass().getSimpleName()
+ ));
+ }
+ }
+
log.info("SegmentNodeStore initialized");
return true;
}
diff --git a/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java b/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
index 11abf49..66bd33e 100644
--- a/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
+++ b/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
@@ -59,6 +59,8 @@ import org.apache.jackrabbit.commons.SimpleValueFactory;
import org.apache.jackrabbit.oak.api.Descriptors;
import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean;
import org.apache.jackrabbit.oak.cache.CacheStats;
import org.apache.jackrabbit.oak.commons.PropertiesUtil;
import org.apache.jackrabbit.oak.osgi.ObserverTracker;
@@ -70,6 +72,8 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore;
import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.S3DataStoreStats;
import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
@@ -289,6 +293,7 @@ public class SegmentNodeStoreService extends ProxyNodeStore
private Registration stringCacheMBean;
private Registration fsgcMonitorMBean;
private Registration fileStoreStatsMBean;
+ private Registration s3DataStoreStatsRegistration;
private WhiteboardExecutor executor;
private boolean customBlobStore;
@@ -536,6 +541,23 @@ public class SegmentNodeStoreService extends ProxyNodeStore
scheduleWithFixedDelay(whiteboard, fsgcm, 1)
);
+ // Expose statistics about S3DataStore, if one is being used
+
+ if (null != blobStore && blobStore instanceof DataStoreBlobStore) {
+ final DataStoreBlobStore dsbs = (DataStoreBlobStore)blobStore;
+ if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof S3DataStore) {
+ final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore();
+ final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds, segmentNodeStore);
+ s3DataStoreStatsRegistration = registerMBean(
+ whiteboard,
+ S3DataStoreStatsMBean.class,
+ s3dsStats,
+ S3DataStoreStatsMBean.TYPE,
+ s3dsStats.getClass().getSimpleName()
+ );
+ }
+ }
+
// Register a factory service to expose the FileStore
providerRegistration = context.getBundleContext().registerService(SegmentStoreProvider.class.getName(), this, null);
@@ -714,6 +736,10 @@ public class SegmentNodeStoreService extends ProxyNodeStore
fileStoreStatsMBean.unregister();
fileStoreStatsMBean = null;
}
+ if (s3DataStoreStatsRegistration != null) {
+ s3DataStoreStatsRegistration.unregister();
+ s3DataStoreStatsRegistration = null;
+ }
if (executor != null) {
executor.stop();
executor = null;