From 3eed36491bfcb0055260bbd65929277b80723de2 Mon Sep 17 00:00:00 2001 From: Zhong Date: Wed, 8 Nov 2017 11:41:13 +0800 Subject: [PATCH] APACHE-KYLIN-2986: add a cube instance lock in promoteNewlyBuiltSegments() in CubeManager for concurrently call --- .../java/org/apache/kylin/cube/CubeManager.java | 33 ++++---- .../org/apache/kylin/cube/CubeManagerTest.java | 89 ++++++++++++++++++++++ 2 files changed, 107 insertions(+), 15 deletions(-) diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 5e72721..b97aec7 100755 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -736,26 +736,29 @@ public class CubeManager implements IRealizationProvider { logger.warn("For cube " + cube + ", segment " + newSegment + " state should be NEW but is READY"); } - List tobe = cube.calculateToBeSegments(newSegment); + synchronized (cube) { + List tobe = cube.calculateToBeSegments(newSegment); - if (tobe.contains(newSegment) == false) - throw new IllegalStateException( - "For cube " + cube + ", segment " + newSegment + " is expected but not in the tobe " + tobe); + if (tobe.contains(newSegment) == false) + throw new IllegalStateException( + "For cube " + cube + ", segment " + newSegment + " is expected but not in the tobe " + tobe); - newSegment.setStatus(SegmentStatusEnum.READY); + newSegment.setStatus(SegmentStatusEnum.READY); - List toRemoveSegs = Lists.newArrayList(); - for (CubeSegment segment : cube.getSegments()) { - if (!tobe.contains(segment)) - toRemoveSegs.add(segment); - } + List toRemoveSegs = Lists.newArrayList(); + for (CubeSegment segment : cube.getSegments()) { + if (!tobe.contains(segment)) + toRemoveSegs.add(segment); + } - logger.info("Promoting cube " + cube + ", new segment " + newSegment + ", to remove segments " + toRemoveSegs); + logger.info( + "Promoting cube " + cube + ", new segment " + newSegment + ", to remove segments " + toRemoveSegs); - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()])) - .setToUpdateSegs(newSegment).setStatus(RealizationStatusEnum.READY); - updateCube(cubeBuilder); + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()])) + .setToUpdateSegs(newSegment).setStatus(RealizationStatusEnum.READY); + updateCube(cubeBuilder); + } } public void promoteNewlyOptimizeSegments(CubeInstance cube, CubeSegment... optimizedSegments) throws IOException { diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java index bb91650..4e7c730 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue; import java.util.List; import java.util.Map; import java.util.NavigableSet; +import java.util.concurrent.CountDownLatch; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; @@ -33,12 +34,14 @@ import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.metadata.model.SegmentRange; import org.apache.kylin.metadata.model.SegmentRange.TSRange; import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.model.Segments; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.project.ProjectManager; import org.junit.After; import org.junit.Before; import org.junit.Test; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; /** @@ -244,6 +247,92 @@ public class CubeManagerTest extends LocalFileMetadataTestCase { } @Test + public void testConcurrentPromoteNewlyBuiltSegments() throws Exception { + for (int i = 0; i < 5; i++) { + testConcurrentPromoteNewlyBuiltSegmentsUnit(); + } + } + + private void testConcurrentPromoteNewlyBuiltSegmentsUnit() throws Exception { + System.setProperty("kylin.cube.max-building-segments", "10"); + final CubeManager mgr = CubeManager.getInstance(getTestConfig()); + final CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty"); + cube.setSegments(new Segments()); + + // no segment at first + assertEquals(0, cube.getSegments().size()); + Map m1 = Maps.newHashMap(); + m1.put(1, 1000L); + Map m2 = Maps.newHashMap(); + m2.put(1, 2000L); + Map m3 = Maps.newHashMap(); + m3.put(1, 3000L); + Map m4 = Maps.newHashMap(); + m4.put(1, 4000L); + + // append first + CubeSegment seg1 = mgr.appendSegment(cube, null, new SegmentRange(0L, 1000L), null, m1); + seg1.setStatus(SegmentStatusEnum.READY); + + CubeSegment seg2 = mgr.appendSegment(cube, null, new SegmentRange(1000L, 2000L), m1, m2); + seg2.setStatus(SegmentStatusEnum.READY); + + CubeSegment seg3 = mgr.appendSegment(cube, null, new SegmentRange(2000L, 3000L), m2, m3); + seg3.setStatus(SegmentStatusEnum.READY); + + CubeSegment seg4 = mgr.appendSegment(cube, null, new SegmentRange(3000L, 4000L), m3, m4); + seg4.setStatus(SegmentStatusEnum.READY); + + CubeSegment merge1 = mgr.mergeSegments(cube, null, new SegmentRange(0L, 2000L), true); + merge1.setStatus(SegmentStatusEnum.NEW); + merge1.setLastBuildJobID("test"); + merge1.setStorageLocationIdentifier("test"); + + CubeSegment merge2 = mgr.mergeSegments(cube, null, new SegmentRange(2000L, 4000L), true); + merge2.setStatus(SegmentStatusEnum.NEW); + merge2.setLastBuildJobID("test"); + merge2.setStorageLocationIdentifier("test"); + + CubeUpdate cubeBuilder = new CubeUpdate(cube); + mgr.updateCube(cubeBuilder); + + List segmentsToPromote = Lists.newArrayList(merge1, merge2); + List threads = Lists.newArrayListWithExpectedSize(segmentsToPromote.size()); + final CountDownLatch startLatch = new CountDownLatch(1); + for (final CubeSegment segmentToPromote : segmentsToPromote) { + threads.add(new Thread() { + public void run() { + try { + startLatch.await(); + mgr.promoteNewlyBuiltSegments(cube, segmentToPromote); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + } + for (Thread t : threads) { + t.start(); + } + startLatch.countDown(); + + for (Thread t : threads) { + t.join(); + } + + assertTrue(cube.getSegments().size() == 2); + + assertTrue(cube.getSegmentById(seg1.getUuid()) == null); + assertTrue(cube.getSegmentById(seg2.getUuid()) == null); + assertTrue(cube.getSegmentById(merge1.getUuid()) != null + && cube.getSegmentById(merge1.getUuid()).getStatus() == SegmentStatusEnum.READY); + assertTrue(cube.getSegmentById(seg3.getUuid()) == null); + assertTrue(cube.getSegmentById(seg4.getUuid()) == null); + assertTrue(cube.getSegmentById(merge2.getUuid()) != null + && cube.getSegmentById(merge2.getUuid()).getStatus() == SegmentStatusEnum.READY); + } + + @Test public void testGetAllCubes() throws Exception { final ResourceStore store = ResourceStore.getStore(getTestConfig()); final NavigableSet cubePath = store.listResources(ResourceStore.CUBE_RESOURCE_ROOT); -- 2.5.4 (Apple Git-61)