From 18c541ebd3a46e4ea8e5aa8a81587e5fcccfbb23 Mon Sep 17 00:00:00 2001
From: Vikas Saurabh <vsaurabh@adobe.com>
Date: Sun, 17 Sep 2017 20:50:31 +0530
Subject: [PATCH 3/3] OAK-6269: Support non chunk storage in OakDirectory

Add test for OakStreamingIndexFile and a few tests in BufferedOakDirectoryTest
---
 .../lucene/directory/BufferedOakDirectoryTest.java | 153 +++++++++++++
 .../directory/OakStreamingIndexFileTest.java       | 246 +++++++++++++++++++++
 2 files changed, 399 insertions(+)
 create mode 100644 oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakStreamingIndexFileTest.java

diff --git a/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/BufferedOakDirectoryTest.java b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/BufferedOakDirectoryTest.java
index a72ef86696..df4fc51717 100644
--- a/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/BufferedOakDirectoryTest.java
+++ b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/BufferedOakDirectoryTest.java
@@ -25,10 +25,12 @@ import java.util.Set;
 import com.google.common.collect.Sets;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.plugins.index.lucene.IndexDefinition;
 import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStateUtils;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
@@ -143,6 +145,157 @@ public class BufferedOakDirectoryTest {
         dir.close();
     }
 
