Index: oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java	(revision f4f4e01dd8f708801883260481d37fdcd5868deb)
+++ oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java	(revision )
@@ -59,6 +59,8 @@
 import org.apache.jackrabbit.oak.api.Descriptors;
 import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
 import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean;
 import org.apache.jackrabbit.oak.cache.CacheStats;
 import org.apache.jackrabbit.oak.commons.PropertiesUtil;
 import org.apache.jackrabbit.oak.osgi.ObserverTracker;
@@ -70,6 +72,8 @@
 import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
 import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.S3DataStoreStats;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
 import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
@@ -289,6 +293,7 @@
     private Registration stringCacheMBean;
     private Registration fsgcMonitorMBean;
     private Registration fileStoreStatsMBean;
+    private Registration s3DataStoreStatsRegistration;
     private WhiteboardExecutor executor;
     private boolean customBlobStore;
 
@@ -536,6 +541,23 @@
                 scheduleWithFixedDelay(whiteboard, fsgcm, 1)
         );
 
+        // Expose statistics about S3DataStore, if one is being used
+
+        if (null != blobStore && blobStore instanceof DataStoreBlobStore) {
+            final DataStoreBlobStore dsbs = (DataStoreBlobStore)blobStore;
+            if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof S3DataStore) {
+                final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore();
+                final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds, segmentNodeStore);
+                s3DataStoreStatsRegistration = registerMBean(
+                        whiteboard,
+                        S3DataStoreStatsMBean.class,
+                        s3dsStats,
+                        S3DataStoreStatsMBean.TYPE,
+                        s3dsStats.getClass().getSimpleName()
+                );
+            }
+        }
+
         // Register a factory service to expose the FileStore
 
         providerRegistration = context.getBundleContext().registerService(SegmentStoreProvider.class.getName(), this, null);
