diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java
index ead168d..4a6ee47 100644
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java
@@ -39,6 +39,8 @@ import org.slf4j.LoggerFactory;
 public abstract class AbstractDataStoreService {
     private static final String PROP_HOME = "repository.home";
 
+    public static final String PROP_ENCODE_LENGTH = "encodeLengthInId";
+
     private ServiceRegistration reg;
 
     private Logger log = LoggerFactory.getLogger(getClass());
@@ -47,14 +49,14 @@ public abstract class AbstractDataStoreService {
 
     protected void activate(ComponentContext context, Map<String, Object> config) throws RepositoryException {
         DataStore ds = createDataStore(context, config);
-
+        boolean encodeLengthInId = PropertiesUtil.toBoolean(config.get(PROP_ENCODE_LENGTH), false);
         String homeDir = lookup(context, PROP_HOME);
         if (homeDir != null) {
             log.info("Initializing the DataStore with homeDir [{}]", homeDir);
         }
         PropertiesUtil.populate(ds, config, false);
         ds.init(homeDir);
-        this.dataStore = new DataStoreBlobStore(ds);
+        this.dataStore = new DataStoreBlobStore(ds, encodeLengthInId);
 
         Dictionary<String, String> props = new Hashtable<String, String>();
         props.put(Constants.SERVICE_PID, ds.getClass().getName());
diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java
index 0880a19..9eb2452 100644
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java
@@ -61,8 +61,25 @@ public class DataStoreBlobStore implements DataStore, BlobStore, GarbageCollecta
 
     private final DataStore delegate;
 
+    /**
+     * If set to true then the blob length information would be encoded as part of blobId
+     * and thus no extra call would be made to DataStore to determine the length
+     *
+     * <b>Implementation Note</b>If enabled the length would be encoded in blobid by appending it at the end.
+     * This would be done for the methods which are part of BlobStore and GarbageCollectableBlobStore interface
+     *
+     * DataIdentifiers which are part of DataStore would not be affected by this as DataStore interface
+     * is not used in Oak and all access is via BlobStore interface
+     */
+    private final boolean encodeLengthInId;
+
     public DataStoreBlobStore(DataStore delegate) {
+        this(delegate, false);
+    }
+
+    public DataStoreBlobStore(DataStore delegate, boolean encodeLengthInId) {
         this.delegate = delegate;
+        this.encodeLengthInId = encodeLengthInId;
     }
 
     //~----------------------------------< DataStore >
@@ -134,7 +151,8 @@ public class DataStoreBlobStore implements DataStore, BlobStore, GarbageCollecta
         boolean threw = true;
         try {
             checkNotNull(stream);
-            String id = writeStream(stream).getIdentifier().toString();
+            DataRecord dr = writeStream(stream);
+            String id = getBlobId(dr);
             threw = false;
             return id;
         } catch (DataStoreException e) {
@@ -147,11 +165,11 @@ public class DataStoreBlobStore implements DataStore, BlobStore, GarbageCollecta
     }
 
     @Override
-    public int readBlob(String blobId, long pos, byte[] buff, int off, int length) throws IOException {
+    public int readBlob(String encodedBlobId, long pos, byte[] buff, int off, int length) throws IOException {
         //This is inefficient as repeated calls for same blobId would involve opening new Stream
         //instead clients should directly access the stream from DataRecord by special casing for
         //BlobStore which implements DataStore
-        InputStream stream = getStream(blobId);
+        InputStream stream = getStream(extractBlobId(encodedBlobId));
         boolean threw = true;
         try {
             ByteStreams.skipFully(stream, pos);
@@ -164,10 +182,16 @@ public class DataStoreBlobStore implements DataStore, BlobStore, GarbageCollecta
     }
 
     @Override
-    public long getBlobLength(String blobId) throws IOException {
+    public long getBlobLength(String encodedBlobId) throws IOException {
         try {
-            checkNotNull(blobId, "BlobId must be specified");
-            return getDataRecord(blobId).getLength();
+            checkNotNull(encodedBlobId, "BlobId must be specified");
+            if(encodeLengthInId){
+                BlobId id = BlobId.of(encodedBlobId);
+                if(id.hasLengthInfo()){
+                    return id.length;
+                }
+            }
+            return getDataRecord(encodedBlobId).getLength();
         } catch (DataStoreException e) {
             throw new IOException(e);
         }
@@ -180,7 +204,7 @@ public class DataStoreBlobStore implements DataStore, BlobStore, GarbageCollecta
         try {
             record = delegate.getRecordFromReference(reference);
             if (record != null) {
-                return record.getIdentifier().toString();
+                return getBlobId(record);
             }
         } catch (DataStoreException e) {
             log.warn("Unable to access the blobId for  [{}]", reference, e);
@@ -189,8 +213,9 @@ public class DataStoreBlobStore implements DataStore, BlobStore, GarbageCollecta
     }
 
     @Override
-    public String getReference(String blobId) {
-        checkNotNull(blobId);
+    public String getReference(String encodedBlobId) {
+        checkNotNull(encodedBlobId);
+        String blobId = extractBlobId(encodedBlobId);
         //Reference are not created for in memory record
         if(InMemoryDataRecord.isInstance(blobId)){
             return null;
@@ -211,8 +236,8 @@ public class DataStoreBlobStore implements DataStore, BlobStore, GarbageCollecta
     }
 
     @Override
-    public InputStream getInputStream(String blobId) throws IOException {
-        return getStream(blobId);
+    public InputStream getInputStream(String encodedBlobId) throws IOException {
+        return getStream(extractBlobId(encodedBlobId));
     }
 
     //~-------------------------------------------< GarbageCollectableBlobStore >
@@ -262,24 +287,33 @@ public class DataStoreBlobStore implements DataStore, BlobStore, GarbageCollecta
 
     @Override
     public Iterator<String> getAllChunkIds(final long maxLastModifiedTime) throws Exception {
-        return transform(filter(delegate.getAllIdentifiers(), new Predicate<DataIdentifier>() {
+        return transform(filter(transform(delegate.getAllIdentifiers(), new Function<DataIdentifier, DataRecord>() {
+            @Nullable
             @Override
-            public boolean apply(DataIdentifier input) {
+            public DataRecord apply(@Nullable DataIdentifier input) {
                 try {
-                    DataRecord dr = delegate.getRecord(input);
-                    if(dr != null && (maxLastModifiedTime <=0
-                            || dr.getLastModified() < maxLastModifiedTime)){
-                        return true;
-                    }
+                    return delegate.getRecord(input);
                 } catch (DataStoreException e) {
-                    log.warn("Error occurred while fetching DataRecord for identifier {}",input, e);
+                    log.warn("Error occurred while fetching DataRecord for identifier {}", input, e);
+                }
+                return null;
+            }
+        }), new Predicate<DataRecord>() {
+            @Override
+            public boolean apply(@Nullable DataRecord input) {
+                if (input != null && (maxLastModifiedTime <= 0
+                        || input.getLastModified() < maxLastModifiedTime)) {
+                    return true;
                 }
                 return false;
             }
-        }),new Function<DataIdentifier, String>() {
+        }),new Function<DataRecord, String>() {
             @Override
-            public String apply(DataIdentifier input) {
-                return input.toString();
+            public String apply(DataRecord input) {
+                if(encodeLengthInId) {
+                    return BlobId.of(input).encodedValue();
+                }
+                return input.getIdentifier().toString();
             }
         });
     }
@@ -288,7 +322,8 @@ public class DataStoreBlobStore implements DataStore, BlobStore, GarbageCollecta
     public boolean deleteChunks(List<String> chunkIds, long maxLastModifiedTime) throws Exception {
         if (delegate instanceof MultiDataStoreAware) {
             for (String chunkId : chunkIds) {
-                DataIdentifier identifier = new DataIdentifier(chunkId);
+                String blobId = extractBlobId(chunkId);
+                DataIdentifier identifier = new DataIdentifier(blobId);
                 DataRecord dataRecord = delegate.getRecord(identifier);
                 boolean success = (maxLastModifiedTime <= 0)
                         || dataRecord.getLastModified() <= maxLastModifiedTime;
@@ -330,7 +365,7 @@ public class DataStoreBlobStore implements DataStore, BlobStore, GarbageCollecta
         }else{
             id = delegate.getRecord(new DataIdentifier(blobId));
         }
-        checkNotNull(id, "No DataRecord found for blodId [%s]", blobId);
+        checkNotNull(id, "No DataRecord found for blobId [%s]", blobId);
         return id;
     }
 
@@ -370,4 +405,76 @@ public class DataStoreBlobStore implements DataStore, BlobStore, GarbageCollecta
         }
         return record;
     }
+
+    private String getBlobId(DataRecord dr){
+        if(encodeLengthInId){
+            return BlobId.of(dr).blobId;
+        }
+        return dr.getIdentifier().toString();
+    }
+
+    private String extractBlobId(String encodedBlobId){
+        if(encodeLengthInId){
+            return BlobId.of(encodedBlobId).blobId;
+        }
+        return encodedBlobId;
+    }
+
+    static class BlobId {
+        static final String SEP = "#";
+        final String blobId;
+        final long length;
+
+        BlobId(String blobId, long length) {
+            this.blobId = blobId;
+            this.length = length;
+        }
+
+        BlobId(DataRecord dr) {
+            this.blobId = dr.getIdentifier().toString();
+            long len;
+            try {
+                len = dr.getLength();
+            } catch (DataStoreException e) {
+                //Cannot determine length
+                len = -1;
+            }
+            this.length = len;
+        }
+
+        BlobId(String encodedBlobId) {
+            int indexOfSep = encodedBlobId.lastIndexOf(SEP);
+            if(indexOfSep != -1){
+                this.blobId = encodedBlobId.substring(0, indexOfSep);
+                this.length = Long.valueOf(encodedBlobId.substring(indexOfSep+SEP.length()));
+            }else{
+                this.blobId = encodedBlobId;
+                this.length = -1;
+            }
+        }
+
+        String encodedValue(){
+            if(hasLengthInfo()){
+                return blobId + SEP + String.valueOf(length);
+            } else{
+                return blobId;
+            }
+        }
+
+        boolean hasLengthInfo(){
+            return length != -1;
+        }
+
+        static boolean isEncoded(String encodedBlobId){
+            return encodedBlobId.contains(SEP);
+        }
+
+        static BlobId of(String encodedValue){
+            return new BlobId(encodedValue);
+        }
+
+        static BlobId of(DataRecord dr){
+            return new BlobId(dr);
+        }
+    }
 }
diff --git oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStoreTest.java oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStoreTest.java
index 9311b93..fea9384 100644
--- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStoreTest.java
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStoreTest.java
@@ -37,6 +37,7 @@ import org.apache.jackrabbit.core.data.DataStoreException;
 import org.apache.jackrabbit.oak.spi.blob.BlobStoreInputStream;
 import org.junit.Test;
 
+import static org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore.BlobId;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -69,7 +70,7 @@ public class DataStoreBlobStoreTest {
 
         //Check for BlobStore methods
         assertEquals(maxInlineSize, ds.getBlobLength(dr.getIdentifier().toString()));
-        assertEquals(dr.getIdentifier().toString(), ds.writeBlob(new ByteArrayInputStream(data)));
+        assertEquals(dr.getIdentifier().toString(), BlobId.of(ds.writeBlob(new ByteArrayInputStream(data))).blobId);
     }
 
     @Test
@@ -102,7 +103,7 @@ public class DataStoreBlobStoreTest {
         assertEquals(dr, ds.getRecord(dr.getIdentifier()));
 
         assertEquals(actualSize, ds.getBlobLength(dr.getIdentifier().toString()));
-        assertEquals(testDI.toString(), ds.writeBlob(new ByteArrayInputStream(data)));
+        assertEquals(testDI.toString(), BlobId.of(ds.writeBlob(new ByteArrayInputStream(data))).blobId);
     }
 
     @Test
@@ -118,7 +119,7 @@ public class DataStoreBlobStoreTest {
         DataStoreBlobStore ds = new DataStoreBlobStore(mockedDS);
 
         assertEquals(reference,ds.getReference(blobId));
-        assertEquals(blobId, ds.getBlobId(reference));
+        assertEquals(blobId, BlobId.of(ds.getBlobId(reference)).blobId);
 
         String inMemBlobId = InMemoryDataRecord.getInstance("foo".getBytes())
                 .getIdentifier().toString();
@@ -147,6 +148,23 @@ public class DataStoreBlobStoreTest {
 
     }
 
+    @Test
+    public void testEncodedBlobId() throws Exception{
+        BlobId blobId = new BlobId("abc"+BlobId.SEP+"123");
+        assertEquals("abc", blobId.blobId);
+        assertEquals(123, blobId.length);
+
+        blobId = new BlobId("abc"+BlobId.SEP+"abc"+BlobId.SEP+"123");
+        assertEquals("abc"+BlobId.SEP+"abc", blobId.blobId);
+        assertEquals(123, blobId.length);
+
+        blobId = new BlobId("abc",123);
+        assertEquals("abc"+BlobId.SEP+"123", blobId.encodedValue());
+
+        assertTrue(BlobId.isEncoded("abc"+BlobId.SEP+"123"));
+        assertFalse(BlobId.isEncoded("abc"));
+    }
+
     private static class ByteArrayDataRecord implements DataRecord {
         private final byte[] data;
         private final DataIdentifier identifier;
@@ -203,7 +221,7 @@ public class DataStoreBlobStoreTest {
 
         @Override
         public long getLength() throws DataStoreException {
-            throw new UnsupportedOperationException();
+            throw new DataStoreException(new UnsupportedOperationException());
         }
 
         @Override