+    @Test
+    public void respectCommandLineFlagForSingleBlobWrite() throws Exception {
+        String oldVal = System.getProperty("oak.lucene.enableSingleBlobIndexFiles");
+
+        BufferedOakDirectory bufferedOakDirectory;
+
+        System.setProperty("oak.lucene.enableSingleBlobIndexFiles", "true");
+        bufferedOakDirectory = (BufferedOakDirectory)createDir(builder, true);
+        assertTrue("Flag not setting as set by command line flag",
+                bufferedOakDirectory.ENABLE_WRITING_SINGLE_BLOB_INDEX_FILE);
+
+        System.setProperty("oak.lucene.enableSingleBlobIndexFiles", "false");
+        bufferedOakDirectory = (BufferedOakDirectory)createDir(builder, true);
+        assertFalse("Flag not setting as set by command line flag",
+                bufferedOakDirectory.ENABLE_WRITING_SINGLE_BLOB_INDEX_FILE);
+
+        if (oldVal == null) {
+            System.clearProperty("oak.lucene.enableSingleBlobIndexFiles");
+        } else {
+            System.setProperty("oak.lucene.enableSingleBlobIndexFiles", oldVal);
+        }
+    }
+
+    @Test
+    public void selectWriteStrategyBasedOnFlagAndMode() throws Exception {
+        String oldVal = System.getProperty("oak.lucene.enableSingleBlobIndexFiles");
+
+        System.setProperty("oak.lucene.enableSingleBlobIndexFiles", "false");
+        try (Directory multiBlobDir = createDir(builder, true)) {
+            IndexOutput multiBlobIndexOutput = multiBlobDir.createOutput("foo", IOContext.DEFAULT);
+
+            multiBlobIndexOutput.writeBytes(randomBytes(100), 0, 100);
+            multiBlobIndexOutput.flush();
+        }
+
+        PropertyState jcrData = builder.getChildNode(":data").getChildNode("foo").getProperty("jcr:data");
+        assertTrue("multiple blobs not written", jcrData.isArray());
+
+        System.setProperty("oak.lucene.enableSingleBlobIndexFiles", "true");
+        try (Directory multiBlobDir = createDir(builder, true)) {
+            IndexOutput multiBlobIndexOutput = multiBlobDir.createOutput("foo", IOContext.DEFAULT);
+
+            multiBlobIndexOutput.writeBytes(randomBytes(100), 0, 100);
+            multiBlobIndexOutput.flush();
+        }
+
+        jcrData = builder.getChildNode(":data").getChildNode("foo").getProperty("jcr:data");
+        assertFalse("multiple blobs written", jcrData.isArray());
+
+        try (Directory multiBlobDir = createDir(builder, false)) {
+            IndexOutput multiBlobIndexOutput = multiBlobDir.createOutput("foo", IOContext.DEFAULT);
+
+            multiBlobIndexOutput.writeBytes(randomBytes(100), 0, 100);
+            multiBlobIndexOutput.flush();
+        }
+
+        jcrData = builder.getChildNode(":data").getChildNode("foo").getProperty("jcr:data");
+        assertTrue("multiple blobs not written despite disabled buffered directory", jcrData.isArray());
+
+        if (oldVal == null) {
+            System.clearProperty("oak.lucene.enableSingleBlobIndexFiles");
+        } else {
+            System.setProperty("oak.lucene.enableSingleBlobIndexFiles", oldVal);
+        }
+    }
+
+    @Test
+    public void readNonStreamingWhenMultipleBlobsExist() throws Exception {
+        String oldVal = System.getProperty("oak.lucene.enableSingleBlobIndexFiles");
+
+        System.setProperty("oak.lucene.enableSingleBlobIndexFiles", "false");
+        try (Directory multiBlobDir = createDir(builder, true)) {
+            IndexOutput multiBlobIndexOutput = multiBlobDir.createOutput("foo", IOContext.DEFAULT);
+
+            multiBlobIndexOutput.writeBytes(randomBytes(100), 0, 100);
+            multiBlobIndexOutput.flush();
+        }
+
+        // Enable feature... reader shouldn't care about the flag.
+        // Repo state needs to be used for that
+        System.setProperty("oak.lucene.enableSingleBlobIndexFiles", "true");
+        try (Directory multiBlobDir = createDir(builder, true)) {
+            OakIndexInput multiBlobIndexInput = (OakIndexInput)multiBlobDir.openInput("foo", IOContext.DEFAULT);
+
+            assertTrue("OakBufferedIndexFile must be used",
+                    multiBlobIndexInput.file instanceof OakBufferedIndexFile);
+        }
+
+        if (oldVal == null) {
+            System.clearProperty("oak.lucene.enableSingleBlobIndexFiles");
+        } else {
+            System.setProperty("oak.lucene.enableSingleBlobIndexFiles", oldVal);
+        }
+    }
+
+    @Test
+    public void readStreamingWithSingleBlob() throws Exception {
+        String oldVal = System.getProperty("oak.lucene.enableSingleBlobIndexFiles");
+
+        System.setProperty("oak.lucene.enableSingleBlobIndexFiles", "true");
+        try (Directory multiBlobDir = createDir(builder, true)) {
+            IndexOutput multiBlobIndexOutput = multiBlobDir.createOutput("foo", IOContext.DEFAULT);
+
+            multiBlobIndexOutput.writeBytes(randomBytes(100), 0, 100);
+            multiBlobIndexOutput.flush();
+        }
+
+        // Enable feature... reader shouldn't care about the flag.
+        // Repo state needs to be used for that
+        System.setProperty("oak.lucene.enableSingleBlobIndexFiles", "false");
+        try (Directory multiBlobDir = createDir(builder, true)) {
+            OakIndexInput multiBlobIndexInput = (OakIndexInput)multiBlobDir.openInput("foo", IOContext.DEFAULT);
+
+            assertTrue("OakStreamingIndexFile must be used",
+                    multiBlobIndexInput.file instanceof OakStreamingIndexFile);
+        }
+
+        if (oldVal == null) {
+            System.clearProperty("oak.lucene.enableSingleBlobIndexFiles");
+        } else {
+            System.setProperty("oak.lucene.enableSingleBlobIndexFiles", oldVal);
+        }
+    }
+
+    @Test
+    public void writeNonStreamingIfDisabledByFlag() throws Exception {
+        String oldVal = System.getProperty("oak.lucene.enableSingleBlobIndexFiles");
+
+        System.setProperty("oak.lucene.enableSingleBlobIndexFiles", "false");
+        try (Directory multiBlobDir = createDir(builder, true)) {
+            OakIndexOutput multiBlobIndexOutput = (OakIndexOutput)multiBlobDir.createOutput("foo1", IOContext.DEFAULT);
+
+            assertTrue("OakBufferedIndexFile must be used",
+                    multiBlobIndexOutput.file instanceof OakBufferedIndexFile);
+        }
+
+        System.setProperty("oak.lucene.enableSingleBlobIndexFiles", "true");
+        try (Directory multiBlobDir = createDir(builder, true)) {
+            OakIndexOutput multiBlobIndexOutput = (OakIndexOutput)multiBlobDir.createOutput("foo2", IOContext.DEFAULT);
+
+            assertTrue("OakStreamingIndexFile must be used",
+                    multiBlobIndexOutput.file instanceof OakStreamingIndexFile);
+        }
+
+        if (oldVal == null) {
+            System.clearProperty("oak.lucene.enableSingleBlobIndexFiles");
+        } else {
+            System.setProperty("oak.lucene.enableSingleBlobIndexFiles", oldVal);
+        }
+    }
+
     private void assertFile(Directory dir, String file, byte[] expected)
             throws IOException {
         assertTrue(dir.fileExists(file));
diff --git a/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakStreamingIndexFileTest.java b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakStreamingIndexFileTest.java
new file mode 100644
index 0000000000..f87800beca
--- /dev/null
+++ b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakStreamingIndexFileTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.directory;
+
+import ch.qos.logback.classic.Level;
+import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Random;
+
+import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class OakStreamingIndexFileTest {
+
+    private Random rnd = new Random();
+
+    private NodeState root = EMPTY_NODE;
+
+    private NodeBuilder builder = root.builder();
+
+    private int fileSize = 1000 + rnd.nextInt(1000);
+
+    @Test
+    public void readSanity() throws Exception {
+        byte[] fileBytes = writeFile();
+
+        NodeBuilder fooBuilder = builder.child("foo");
+        try (OakStreamingIndexFile readFile = new OakStreamingIndexFile("foo", fooBuilder, "dirDetails",
+                BlobFactory.getNodeBuilderBlobFactory(fooBuilder))
+        ) {
+            byte[] readBytes = new byte[fileBytes.length];
+            readFile.readBytes(readBytes, 0, fileBytes.length);
+
+            assertTrue("Must get back same data", Arrays.equals(readBytes, fileBytes));
+
+            try {
+                readFile.readBytes(readBytes, 0, 1);
+                fail("Must not be able to read past stored number of bytes");
+            } catch (Exception ignored) {
+                // ignore
+            }
+        }
+    }
+
+    @Test
+    public void rangeReadWorks() throws Exception {
+        int numFewBytes = 100;
+
+        byte[] fileBytes = writeFile();
+        byte[] aFewBytes = Arrays.copyOfRange(fileBytes, 1, numFewBytes + 1);
+
+        NodeBuilder fooBuilder = builder.child("foo");
+        try (OakStreamingIndexFile readFile = new OakStreamingIndexFile("foo", fooBuilder, "dirDetails",
+                BlobFactory.getNodeBuilderBlobFactory(fooBuilder))
+        ) {
+            readFile.seek(1);
+            assertEquals("Seeking should move position", 1, readFile.position());
+
+            byte[] readBytes = new byte[numFewBytes];
+            readFile.readBytes(readBytes, 0, numFewBytes);
+            assertTrue("Reading a few bytes should be accurate", Arrays.equals(readBytes, aFewBytes));
+        }
+    }
+
+    @Test
+    public void rangeReadWorksOnSeekingBack() throws Exception {
+        int numFewBytes = 100;
+
+        byte[] fileBytes = writeFile();
+        byte[] aFewBytes = Arrays.copyOfRange(fileBytes, 1, numFewBytes + 1);
+
+        NodeBuilder fooBuilder = builder.child("foo");
+        try (OakStreamingIndexFile readFile = new OakStreamingIndexFile("foo", fooBuilder, "dirDetails",
+                BlobFactory.getNodeBuilderBlobFactory(fooBuilder))
+        ) {
+            byte[] readBytes = new byte[numFewBytes];
+
+            readFile.seek(100);
+            readFile.readBytes(readBytes, 0, 1);
+            readFile.seek(1);
+            assertEquals("Seeking should move position", 1, readFile.position());
+
+            readFile.readBytes(readBytes, 0, numFewBytes);
+            assertTrue("Reading a few bytes should be accurate", Arrays.equals(readBytes, aFewBytes));
+        }
+    }
+
+    @Test
+    public void cloneCreatesSimilarUnrelatedStreams() throws Exception {
+        int numFewBytes = 100;
+
+        byte[] fileBytes = writeFile();
+        byte[] aFewBytes = Arrays.copyOfRange(fileBytes, 2, numFewBytes + 2);
+
+        byte[] readBytes = new byte[numFewBytes];
+
+        OakStreamingIndexFile readFileClone;
+
+        NodeBuilder fooBuilder = builder.child("foo");
+        try (OakStreamingIndexFile readFile = new OakStreamingIndexFile("foo", fooBuilder, "dirDetails",
+                BlobFactory.getNodeBuilderBlobFactory(fooBuilder))
+        ) {
+            readFile.seek(1);
+            readFile.readBytes(readBytes, 0, 1);
+
+            readFileClone = (OakStreamingIndexFile) readFile.clone();
+
+            readFile.readBytes(readBytes, 0, numFewBytes);
+        }
+
+        assertNotNull("Clone reader should have been created", readFileClone);
+
+        try {
+            readFileClone.readBytes(readBytes, 0, numFewBytes);
+            assertTrue("Clone reader should start from same position as source",
+                    Arrays.equals(readBytes, aFewBytes));
+        } finally {
+            readFileClone.close();
+        }
+    }
+
+    @Test
+    public void streamingWritesDontWorkPiecewise() throws Exception {
+        NodeBuilder fooBuilder = builder.child("foo");
+        try (OakStreamingIndexFile writeFile = new OakStreamingIndexFile("foo", fooBuilder, "dirDetails",
+                BlobFactory.getNodeBuilderBlobFactory(fooBuilder))
+        ) {
+            byte[] fileBytes = randomBytes(fileSize);
+
+            writeFile.writeBytes(fileBytes, 0, fileBytes.length);
+            try {
+                writeFile.writeBytes(fileBytes, 0, 1);
+                fail("Multiple write bytes must not be allowed with streaming writes");
+            } catch (Exception ignored) {
+                //ignore
+            }
+            writeFile.flush();
+        }
+    }
+
+    @Test
+    public void seekScenarios() throws Exception {
+        byte[] fileBytes = writeFile();
+
+        NodeBuilder fooBuilder = builder.child("foo");
+        try (OakStreamingIndexFile readFile = new OakStreamingIndexFile("foo", fooBuilder, "dirDetails",
+                BlobFactory.getNodeBuilderBlobFactory(fooBuilder))
+        ) {
+            byte[] aFewBytes = new byte[10];
+            byte[] expectedFewBytes;
+
+            expectedFewBytes = Arrays.copyOfRange(fileBytes, 10, 10 + aFewBytes.length);
+            readFile.seek(10);
+            readFile.readBytes(aFewBytes, 0, aFewBytes.length);
+            assertTrue("Range read after seek should read accurately",
+                    Arrays.equals(expectedFewBytes, aFewBytes));
+
+
+            expectedFewBytes = Arrays.copyOfRange(fileBytes, 25, 25 + aFewBytes.length);
+            readFile.seek(25);
+            readFile.readBytes(aFewBytes, 0, aFewBytes.length);
+            assertTrue("Range read after seek should read accurately",
+                    Arrays.equals(expectedFewBytes, aFewBytes));
+
+
+            expectedFewBytes = Arrays.copyOfRange(fileBytes, 2, 2 + aFewBytes.length);
+            readFile.seek(2);
+            readFile.readBytes(aFewBytes, 0, aFewBytes.length);
+            assertTrue("Range read after backward seek should read accurately",
+                    Arrays.equals(expectedFewBytes, aFewBytes));
+
+        }
+    }
+
+    @Test
+    public void logWarnWhenSeekingBackAfterRead() throws Exception {
+        byte[] fileBytes = writeFile();
+
+        LogCustomizer logRecorder = LogCustomizer
+                .forLogger(OakStreamingIndexFile.class.getName()).enable(Level.WARN)
+                .contains("Seeking back on streaming index file").create();
+
+        NodeBuilder fooBuilder = builder.child("foo");
+        try (OakStreamingIndexFile readFile = new OakStreamingIndexFile("foo", fooBuilder, "dirDetails",
+                BlobFactory.getNodeBuilderBlobFactory(fooBuilder))
+        ) {
+            logRecorder.starting();
+            byte[] readBytes = new byte[fileBytes.length];
+
+            readFile.readBytes(readBytes, 0, 10);
+            assertEquals("Don't log for simple reads", 0, logRecorder.getLogs().size());
+
+            readFile.seek(12);
+            assertEquals("Don't log for forward seeks", 0, logRecorder.getLogs().size());
+
+            readFile.seek(2);
+            assertEquals("Log warning for backward seeks", 1, logRecorder.getLogs().size());
+        }
+
+        logRecorder.finished();
+    }
+
+    private byte[] writeFile() throws Exception {
+        byte[] fileBytes = randomBytes(fileSize);
+
+        NodeBuilder fooBuilder = builder.child("foo");
+        try (OakStreamingIndexFile writeFile = new OakStreamingIndexFile("foo", fooBuilder, "dirDetails",
+                BlobFactory.getNodeBuilderBlobFactory(fooBuilder))
+        ) {
+            writeFile.writeBytes(fileBytes, 0, fileBytes.length);
+            writeFile.flush();
+        }
+
+        return fileBytes;
+    }
+
+    private byte[] randomBytes(int size) {
+        byte[] data = new byte[size];
+        rnd.nextBytes(data);
+        return data;
+    }
+}
-- 
2.11.0

