From 6cb77a0ed64eb812cc0b9ce8e0be6d9a412da317 Mon Sep 17 00:00:00 2001
From: Mohit Kataria <tihom88@gmail.com>
Date: Tue, 23 Apr 2019 13:46:33 +0530
Subject: [PATCH] OAK-8261: Indexing lane failing but the index is not marked
 corrupt

---
 .../oak/plugins/blob/BlobStoreBlob.java       |  32 +-
 .../index/lucene/LuceneIndexEditor.java       |  42 ++-
 .../AsyncIndexUpdateCorruptMarkingTest.java   | 354 ++++++++++++++++++
 3 files changed, 399 insertions(+), 29 deletions(-)
 create mode 100644 oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/AsyncIndexUpdateCorruptMarkingTest.java

diff --git a/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobStoreBlob.java b/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobStoreBlob.java
index 2fcec3f53a..dd550a968c 100644
--- a/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobStoreBlob.java
+++ b/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/BlobStoreBlob.java
@@ -18,22 +18,25 @@
  */
 package org.apache.jackrabbit.oak.plugins.blob;
 
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
 
 /**
  * A blob implementation.
  */
 public class BlobStoreBlob implements Blob {
-    
+    private static final Logger LOG = LoggerFactory.getLogger(BlobStoreBlob.class);
+
     private final BlobStore blobStore;
     private final String blobId;
-    
+
     public BlobStoreBlob(BlobStore blobStore, String blobId) {
         this.blobStore = blobStore;
         this.blobId = blobId;
@@ -45,8 +48,14 @@ public class BlobStoreBlob implements Blob {
         try {
             return blobStore.getInputStream(blobId);
         } catch (IOException e) {
-            throw new RuntimeException("Error occurred while obtaining " +
-                    "InputStream for blobId ["+ blobId +"]",e);
+            LOG.warn("Error occurred while obtaining " +
+                    "InputStream for blobId [" + blobId + "]", e);
+            return new InputStream() {
+                @Override
+                public int read() throws IOException {
+                    throw new IOException(e);
+                }
+            };
         }
     }
 
@@ -59,7 +68,8 @@ public class BlobStoreBlob implements Blob {
         }
     }
 
-    @Override @Nullable
+    @Override
+    @Nullable
     public String getReference() {
         return blobStore.getReference(blobId);
     }
@@ -83,7 +93,7 @@ public class BlobStoreBlob implements Blob {
     public String toString() {
         return blobId;
     }
-    
+
     @Override
     public int hashCode() {
         return blobId.hashCode();
@@ -93,7 +103,7 @@ public class BlobStoreBlob implements Blob {
     public boolean equals(Object other) {
         if (this == other) {
             return true;
-        } 
+        }
         if (other instanceof BlobStoreBlob) {
             BlobStoreBlob b = (BlobStoreBlob) other;
             // theoretically, the data could be the same  
diff --git a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor.java b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor.java
index 14481af7fa..7eba981b9c 100644
--- a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor.java
+++ b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor.java
@@ -16,11 +16,6 @@
  */
 package org.apache.jackrabbit.oak.plugins.index.lucene;
 
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -42,6 +37,11 @@ import org.apache.lucene.document.Document;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
 import static org.apache.jackrabbit.oak.commons.PathUtils.concat;
 
 /**
@@ -57,13 +57,19 @@ public class LuceneIndexEditor implements IndexEditor, Aggregate.AggregateRoot {
 
     private final LuceneIndexEditorContext context;
 
-    /** Name of this node, or {@code null} for the root node. */
+    /**
+     * Name of this node, or {@code null} for the root node.
+     */
     private final String name;
 
-    /** Parent editor or {@code null} if this is the root editor. */
+    /**
+     * Parent editor or {@code null} if this is the root editor.
+     */
     private final LuceneIndexEditor parent;
 
-    /** Path of this editor, built lazily in {@link #getPath()}. */
+    /**
+     * Path of this editor, built lazily in {@link #getPath()}.
+     */
     private String path;
 
     private boolean propertiesChanged = false;
@@ -96,7 +102,7 @@ public class LuceneIndexEditor implements IndexEditor, Aggregate.AggregateRoot {
     private LuceneIndexEditor(LuceneIndexEditor parent, String name,
                               MatcherState matcherState,
                               PathFilter.Result pathFilterResult,
-            boolean isDeleted) {
+                              boolean isDeleted) {
         this.parent = parent;
         this.name = name;
         this.path = null;
@@ -116,7 +122,7 @@ public class LuceneIndexEditor implements IndexEditor, Aggregate.AggregateRoot {
     @Override
     public void enter(NodeState before, NodeState after)
             throws CommitFailedException {
-        if (EmptyNodeState.MISSING_NODE == before && parent == null){
+        if (EmptyNodeState.MISSING_NODE == before && parent == null) {
             context.enableReindexMode();
         }
 
@@ -147,7 +153,7 @@ public class LuceneIndexEditor implements IndexEditor, Aggregate.AggregateRoot {
             }
         }
 
-        for (Matcher m : matcherState.affectedMatchers){
+        for (Matcher m : matcherState.affectedMatchers) {
             m.markRootDirty();
         }
 
@@ -231,14 +237,14 @@ public class LuceneIndexEditor implements IndexEditor, Aggregate.AggregateRoot {
                 this.context.indexUpdate();
             } catch (IOException e) {
                 CommitFailedException ce = new CommitFailedException("Lucene", 5, "Failed to remove the index entries of"
-                                + " the removed subtree " + path + "for index " + context.getIndexingContext().getIndexPath(), e);
+                        + " the removed subtree " + path + "for index " + context.getIndexingContext().getIndexPath(), e);
                 context.getIndexingContext().indexUpdateFailed(ce);
                 throw ce;
             }
         }
 
         MatcherState ms = getMatcherState(name, before);
-        if (!ms.isEmpty()){
+        if (!ms.isEmpty()) {
             return new LuceneIndexEditor(this, name, ms, filterResult, true);
         }
         return null; // no need to recurse down the removed subtree
@@ -291,11 +297,11 @@ public class LuceneIndexEditor implements IndexEditor, Aggregate.AggregateRoot {
         List<Matcher> inherited = Lists.newArrayList();
         for (Matcher m : Iterables.concat(matcherState.inherited, currentMatchers)) {
             Matcher result = m.match(name, after);
-            if (result.getStatus() == Matcher.Status.MATCH_FOUND){
+            if (result.getStatus() == Matcher.Status.MATCH_FOUND) {
                 matched.add(result);
             }
 
-            if (result.getStatus() != Matcher.Status.FAIL){
+            if (result.getStatus() != Matcher.Status.FAIL) {
                 inherited.addAll(result.nextSet());
             }
         }
@@ -330,13 +336,13 @@ public class LuceneIndexEditor implements IndexEditor, Aggregate.AggregateRoot {
         final Set<Matcher> affectedMatchers;
 
         public MatcherState(List<Matcher> matched,
-                            List<Matcher> inherited){
+                            List<Matcher> inherited) {
             this.matched = matched;
             this.inherited = inherited;
 
             //Affected matches would only be used when there are
             //some matched matchers
-            if (matched.isEmpty()){
+            if (matched.isEmpty()) {
                 affectedMatchers = Collections.emptySet();
             } else {
                 affectedMatchers = Sets.newIdentityHashSet();
@@ -390,7 +396,7 @@ public class LuceneIndexEditor implements IndexEditor, Aggregate.AggregateRoot {
         return context.getDefinition();
     }
 
-    private boolean isIndexable(){
+    private boolean isIndexable() {
         return indexingRule != null;
     }
 
diff --git a/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/AsyncIndexUpdateCorruptMarkingTest.java b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/AsyncIndexUpdateCorruptMarkingTest.java
new file mode 100644
index 0000000000..24c08caf05
--- /dev/null
+++ b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/AsyncIndexUpdateCorruptMarkingTest.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.core.data.FileDataStore;
+import org.apache.jackrabbit.oak.InitialContent;
+import org.apache.jackrabbit.oak.Oak;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.ContentRepository;
+import org.apache.jackrabbit.oak.api.ContentSession;
+import org.apache.jackrabbit.oak.api.Root;
+import org.apache.jackrabbit.oak.api.Tree;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.apache.jackrabbit.oak.plugins.blob.BlobStoreStats;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.OakFileDataStore;
+import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate;
+import org.apache.jackrabbit.oak.plugins.index.TrackingCorruptIndexHandler;
+import org.apache.jackrabbit.oak.plugins.index.lucene.directory.CopyOnReadDirectory;
+import org.apache.jackrabbit.oak.plugins.index.lucene.util.IndexDefinitionBuilder;
+import org.apache.jackrabbit.oak.plugins.index.nodetype.NodeTypeIndexProvider;
+import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.index.search.ExtractedTextCache;
+import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
+import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder;
+import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.blob.stats.BlobStatsCollector;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
+import org.apache.jackrabbit.oak.spi.query.QueryIndexProvider;
+import org.apache.jackrabbit.oak.spi.security.OpenSecurityProvider;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.FilterDirectory;
+import org.junit.*;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests marking index as corrupt if blob is missing.
+ * {@link org.apache.jackrabbit.oak.segment.SegmentNodeStore}.
+ */
+@RunWith(Parameterized.class)
+public class AsyncIndexUpdateCorruptMarkingTest {
+
+    private static final File DIRECTORY = new File("target/fs");
+    private static String FOO = "foo";
+    private static final String FOO_QUERY = "select [jcr:path] from [nt:base] where contains('foo', '*')";
+    private final long INDEX_CORRUPT_INTERVAL_IN_SECONDS = 2;
+    private long INDEX_ERROR_WARN_INTERVAL_IN_SECONDS = 1;
+
+    private final boolean copyOnRW;
+    private final String codec;
+    private final boolean indexOnFS;
+    private final int minRecordLength;
+    private final String mergePolicy;
+    protected ContentSession session;
+    protected Root root;
+
+    @Before
+    public void before() throws Exception {
+        session = createRepository().login(null, null);
+        root = session.getLatestRoot();
+    }
+
+    private ExecutorService executorService = Executors.newFixedThreadPool(2);
+    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+    @Rule
+    public TemporaryFolder temporaryFolder = new TemporaryFolder(new File("target"));
+    private String corDir = null;
+    private String cowDir = null;
+
+    private TestUtil.OptionalEditorProvider optionalEditorProvider = new TestUtil.OptionalEditorProvider();
+    private FileStore fileStore;
+    private DataStoreBlobStore dataStoreBlobStore;
+    private DefaultStatisticsProvider statisticsProvider;
+    private String fdsDir;
+    private String indexPath;
+    private AsyncIndexUpdate asyncIndexUpdate;
+
+
+    public AsyncIndexUpdateCorruptMarkingTest(boolean copyOnRW, String codec, boolean indexOnFS, int minRecordLength, String mergePolicy) {
+        this.copyOnRW = copyOnRW;
+        this.codec = codec;
+        this.indexOnFS = indexOnFS;
+        this.minRecordLength = minRecordLength;
+        this.mergePolicy = mergePolicy;
+    }
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][]{
+                {false, "oakCodec", false, 4096, "tiered"},
+        });
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        if (!DIRECTORY.exists()) {
+            assert DIRECTORY.mkdirs();
+        }
+    }
+
+    @After
+    public void after() {
+        new ExecutorCloser(executorService).close();
+        IndexDefinition.setDisableStoredIndexDefinition(false);
+        fileStore.close();
+        if (DIRECTORY.exists()) {
+            try {
+                FileUtils.deleteDirectory(DIRECTORY);
+            } catch (IOException e) {
+                // do nothing
+            }
+        }
+    }
+
+    protected ContentRepository createRepository() {
+        LuceneIndexEditorProvider editorProvider;
+        LuceneIndexProvider provider;
+        if (copyOnRW) {
+            IndexCopier copier = createIndexCopier();
+            editorProvider = new LuceneIndexEditorProvider(copier, new ExtractedTextCache(10 * FileUtils.ONE_MB, 100));
+            provider = new LuceneIndexProvider(copier);
+        } else {
+            editorProvider = new LuceneIndexEditorProvider();
+            provider = new LuceneIndexProvider();
+        }
+
+        NodeStore nodeStore;
+        try {
+            statisticsProvider = new DefaultStatisticsProvider(scheduledExecutorService);
+            fileStore = FileStoreBuilder.fileStoreBuilder(DIRECTORY)
+                    .withStatisticsProvider(statisticsProvider)
+                    .withBlobStore(createBlobStore())
+                    .build();
+            nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build();
+        } catch (IOException | InvalidFileStoreVersionException e) {
+            throw new RuntimeException(e);
+        }
+
+        asyncIndexUpdate = new AsyncIndexUpdate("async", nodeStore, editorProvider);
+        TrackingCorruptIndexHandler trackingCorruptIndexHandler = new TrackingCorruptIndexHandler();
+        trackingCorruptIndexHandler.setCorruptInterval(INDEX_CORRUPT_INTERVAL_IN_SECONDS, TimeUnit.SECONDS);
+        trackingCorruptIndexHandler.setErrorWarnInterval(INDEX_ERROR_WARN_INTERVAL_IN_SECONDS, TimeUnit.SECONDS);
+        asyncIndexUpdate.setCorruptIndexHandler(trackingCorruptIndexHandler);
+        return new Oak(nodeStore)
+                .with(new InitialContent())
+                .with(new OpenSecurityProvider())
+                .with((QueryIndexProvider) provider)
+                .with((Observer) provider)
+                .with(editorProvider)
+                .with(optionalEditorProvider)
+                .with(new PropertyIndexEditorProvider())
+                .with(new NodeTypeIndexProvider())
+                .createContentRepository();
+    }
+
+    private BlobStore createBlobStore() {
+        FileDataStore fds = new OakFileDataStore();
+        fdsDir = "target/fds-" + codec + copyOnRW + minRecordLength + mergePolicy;
+        fds.setPath(fdsDir);
+        if (minRecordLength > 0) {
+            fds.setMinRecordLength(minRecordLength);
+        }
+        fds.init(null);
+        dataStoreBlobStore = new DataStoreBlobStore(fds);
+        StatisticsProvider sp = new DefaultStatisticsProvider(scheduledExecutorService);
+        BlobStatsCollector collector = new BlobStoreStats(sp);
+        dataStoreBlobStore.setBlobStatsCollector(collector);
+        return dataStoreBlobStore;
+    }
+
+    private IndexCopier createIndexCopier() {
+        try {
+            return new IndexCopier(executorService, temporaryFolder.getRoot()) {
+                @Override
+                public Directory wrapForRead(String indexPath, LuceneIndexDefinition definition,
+                                             Directory remote, String dirName) throws IOException {
+                    Directory ret = super.wrapForRead(indexPath, definition, remote, dirName);
+                    corDir = getFSDirPath(ret);
+                    return ret;
+                }
+
+                @Override
+                public Directory wrapForWrite(LuceneIndexDefinition definition,
+                                              Directory remote, boolean reindexMode, String dirName) throws IOException {
+                    Directory ret = super.wrapForWrite(definition, remote, reindexMode, dirName);
+                    cowDir = getFSDirPath(ret);
+                    return ret;
+                }
+
+                private String getFSDirPath(Directory dir) {
+                    if (dir instanceof CopyOnReadDirectory) {
+                        dir = ((CopyOnReadDirectory) dir).getLocal();
+                    }
+
+                    dir = unwrap(dir);
+
+                    if (dir instanceof FSDirectory) {
+                        return ((FSDirectory) dir).getDirectory().getAbsolutePath();
+                    }
+                    return null;
+                }
+
+                private Directory unwrap(Directory dir) {
+                    if (dir instanceof FilterDirectory) {
+                        return unwrap(((FilterDirectory) dir).getDelegate());
+                    }
+                    return dir;
+                }
+
+            };
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @After
+    public void shutdownExecutor() {
+        executorService.shutdown();
+        scheduledExecutorService.shutdown();
+    }
+
+    private void deleteBlobs(String path) {
+        File file = new File(path);
+        while (file.listFiles().length > 0) {
+            File folder = file.listFiles()[0];
+            try {
+                FileUtils.deleteDirectory(folder);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @Test
+    public void testLuceneIndexSegmentStats() throws Exception {
+        root.commit();
+        root.getTree("/oak:index/counter").remove();
+        root.commit();
+
+        IndexDefinitionBuilder idxb = new IndexDefinitionBuilder()
+                //.noAsync()
+                .codec(codec)
+                .mergePolicy(mergePolicy);
+        idxb.indexRule("nt:base").property(FOO).analyzed().nodeScopeIndex().ordered().useInExcerpt().propertyIndex();
+        idxb.indexRule("nt:base").property("bin").analyzed().nodeScopeIndex().ordered().useInExcerpt().propertyIndex();
+        Tree idx = root.getTree("/").getChild("oak:index").addChild("lucenePropertyIndex");
+        Tree idxDef = idxb.build(idx);
+        if (!codec.equals("oakCodec") && indexOnFS) {
+            idxDef.setProperty("persistence", "file");
+            indexPath = "target/index-" + codec + copyOnRW;
+            idxDef.setProperty("path", indexPath);
+        }
+        System.out.println("***");
+        System.out.println(codec + "," + copyOnRW + "," + indexOnFS + "," + minRecordLength + "," + mergePolicy);
+        root.getTree("/").addChild("content");
+        ContentCreator contentCreator = new ContentCreator();
+        contentCreator.run();
+        root.commit();
+        asyncIndexUpdate.run();
+        ScheduledExecutorService executorService = Executors
+                .newSingleThreadScheduledExecutor();
+        executorService.scheduleAtFixedRate(contentCreator, 0, 10, TimeUnit.MILLISECONDS);
+
+        Thread.sleep(200);
+        contentCreator.setStopContentCreator();
+        Thread.sleep(50);
+        deleteBlobs(fdsDir);
+        asyncIndexUpdate.run(); // As blobs are deleted at this point index will be marked as bad.
+        Thread.sleep(100);
+        Thread.sleep(INDEX_CORRUPT_INTERVAL_IN_SECONDS *1000);
+        asyncIndexUpdate.run(); // after corrupt interval index will be marked as corrupt.
+        Thread.sleep(100);
+        assertTrue(null != root.getTree("/oak:index/lucenePropertyIndex").getProperty("corrupt"));
+
+    }
+
+    private class ContentCreator implements Runnable {
+        private static final String STRINGSET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+        private volatile boolean stopContentCreator = false;
+
+        private long numberOfNodes = 100;
+        private int randomStringLength = 100;
+        private int randomNodeNameLength = 8;
+
+        private String randomString(int count) {
+            StringBuilder builder = new StringBuilder();
+            while (count-- != 0) {
+                int character = (int) (Math.random() * STRINGSET.length());
+                builder.append(STRINGSET.charAt(character));
+            }
+            return builder.toString();
+        }
+
+        public void setStopContentCreator() {
+            stopContentCreator = true;
+        }
+
+        public void run() {
+            if (!stopContentCreator) {
+                Tree rootTree = root.getTree("/content");
+                for (int i = 0; i < numberOfNodes; i++) {
+                    String text = randomString(randomStringLength);
+                    Tree tree = rootTree.addChild(String.valueOf(randomString(randomNodeNameLength).trim() + i));
+                    tree.setProperty(FOO, text);
+                }
+            } else {
+                try {
+                    root.commit();
+                    fileStore.flush();
+                } catch (IOException | CommitFailedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+}
-- 
2.17.1

