From 4c68479808d223293e3ced9a20cae4c7c00453b4 Mon Sep 17 00:00:00 2001 From: sunyerui Date: Fri, 11 Sep 2015 00:04:42 +0800 Subject: [PATCH] KYLIN-978 GarbageCollectionStep dropped Hive Intermediate Table but didn't drop external hdfs path --- .../apache/kylin/job/cube/CubingJobBuilder.java | 47 +++++++++++++--------- .../kylin/job/cube/GarbageCollectionStep.java | 38 ++++++++++++----- 2 files changed, 58 insertions(+), 27 deletions(-) diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java index ff79286..de75f7d 100644 --- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java +++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java @@ -25,8 +25,6 @@ import java.util.List; import java.util.TimeZone; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeSegment; @@ -65,9 +63,11 @@ public final class CubingJobBuilder extends AbstractJobBuilder { final CubingJob result = initialJob(seg, "BUILD"); final String jobId = result.getId(); final String cuboidRootPath = getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/cuboid/"; + final List toDeletePathsOnHadoopCluster = Lists.newArrayList(); + final List toDeletePathsOnHBaseCluster = Lists.newArrayList(); // cubing - Pair twoSteps = addCubingSteps(seg, cuboidRootPath, result); + Pair twoSteps = addCubingSteps(seg, cuboidRootPath, result, toDeletePathsOnHadoopCluster); String intermediateHiveTableStepId = twoSteps.getFirst().getId(); String baseCuboidStepId = twoSteps.getSecond().getId(); @@ -79,7 +79,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder { final CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg); final String hiveIntermediateTable = this.getIntermediateHiveTableName(intermediateTableDesc, jobId); - result.addTask(createGarbageCollectionStep(seg, null, hiveIntermediateTable, null)); + toDeletePathsOnHBaseCluster.add(getJobWorkingDir(jobId)); + result.addTask(createGarbageCollectionStep(seg, null, hiveIntermediateTable, toDeletePathsOnHadoopCluster, toDeletePathsOnHBaseCluster)); return result; } @@ -92,9 +93,14 @@ public final class CubingJobBuilder extends AbstractJobBuilder { final String jobId = result.getId(); final String appendRootPath = getJobWorkingDir(jobId) + "/" + appendSegment.getCubeInstance().getName() + "/append_cuboid/"; final String mergedRootPath = getJobWorkingDir(jobId) + "/" + appendSegment.getCubeInstance().getName() + "/cuboid/"; + List mergingSegmentIds = Lists.newArrayList(); + List mergingCuboidPaths = Lists.newArrayList(); + List mergingHTables = Lists.newArrayList(); + final List toDeletePathsOnHadoopCluster = Lists.newArrayList(); + final List toDeletePathsOnHBaseCluster = Lists.newArrayList(); // cubing the incremental segment - Pair twoSteps = addCubingSteps(appendSegment, appendRootPath, result); + Pair twoSteps = addCubingSteps(appendSegment, appendRootPath, result, toDeletePathsOnHadoopCluster); final String intermediateHiveTableStepId = twoSteps.getFirst().getId(); final String baseCuboidStepId = twoSteps.getSecond().getId(); @@ -103,10 +109,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder { List mergingSegments = mergeSegment.getCubeInstance().getMergingSegments(mergeSegment); Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge"); - List mergingSegmentIds = Lists.newArrayList(); - List mergingCuboidPaths = Lists.newArrayList(); - List mergingHTables = Lists.newArrayList(); - List toDeletePaths = Lists.newArrayList(); + for (CubeSegment merging : mergingSegments) { mergingSegmentIds.add(merging.getUuid()); mergingHTables.add(merging.getStorageLocationIdentifier()); @@ -115,7 +118,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder { } else { mergingCuboidPaths.add(getPathToMerge(merging)); } - toDeletePaths.add(getJobWorkingDir(merging.getLastBuildJobID())); + toDeletePathsOnHadoopCluster.add(getJobWorkingDir(merging.getLastBuildJobID())); } // merge cuboid @@ -126,7 +129,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder { // update cube info result.addTask(createUpdateCubeInfoAfterMergeStep(mergeSegment, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId)); - result.addTask(createGarbageCollectionStep(mergeSegment, mergingHTables, null, toDeletePaths)); + toDeletePathsOnHBaseCluster.add(getJobWorkingDir(jobId)); + result.addTask(createGarbageCollectionStep(mergeSegment, mergingHTables, null, toDeletePathsOnHadoopCluster, toDeletePathsOnHBaseCluster)); return result; } @@ -143,12 +147,14 @@ public final class CubingJobBuilder extends AbstractJobBuilder { List mergingSegmentIds = Lists.newArrayList(); List mergingCuboidPaths = Lists.newArrayList(); List mergingHTables = Lists.newArrayList(); - List toDeletePaths = Lists.newArrayList(); + final List toDeletePathsOnHadoopCluster = Lists.newArrayList(); + final List toDeletePathsOnHBaseCluster = Lists.newArrayList(); + for (CubeSegment merging : mergingSegments) { mergingSegmentIds.add(merging.getUuid()); mergingCuboidPaths.add(getPathToMerge(merging)); mergingHTables.add(merging.getStorageLocationIdentifier()); - toDeletePaths.add(getJobWorkingDir(merging.getLastBuildJobID())); + toDeletePathsOnHadoopCluster.add(getJobWorkingDir(merging.getLastBuildJobID())); } // merge cuboid @@ -159,7 +165,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder { // update cube info result.addTask(createUpdateCubeInfoAfterMergeStep(seg, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId)); - result.addTask(createGarbageCollectionStep(seg, mergingHTables, null, toDeletePaths)); + toDeletePathsOnHBaseCluster.add(getJobWorkingDir(jobId)); + result.addTask(createGarbageCollectionStep(seg, mergingHTables, null, toDeletePathsOnHadoopCluster, toDeletePathsOnHBaseCluster)); return result; } @@ -171,7 +178,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder { result.addTask(createMergeCuboidDataStep(seg, formattedPath, mergedCuboidPath)); } - Pair addCubingSteps(CubeSegment seg, String cuboidRootPath, CubingJob result) { + Pair addCubingSteps(CubeSegment seg, String cuboidRootPath, CubingJob result, List toDeletePaths) { final int groupRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getNCuboidBuildLevels(); final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length; @@ -199,6 +206,9 @@ public final class CubingJobBuilder extends AbstractJobBuilder { result.addTask(createNDimensionCuboidStep(seg, cuboidOutputTempPath, dimNum, totalRowkeyColumnsCount)); } + toDeletePaths.add(intermediateHiveTableLocation); + toDeletePaths.add(factDistinctColumnsPath); + return new Pair(intermediateHiveTableStep, baseCuboidStep); } @@ -266,7 +276,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder { } private String getRowkeyDistributionOutputPath(CubeSegment seg, String jobId) { - return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats"; + return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats"); } private String getFactDistinctColumnsPath(CubeSegment seg, String jobUuid) { @@ -460,12 +470,13 @@ public final class CubingJobBuilder extends AbstractJobBuilder { return result; } - private GarbageCollectionStep createGarbageCollectionStep(CubeSegment seg, List oldHtables, String hiveIntermediateTable, List oldHdsfPaths) { + private GarbageCollectionStep createGarbageCollectionStep(CubeSegment seg, List oldHtables, String hiveIntermediateTable, List oldHdsfPaths, List oldHdfsPathsOnHBaseCluster) { GarbageCollectionStep result = new GarbageCollectionStep(); result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION); result.setOldHTables(oldHtables); result.setOldHiveTable(hiveIntermediateTable); - result.setOldHdsfPaths(oldHdsfPaths); + result.setOldHdfsPaths(oldHdsfPaths); + result.setOldHdfsPathsOnHBaseCluster(oldHdfsPathsOnHBaseCluster); return result; } diff --git a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java index 72cad96..641454c 100644 --- a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java +++ b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java @@ -55,6 +55,8 @@ public class GarbageCollectionStep extends AbstractExecutable { private static final String OLD_HDFS_PATHS = "oldHdfsPaths"; + private static final String OLD_HDFS_PATHS_ON_HBASE_CLUSTER = "oldHdfsPathsOnHBaseCluster"; + private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class); private StringBuffer output; @@ -69,8 +71,9 @@ public class GarbageCollectionStep extends AbstractExecutable { try { dropHBaseTable(context); - dropHdfsPath(context); dropHiveTable(context); + dropHdfsPath(context); + dropHdfsPathOnHBaseCluster(context); } catch (IOException e) { logger.error("job:" + getId() + " execute finished with exception", e); output.append("\n").append(e.getLocalizedMessage()); @@ -131,13 +134,11 @@ public class GarbageCollectionStep extends AbstractExecutable { } } } - - private void dropHdfsPath(ExecutableContext context) throws IOException { - List oldHdfsPaths = this.getOldHdsfPaths(); + private void dropHdfsPathOnCluster(List oldHdfsPaths, FileSystem fileSystem) throws IOException { if (oldHdfsPaths != null && oldHdfsPaths.size() > 0) { - Configuration hconf = HadoopUtil.getCurrentConfiguration(); - FileSystem fileSystem = FileSystem.get(hconf); + logger.debug("Drop HDFS path on FileSystem: " + fileSystem.getUri()); + output.append("Drop HDFS path on FileSystem: \"" + fileSystem.getUri() + "\" \n"); for (String path : oldHdfsPaths) { if (path.endsWith("*")) path = path.substring(0, path.length() - 1); @@ -152,10 +153,21 @@ public class GarbageCollectionStep extends AbstractExecutable { output.append("HDFS path not exists: \"" + path + "\" \n"); } } - } } + private void dropHdfsPath(ExecutableContext context) throws IOException { + List oldHdfsPaths = this.getOldHdfsPaths(); + FileSystem fileSystem = FileSystem.get(HadoopUtil.getCurrentConfiguration()); + dropHdfsPathOnCluster(oldHdfsPaths, fileSystem); + } + + private void dropHdfsPathOnHBaseCluster(ExecutableContext context) throws IOException { + List oldHdfsPaths = this.getOldHdfsPathsOnHBaseCluster(); + FileSystem fileSystem = FileSystem.get(HadoopUtil.getCurrentHBaseConfiguration()); + dropHdfsPathOnCluster(oldHdfsPaths, fileSystem); + } + public void setOldHTables(List tables) { setArrayParam(OLD_HTABLES, tables); } @@ -164,14 +176,22 @@ public class GarbageCollectionStep extends AbstractExecutable { return getArrayParam(OLD_HTABLES); } - public void setOldHdsfPaths(List paths) { + public void setOldHdfsPaths(List paths) { setArrayParam(OLD_HDFS_PATHS, paths); } - private List getOldHdsfPaths() { + private List getOldHdfsPaths() { return getArrayParam(OLD_HDFS_PATHS); } + public void setOldHdfsPathsOnHBaseCluster(List paths) { + setArrayParam(OLD_HDFS_PATHS_ON_HBASE_CLUSTER, paths); + } + + private List getOldHdfsPathsOnHBaseCluster() { + return getArrayParam(OLD_HDFS_PATHS_ON_HBASE_CLUSTER); + } + private void setArrayParam(String paramKey, List paramValues) { setParam(paramKey, StringUtils.join(paramValues, ",")); } -- 2.3.2 (Apple Git-55)