From 78f518128333c10cb0494cd75daa08bfbb412765 Mon Sep 17 00:00:00 2001
From: Vikas Saurabh <vsaurabh@adobe.com>
Date: Sat, 10 Jun 2017 05:40:24 +0530
Subject: [PATCH] OAK-6314: ActiveDeletedBlobCollectorTest.multiThreadedCommits
 is failing intermittently for a few users

There were 2 issues that we fixed here:
* Blocking queue sometimes won't give all items to "consumer" immediately (we tried iterating a few times and yet the items didn't show up)
** For this, we not keep deleting some "MARKER*" blobs and wait for them to be purged
* Data appened to deletedBlobFile doesn't immediately show up during
read while purging
** For this, we change the implementation a bit to not delete last file that might have been used for writing when purge is called. This file would be read/processed/deleted in the next run now.
---
 .../ActiveDeletedBlobCollectorFactory.java         | 31 ++++++++---
 .../directory/ActiveDeletedBlobCollectorTest.java  | 62 ++++++++++++++++++----
 2 files changed, 76 insertions(+), 17 deletions(-)

diff --git a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorFactory.java b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorFactory.java
index b6a6e46805..777a148401 100644
--- a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorFactory.java
+++ b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorFactory.java
@@ -155,13 +155,14 @@ public class ActiveDeletedBlobCollectorFactory {
         /**
          * Purges blobs form blob-store which were tracked earlier to deleted.
          * @param before only purge blobs which were deleted before this timestamps
-         * @param blobStore
+         * @param blobStore used to purge blobs/chunks
          */
         public void purgeBlobsDeleted(long before, @Nonnull GarbageCollectableBlobStore blobStore) {
             long numBlobsDeleted = 0;
             long numChunksDeleted = 0;
 
             long lastCheckedBlobTimestamp = readLastCheckedBlobTimestamp();
+            String currInUseFileName = deletedBlobsFileWriter.inUseFileName;
             deletedBlobsFileWriter.releaseInUseFile();
             for (File deletedBlobListFile : FileUtils.listFiles(rootDirectory, blobFileNameFilter, null)) {
                 if (deletedBlobListFile.getName().equals(deletedBlobsFileWriter.inUseFileName)) {
@@ -219,7 +220,10 @@ public class ActiveDeletedBlobCollectorFactory {
                         LOG.warn("Couldn't read deleted blob list file - " + deletedBlobListFile, ioe);
                     }
 
-                    if (!deletedBlobListFile.delete()) {
+                    // OAK-6314 revealed that blobs appended might not be immediately available. So, we'd skip
+                    // the file that was being processed when purge started - next cycle would re-process and
+                    // delete
+                    if (!deletedBlobListFile.getName().equals(currInUseFileName) && !deletedBlobListFile.delete()) {
                         LOG.warn("File {} couldn't be deleted while all blobs listed in it have been purged", deletedBlobListFile);
                     }
                 } else {
@@ -232,6 +236,10 @@ public class ActiveDeletedBlobCollectorFactory {
 
         private long readLastCheckedBlobTimestamp() {
             File blobCollectorInfoFile = new File(rootDirectory, "collection-info.txt");
+            if (!blobCollectorInfoFile.exists()) {
+                LOG.debug("Couldn't read last checked blob timestamp (file not found). Would do a bit more scan");
+                return -1;
+            }
             InputStream is = null;
             Properties p;
             try {
@@ -239,7 +247,8 @@ public class ActiveDeletedBlobCollectorFactory {
                 p = new Properties();
                 p.load(is);
             } catch (IOException e) {
-                LOG.info("Couldn't read last checked blob timestamp... would do a bit more scan");
+                LOG.warn("Couldn't read last checked blob timestamp from {} ... would do a bit more scan",
+                        blobCollectorInfoFile, e);
                 return -1;
             } finally {
                 org.apache.commons.io.IOUtils.closeQuietly(is);
@@ -292,16 +301,23 @@ public class ActiveDeletedBlobCollectorFactory {
         }
 
         private void addDeletedBlobs(Collection<BlobIdInfoStruct> deletedBlobs) {
+            int addedForFlush = 0;
             for (BlobIdInfoStruct info : deletedBlobs) {
                 try {
                     if (!this.deletedBlobs.offer(info, 1, TimeUnit.SECONDS)) {
                         LOG.warn("Timed out while offer-ing {} into queue.", info);
                     }
+                    if (LOG.isDebugEnabled()) {
+                        addedForFlush++;
+                    }
                 } catch (InterruptedException e) {
                     LOG.warn("Interrupted while adding " + info, e);
                 }
             }
-            LOG.debug("Added {} to be flushed", deletedBlobs.size());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Added {} (out of {} tried) to be flushed. QSize: {}",
+                        addedForFlush, deletedBlobs.size(), this.deletedBlobs.size());
+            }
             deletedBlobsFileWriter.scheduleFileFlushIfNeeded();
         }
 
@@ -312,9 +328,7 @@ public class ActiveDeletedBlobCollectorFactory {
 
             private synchronized void flushDeletedBlobs() {
                 List<BlobIdInfoStruct> localDeletedBlobs = new LinkedList<>();
-                while (deletedBlobs.peek() != null) {
-                    localDeletedBlobs.add(deletedBlobs.poll());
-                }
+                deletedBlobs.drainTo(localDeletedBlobs);
                 if (localDeletedBlobs.size() > 0) {
                     File outFile = new File(rootDirectory, getBlobFileName());
                     try {
@@ -324,6 +338,9 @@ public class ActiveDeletedBlobCollectorFactory {
                     } catch (IOException e) {
                         LOG.error("Couldn't write out to " + outFile, e);
                     }
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Flushed {} blobs to {}", localDeletedBlobs.size(), outFile.getName());
+                    }
                 }
             }
 
diff --git a/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorTest.java b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorTest.java
index 47cd797a8d..6c9b8f9f5d 100644
--- a/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorTest.java
+++ b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorTest.java
@@ -27,7 +27,6 @@ import org.apache.jackrabbit.oak.spi.blob.BlobOptions;
 import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
 import org.apache.jackrabbit.oak.stats.Clock;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -43,6 +42,7 @@ import java.nio.file.Path;
 import java.nio.file.attribute.PosixFilePermission;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -55,7 +55,9 @@ import static org.apache.jackrabbit.oak.plugins.index.IndexCommitCallback.IndexP
 import static org.apache.jackrabbit.oak.plugins.index.IndexCommitCallback.IndexProgress.COMMIT_SUCCEDED;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeTrue;
 
 public class ActiveDeletedBlobCollectorTest {
@@ -166,13 +168,12 @@ public class ActiveDeletedBlobCollectorTest {
         verifyBlobsDeleted("blobId1", "blobId2", "blobId3");
     }
 
-    @Ignore("OAK-6314")
     @Test
     public void multiThreadedCommits() throws Exception {
-        clock = Clock.SIMPLE;
         ExecutorService executorService = Executors.newFixedThreadPool(3);
-        adbc = ActiveDeletedBlobCollectorFactory.newInstance(
-                new File(blobCollectionRoot.getRoot(), "b"), executorService);
+        File rootDirectory = new File(blobCollectionRoot.getRoot(), "b");
+        FileUtils.forceMkdir(rootDirectory);
+        adbc = new ActiveDeletedBlobCollectorImpl(clock, rootDirectory, executorService);
 
         int numThreads = 4;
         int numBlobsPerThread = 500;
@@ -214,10 +215,8 @@ public class ActiveDeletedBlobCollectorTest {
             t.join();
         }
 
-        // Push one more commit to flush out any remaining ones
-        adbc.getBlobDeletionCallback().commitProgress(COMMIT_SUCCEDED);
-
-        executorService.awaitTermination(100, TimeUnit.MILLISECONDS);
+        boolean timeout = executorService.awaitTermination(100, TimeUnit.MILLISECONDS);
+        assertFalse(timeout);
 
         List<String> deletedChunks = new ArrayList<>(numThreads*numBlobsPerThread*2);
         for (int threadNum = 0; threadNum < numThreads; threadNum++) {
@@ -227,7 +226,36 @@ public class ActiveDeletedBlobCollectorTest {
             }
         }
 
-        adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
+        // Blocking queue doesn't supply all the items immediately.
+        // So, we'd push "MARKER*" blob ids and purge until some marker blob
+        // gets purged. BUT, we'd time-out this activity in 3 seconds
+        long until = Clock.SIMPLE.getTime() + TimeUnit.SECONDS.toMillis(3);
+        List<String> markerChunks = Lists.newArrayList();
+        int i = 0;
+        while (Clock.SIMPLE.getTime() < until) {
+            // Push commit with a marker blob-id and wait for it to be purged
+            BlobDeletionCallback bdc = adbc.getBlobDeletionCallback();
+            String markerBlobId = "MARKER-" + (i++);
+            bdc.deleted(markerBlobId, Lists.newArrayList(markerBlobId));
+            bdc.commitProgress(COMMIT_SUCCEDED);
+
+            Iterators.addAll(markerChunks, blobStore.resolveChunks(markerBlobId));
+            clock.waitUntil(clock.getTime() + TimeUnit.SECONDS.toMillis(5));
+            adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
+
+            if (blobStore.markerChunkDeleted) {
+                break;
+            }
+        }
+
+        assertTrue("Timed out while waiting for marker chunk to be purged", blobStore.markerChunkDeleted);
+
+        // don't care how many marker blobs are purged
+        blobStore.deletedChunkIds.removeAll(markerChunks);
+
+        HashSet<String> list = new HashSet<>(deletedChunks);
+        list.removeAll(blobStore.deletedChunkIds);
+        assertTrue("size: " + list.size() + "; list: " + list.toString(), list.isEmpty());
 
         assertThat(blobStore.deletedChunkIds, containsInAnyOrder(deletedChunks.toArray()));
     }
@@ -271,6 +299,7 @@ public class ActiveDeletedBlobCollectorTest {
 
     class ChunkDeletionTrackingBlobStore implements GarbageCollectableBlobStore {
         List<String> deletedChunkIds = Lists.newArrayList();
+        volatile boolean markerChunkDeleted = false;
 
         @Override
         public String writeBlob(InputStream in) throws IOException {
@@ -350,15 +379,28 @@ public class ActiveDeletedBlobCollectorTest {
         @Override
         public boolean deleteChunks(List<String> chunkIds, long maxLastModifiedTime) throws Exception {
             deletedChunkIds.addAll(chunkIds);
+            setMarkerChunkDeletedFlag(chunkIds);
             return true;
         }
 
         @Override
         public long countDeleteChunks(List<String> chunkIds, long maxLastModifiedTime) throws Exception {
             deletedChunkIds.addAll(chunkIds);
+            setMarkerChunkDeletedFlag(chunkIds);
             return chunkIds.size();
         }
 
+        private void setMarkerChunkDeletedFlag(List<String> deletedChunkIds) {
+            if (!markerChunkDeleted) {
+                for (String chunkId : deletedChunkIds) {
+                    if (chunkId.startsWith("MARKER")) {
+                        markerChunkDeleted = true;
+                        break;
+                    }
+                }
+            }
+        }
+
         @Override
         public Iterator<String> resolveChunks(String blobId) throws IOException {
             return Iterators.forArray(blobId + "-1", blobId + "-2");
-- 
2.11.0

