Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java (revision 4ec5776dc3bd374bb477bb16a33587bf806a91a8) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java (revision 4ec5776dc3bd374bb477bb16a33587bf806a91a8) @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.segment; + +import static com.google.common.collect.Maps.newConcurrentMap; +import static com.google.common.collect.Sets.intersection; +import static com.google.common.collect.Sets.newHashSet; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static java.util.concurrent.Executors.newSingleThreadExecutor; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.jackrabbit.oak.segment.SegmentVersion.LATEST_VERSION; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; + +import javax.annotation.CheckForNull; +import javax.annotation.Nonnull; + +import com.google.common.base.Suppliers; +import org.apache.jackrabbit.oak.segment.WriteOperationHandler.WriteOperation; +import org.apache.jackrabbit.oak.segment.memory.MemoryStore; +import org.junit.After; +import org.junit.Test; + +public class SegmentBufferWriterPoolTest { + private final MemoryStore store = new MemoryStore(); + + private final RecordId rootId = store.getRevisions().getHead(); + + private final SegmentBufferWriterPool pool = new SegmentBufferWriterPool( + store, store.getTracker(), store.getReader(), LATEST_VERSION, "", Suppliers.ofInstance(0)); + + private final ExecutorService[] executors = new ExecutorService[] { + newSingleThreadExecutor(), newSingleThreadExecutor(), newSingleThreadExecutor()}; + + public SegmentBufferWriterPoolTest() throws IOException { } + + private Future execute(final WriteOperation op, int executor) { + return executors[executor].submit(new Callable() { + @Override + public RecordId call() throws Exception { + return pool.execute(op); + } + }); + } + + private WriteOperation createOp(final String key, final ConcurrentMap map) { + return new WriteOperation() { + @Nonnull @Override + public RecordId execute(@Nonnull SegmentBufferWriter writer) { + map.put(key, writer); + return rootId; + } + }; + } + + @After + public void tearDown() { + for (ExecutorService executor : executors) { + executor.shutdown(); + } + } + + @Test + public void testThreadAffinity() throws IOException, ExecutionException, InterruptedException { + ConcurrentMap map1 = newConcurrentMap(); + Future res1 = execute(createOp("a", map1), 0); + Future res2 = execute(createOp("b", map1), 1); + Future res3 = execute(createOp("c", map1), 2); + + // Give the tasks some time to complete + sleepUninterruptibly(10, MILLISECONDS); + + assertEquals(rootId, res1.get()); + assertEquals(rootId, res2.get()); + assertEquals(rootId, res3.get()); + assertEquals(3, map1.size()); + + ConcurrentMap map2 = newConcurrentMap(); + Future res4 = execute(createOp("a", map2), 0); + Future res5 = execute(createOp("b", map2), 1); + Future res6 = execute(createOp("c", map2), 2); + + // Give the tasks some time to complete + sleepUninterruptibly(10, MILLISECONDS); + + assertEquals(rootId, res4.get()); + assertEquals(rootId, res5.get()); + assertEquals(rootId, res6.get()); + assertEquals(3, map2.size()); + assertEquals(map1, map2); + } + + @Test + public void testFlush() throws ExecutionException, InterruptedException, IOException { + ConcurrentMap map1 = newConcurrentMap(); + Future res1 = execute(createOp("a", map1), 0); + Future res2 = execute(createOp("b", map1), 1); + Future res3 = execute(createOp("c", map1), 2); + + // Give the tasks some time to complete + sleepUninterruptibly(10, MILLISECONDS); + + assertEquals(rootId, res1.get()); + assertEquals(rootId, res2.get()); + assertEquals(rootId, res3.get()); + assertEquals(3, map1.size()); + + pool.flush(); + + ConcurrentMap map2 = newConcurrentMap(); + Future res4 = execute(createOp("a", map2), 0); + Future res5 = execute(createOp("b", map2), 1); + Future res6 = execute(createOp("c", map2), 2); + + // Give the tasks some time to complete + sleepUninterruptibly(10, MILLISECONDS); + + assertEquals(rootId, res4.get()); + assertEquals(rootId, res5.get()); + assertEquals(rootId, res6.get()); + assertEquals(3, map2.size()); + assertTrue(intersection(newHashSet(map1.values()), newHashSet(map2.values())).isEmpty()); + } + + @Test + public void testFlushBlocks() throws ExecutionException, InterruptedException { + Future res = execute(new WriteOperation() { + @CheckForNull @Override + public RecordId execute(@Nonnull SegmentBufferWriter writer) { + try { + // This should deadlock as flush waits for this write + // operation to finish, which in this case contains the + // call to flush itself. + executors[1].submit(new Callable() { + @Override + public Void call() throws Exception { + pool.flush(); + return null; + } + }).get(100, MILLISECONDS); + return null; // No deadlock -> null indicates test failure + } catch (InterruptedException | ExecutionException ignore) { + return null; // No deadlock -> null indicates test failure + } catch (TimeoutException e) { + return rootId; // Deadlock -> rootId indicates test pass + } + } + }, 0); + + // Give the tasks some time to complete + sleepUninterruptibly(10, MILLISECONDS); + assertEquals(rootId, res.get()); + } + +}