From 91acf8f849015fedfc61816f29168a6ae34a29ae Mon Sep 17 00:00:00 2001 From: Zhong Date: Wed, 8 Nov 2017 13:23:51 +0800 Subject: [PATCH] APACHE-KYLIN-3022: Add clone() for ISegment, which is needed when updating cube segment --- .../java/org/apache/kylin/cube/CubeManager.java | 36 +++++++++++++++++----- .../java/org/apache/kylin/cube/CubeSegment.java | 13 ++++++++ .../org/apache/kylin/cube/CubeSegmentsTest.java | 12 ++++++++ .../org/apache/kylin/metadata/model/ISegment.java | 5 +-- .../org/apache/kylin/metadata/model/Segments.java | 9 ++++++ .../kylin/engine/mr/common/CuboidShardUtil.java | 14 +++++++-- .../kylin/engine/mr/steps/CopyDictionaryStep.java | 8 ++++- .../kylin/engine/mr/steps/MergeDictionaryStep.java | 2 +- .../mr/steps/UpdateCubeInfoAfterBuildStep.java | 8 ++++- .../steps/UpdateCubeInfoAfterCheckpointStep.java | 6 ++-- .../mr/steps/UpdateCubeInfoAfterMergeStep.java | 1 + .../mr/steps/UpdateCubeInfoAfterOptimizeStep.java | 8 ++++- .../engine/mr/steps/MergeCuboidMapperTest.java | 2 +- .../kylin/source/kafka/job/MergeOffsetStep.java | 2 +- 14 files changed, 105 insertions(+), 21 deletions(-) diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 5e72721..18aeffe 100755 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -245,13 +245,18 @@ public class CubeManager implements IRealizationProvider { private void saveDictionaryInfo(CubeSegment cubeSeg, TblColRef col, DictionaryInfo dictInfo) throws IOException { if (dictInfo != null) { + CubeSegment updateSegment = cubeSeg.clone(); Dictionary dict = dictInfo.getDictionaryObject(); - cubeSeg.putDictResPath(col, dictInfo.getResourcePath()); - cubeSeg.getRowkeyStats().add(new Object[] { col.getIdentity(), dict.getSize(), dict.getSizeOfId() }); + updateSegment.putDictResPath(col, dictInfo.getResourcePath()); + updateSegment.getRowkeyStats().add(new Object[] { col.getIdentity(), dict.getSize(), dict.getSizeOfId() }); CubeUpdate update = new CubeUpdate(cubeSeg.getCubeInstance()); - update.setToUpdateSegs(cubeSeg); + update.setToUpdateSegs(updateSegment); updateCube(update); + + // Update the input cubeSeg after the resource store updated + cubeSeg.putDictResPath(col, updateSegment.getDictResPath(col)); + cubeSeg.getRowkeyStats().addAll(updateSegment.getRowkeyStats()); } } @@ -277,6 +282,9 @@ public class CubeManager implements IRealizationProvider { return (Dictionary) info.getDictionaryObject(); } + /** + * @param cubeSeg needs to be updated after finishing this call + * */ public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable) throws IOException { TableMetadataManager metaMgr = getTableManager(); SnapshotManager snapshotMgr = getSnapshotManager(); @@ -285,11 +293,14 @@ public class CubeManager implements IRealizationProvider { IReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc); SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc); - cubeSeg.putSnapshotResPath(lookupTable, snapshot.getResourcePath()); + CubeSegment updateSegment = cubeSeg.clone(); + updateSegment.putSnapshotResPath(lookupTable, snapshot.getResourcePath()); CubeUpdate cubeBuilder = new CubeUpdate(cubeSeg.getCubeInstance()); - cubeBuilder.setToUpdateSegs(cubeSeg); + cubeBuilder.setToUpdateSegs(updateSegment); updateCube(cubeBuilder); + // Update the input cubeSeg after the resource store updated + cubeSeg.putSnapshotResPath(lookupTable, updateSegment.getSnapshotResPath(lookupTable)); return snapshot; } @@ -724,6 +735,10 @@ public class CubeManager implements IRealizationProvider { return tableName; } + /** + * @param newSegment changes will not influence cube instance + * @throws IOException + */ public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment newSegment) throws IOException { if (StringUtils.isBlank(newSegment.getStorageLocationIdentifier())) throw new IllegalStateException( @@ -742,13 +757,12 @@ public class CubeManager implements IRealizationProvider { throw new IllegalStateException( "For cube " + cube + ", segment " + newSegment + " is expected but not in the tobe " + tobe); - newSegment.setStatus(SegmentStatusEnum.READY); - List toRemoveSegs = Lists.newArrayList(); for (CubeSegment segment : cube.getSegments()) { if (!tobe.contains(segment)) toRemoveSegs.add(segment); } + newSegment.setStatus(SegmentStatusEnum.READY); logger.info("Promoting cube " + cube + ", new segment " + newSegment + ", to remove segments " + toRemoveSegs); @@ -758,6 +772,10 @@ public class CubeManager implements IRealizationProvider { updateCube(cubeBuilder); } + /** + * @param optimizedSegments changes will not influence cube instance + * @throws IOException + */ public void promoteNewlyOptimizeSegments(CubeInstance cube, CubeSegment... optimizedSegments) throws IOException { for (CubeSegment seg : optimizedSegments) { seg.setStatus(SegmentStatusEnum.READY_PENDING); @@ -768,6 +786,10 @@ public class CubeManager implements IRealizationProvider { updateCube(cubeBuilder); } + /** + * @param optimizedSegments changes will not influence cube instance + * @throws IOException + */ public void promoteCheckpointOptimizeSegments(CubeInstance cube, Map recommendCuboids, CubeSegment... optimizedSegments) throws IOException { if (cube.getSegments().size() != optimizedSegments.length * 2) { diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java index 003d006..b8a5ad2 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java @@ -18,6 +18,7 @@ package org.apache.kylin.cube; +import java.io.IOException; import java.io.Serializable; import java.text.SimpleDateFormat; import java.util.Collection; @@ -31,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.ShardingHash; import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.cube.kv.CubeDimEncMap; @@ -571,4 +573,15 @@ public class CubeSegment implements IBuildable, ISegment, Serializable { public void setSourcePartitionOffsetStart(Map sourcePartitionOffsetStart) { this.sourcePartitionOffsetStart = sourcePartitionOffsetStart; } + + @Override + public CubeSegment clone() { + try { + CubeSegment cloneSeg = JsonUtil.readValue(JsonUtil.writeValueAsString(this), CubeSegment.class); + cloneSeg.setCubeInstance(cubeInstance); + return cloneSeg; + } catch (IOException e) { + throw new RuntimeException(e); + } + } } diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java index 64c6d68..e930f38 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java @@ -179,6 +179,18 @@ public class CubeSegmentsTest extends LocalFileMetadataTestCase { assertEquals(3, cube.getSegments().size()); } + @Test + public void testClone() throws IOException { + CubeManager mgr = mgr(); + CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty"); + + CubeSegment seg = mgr.appendSegment(cube, new TSRange(0L, 1000L)); + seg.setStatus(SegmentStatusEnum.READY); + + CubeSegment segClone = seg.clone(); + assertEquals(seg.getConfig(), segClone.getConfig()); + } + private CubeManager mgr() { return CubeManager.getInstance(getTestConfig()); } diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java index b8bee36..4f275d6 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java @@ -21,7 +21,7 @@ package org.apache.kylin.metadata.model; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.metadata.model.SegmentRange.TSRange; -public interface ISegment extends Comparable { +public interface ISegment extends Comparable, Cloneable { public KylinConfig getConfig(); @@ -40,5 +40,6 @@ public interface ISegment extends Comparable { public long getLastBuildTime(); public void validate() throws IllegalStateException; - + + public ISegment clone(); } diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java index 126d5f9..e035963 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java @@ -470,4 +470,13 @@ public class Segments extends ArrayList implements Serial return Pair.newPair(startFit, endFit); } + + @Override + public Segments clone() { + Segments segments = new Segments<>(); + for (T segment : this) { + segments.add((T) segment.clone()); + } + return segments; + } } \ No newline at end of file diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java index b6dbd5d..6b42333 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidShardUtil.java @@ -32,6 +32,9 @@ import com.google.common.collect.Maps; public class CuboidShardUtil { protected static final Logger logger = LoggerFactory.getLogger(CuboidShardUtil.class); + /** + * @param segment needs to be updated after finishing this call + * */ public static void saveCuboidShards(CubeSegment segment, Map cuboidShards, int totalShards) throws IOException { CubeManager cubeManager = CubeManager.getInstance(segment.getConfig()); @@ -42,11 +45,16 @@ public class CuboidShardUtil { } } - segment.setCuboidShardNums(filtered); - segment.setTotalShards(totalShards); + CubeSegment updateSegment = segment.clone(); + updateSegment.setCuboidShardNums(filtered); + updateSegment.setTotalShards(totalShards); CubeUpdate cubeBuilder = new CubeUpdate(segment.getCubeInstance()); - cubeBuilder.setToUpdateSegs(segment); + cubeBuilder.setToUpdateSegs(updateSegment); cubeManager.updateCube(cubeBuilder); + + // Update the input segment after the resource store updated + segment.setCuboidShardNums(filtered); + segment.setTotalShards(totalShards); } } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java index 3341be9..2e4c6a3 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java @@ -24,6 +24,7 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.engine.mr.exception.SegmentNotFoundException; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; @@ -45,7 +46,12 @@ public class CopyDictionaryStep extends AbstractExecutable { protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { final CubeManager mgr = CubeManager.getInstance(context.getConfig()); final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams())); - final CubeSegment optimizeSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); + CubeSegment optimizeSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); + if (optimizeSegment == null) { + return ExecuteResult.createFailed(new SegmentNotFoundException( + "there is no segment with id:" + CubingExecutableUtil.getSegmentId(this.getParams()))); + } + optimizeSegment = optimizeSegment.clone(); CubeSegment oldSegment = optimizeSegment.getCubeInstance().getOriginalSegmentToOptimize(optimizeSegment); Preconditions.checkNotNull(oldSegment, diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java index 3a1f852..acd0b9e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java @@ -54,7 +54,7 @@ public class MergeDictionaryStep extends AbstractExecutable { protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { final CubeManager mgr = CubeManager.getInstance(context.getConfig()); final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams())); - final CubeSegment newSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); + final CubeSegment newSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())).clone(); final List mergingSegments = getMergingSegments(cube); KylinConfig conf = cube.getConfig(); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java index 7d36643..a701310 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java @@ -32,6 +32,7 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.exception.SegmentNotFoundException; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; @@ -54,7 +55,12 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { final CubeManager cubeManager = CubeManager.getInstance(context.getConfig()); final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())); - final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); + CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); + if (segment == null) { + return ExecuteResult.createFailed(new SegmentNotFoundException( + "there is no segment with id:" + CubingExecutableUtil.getSegmentId(this.getParams()))); + } + segment = segment.clone(); CubingJob cubingJob = (CubingJob) getManager().getJob(CubingExecutableUtil.getCubingJobId(this.getParams())); long sourceCount = cubingJob.findSourceRecordCount(); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java index ed61b4a..e2759bb 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java @@ -18,7 +18,6 @@ package org.apache.kylin.engine.mr.steps; -import java.util.List; import java.util.Map; import java.util.Set; @@ -31,6 +30,7 @@ import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.model.Segments; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,14 +51,14 @@ public class UpdateCubeInfoAfterCheckpointStep extends AbstractExecutable { Set recommendCuboids = cube.getCuboidsRecommend(); try { - List newSegments = cube.getSegments(SegmentStatusEnum.READY_PENDING); + Segments newSegments = cube.getSegments(SegmentStatusEnum.READY_PENDING); Map recommendCuboidsWithStats = CuboidStatsReaderUtil .readCuboidStatsFromSegments(recommendCuboids, newSegments); if (recommendCuboidsWithStats == null) { throw new RuntimeException("Fail to get statistics info for recommended cuboids after optimization!!!"); } cubeManager.promoteCheckpointOptimizeSegments(cube, recommendCuboidsWithStats, - newSegments.toArray(new CubeSegment[newSegments.size()])); + newSegments.clone().toArray(new CubeSegment[newSegments.size()])); return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); } catch (Exception e) { logger.error("fail to update cube after build", e); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java index 3185bec..44128f2 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java @@ -55,6 +55,7 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable { return ExecuteResult.createFailed(new SegmentNotFoundException( "there is no segment with id:" + CubingExecutableUtil.getSegmentId(this.getParams()))); } + mergedSegment = mergedSegment.clone(); CubingJob cubingJob = (CubingJob) getManager().getJob(CubingExecutableUtil.getCubingJobId(this.getParams())); long cubeSizeBytes = cubingJob.findCubeSizeBytes(); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java index 13c4f40..9b43edc 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java @@ -24,6 +24,7 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.engine.mr.exception.SegmentNotFoundException; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; @@ -45,7 +46,12 @@ public class UpdateCubeInfoAfterOptimizeStep extends AbstractExecutable { protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { final CubeManager cubeManager = CubeManager.getInstance(context.getConfig()); final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())); - final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); + CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); + if (segment == null) { + return ExecuteResult.createFailed(new SegmentNotFoundException( + "there is no segment with id:" + CubingExecutableUtil.getSegmentId(this.getParams()))); + } + segment = segment.clone(); CubeSegment originalSegment = cube.getOriginalSegmentToOptimize(segment); long sourceCount = originalSegment.getInputRecords(); diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java index 4d92f8e..619508f 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java @@ -113,7 +113,7 @@ public class MergeCuboidMapperTest extends LocalFileMetadataTestCase { DictionaryInfo sharedDict = makeSharedDict(); boolean isFirstSegment = true; - for (CubeSegment segment : cube.getSegments()) { + for (CubeSegment segment : cube.getSegments().clone()) { TableSignature signature = new TableSignature(); signature.setSize(100); diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java index 8139342..2204364 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java @@ -52,7 +52,7 @@ public class MergeOffsetStep extends AbstractExecutable { final CubeManager cubeManager = CubeManager.getInstance(context.getConfig()); final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())); final String segmentId = CubingExecutableUtil.getSegmentId(this.getParams()); - final CubeSegment segment = cube.getSegmentById(segmentId); + final CubeSegment segment = cube.getSegmentById(segmentId).clone(); Preconditions.checkNotNull(segment, "Cube segment '" + segmentId + "' not found."); Segments mergingSegs = cube.getMergingSegments(segment); -- 2.5.4 (Apple Git-61)