diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobStoreStats.java oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobStoreStats.java
new file mode 100644
index 0000000..d455f87
--- /dev/null
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobStoreStats.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.management.openmbean.CompositeData;
+
+import org.apache.commons.io.input.CountingInputStream;
+import org.apache.jackrabbit.oak.stats.HistogramStats;
+import org.apache.jackrabbit.oak.stats.MeterStats;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.apache.jackrabbit.stats.TimeSeriesStatsUtil;
+
+import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
+
+public class BlobStoreStats implements BlobStoreStatsMBean {
+    public static final BlobStoreStats NOOP = new BlobStoreStats(StatisticsProvider.NOOP);
+
+    private static final String BLOB_UPLOADS = "BLOB_UPLOADS";
+    private static final String BLOB_DOWNLOADS = "BLOB_DOWNLOADS";
+
+    private final StatisticsProvider statisticsProvider;
+    private final HistogramStats uploadHisto;
+    private final MeterStats uploadSizeMeter;
+    private final MeterStats uploadTimeMeter;
+    private final AtomicLong uploadTotal = new AtomicLong();
+    private final AtomicLong uploadTime = new AtomicLong();
+    private volatile int uploadCount;
+
+    private final HistogramStats downloadHisto;
+    private final MeterStats downloadSizeMeter;
+    private final MeterStats downloadTimeMeter;
+    private final AtomicLong downloadTotal = new AtomicLong();
+    private volatile int downloadCount;
+
+    public BlobStoreStats(StatisticsProvider sp) {
+        this.statisticsProvider = sp;
+
+        this.uploadHisto = sp.getHistogram(BLOB_UPLOADS);
+        //TODO Need to expose an API in StatisticsProvider to register for avg
+        //That would give us upload and download *rate*
+        this.uploadSizeMeter = sp.getMeter("BLOB_UPLOAD_SIZE");
+        this.uploadTimeMeter = sp.getMeter("BLOB_UPLOAD_TIME");
+
+        this.downloadHisto = sp.getHistogram(BLOB_DOWNLOADS);
+        this.downloadSizeMeter = sp.getMeter("BLOB_DOWNLOAD_SIZE");
+        this.downloadTimeMeter = sp.getMeter("BLOB_DOWNLOAD_TIME");
+    }
+
+    public void uploaded(long startTimeInNanos, long size) {
+        uploadTotal.getAndAdd(size);
+        uploadHisto.update(size);
+        long timeTaken = System.nanoTime() - startTimeInNanos;
+        uploadTime.getAndAdd(timeTaken);
+        uploadCount++;
+
+        uploadSizeMeter.mark(size);
+        uploadTimeMeter.mark(timeTaken);
+    }
+
+    public InputStream wrap(InputStream in) {
+        final CountingInputStream cin = new CountingInputStream(in);
+        final long startTime = System.nanoTime();
+        return new FilterInputStream(cin){
+            @Override
+            public void close() throws IOException {
+                //We rely on close to determine how much was downloaded
+                //as once an InputStream is exposed its not possible to
+                //determine if the stream is actually used
+
+                //Download time might not be accurate as reading code might
+                //be processing also as it moved further in stream. So that
+                //overhead would add to the download time
+                super.close();
+                long size = cin.getByteCount();
+                long timeTaken = System.nanoTime() - startTime;
+                downloadHisto.update(size);
+                downloadTotal.getAndAdd(size);
+                downloadCount++;
+
+                downloadSizeMeter.mark(size);
+                downloadTimeMeter.mark(timeTaken);
+            }
+        };
+    }
+
+    //~--------------------------------------< BlobStoreMBean >
+
+    @Override
+    public long getUploadSize() {
+        return uploadTotal.get();
+    }
+
+    @Override
+    public int getUploadCount() {
+        return uploadCount;
+    }
+
+    @Override
+    public long getUploadTimeInSecs() {
+        return TimeUnit.NANOSECONDS.toMillis(uploadTime.get());
+    }
+
+    @Override
+    public long getDownloadSize() {
+        return downloadTotal.get();
+    }
+
+    @Override
+    public int getDownloadCount() {
+        return downloadCount;
+    }
+
+    @Override
+    public String blobStoreInfoAsString() {
+        return String.format("Uploads - size = %s, count = %d%nDownloads - size = %s, count = %d",
+                humanReadableByteCount(getUploadSize()),
+                getUploadCount(),
+                humanReadableByteCount(getDownloadSize()),
+                getDownloadCount()
+        );
+    }
+
+    @Override
+    public CompositeData getUploadHistory() {
+        return getTimeSeries(BLOB_UPLOADS);
+    }
+
+    @Override
+    public CompositeData getDownloadHistory() {
+        return getTimeSeries(BLOB_DOWNLOADS);
+    }
+
+    private CompositeData getTimeSeries(String name){
+        return TimeSeriesStatsUtil.asCompositeData(statisticsProvider.getStats().getTimeSeries(name, true),
+                name);
+    }
+}
diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobStoreStatsMBean.java oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobStoreStatsMBean.java
new file mode 100644
index 0000000..3a36edd
--- /dev/null
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobStoreStatsMBean.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import javax.management.openmbean.CompositeData;
+
+public interface BlobStoreStatsMBean {
+    String TYPE = "BlobStoreStats";
+
+    long getUploadSize();
+
+    int getUploadCount();
+
+    long getUploadTimeInSecs();
+
+    long getDownloadSize();
+
+    int getDownloadCount();
+
+    String blobStoreInfoAsString();
+
+    CompositeData getUploadHistory();
+
+    CompositeData getDownloadHistory();
+}
diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java
index f0b58d3..1d351c8 100644
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java
@@ -25,11 +25,18 @@ import java.util.Map;
 
 import javax.jcr.RepositoryException;
 
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
 import org.apache.jackrabbit.core.data.DataStore;
 import org.apache.jackrabbit.core.data.DataStoreException;
 import org.apache.jackrabbit.oak.commons.PropertiesUtil;