@@ -713,6 +735,10 @@
         if (fileStoreStatsMBean != null) {
             fileStoreStatsMBean.unregister();
             fileStoreStatsMBean = null;
+        }
+        if (s3DataStoreStatsRegistration != null) {
+            s3DataStoreStatsRegistration.unregister();
+            s3DataStoreStatsRegistration = null;
         }
         if (executor != null) {
             executor.stop();
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/S3DataStoreStats.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/S3DataStoreStats.java	(revision )
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/S3DataStoreStats.java	(revision )
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob.datastore;
+
+import com.google.common.base.Strings;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean;
+import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+
+import java.util.List;
+
+public class S3DataStoreStats extends AnnotatedStandardMBean implements S3DataStoreStatsMBean {
+    private final S3DataStore s3ds;
+
+    protected NodeStore nodeStore;
+
+    public S3DataStoreStats(final S3DataStore s3ds, final NodeStore nodeStore) {
+        super(S3DataStoreStatsMBean.class);
+        this.s3ds = s3ds;
+        this.nodeStore = nodeStore;
+    }
+
+    /**
+     * Obtains the number of records that are in the process
+     * of being "synced", meaning they are either scheduled to
+     * be copied to S3 or are actively being copied to S3
+     * but the copy of these files has not yet completed.
+     *
+     * @return number of syncs in progress (active).
+     */
+    @Override
+    public long getActiveSyncs() {
+        return s3ds.getPendingUploads().size();
+    }
+
+    /**
+     * Determines whether a file-like entity with the given name
+     * has been "synced" (completely copied) to S3.
+     *
+     * Determination of "synced":
+     * - A nodeName of null or "" is always "not synced".
+     * - A nodeName that does not map to a valid node is always "not synced".
+     * - If the node for this nodeName does not have a binary property,
+     * this node is always "not synced" since such a node would never be
+     * copied to S3.
+     * - If the node for this nodeName is not in the nodeStore, this node is
+     * always "not synced".
+     * - Otherwise, the state is "synced" if the corresponding blob is
+     * completely stored in S3.
+     *
+     * @param nodePathName - Path to the entity to check.  This is
+     *                       a node path, not an external file path.
+     * @return true if the file is synced to S3.
+     */
+    @Override
+    public boolean isFileSynced(final String nodePathName) {
+        if (Strings.isNullOrEmpty(nodePathName)) {
+            return false;
+        }
+
+        if (null == nodeStore) {
+            return false;
+        }
+
+        final NodeState leafNode = findLeafNode(nodePathName);
+        if (!leafNode.exists()) {
+            return false;
+        }
+
+        boolean nodeHasBinaryProperties = false;
+        for (final PropertyState propertyState : leafNode.getProperties()) {
+            nodeHasBinaryProperties |= (propertyState.getType() == Type.BINARY || propertyState.getType() == Type.BINARIES);
+            try {
+                if (propertyState.getType() == Type.BINARY) {
+                    final Blob blob = (Blob) propertyState.getValue(propertyState.getType());
+                    if (null == blob || !haveRecordForBlob(blob)) {
+                        return false;
+                    }
+                } else if (propertyState.getType() == Type.BINARIES) {
+                    final List<Blob> blobs = (List<Blob>) propertyState.getValue(propertyState.getType());
+                    if (null == blobs) {
+                        return false;
+                    }
+                    for (final Blob blob : blobs) {
+                        if (!haveRecordForBlob(blob)) {
+                            return false;
+                        }
+                    }
+                }
+            }
+            catch (ClassCastException e) {
+                return false;
+            }
+        }
+
+        // If we got here and nodeHasBinaryProperties is true,
+        // it means at least one binary property was found for
+        // the leaf node and that we were able to locate a
+        // records for the binaries found.
+        return nodeHasBinaryProperties;
+    }
+
+    private NodeState findLeafNode(final String nodePathName) {
+        final Iterable<String> pathNodes = PathUtils.elements(PathUtils.getParentPath(nodePathName));
+        final String leafNodeName = PathUtils.getName(nodePathName);
+
+        NodeState currentNode = nodeStore.getRoot();
+        for (final String pathNodeName : pathNodes) {
+            final NodeState childNode = currentNode.getChildNode(pathNodeName);
+            if (!childNode.exists()) {
+                break;
+            }
+            currentNode = childNode;
+        }
+        return currentNode.getChildNode(leafNodeName);
+    }
+
+    private boolean haveRecordForBlob(final Blob blob) {
+        final String fullBlobId = blob.getContentIdentity();
+        if (!Strings.isNullOrEmpty(fullBlobId)
+            && !InMemoryDataRecord.isInstance(fullBlobId)) {
+            String blobId = DataStoreBlobStore.BlobId.of(fullBlobId).blobId;
+            return s3ds.haveRecordForIdentifier(blobId);
+        }
+        return false;
+    }
+}
Index: oak-blob-cloud/pom.xml
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- oak-blob-cloud/pom.xml	(revision f4f4e01dd8f708801883260481d37fdcd5868deb)
+++ oak-blob-cloud/pom.xml	(revision )
@@ -41,7 +41,7 @@
                 <artifactId>maven-bundle-plugin</artifactId>
                 <configuration>
                     <instructions>
-                        <Export-Package>org.apache.jackrabbit.oak.blob.cloud.aws.s3</Export-Package>
+                        <Export-Package>org.apache.jackrabbit.oak.blob.cloud.aws.s3,org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats</Export-Package>
                         <DynamicImport-Package>sun.io</DynamicImport-Package>
                     </instructions>
                 </configuration>
@@ -101,6 +101,13 @@
             <version>${jackrabbit.version}</version>
         </dependency>
 
+        <!-- Dependencies to other Oak components -->
+        <dependency>
+            <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>oak-commons</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <!-- Amazon AWS dependency -->
         <dependency>
             <groupId>com.amazonaws</groupId>
@@ -138,6 +145,12 @@
         <dependency>
             <groupId>ch.qos.logback</groupId>
             <artifactId>logback-classic</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>1.10.19</version>
             <scope>test</scope>
         </dependency>
     </dependencies>
Index: oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java	(revision )
+++ oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/stats/S3DataStoreStatsMBean.java	(revision )
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats;
+
+/**
+ * MBean for JMX statistics pertaining to an S3DataStore.
+ */
+public interface S3DataStoreStatsMBean {
+    String TYPE = "S3DataStoreStats";
+
+    /**
+     * Obtains the number of records that are in the process
+     * of being "synced", meaning they are either scheduled to
+     * be copied to S3 or are actively being copied to S3
+     * but the copy of these files has not yet completed.
+     *
+     * @return number of syncs in progress (active).
+     */
+    long getActiveSyncs();
+
+    /**
+     * Determines whether all blobs in the given node path
+     * has been "synced" (completely copied) to S3.
+     *
+     * @param nodePathName - OAK path to the entity to check.  This is
+     *                       the repository node path, not an external file path.
+     * @return true if the file is synced to S3.
+     */
+    boolean isFileSynced(final String nodePathName);
+}
Index: oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java	(revision f4f4e01dd8f708801883260481d37fdcd5868deb)
+++ oak-blob-cloud/src/main/java/org/apache/jackrabbit/oak/blob/cloud/aws/s3/S3DataStore.java	(revision )
@@ -17,14 +17,26 @@
 package org.apache.jackrabbit.oak.blob.cloud.aws.s3;
 
 import java.util.Properties;
+
+import com.google.common.base.Strings;
 import org.apache.jackrabbit.core.data.Backend;
 import org.apache.jackrabbit.core.data.CachingDataStore;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * An Amazon S3 data store.
  */
 public class S3DataStore extends CachingDataStore {
+
+    /**
+     * Logger instance.
+     */
+    private static final Logger LOG = LoggerFactory.getLogger(S3DataStore.class);
+
     protected Properties properties;
 
     @Override
@@ -46,5 +58,25 @@
      */
     public void setProperties(Properties properties) {
         this.properties = properties;
+    }
+
+    /**
+     * Look in the backend for a record matching the given identifier.  Returns true
+     * if such a record exists.
+     *
+     * @param identifier - A path-like identifier that represents the path to
+     *                   the record in question.
+     * @return true if a record for the provided identifier can be found.
+     */
+    public boolean haveRecordForIdentifier(final String identifier) {
+        try {
+            if (!Strings.isNullOrEmpty(identifier)) {
+                return this.getBackend().exists(new DataIdentifier(identifier));
+            }
+        }
+        catch (DataStoreException e) {
+            LOG.warn(String.format("Data Store Exception caught checking for %s in pending uploads", identifier), e);
+        }
+        return false;
     }
 }
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java	(revision f4f4e01dd8f708801883260481d37fdcd5868deb)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java	(revision )
@@ -59,6 +59,8 @@
 import org.apache.jackrabbit.oak.api.Descriptors;
 import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
 import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean;
 import org.apache.jackrabbit.oak.cache.CacheStats;
 import org.apache.jackrabbit.oak.osgi.ObserverTracker;
 import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
@@ -68,6 +70,8 @@
 import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore;
 import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
 import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.S3DataStoreStats;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
@@ -633,6 +637,23 @@
                     BlobGCMBean.TYPE,
                     "Segment node store blob garbage collection"
             ));
