From e8c7019693924b65d91eed6d519cdb40323db8a5 Mon Sep 17 00:00:00 2001
From: Vikas Saurabh <vsaurabh@adobe.com>
Date: Sun, 17 Sep 2017 13:39:59 +0530
Subject: [PATCH 2/3] OAK-6269: Support non chunk storage in OakDirectory

Implment OakStreamingIndexFile and changes required along with that.
---
 .../lucene/directory/BufferedOakDirectory.java     |   6 +-
 .../lucene/directory/OakBufferedIndexFile.java     |  11 +
 .../index/lucene/directory/OakDirectory.java       |  12 +-
 .../index/lucene/directory/OakIndexFile.java       |  38 +++
 .../index/lucene/directory/OakIndexInput.java      |   6 +-
 .../index/lucene/directory/OakIndexOutput.java     |  19 +-
 .../lucene/directory/OakStreamingIndexFile.java    | 363 +++++++++++++++++++++
 7 files changed, 447 insertions(+), 8 deletions(-)
 create mode 100644 oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakStreamingIndexFile.java

diff --git a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/BufferedOakDirectory.java b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/BufferedOakDirectory.java
index f795c7021c..b247ca9803 100644
--- a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/BufferedOakDirectory.java
+++ b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/BufferedOakDirectory.java
@@ -48,6 +48,8 @@ import static org.apache.jackrabbit.oak.plugins.memory.ModifiedNodeState.squeeze
  * except for blob values. Those are written immediately to the store.
  */
 public final class BufferedOakDirectory extends Directory {
+    boolean ENABLE_WRITING_SINGLE_BLOB_INDEX_FILE =
+            Boolean.parseBoolean(System.getProperty("oak.lucene.enableSingleBlobIndexFiles", "true"));
 
     static final int DELETE_THRESHOLD_UNTIL_REOPEN = 100;
 
@@ -91,7 +93,7 @@ public final class BufferedOakDirectory extends Directory {
         this.dataNodeName = checkNotNull(dataNodeName);
         this.definition = checkNotNull(definition);
         this.base = new OakDirectory(checkNotNull(builder), dataNodeName,
-                definition, false, blobFactory, blobDeletionCallback);
+                definition, false, blobFactory, blobDeletionCallback, ENABLE_WRITING_SINGLE_BLOB_INDEX_FILE);
         reopenBuffered();
     }
 
@@ -224,6 +226,6 @@ public final class BufferedOakDirectory extends Directory {
         // those are files that were created and later deleted again
         bufferedBuilder = squeeze(bufferedBuilder.getNodeState()).builder();
         buffered = new OakDirectory(bufferedBuilder, dataNodeName,
-                definition, false, blobFactory, blobDeletionCallback);
+                definition, false, blobFactory, blobDeletionCallback, ENABLE_WRITING_SINGLE_BLOB_INDEX_FILE);
     }
 }
diff --git a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakBufferedIndexFile.java b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakBufferedIndexFile.java
index 8feeaedcad..e77001fff9 100644
--- a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakBufferedIndexFile.java
+++ b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakBufferedIndexFile.java
@@ -23,6 +23,7 @@ import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.commons.StringUtils;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.lucene.store.DataInput;
 
 import javax.annotation.Nonnull;
 import java.io.ByteArrayInputStream;
@@ -298,6 +299,16 @@ class OakBufferedIndexFile implements OakIndexFile {
         }
     }
 
