From fd172821d9070b1bd704291a164569dc9d920045 Mon Sep 17 00:00:00 2001 From: sunyerui Date: Fri, 11 Sep 2015 00:04:42 +0800 Subject: [PATCH 1/2] KYLIN-978 GarbageCollectionStep dropped Hive Intermediate Table but didn't drop external hdfs path --- .../apache/kylin/job/cube/CubingJobBuilder.java | 47 +++++++++++++--------- .../kylin/job/cube/GarbageCollectionStep.java | 38 ++++++++++++----- 2 files changed, 58 insertions(+), 27 deletions(-) diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java index ff79286..de75f7d 100644 --- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java +++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java @@ -25,8 +25,6 @@ import java.util.List; import java.util.TimeZone; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeSegment; @@ -65,9 +63,11 @@ public final class CubingJobBuilder extends AbstractJobBuilder { final CubingJob result = initialJob(seg, "BUILD"); final String jobId = result.getId(); final String cuboidRootPath = getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/cuboid/"; + final List toDeletePathsOnHadoopCluster = Lists.newArrayList(); + final List toDeletePathsOnHBaseCluster = Lists.newArrayList(); // cubing - Pair twoSteps = addCubingSteps(seg, cuboidRootPath, result); + Pair twoSteps = addCubingSteps(seg, cuboidRootPath, result, toDeletePathsOnHadoopCluster); String intermediateHiveTableStepId = twoSteps.getFirst().getId(); String baseCuboidStepId = twoSteps.getSecond().getId(); @@ -79,7 +79,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder { final CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg); final String hiveIntermediateTable = this.getIntermediateHiveTableName(intermediateTableDesc, jobId); - result.addTask(createGarbageCollectionStep(seg, null, hiveIntermediateTable, null)); + toDeletePathsOnHBaseCluster.add(getJobWorkingDir(jobId)); + result.addTask(createGarbageCollectionStep(seg, null, hiveIntermediateTable, toDeletePathsOnHadoopCluster, toDeletePathsOnHBaseCluster)); return result; } @@ -92,9 +93,14 @@ public final class CubingJobBuilder extends AbstractJobBuilder { final String jobId = result.getId(); final String appendRootPath = getJobWorkingDir(jobId) + "/" + appendSegment.getCubeInstance().getName() + "/append_cuboid/"; final String mergedRootPath = getJobWorkingDir(jobId) + "/" + appendSegment.getCubeInstance().getName() + "/cuboid/"; + List mergingSegmentIds = Lists.newArrayList(); + List mergingCuboidPaths = Lists.newArrayList(); + List mergingHTables = Lists.newArrayList(); + final List toDeletePathsOnHadoopCluster = Lists.newArrayList(); + final List toDeletePathsOnHBaseCluster = Lists.newArrayList(); // cubing the incremental segment - Pair twoSteps = addCubingSteps(appendSegment, appendRootPath, result); + Pair twoSteps = addCubingSteps(appendSegment, appendRootPath, result, toDeletePathsOnHadoopCluster); final String intermediateHiveTableStepId = twoSteps.getFirst().getId(); final String baseCuboidStepId = twoSteps.getSecond().getId(); @@ -103,10 +109,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder { List mergingSegments = mergeSegment.getCubeInstance().getMergingSegments(mergeSegment); Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge"); - List mergingSegmentIds = Lists.newArrayList(); - List mergingCuboidPaths = Lists.newArrayList(); - List mergingHTables = Lists.newArrayList(); - List toDeletePaths = Lists.newArrayList(); + for (CubeSegment merging : mergingSegments) { mergingSegmentIds.add(merging.getUuid()); mergingHTables.add(merging.getStorageLocationIdentifier()); @@ -115,7 +118,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder { } else { mergingCuboidPaths.add(getPathToMerge(merging)); } - toDeletePaths.add(getJobWorkingDir(merging.getLastBuildJobID())); + toDeletePathsOnHadoopCluster.add(getJobWorkingDir(merging.getLastBuildJobID())); } // merge cuboid @@ -126,7 +129,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder { // update cube info result.addTask(createUpdateCubeInfoAfterMergeStep(mergeSegment, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId)); - result.addTask(createGarbageCollectionStep(mergeSegment, mergingHTables, null, toDeletePaths)); + toDeletePathsOnHBaseCluster.add(getJobWorkingDir(jobId)); + result.addTask(createGarbageCollectionStep(mergeSegment, mergingHTables, null, toDeletePathsOnHadoopCluster, toDeletePathsOnHBaseCluster)); return result; } @@ -143,12 +147,14 @@ public final class CubingJobBuilder extends AbstractJobBuilder { List mergingSegmentIds = Lists.newArrayList(); List mergingCuboidPaths = Lists.newArrayList(); List mergingHTables = Lists.newArrayList(); - List toDeletePaths = Lists.newArrayList(); + final List toDeletePathsOnHadoopCluster = Lists.newArrayList(); + final List toDeletePathsOnHBaseCluster = Lists.newArrayList(); + for (CubeSegment merging : mergingSegments) { mergingSegmentIds.add(merging.getUuid()); mergingCuboidPaths.add(getPathToMerge(merging)); mergingHTables.add(merging.getStorageLocationIdentifier()); - toDeletePaths.add(getJobWorkingDir(merging.getLastBuildJobID())); + toDeletePathsOnHadoopCluster.add(getJobWorkingDir(merging.getLastBuildJobID())); } // merge cuboid @@ -159,7 +165,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder { // update cube info result.addTask(createUpdateCubeInfoAfterMergeStep(seg, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId)); - result.addTask(createGarbageCollectionStep(seg, mergingHTables, null, toDeletePaths)); + toDeletePathsOnHBaseCluster.add(getJobWorkingDir(jobId)); + result.addTask(createGarbageCollectionStep(seg, mergingHTables, null, toDeletePathsOnHadoopCluster, toDeletePathsOnHBaseCluster)); return result; } @@ -171,7 +178,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder { result.addTask(createMergeCuboidDataStep(seg, formattedPath, mergedCuboidPath)); } - Pair addCubingSteps(CubeSegment seg, String cuboidRootPath, CubingJob result) { + Pair addCubingSteps(CubeSegment seg, String cuboidRootPath, CubingJob result, List toDeletePaths) { final int groupRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getNCuboidBuildLevels(); final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length; @@ -199,6 +206,9 @@ public final class CubingJobBuilder extends AbstractJobBuilder { result.addTask(createNDimensionCuboidStep(seg, cuboidOutputTempPath, dimNum, totalRowkeyColumnsCount)); } + toDeletePaths.add(intermediateHiveTableLocation); + toDeletePaths.add(factDistinctColumnsPath); + return new Pair(intermediateHiveTableStep, baseCuboidStep); } @@ -266,7 +276,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder { } private String getRowkeyDistributionOutputPath(CubeSegment seg, String jobId) { - return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats"; + return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats"); } private String getFactDistinctColumnsPath(CubeSegment seg, String jobUuid) { @@ -460,12 +470,13 @@ public final class CubingJobBuilder extends AbstractJobBuilder { return result; } - private GarbageCollectionStep createGarbageCollectionStep(CubeSegment seg, List oldHtables, String hiveIntermediateTable, List oldHdsfPaths) { + private GarbageCollectionStep createGarbageCollectionStep(CubeSegment seg, List oldHtables, String hiveIntermediateTable, List oldHdsfPaths, List oldHdfsPathsOnHBaseCluster) { GarbageCollectionStep result = new GarbageCollectionStep(); result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION); result.setOldHTables(oldHtables); result.setOldHiveTable(hiveIntermediateTable); - result.setOldHdsfPaths(oldHdsfPaths); + result.setOldHdfsPaths(oldHdsfPaths); + result.setOldHdfsPathsOnHBaseCluster(oldHdfsPathsOnHBaseCluster); return result; } diff --git a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java index 72cad96..641454c 100644 --- a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java +++ b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java @@ -55,6 +55,8 @@ public class GarbageCollectionStep extends AbstractExecutable { private static final String OLD_HDFS_PATHS = "oldHdfsPaths"; + private static final String OLD_HDFS_PATHS_ON_HBASE_CLUSTER = "oldHdfsPathsOnHBaseCluster"; + private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class); private StringBuffer output; @@ -69,8 +71,9 @@ public class GarbageCollectionStep extends AbstractExecutable { try { dropHBaseTable(context); - dropHdfsPath(context); dropHiveTable(context); + dropHdfsPath(context); + dropHdfsPathOnHBaseCluster(context); } catch (IOException e) { logger.error("job:" + getId() + " execute finished with exception", e); output.append("\n").append(e.getLocalizedMessage()); @@ -131,13 +134,11 @@ public class GarbageCollectionStep extends AbstractExecutable { } } } - - private void dropHdfsPath(ExecutableContext context) throws IOException { - List oldHdfsPaths = this.getOldHdsfPaths(); + private void dropHdfsPathOnCluster(List oldHdfsPaths, FileSystem fileSystem) throws IOException { if (oldHdfsPaths != null && oldHdfsPaths.size() > 0) { - Configuration hconf = HadoopUtil.getCurrentConfiguration(); - FileSystem fileSystem = FileSystem.get(hconf); + logger.debug("Drop HDFS path on FileSystem: " + fileSystem.getUri()); + output.append("Drop HDFS path on FileSystem: \"" + fileSystem.getUri() + "\" \n"); for (String path : oldHdfsPaths) { if (path.endsWith("*")) path = path.substring(0, path.length() - 1); @@ -152,10 +153,21 @@ public class GarbageCollectionStep extends AbstractExecutable { output.append("HDFS path not exists: \"" + path + "\" \n"); } } - } } + private void dropHdfsPath(ExecutableContext context) throws IOException { + List oldHdfsPaths = this.getOldHdfsPaths(); + FileSystem fileSystem = FileSystem.get(HadoopUtil.getCurrentConfiguration()); + dropHdfsPathOnCluster(oldHdfsPaths, fileSystem); + } + + private void dropHdfsPathOnHBaseCluster(ExecutableContext context) throws IOException { + List oldHdfsPaths = this.getOldHdfsPathsOnHBaseCluster(); + FileSystem fileSystem = FileSystem.get(HadoopUtil.getCurrentHBaseConfiguration()); + dropHdfsPathOnCluster(oldHdfsPaths, fileSystem); + } + public void setOldHTables(List tables) { setArrayParam(OLD_HTABLES, tables); } @@ -164,14 +176,22 @@ public class GarbageCollectionStep extends AbstractExecutable { return getArrayParam(OLD_HTABLES); } - public void setOldHdsfPaths(List paths) { + public void setOldHdfsPaths(List paths) { setArrayParam(OLD_HDFS_PATHS, paths); } - private List getOldHdsfPaths() { + private List getOldHdfsPaths() { return getArrayParam(OLD_HDFS_PATHS); } + public void setOldHdfsPathsOnHBaseCluster(List paths) { + setArrayParam(OLD_HDFS_PATHS_ON_HBASE_CLUSTER, paths); + } + + private List getOldHdfsPathsOnHBaseCluster() { + return getArrayParam(OLD_HDFS_PATHS_ON_HBASE_CLUSTER); + } + private void setArrayParam(String paramKey, List paramValues) { setParam(paramKey, StringUtils.join(paramValues, ",")); } -- 2.3.2 (Apple Git-55) From b4ef35e48bd7b4cd7752220bbd779c9bba8ed5f3 Mon Sep 17 00:00:00 2001 From: sunyerui Date: Fri, 18 Sep 2015 21:54:22 +0800 Subject: [PATCH 2/2] KYLIN-978 GarbageCollectionStep dropped Hive Intermediate Table but didn't drop external hdfs path, fix critical bug in one cluster mode --- .../apache/kylin/job/cube/CubingJobBuilder.java | 33 ++++++++++++---------- .../kylin/job/cube/GarbageCollectionStep.java | 19 +++++++++++++ 2 files changed, 37 insertions(+), 15 deletions(-) diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java index de75f7d..0ab11e3 100644 --- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java +++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java @@ -72,15 +72,14 @@ public final class CubingJobBuilder extends AbstractJobBuilder { String baseCuboidStepId = twoSteps.getSecond().getId(); // convert htable - AbstractExecutable convertCuboidToHfileStep = addHTableSteps(seg, cuboidRootPath, result); + AbstractExecutable convertCuboidToHfileStep = addHTableSteps(seg, cuboidRootPath, result, toDeletePathsOnHBaseCluster); // update cube info result.addTask(createUpdateCubeInfoAfterBuildStep(seg, intermediateHiveTableStepId, baseCuboidStepId, convertCuboidToHfileStep.getId(), jobId)); final CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg); final String hiveIntermediateTable = this.getIntermediateHiveTableName(intermediateTableDesc, jobId); - toDeletePathsOnHBaseCluster.add(getJobWorkingDir(jobId)); - result.addTask(createGarbageCollectionStep(seg, null, hiveIntermediateTable, toDeletePathsOnHadoopCluster, toDeletePathsOnHBaseCluster)); + result.addTask(createGarbageCollectionStep(seg, null, hiveIntermediateTable, toDeletePathsOnHadoopCluster, toDeletePathsOnHBaseCluster, jobId)); return result; } @@ -125,12 +124,11 @@ public final class CubingJobBuilder extends AbstractJobBuilder { addMergeSteps(mergeSegment, mergingSegmentIds, mergingCuboidPaths, mergedRootPath, result); // convert htable - AbstractExecutable convertCuboidToHfileStep = addHTableSteps(mergeSegment, mergedRootPath, result); + AbstractExecutable convertCuboidToHfileStep = addHTableSteps(mergeSegment, mergedRootPath, result, toDeletePathsOnHBaseCluster); // update cube info result.addTask(createUpdateCubeInfoAfterMergeStep(mergeSegment, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId)); - toDeletePathsOnHBaseCluster.add(getJobWorkingDir(jobId)); - result.addTask(createGarbageCollectionStep(mergeSegment, mergingHTables, null, toDeletePathsOnHadoopCluster, toDeletePathsOnHBaseCluster)); + result.addTask(createGarbageCollectionStep(mergeSegment, mergingHTables, null, toDeletePathsOnHadoopCluster, toDeletePathsOnHBaseCluster, jobId)); return result; } @@ -161,12 +159,11 @@ public final class CubingJobBuilder extends AbstractJobBuilder { addMergeSteps(seg, mergingSegmentIds, mergingCuboidPaths, mergedCuboidPath, result); // convert htable - AbstractExecutable convertCuboidToHfileStep = addHTableSteps(seg, mergedCuboidPath, result); + AbstractExecutable convertCuboidToHfileStep = addHTableSteps(seg, mergedCuboidPath, result, toDeletePathsOnHBaseCluster); // update cube info result.addTask(createUpdateCubeInfoAfterMergeStep(seg, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId)); - toDeletePathsOnHBaseCluster.add(getJobWorkingDir(jobId)); - result.addTask(createGarbageCollectionStep(seg, mergingHTables, null, toDeletePathsOnHadoopCluster, toDeletePathsOnHBaseCluster)); + result.addTask(createGarbageCollectionStep(seg, mergingHTables, null, toDeletePathsOnHadoopCluster, toDeletePathsOnHBaseCluster, jobId)); return result; } @@ -212,15 +209,15 @@ public final class CubingJobBuilder extends AbstractJobBuilder { return new Pair(intermediateHiveTableStep, baseCuboidStep); } - AbstractExecutable addHTableSteps(CubeSegment seg, String cuboidRootPath, CubingJob result) { + AbstractExecutable addHTableSteps(CubeSegment seg, String cuboidRootPath, CubingJob result, List toDeletePaths) { final String jobId = result.getId(); final String cuboidPath = cuboidRootPath + "*"; - result.addTask(createRangeRowkeyDistributionStep(seg, cuboidPath, jobId)); + result.addTask(createRangeRowkeyDistributionStep(seg, cuboidPath, jobId, toDeletePaths)); // create htable step result.addTask(createCreateHTableStep(seg, jobId)); // generate hfiles step - final MapReduceExecutable convertCuboidToHfileStep = createConvertCuboidToHfileStep(seg, cuboidPath, jobId); + final MapReduceExecutable convertCuboidToHfileStep = createConvertCuboidToHfileStep(seg, cuboidPath, jobId, toDeletePaths); result.addTask(convertCuboidToHfileStep); // bulk load step result.addTask(createBulkLoadStep(seg, jobId)); @@ -357,7 +354,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder { return ndCuboidStep; } - private MapReduceExecutable createRangeRowkeyDistributionStep(CubeSegment seg, String inputPath, String jobId) { + private MapReduceExecutable createRangeRowkeyDistributionStep(CubeSegment seg, String inputPath, String jobId, List toDeletePaths) { MapReduceExecutable rowkeyDistributionStep = new MapReduceExecutable(); rowkeyDistributionStep.setName(ExecutableConstants.STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION); StringBuilder cmd = new StringBuilder(); @@ -370,6 +367,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder { rowkeyDistributionStep.setMapReduceParams(cmd.toString()); rowkeyDistributionStep.setMapReduceJobClass(RangeKeyDistributionJob.class); + + toDeletePaths.add(getRowkeyDistributionOutputPath(seg, jobId)); return rowkeyDistributionStep; } @@ -387,7 +386,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder { return createHtableStep; } - private MapReduceExecutable createConvertCuboidToHfileStep(CubeSegment seg, String inputPath, String jobId) { + private MapReduceExecutable createConvertCuboidToHfileStep(CubeSegment seg, String inputPath, String jobId, List toDeletePaths) { MapReduceExecutable createHFilesStep = new MapReduceExecutable(); createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE); StringBuilder cmd = new StringBuilder(); @@ -402,6 +401,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder { createHFilesStep.setMapReduceParams(cmd.toString()); createHFilesStep.setMapReduceJobClass(CubeHFileJob.class); + toDeletePaths.add(getHFilePath(seg, jobId)); + return createHFilesStep; } @@ -470,13 +471,15 @@ public final class CubingJobBuilder extends AbstractJobBuilder { return result; } - private GarbageCollectionStep createGarbageCollectionStep(CubeSegment seg, List oldHtables, String hiveIntermediateTable, List oldHdsfPaths, List oldHdfsPathsOnHBaseCluster) { + private GarbageCollectionStep createGarbageCollectionStep(CubeSegment seg, List oldHtables, + String hiveIntermediateTable, List oldHdsfPaths, List oldHdfsPathsOnHBaseCluster, String jobId) { GarbageCollectionStep result = new GarbageCollectionStep(); result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION); result.setOldHTables(oldHtables); result.setOldHiveTable(hiveIntermediateTable); result.setOldHdfsPaths(oldHdsfPaths); result.setOldHdfsPathsOnHBaseCluster(oldHdfsPathsOnHBaseCluster); + result.setJobWorkingDir(getJobWorkingDir(jobId)); return result; } diff --git a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java index 641454c..b7f2233 100644 --- a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java +++ b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java @@ -57,6 +57,8 @@ public class GarbageCollectionStep extends AbstractExecutable { private static final String OLD_HDFS_PATHS_ON_HBASE_CLUSTER = "oldHdfsPathsOnHBaseCluster"; + private static final String JOB_WORKING_DIR = "jobWorkingDir"; + private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class); private StringBuffer output; @@ -152,6 +154,16 @@ public class GarbageCollectionStep extends AbstractExecutable { logger.debug("HDFS path not exists: " + path); output.append("HDFS path not exists: \"" + path + "\" \n"); } + // If hbase was deployed on another cluster, the job dir is empty and should be dropped, + // because of rowkey_stats and hfile dirs are both dropped. + if (fileSystem.listStatus(oldPath.getParent()).length == 0) { + Path emptyJobPath = new Path(getJobWorkingDir()); + if (fileSystem.exists(emptyJobPath)) { + fileSystem.delete(emptyJobPath, true); + logger.debug("HDFS path " + emptyJobPath + " is empty and dropped."); + output.append("HDFS path " + emptyJobPath + " is empty and dropped.\n"); + } + } } } } @@ -218,4 +230,11 @@ public class GarbageCollectionStep extends AbstractExecutable { return getParam(OLD_HIVE_TABLE); } + public void setJobWorkingDir(String jobWorkingDir) { + setParam(JOB_WORKING_DIR, jobWorkingDir); + } + + private String getJobWorkingDir() { + return getParam(JOB_WORKING_DIR); + } } -- 2.3.2 (Apple Git-55)