+        }
+
+        // Expose statistics about S3DataStore, if one is being used
+
+        if (null != blobStore && blobStore instanceof DataStoreBlobStore) {
+            final DataStoreBlobStore dsbs = (DataStoreBlobStore)blobStore;
+            if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof S3DataStore) {
+                final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore();
+                final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds, segmentNodeStore);
+                registrations.add(registerMBean(
+                        whiteboard,
+                        S3DataStoreStatsMBean.class,
+                        s3dsStats,
+                        S3DataStoreStatsMBean.TYPE,
+                        s3dsStats.getClass().getSimpleName()
+                ));
+            }
         }
 
         log.info("SegmentNodeStore initialized");
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java	(revision f4f4e01dd8f708801883260481d37fdcd5868deb)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java	(revision )
@@ -60,6 +60,8 @@
 import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
 import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
 import org.apache.jackrabbit.oak.api.jmx.PersistentCacheStatsMBean;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean;
 import org.apache.jackrabbit.oak.cache.CacheStats;
 import org.apache.jackrabbit.oak.commons.PropertiesUtil;
 import org.apache.jackrabbit.oak.osgi.ObserverTracker;
@@ -71,6 +73,8 @@
 import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore;
 import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.S3DataStoreStats;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
 import org.apache.jackrabbit.oak.plugins.document.persistentCache.CacheType;
 import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCacheStats;