+import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
+import org.apache.jackrabbit.oak.plugins.blob.BlobStoreStatsMBean;
+import org.apache.jackrabbit.oak.plugins.blob.BlobStoreStats;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
+import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
 import org.osgi.framework.Constants;
 import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.component.ComponentContext;
@@ -37,7 +44,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.jackrabbit.oak.spi.blob.osgi.SplitBlobStoreService.PROP_SPLIT_BLOBSTORE;
+import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
 
+@Component(componentAbstract = true)
 public abstract class AbstractDataStoreService {
     private static final String PROP_HOME = "repository.home";
 
@@ -47,8 +56,13 @@ public abstract class AbstractDataStoreService {
 
     private ServiceRegistration reg;
 
+    private Registration mbeanReg;
+
     private Logger log = LoggerFactory.getLogger(getClass());
 
+    @Reference
+    private StatisticsProvider statisticsProvider;
+
     private DataStore dataStore;
 
     protected void activate(ComponentContext context, Map<String, Object> config) throws RepositoryException {
@@ -61,7 +75,8 @@ public abstract class AbstractDataStoreService {
         }
         PropertiesUtil.populate(ds, config, false);
         ds.init(homeDir);
-        this.dataStore = new DataStoreBlobStore(ds, encodeLengthInId, cacheSizeInMB);
+        BlobStoreStats stats = new BlobStoreStats(getStatisticsProvider());
+        this.dataStore = new DataStoreBlobStore(ds, encodeLengthInId, cacheSizeInMB, stats);
         PropertiesUtil.populate(dataStore, config, false);
 
         Dictionary<String, Object> props = new Hashtable<String, Object>();
@@ -75,6 +90,12 @@ public abstract class AbstractDataStoreService {
                 BlobStore.class.getName(),
                 GarbageCollectableBlobStore.class.getName()
         }, dataStore , props);
+
+        mbeanReg = registerMBean(new OsgiWhiteboard(context.getBundleContext()),
+                BlobStoreStatsMBean.class,
+                stats,
+                BlobStoreStatsMBean.TYPE,
+                ds.getClass().getSimpleName());
     }
 
     protected void deactivate() throws DataStoreException {
@@ -82,11 +103,19 @@ public abstract class AbstractDataStoreService {
             reg.unregister();
         }
 
+        if (mbeanReg != null){
+            mbeanReg.unregister();
+        }
+
         dataStore.close();
     }
 
     protected abstract DataStore createDataStore(ComponentContext context, Map<String, Object> config);
 
+    protected StatisticsProvider getStatisticsProvider(){
+        return statisticsProvider;
+    }
+
     protected String[] getDescription(){
         return new String[] {"type=unknown"};
     }
diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java
index 7a10c5f..b05dc17 100644
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java
@@ -55,6 +55,7 @@ import org.apache.jackrabbit.core.data.DataStoreException;
 import org.apache.jackrabbit.core.data.MultiDataStoreAware;
 import org.apache.jackrabbit.oak.cache.CacheLIRS;
 import org.apache.jackrabbit.oak.commons.StringUtils;
+import org.apache.jackrabbit.oak.plugins.blob.BlobStoreStats;
 import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
@@ -72,6 +73,8 @@ public class DataStoreBlobStore implements DataStore, SharedDataStore, BlobStore
 
     private final DataStore delegate;
 
+    private final BlobStoreStats stats;
+
     /**
      * If set to true then the blob length information would be encoded as part of blobId
      * and thus no extra call would be made to DataStore to determine the length
@@ -96,16 +99,17 @@ public class DataStoreBlobStore implements DataStore, SharedDataStore, BlobStore
 
 
     public DataStoreBlobStore(DataStore delegate) {
-        this(delegate, true, DEFAULT_CACHE_SIZE);
+        this(delegate, true, DEFAULT_CACHE_SIZE, BlobStoreStats.NOOP);
     }
 
     public DataStoreBlobStore(DataStore delegate, boolean encodeLengthInId) {
-        this(delegate, encodeLengthInId, DEFAULT_CACHE_SIZE);
+        this(delegate, encodeLengthInId, DEFAULT_CACHE_SIZE, BlobStoreStats.NOOP);
     }
 
-    public DataStoreBlobStore(DataStore delegate, boolean encodeLengthInId, int cacheSizeInMB) {
+    public DataStoreBlobStore(DataStore delegate, boolean encodeLengthInId, int cacheSizeInMB, BlobStoreStats stats) {
         this.delegate = delegate;
         this.encodeLengthInId = encodeLengthInId;
+        this.stats = stats;
 
         this.cache = CacheLIRS.<String, byte[]>newBuilder()
                 .module("DataStoreBlobStore")
@@ -188,10 +192,12 @@ public class DataStoreBlobStore implements DataStore, SharedDataStore, BlobStore
     public String writeBlob(InputStream stream) throws IOException {
         boolean threw = true;
         try {
+            long start = System.nanoTime();
             checkNotNull(stream);
             DataRecord dr = writeStream(stream);
             String id = getBlobId(dr);
             threw = false;
+            stats.uploaded(start, dr.getLength());
             return id;
         } catch (DataStoreException e) {
             throw new IOException(e);
@@ -477,7 +483,7 @@ public class DataStoreBlobStore implements DataStore, SharedDataStore, BlobStore
             if (!(in instanceof BufferedInputStream)){
                 in = new BufferedInputStream(in);
             }
-            return in;
+            return stats.wrap(in);
         } catch (DataStoreException e) {
             throw new IOException(e);
         }
diff --git oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStoreTest.java oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStoreTest.java
index 8e7b053..2995c51 100644
--- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStoreTest.java
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStoreTest.java
@@ -113,7 +113,7 @@ public class DataStoreBlobStoreTest extends AbstractBlobStoreTest {
         assertEquals(dr, ds.getRecordIfStored(dr.getIdentifier()));
         assertEquals(dr, ds.getRecord(dr.getIdentifier()));
 
-        assertTrue(ds.getInputStream(dr.getIdentifier().toString()) instanceof BufferedInputStream);
+//        assertTrue(ds.getInputStream(dr.getIdentifier().toString()) instanceof BufferedInputStream);
         assertEquals(actualSize, ds.getBlobLength(dr.getIdentifier().toString()));
         assertEquals(testDI.toString(), BlobId.of(ds.writeBlob(new ByteArrayInputStream(data))).blobId);
     }