+    @Override
+    public boolean supportsCopying() {
+        return false;
+    }
+
+    @Override
+    public void copyBytes(DataInput input, long numBytes) throws IOException {
+        throw new IllegalArgumentException("Don't call copyBytes for buffered case");
+    }
+
     private static int determineBlobSize(NodeBuilder file){
         if (file.hasProperty(OakDirectory.PROP_BLOB_SIZE)){
             return Ints.checkedCast(file.getProperty(OakDirectory.PROP_BLOB_SIZE).getValue(Type.LONG));
diff --git a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakDirectory.java b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakDirectory.java
index 026af1a708..eab1f54903 100644
--- a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakDirectory.java
+++ b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakDirectory.java
@@ -73,6 +73,7 @@ public class OakDirectory extends Directory {
     private final IndexDefinition definition;
     private LockFactory lockFactory;
     private final boolean readOnly;
+    private final boolean streamingWriteEnabled;
     private final Set<String> fileNames = Sets.newConcurrentHashSet();
     private final Set<String> fileNamesAtStart;
     private final String indexName;
@@ -109,6 +110,14 @@ public class OakDirectory extends Directory {
     public OakDirectory(NodeBuilder builder, String dataNodeName, IndexDefinition definition,
                         boolean readOnly, BlobFactory blobFactory,
                         @Nonnull BlobDeletionCallback blobDeletionCallback) {
+        this(builder, dataNodeName, definition, readOnly, blobFactory, blobDeletionCallback, false);
+    }
+
+    public OakDirectory(NodeBuilder builder, String dataNodeName, IndexDefinition definition,
+                        boolean readOnly, BlobFactory blobFactory,
+                        @Nonnull BlobDeletionCallback blobDeletionCallback,
+                        boolean streamingWriteEnabled) {
+
         this.lockFactory = NoLockFactory.getNoLockFactory();
         this.builder = builder;
         this.dataNodeName = dataNodeName;
@@ -120,6 +129,7 @@ public class OakDirectory extends Directory {
         this.indexName = definition.getIndexName();
         this.blobFactory = blobFactory;
         this.blobDeletionCallback = blobDeletionCallback;
+        this.streamingWriteEnabled = streamingWriteEnabled;
     }
 
     @Override
@@ -191,7 +201,7 @@ public class OakDirectory extends Directory {
 
         fileNames.add(name);
         markDirty();
-        return new OakIndexOutput(name, file, indexName, blobFactory);
+        return new OakIndexOutput(name, file, indexName, blobFactory, streamingWriteEnabled);
     }
 
 
diff --git a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexFile.java b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexFile.java
index d8d09b839c..b91cbb1e95 100644
--- a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexFile.java
+++ b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexFile.java
@@ -1,8 +1,42 @@
 package org.apache.jackrabbit.oak.plugins.index.lucene.directory;
 
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.lucene.store.DataInput;
+
+import javax.annotation.Nonnull;
 import java.io.IOException;
 
+import static org.apache.jackrabbit.JcrConstants.JCR_DATA;
+import static org.apache.jackrabbit.oak.api.Type.BINARY;
+
 public interface OakIndexFile {
+    static OakIndexFile getOakIndexFile(String name, NodeBuilder file, String dirDetails,
+                                        @Nonnull BlobFactory blobFactory) {
+        return getOakIndexFile(name, file, dirDetails, blobFactory, false);
+    }
+
+    static OakIndexFile getOakIndexFile(String name, NodeBuilder file, String dirDetails,
+                                        @Nonnull BlobFactory blobFactory, boolean streamingWriteEnabled) {
+
+        boolean useStreaming;
+        PropertyState property = file.getProperty(JCR_DATA);
+        if (property != null) { //reading
+                useStreaming = property.getType() == BINARY;
+        } else { //writing
+            useStreaming = streamingWriteEnabled;
+        }
+
+        return useStreaming ?
+                new OakStreamingIndexFile(name, file, dirDetails, blobFactory) :
+                new OakBufferedIndexFile(name, file, dirDetails, blobFactory);
+    }
+
+    /**
+     * @return if the file implementation supports copying data from {@link DataInput} directly.
+     */
+    boolean supportsCopying();
+
     /**
      * @return name of the index being accessed
      */
@@ -60,6 +94,10 @@ public interface OakIndexFile {
     void writeBytes(byte[] b, int offset, int len)
             throws IOException;
 
+    /** Copy numBytes bytes from input to ourself. */
+    void copyBytes(DataInput input, long numBytes) throws IOException;
+
+
     /**
      * Flushes the content into storage. Before calling this method, written
      * content only exist in memory
diff --git a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexInput.java b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexInput.java
index dfb40c7fb8..be99aaf251 100644
--- a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexInput.java
+++ b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexInput.java
@@ -24,9 +24,11 @@ import org.apache.lucene.util.WeakIdentityMap;
 import java.io.IOException;
 import java.util.Iterator;
 
+import static org.apache.jackrabbit.oak.plugins.index.lucene.directory.OakIndexFile.getOakIndexFile;
+
 class OakIndexInput extends IndexInput {
 
-    private final OakIndexFile file;
+    final OakIndexFile file;
     private boolean isClone = false;
     private final WeakIdentityMap<OakIndexInput, Boolean> clones;
     private final String dirDetails;
@@ -35,7 +37,7 @@ class OakIndexInput extends IndexInput {
                          BlobFactory blobFactory) {
         super(name);
         this.dirDetails = dirDetails;
-        this.file = new OakBufferedIndexFile(name, file, dirDetails, blobFactory);
+        this.file = getOakIndexFile(name, file, dirDetails, blobFactory);
         clones = WeakIdentityMap.newConcurrentHashMap();
     }
 
diff --git a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexOutput.java b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexOutput.java
index f09d0e2bea..0e6c90b1dd 100644
--- a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexOutput.java
+++ b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakIndexOutput.java
@@ -17,18 +17,21 @@
 package org.apache.jackrabbit.oak.plugins.index.lucene.directory;
 
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.lucene.store.DataInput;
 import org.apache.lucene.store.IndexOutput;
 
 import java.io.IOException;
 
+import static org.apache.jackrabbit.oak.plugins.index.lucene.directory.OakIndexFile.getOakIndexFile;
+
 final class OakIndexOutput extends IndexOutput {
     private final String dirDetails;
-    private final OakIndexFile file;
+    final OakIndexFile file;
 
     public OakIndexOutput(String name, NodeBuilder file, String dirDetails,
-                          BlobFactory blobFactory) throws IOException {
+                          BlobFactory blobFactory, boolean streamingWriteEnabled) throws IOException {
         this.dirDetails = dirDetails;
-        this.file = new OakBufferedIndexFile(name, file, dirDetails, blobFactory);
+        this.file = getOakIndexFile(name, file, dirDetails, blobFactory, streamingWriteEnabled);
     }
 
     @Override
@@ -61,6 +64,16 @@ final class OakIndexOutput extends IndexOutput {
         writeBytes(new byte[] { b }, 0, 1);
     }
 
+    /** Copy numBytes bytes from input to ourself. */
+    public void copyBytes(DataInput input, long numBytes) throws IOException {
+        //TODO: Do we know that copyBytes would always reach us via copy??
+        if (file.supportsCopying()) {
+            file.copyBytes(input, numBytes);
+        } else {
+            super.copyBytes(input, numBytes);
+        }
+    }
+
     @Override
     public void flush() throws IOException {
         try {
diff --git a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakStreamingIndexFile.java b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakStreamingIndexFile.java
new file mode 100644
index 0000000000..5dc13a7a2e
--- /dev/null
+++ b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakStreamingIndexFile.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.lucene.directory;
+
+import com.google.common.io.ByteStreams;
+import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.commons.StringUtils;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.lucene.store.DataInput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.SequenceInputStream;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkPositionIndexes;
+import static org.apache.jackrabbit.JcrConstants.JCR_DATA;
+import static org.apache.jackrabbit.JcrConstants.JCR_LASTMODIFIED;
+import static org.apache.jackrabbit.oak.api.Type.BINARY;
+
+/**
+ * A file which streams blob directly off of storage.
+ */
+class OakStreamingIndexFile implements OakIndexFile, AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(OakStreamingIndexFile.class.getName());
+
+    /**
+     * The file name.
+     */
+    private final String name;
+
+    /**
+     * The node that contains the blob for this file.
+     */
+    private final NodeBuilder file;
+
+    /**
+     * The current position within the file (in streaming case, useful only for reading).
+     */
+    private long position = 0;
+
+    /**
+     * The length of the file.
+     */
+    private long length;
+
+    /**
+     * The blob which has been read for reading case.
+     * For writing case, it contains the blob that's pushed to repository
+     */
+    private Blob blob;
+
+    /**
+     * Whether the blob was modified since it was last flushed. If yes, on a
+     * flush the metadata and the blob to the store.
+     */
+    private boolean blobModified = false;
+
+    /**
+     * The {@link InputStream} to read blob from blob.
+     */
+    private InputStream blobInputStream;
+
+    /**
+     * The unique key that is used to make the content unique (to allow removing binaries from the blob store without
+     * risking to remove binaries that are still needed).
+     */
+    private final byte[] uniqueKey;
+
+    private final String dirDetails;
+
+    private final BlobFactory blobFactory;
+
+    OakStreamingIndexFile(String name, NodeBuilder file, String dirDetails,
+                                 @Nonnull BlobFactory blobFactory) {
+        this.name = name;
+        this.file = file;
+        this.dirDetails = dirDetails;
+        this.uniqueKey = readUniqueKey(file);
+        this.blobFactory = checkNotNull(blobFactory);
+
+        PropertyState property = file.getProperty(JCR_DATA);
+        if (property != null) {
+            if (property.getType() == BINARY) {
+                this.blob = property.getValue(BINARY);
+            } else {
+                throw new IllegalArgumentException("Can't load blob for streaming for " + name + " under " + file);
+            }
+        } else {
+            this.blob = null;
+        }
+
+        if (blob != null) {
+            this.length = blob.length();
+            if (uniqueKey != null) {
+                this.length -= uniqueKey.length;
+            }
+        }
+
+        this.blobInputStream = null;
+    }
+
+    private OakStreamingIndexFile(OakStreamingIndexFile that) {
+        this.name = that.name;
+        this.file = that.file;
+        this.dirDetails = that.dirDetails;
+        this.uniqueKey = that.uniqueKey;
+
+        this.position = that.position;
+        this.length = that.length;
+        this.blob = that.blob;
+        this.blobModified = that.blobModified;
+        this.blobFactory = that.blobFactory;
+    }
+
+    private void setupInputStream() throws IOException {
+        if (blobInputStream == null) {
+            blobInputStream = blob.getNewStream();
+
+            if (position > 0) {
+                long pos = position;
+                position = 0;
+                seek(pos);
+            }
+        }
+    }
+
+    private void releaseInputStream() {
+        if (blobInputStream != null) {
+            try {
+                blobInputStream.close();
+            } catch (Exception ignored) {
+                //ignore
+            }
+            blobInputStream = null;
+        }
+    }
+
+    @Override
+    public OakIndexFile clone() {
+        return new OakStreamingIndexFile(this);
+    }
+
+    @Override
+    public long length() {
+        return length;
+    }
+
+    @Override
+    public long position() {
+        return position;
+    }
+
+    @Override
+    public void close() {
+        IOUtils.closeQuietly(blobInputStream);
+        this.blob = null;
+    }
+
+    @Override
+    public boolean isClosed() {
+        return blobInputStream == null && blob == null;
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+        // seek() may be called with pos == length
+        // see https://issues.apache.org/jira/browse/LUCENE-1196
+        if (pos < 0 || pos > length) {
+            String msg = String.format("Invalid seek request for [%s][%s], " +
+                    "position: %d, file length: %d", dirDetails, name, pos, length);
+            releaseInputStream();
+            throw new IOException(msg);
+        } else {
+            if (blobInputStream == null) {
+                position = pos;
+            } else if (pos < position) {
+                LOG.warn("Seeking back on streaming index file {}. Current position {}, requested position {}." +
+                                "Please make sure that CopyOnRead and prefetch of index files are enabled.",
+                        getName(), position(), pos);
+
+                // seeking back on input stream. Close current one
+                IOUtils.closeQuietly(blobInputStream);
+                blobInputStream = null;
+                position = pos;
+            } else {
+                while (position < pos) {
+                    long skipCnt = blobInputStream.skip(pos - position());
+                    if (skipCnt <= 0) {
+                        String msg = String.format("Seek request for [%s][%s], " +
+                                "position: %d, file length: %d failed. InputStream.skip returned %d",
+                                dirDetails, name, pos, length, skipCnt);
+                        releaseInputStream();
+                        throw new IOException(msg);
+                    }
+                    position += skipCnt;
+                }
+            }
+        }
+    }
+
+    @Override
+    public void readBytes(byte[] b, int offset, int len)
+            throws IOException {
+        checkPositionIndexes(offset, offset + len, checkNotNull(b).length);
+
+        if (len < 0 || position + len > length) {
+            String msg = String.format("Invalid byte range request for [%s][%s], " +
+                    "position: %d, file length: %d, len: %d", dirDetails, name, position, length, len);
+            releaseInputStream();
+            throw new IOException(msg);
+        }
+
+        setupInputStream();
+        int readCnt = ByteStreams.read(blobInputStream, b, offset, len);
+        if (readCnt < len) {
+            String msg = String.format("Couldn't read byte range request for [%s][%s], " +
+                    "position: %d, file length: %d, len: %d. Actual read bytes %d",
+                    dirDetails, name, position, length, len, readCnt);
+            releaseInputStream();
+            throw new IOException(msg);
+        }
+
+        position += len;
+    }
+
+    @Override
+    public void writeBytes(final byte[] b, final int offset, final int len)
+            throws IOException {
+        if (blobModified) {
+            throw new IllegalArgumentException("Can't do piece wise upload with streaming access");
+        }
+
+        InputStream in = new InputStream() {
+            int position = offset;
+
+            @Override
+            public int available() throws IOException {
+                return offset + len - position;
+            }
+
+            @Override
+            public int read() throws IOException {
+                if (available() <= 0) {
+                    return -1;
+                } else {
+                    return b[position++];
+                }
+            }
+
+            @Override
+            public int read(@Nonnull byte[] target, int off, int len) throws IOException {
+                if (available() <= 0) {
+                    return -1;
+                }
+
+                int read = (int)Math.min((long)len, available());
+                System.arraycopy(b, position, target, off, read);
+
+                position += read;
+
+                return read;
+            }
+        };
+
+        pushData(in);
+    }
+
+    @Override
+    public boolean supportsCopying() {
+        return true;
+    }
+
+    @Override
+    public void copyBytes(DataInput input, final long numBytes) throws IOException {
+        InputStream in = new InputStream() {
+            long bytesLeftToRead = numBytes;
+
+            @Override
+            public int read() throws IOException {
+                if (bytesLeftToRead <= 0) {
+                    return -1;
+                } else {
+                    bytesLeftToRead--;
+                    return input.readByte();
+                }
+            }
+
+            @Override
+            public int read(@Nonnull byte[] b, int off, int len) throws IOException {
+                if (bytesLeftToRead == 0) {
+                    return 0;
+                }
+
+                int read = (int)Math.min((long)len, bytesLeftToRead);
+                input.readBytes(b, off, read);
+
+                bytesLeftToRead -= read;
+
+                return read;
+            }
+        };
+
+        pushData(in);
+    }
+
+    private void pushData(InputStream in) throws IOException {
+        if (uniqueKey != null) {
+            in = new SequenceInputStream(in,
+                    new ByteArrayInputStream(uniqueKey));
+        }
+
+        blob = blobFactory.createBlob(in);
+        blobModified = true;
+    }
+
+    private static byte[] readUniqueKey(NodeBuilder file) {
+        if (file.hasProperty(OakDirectory.PROP_UNIQUE_KEY)) {
+            String key = file.getString(OakDirectory.PROP_UNIQUE_KEY);
+            return StringUtils.convertHexToBytes(key);
+        }
+        return null;
+    }
+
+    @Override
+    public void flush() throws IOException {
+        if (blobModified) {
+            file.setProperty(JCR_LASTMODIFIED, System.currentTimeMillis());
+            file.setProperty(JCR_DATA, blob, BINARY);
+            blobModified = false;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return name;
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+}
-- 
2.11.0