@@ -794,6 +798,21 @@
                                                         ClusterRepositoryInfo.getOrCreateId(nodeStore));
             registrations.add(registerMBean(whiteboard, BlobGCMBean.class, new BlobGC(gc, executor),
                     BlobGCMBean.TYPE, "Document node store blob garbage collection"));
+        }
+
+        // Expose statistics about S3DataStore, if one is being used
+
+        if (null != store.getBlobStore() && store.getBlobStore() instanceof DataStoreBlobStore) {
+            final DataStoreBlobStore dsbs = (DataStoreBlobStore)store.getBlobStore();
+            if (null != dsbs.getDataStore() && dsbs.getDataStore() instanceof S3DataStore) {
+                final S3DataStore s3ds = (S3DataStore)dsbs.getDataStore();
+                final S3DataStoreStats s3dsStats = new S3DataStoreStats(s3ds, nodeStore);
+                registrations.add(registerMBean(whiteboard,
+                        S3DataStoreStatsMBean.class,
+                        s3dsStats,
+                        S3DataStoreStatsMBean.TYPE,
+                        s3dsStats.getClass().getSimpleName()));
+            }
         }
 
         RevisionGC revisionGC = new RevisionGC(new Runnable() {
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/S3DataStoreStatsTest.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/S3DataStoreStatsTest.java	(revision )
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/S3DataStoreStatsTest.java	(revision )
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.blob.datastore;
+
+import static org.apache.commons.codec.binary.Hex.encodeHexString;
+import static org.apache.commons.io.FileUtils.copyInputStreamToFile;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assume.assumeFalse;
+import static org.junit.Assume.assumeNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.amazonaws.util.StringInputStream;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.output.NullOutputStream;
+import org.apache.jackrabbit.core.data.AsyncUploadCallback;
+import org.apache.jackrabbit.core.data.Backend;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3Backend;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3Constants;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.S3DataStore;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.Utils;
+import org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats.S3DataStoreStatsMBean;
+import org.apache.jackrabbit.oak.commons.FileIOUtils;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import javax.jcr.RepositoryException;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.InstanceNotFoundException;
+import javax.management.JMX;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerConnection;
+import javax.management.MBeanServerFactory;
+import javax.management.MalformedObjectNameException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+
+public class S3DataStoreStatsTest {
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder(new File("target"));
+
+    private static Properties properties;
+    private static MBeanServer jmxServer;
+    private static ObjectName mBeanName;
+    private static File syncfile1;
+    private static NodeStore mockNodeStore;
+
+    private S3DataStoreStatsMBean mBean;
+
+    @BeforeClass
+    public static void preClass() throws IOException, RepositoryException, MalformedObjectNameException,
+            NoSuchAlgorithmException
+    {
+
+        // This will cause all tests in this file to be ignored if JMX properties
+        // are not passed into the test execution.
+        //
+        // If you want to run this unit test suite you will need to
+        // pass the following settings into the command-line.
+        // Example:
+        //   -Djava.rmi.server.hostname=localhost
+        //   -Dcom.sun.management.jmxremote.port=9999
+        //   -Dcom.sun.management.jmxremote.ssl=false
+        //   -Dcom.sun.management.jmxremote.authenticate=false
+/*        for (final String property : Lists.newArrayList("java.rmi.server.hostname",
+                "com.sun.management.jmxremote.port",
+                "com.sun.management.jmxremote.ssl",
+                "com.sun.management.jmxremote.authenticate")) {
+            assumeFalse(Strings.isNullOrEmpty(System.getProperty(property)));
+        }*/
+
+        // This will cause all tests in this file to be ignored if no JMX
+        // server could be found.
+        jmxServer = ManagementFactory.getPlatformMBeanServer();
+        if (null == jmxServer) {
+            jmxServer = MBeanServerFactory.newMBeanServer();
+        }
+        assumeNotNull(jmxServer);
+
+        // This will cause all tests in this file to be ignored if no S3
+        // configuration has been provided.
+        //
+        // If you want to run this unit test suite you will need to
+        // pass the following setting into the command-line.
+        // Example:
+        //   -Dconfig=/path/to/aws/properties
+        //
+        // Properties file uses the same format as for S3DataStore configuration.
+        //assumeFalse(Strings.isNullOrEmpty(System.getProperty("config")));
+
+        //TODO: Move to oak-blob-cloud
+        properties = Utils.readConfig(System.getProperty("config"));
+
+        mBeanName = new ObjectName("org.apache.jackrabbit.oak.blob.cloud.aws.s3.stats:type=S3DataStoreStats");
+    }
+
+    @Before
+    public void setup()
+        throws IOException, InstanceAlreadyExistsException, MBeanRegistrationException,
+        NotCompliantMBeanException, NoSuchAlgorithmException {
+        System.setProperty("java.rmi.server.hostname", "localhost");
+        System.setProperty("com.sun.management.jmxremote.port", "9999");
+        System.setProperty("com.sun.management.jmxremote.ssl", "false");
+        System.setProperty("com.sun.management.jmxremote.authenticate", "false");
+
+        syncfile1 = folder.newFile();
+        copyInputStreamToFile(randomStream(0, 16384), folder.newFile());
+        mockNodeStore = mock(NodeStore.class);
+        MessageDigest digest = MessageDigest.getInstance("SHA-1");
+        OutputStream output = new DigestOutputStream(new FileOutputStream(syncfile1), digest);
+        try {
+            IOUtils.copyLarge(new FileInputStream(syncfile1), output);
+        } finally {
+            output.close();
+        }
+        String syncfile1Id = encodeHexString(digest.digest());
+
+        final NodeState mockRootState = mock(NodeState.class);
+        final NodeState mockLeafState = mock(NodeState.class);
+        final PropertyState mockLeafPropertyState = mock(PropertyState.class);
+        final Blob mockBlob = mock(Blob.class);
+        when(mockNodeStore.getRoot()).thenReturn(mockRootState);
+        when(mockRootState.getChildNode(anyString())).thenReturn(mockLeafState);
+        when(mockLeafState.exists()).thenReturn(true);
+        when(mockLeafState.getProperty(anyString())).thenReturn(mockLeafPropertyState);
+        doReturn(Lists.newArrayList(mockLeafPropertyState)).when(mockLeafState).getProperties();
+        doReturn(Type.BINARY).when(mockLeafPropertyState).getType();
+        when(mockLeafPropertyState.getValue(Type.BINARY)).thenReturn(mockBlob);
+        when(mockBlob.getContentIdentity()).thenReturn(syncfile1Id);
+
+        // Set up JMX connection and mbean
+        final JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi");
+        final JMXConnector connector = JMXConnectorFactory.connect(url, null);
+        final MBeanServerConnection connection = connector.getMBeanServerConnection();
+        mBean = JMX.newMBeanProxy(connection, mBeanName, S3DataStoreStatsMBean.class, true);
+    }
+
+    @After
+    public void teardown() throws InstanceNotFoundException, MBeanRegistrationException {
+        System.clearProperty("java.rmi.server.hostname");
+        System.clearProperty("com.sun.management.jmxremote.port");
+        System.clearProperty("com.sun.management.jmxremote.ssl");
+        System.clearProperty("com.sun.management.jmxremote.authenticate");
+
+        jmxServer.unregisterMBean(mBeanName);
+    }
+
+    private S3Backend getMockS3Backend() throws DataStoreException {
+        S3Backend backend = mock(S3Backend.class, Mockito.CALLS_REAL_METHODS);
+        doNothing().when(backend).writeAsync(any(DataIdentifier.class), any(File.class), any(AsyncUploadCallback.class));
+        doNothing().when(backend).write(any(DataIdentifier.class), any(File.class));
+        return backend;
+    }
+
+    //TODO: Move to oak-blob-cloud and possibly do a MOCK here
+    private void setupTestS3DS(final S3DataStore s3ds) throws IOException, RepositoryException {
+        s3ds.setProperties(properties);
+        s3ds.setSecret((String) properties.get(S3Constants.SECRET_KEY));
+        s3ds.init(folder.newFolder().getAbsolutePath());
+    }
+
+    private S3DataStore getDefaultS3DS() throws IOException, RepositoryException {
+        final S3DataStore s3ds = new S3DataStore();
+        setupTestS3DS(s3ds);
+        return s3ds;
+    }
+
+    private S3DataStore getCustomBackendS3DS(final S3Backend backend) throws IOException, RepositoryException {
+        final S3DataStore s3ds = new CustomBackendS3DataStore(backend);
+        setupTestS3DS(s3ds);
+        return s3ds;
+    }
+
+    @Test
+    public void testGetActiveS3FileSyncMetricExists() throws RepositoryException, IOException, MalformedObjectNameException,
+            InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException {
+        final S3DataStore s3ds = getDefaultS3DS();
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds, mockNodeStore);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        assert(0 == mBean.getActiveSyncs());
+    }
+
+    @Test
+    public void testGetSingleActiveS3FileSyncMetric() throws IOException, RepositoryException, MalformedObjectNameException,
+            InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException {
+        final S3Backend backend = getMockS3Backend();
+        final S3DataStore s3ds = getCustomBackendS3DS(backend);
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds, mockNodeStore);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        DataRecord record = null;
+        try {
+            record = s3ds.addRecord(new StringInputStream("test"));
+            assert(1 == mBean.getActiveSyncs());
+        }
+        finally {
+            if (null != record) {
+                s3ds.deleteRecord(record.getIdentifier());
+            }
+        }
+        assert(0 == mBean.getActiveSyncs());
+    }
+
+    @Test
+    public void testGetMultilpleActiveS3FileSyncMetric() throws IOException, RepositoryException, MalformedObjectNameException,
+            InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException {
+        final S3Backend backend = getMockS3Backend();
+        final S3DataStore s3ds = getCustomBackendS3DS(backend);
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds, mockNodeStore);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        final Set<DataRecord> records = Sets.newHashSet();
+        try {
+            records.add(s3ds.addRecord(new StringInputStream("test1")));
+            records.add(s3ds.addRecord(new StringInputStream("test2")));
+            records.add(s3ds.addRecord(new StringInputStream("test3")));
+
+            assert (3 == mBean.getActiveSyncs());
+        }
+        finally {
+            for (final DataRecord record : records) {
+                s3ds.deleteRecord(record.getIdentifier());
+            }
+        }
+
+        assert(0 == mBean.getActiveSyncs());
+    }
+
+    @Test
+    public void testIsFileSyncedMetricExists() throws IOException, RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3DataStore s3ds = getDefaultS3DS();
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds, mockNodeStore);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        assertFalse(mBean.isFileSynced(syncfile1.getName()));
+    }
+
+    @Test
+    public void testIsFileSyncedNullFileReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3DataStore s3ds = getDefaultS3DS();
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds, mockNodeStore);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        assertFalse(mBean.isFileSynced(null));
+    }
+
+    @Test
+    public void testIsFileSyncedEmptyStringReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3DataStore s3ds = getDefaultS3DS();
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds, mockNodeStore);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        assertFalse(mBean.isFileSynced(""));
+    }
+
+    @Test
+    public void testIsFileSyncedInvalidFilenameReturnsFalse()  throws IOException, RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3DataStore s3ds = getDefaultS3DS();
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds, mockNodeStore);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        assertFalse(mBean.isFileSynced("invalid"));
+    }
+
+    @Test
+    public void testIsFileSyncedFileNotAddedReturnsFalse()  throws IOException, RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3Backend backend = getMockS3Backend();
+        final S3DataStore s3ds = getCustomBackendS3DS(backend);
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds, mockNodeStore);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        assertFalse(mBean.isFileSynced(syncfile1.getName()));
+    }
+
+    @Test
+    public void testIsFileSyncedSyncIncompleteReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3Backend backend = getMockS3Backend();
+        final S3DataStore s3ds = getCustomBackendS3DS(backend);
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds, mockNodeStore);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        FileInputStream inputStream = null;
+        DataRecord record = null;
+        try {
+            inputStream = new FileInputStream(syncfile1);
+            record = s3ds.addRecord(new FileInputStream(syncfile1));
+
+            assertFalse(mBean.isFileSynced(syncfile1.getName()));
+
+        }
+        finally {
+            if (null != record) {
+                s3ds.deleteRecord(record.getIdentifier());
+            }
+            if (null != inputStream) {
+                inputStream.close();
+            }
+        }
+    }
+
+    @Test
+    public void testIsFileSyncedSyncCompleteReturnsTrue() throws IOException, RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3DataStore s3ds = getDefaultS3DS();
+
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds, mockNodeStore);
+
+        jmxServer.registerMBean(stats, mBeanName);
+
+        FileInputStream inputStream = null;
+        DataRecord record = null;
+        try {
+            inputStream = new FileInputStream(syncfile1);
+
+            record = s3ds.addRecord(new FileInputStream(syncfile1));
+
+            int tries = 0;
+            while (stats.getActiveSyncs() > 0 && 50 > tries++) {
+                try {
+                    Thread.sleep(100);
+                }
+                catch (InterruptedException e) { }
+            }
+
+            assert(mBean.isFileSynced(syncfile1.getName()));
+        }
+        finally {
+            if (null != record) {
+                s3ds.deleteRecord(record.getIdentifier());
+            }
+            if (null != inputStream) {
+                inputStream.close();
+            }
+        }
+    }
+
+    @Test
+    public void testIsFileSyncedFileDeletedReturnsFalse() throws IOException, RepositoryException, InstanceAlreadyExistsException,
+            MBeanRegistrationException, NotCompliantMBeanException {
+        final S3DataStore s3ds = getDefaultS3DS();
+        final S3DataStoreStats stats = new S3DataStoreStats(s3ds, mockNodeStore);
+        jmxServer.registerMBean(stats, mBeanName);
+
+        FileInputStream inputStream = null;
+        DataRecord record = null;
+        try {
+            inputStream = new FileInputStream(syncfile1);
+
+            record = s3ds.addRecord(new FileInputStream(syncfile1));
+
+            int tries = 0;
+            while (stats.getActiveSyncs() > 0 && 50 > tries++) {
+                try {
+                    Thread.sleep(100);
+                }
+                catch (InterruptedException e) { }
+            }
+        }
+        finally {
+            if (null != record) {
+                s3ds.deleteRecord(record.getIdentifier());
+            }
+            if (null != inputStream) {
+                inputStream.close();
+            }
+        }
+
+        assertFalse(mBean.isFileSynced(syncfile1.getName()));
+    }
+
+    private class CustomBackendS3DataStore extends S3DataStore {
+        private S3Backend _localBackend;
+        CustomBackendS3DataStore(final S3Backend backend) { _localBackend = backend; }
+        @Override
+        protected Backend createBackend() {
+            if(properties != null){
+                _localBackend.setProperties(properties);
+            }
+            return _localBackend;
+        }
+    }
+
+    static InputStream randomStream(int seed, int size) {
+        Random r = new Random(seed);
+        byte[] data = new byte[size];
+        r.nextBytes(data);
+        return new ByteArrayInputStream(data);
+    }
+}
