From 53a6be1e917521503d4eafec1b483b52d050556a Mon Sep 17 00:00:00 2001 From: sunyerui Date: Fri, 18 Sep 2015 19:11:31 +0800 Subject: [PATCH] KYLIN-978 GarbageCollectionStep dropped Hive Intermediate Table but didn't drop external hdfs path --- .../java/org/apache/kylin/job/JoinedFlatTable.java | 2 +- .../org/apache/kylin/source/hive/HiveMRInput.java | 19 ++- .../kylin/storage/hbase/steps/HBaseMROutput.java | 5 +- .../kylin/storage/hbase/steps/HBaseMRSteps.java | 50 ++++++++ .../hbase/steps/HDFSPathGarbageCollectionStep.java | 138 +++++++++++++++++++++ 5 files changed, 209 insertions(+), 5 deletions(-) create mode 100644 storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java index 6ae3ccb..5886325 100644 --- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java +++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java @@ -72,7 +72,7 @@ public class JoinedFlatTable { ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\177'" + "\n"); ddl.append("STORED AS SEQUENCEFILE" + "\n"); - ddl.append("LOCATION '" + storageDfsDir + "/" + intermediateTableDesc.getTableName() + "';").append("\n"); + ddl.append("LOCATION '" + getTableDir(intermediateTableDesc, storageDfsDir) + "';").append("\n"); // ddl.append("TBLPROPERTIES ('serialization.null.format'='\\\\N')" + // ";\n"); return ddl.toString(); diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index 4491e2b..3322060 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -21,6 +21,8 @@ package org.apache.kylin.source.hive; import java.io.IOException; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; @@ -135,6 +137,7 @@ public class HiveMRInput implements IMRInput { GarbageCollectionStep step = new GarbageCollectionStep(); step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION); step.setIntermediateTableIdentity(getIntermediateTableIdentity()); + step.setExternalDataPath(JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId()))); jobFlow.addTask(step); } @@ -149,7 +152,6 @@ public class HiveMRInput implements IMRInput { } public static class GarbageCollectionStep extends AbstractExecutable { - @Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { KylinConfig config = context.getConfig(); @@ -163,6 +165,13 @@ public class HiveMRInput implements IMRInput { try { config.getCliCommandExecutor().execute(dropHiveCMD, shellCmdOutput); output.append("Hive table " + hiveTable + " is dropped. \n"); + + Path externalDataPath = new Path(getExternalDataPath()); + FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration()); + if (fs.exists(externalDataPath)) { + fs.delete(externalDataPath, true); + output.append("Hive table " + hiveTable + " external data path " + externalDataPath + " is deleted. \n"); + } } catch (IOException e) { logger.error("job:" + getId() + " execute finished with exception", e); output.append(shellCmdOutput.getOutput()).append("\n").append(e.getLocalizedMessage()); @@ -180,6 +189,14 @@ public class HiveMRInput implements IMRInput { private String getIntermediateTableIdentity() { return getParam("oldHiveTable"); } + + public void setExternalDataPath(String externalDataPath) { + setParam("externalDataPath", externalDataPath); + } + + private String getExternalDataPath() { + return getParam("externalDataPath"); + } } } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java index 8cbb7ff..c634a1d 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java @@ -21,7 +21,6 @@ package org.apache.kylin.storage.hbase.steps; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.IMROutput; import org.apache.kylin.job.execution.DefaultChainedExecutable; -import org.apache.kylin.storage.hbase.steps.HBaseMRSteps; public class HBaseMROutput implements IMROutput { @@ -37,7 +36,7 @@ public class HBaseMROutput implements IMROutput { @Override public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) { - // nothing to do + steps.addCubingGarbageCollectionSteps(jobFlow); } }; } @@ -54,7 +53,7 @@ public class HBaseMROutput implements IMROutput { @Override public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) { - jobFlow.addTask(steps.createMergeGCStep()); + steps.addMergingGarbageCollectionSteps(jobFlow); } }; } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java index dfb4f33..f9e9b15 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java @@ -1,5 +1,6 @@ package org.apache.kylin.storage.hbase.steps; +import java.util.ArrayList; import java.util.List; import org.apache.kylin.cube.CubeSegment; @@ -129,6 +130,16 @@ public class HBaseMRSteps extends JobBuilderSupport { return mergingHTables; } + public List getMergingHDFSPaths() { + final List mergingSegments = seg.getCubeInstance().getMergingSegments(seg); + Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge"); + final List mergingHDFSPaths = Lists.newArrayList(); + for (CubeSegment merging : mergingSegments) { + mergingHDFSPaths.add(getJobWorkingDir(merging.getLastBuildJobID())); + } + return mergingHDFSPaths; + } + public String getHFilePath(String jobId) { return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/"); } @@ -137,4 +148,43 @@ public class HBaseMRSteps extends JobBuilderSupport { return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats"); } + public void addMergingGarbageCollectionSteps(DefaultChainedExecutable jobFlow) { + String jobId = jobFlow.getId(); + + jobFlow.addTask(createMergeGCStep()); + + List toDeletePathsOnHadoopCluster = new ArrayList<>(); + toDeletePathsOnHadoopCluster.addAll(getMergingHDFSPaths()); + + List toDeletePathsOnHbaseCluster = new ArrayList<>(); + toDeletePathsOnHbaseCluster.add(getRowkeyDistributionOutputPath(jobId)); + toDeletePathsOnHbaseCluster.add(getHFilePath(jobId)); + + HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep(); + step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION); + step.setDeletePathsOnHadoopCluster(toDeletePathsOnHadoopCluster); + step.setDeletePathsOnHBaseCluster(toDeletePathsOnHbaseCluster); + step.setJobId(jobId); + + jobFlow.addTask(step); + } + + public void addCubingGarbageCollectionSteps(DefaultChainedExecutable jobFlow) { + String jobId = jobFlow.getId(); + + List toDeletePathsOnHadoopCluster = new ArrayList<>(); + toDeletePathsOnHadoopCluster.add(getFactDistinctColumnsPath(jobId)); + + List toDeletePathsOnHbaseCluster = new ArrayList<>(); + toDeletePathsOnHbaseCluster.add(getRowkeyDistributionOutputPath(jobId)); + toDeletePathsOnHbaseCluster.add(getHFilePath(jobId)); + + HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep(); + step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION); + step.setDeletePathsOnHadoopCluster(toDeletePathsOnHadoopCluster); + step.setDeletePathsOnHBaseCluster(toDeletePathsOnHbaseCluster); + step.setJobId(jobId); + + jobFlow.addTask(step); + } } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java new file mode 100644 index 0000000..2ae8ca8 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.storage.hbase.steps; + +import com.google.common.collect.Lists; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.engine.mr.JobBuilderSupport; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Created by sunyerui on 15/9/17. + */ +public class HDFSPathGarbageCollectionStep extends AbstractExecutable { + + private StringBuffer output; + private JobEngineConfig config; + + public HDFSPathGarbageCollectionStep() { + super(); + output = new StringBuffer(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + try { + config = new JobEngineConfig(context.getConfig()); + dropHdfsPathOnCluster(getDeletePathsOnHadoopCluster(), FileSystem.get(HadoopUtil.getCurrentConfiguration())); + dropHdfsPathOnCluster(getDeletePathsOnHBaseCluster(), FileSystem.get(HadoopUtil.getCurrentHBaseConfiguration())); + } catch (IOException e) { + logger.error("job:" + getId() + " execute finished with exception", e); + output.append("\n").append(e.getLocalizedMessage()); + return new ExecuteResult(ExecuteResult.State.ERROR, output.toString()); + } + + return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString()); + } + + private void dropHdfsPathOnCluster(List oldHdfsPaths, FileSystem fileSystem) throws IOException { + if (oldHdfsPaths != null && oldHdfsPaths.size() > 0) { + logger.debug("Drop HDFS path on FileSystem: " + fileSystem.getUri()); + output.append("Drop HDFS path on FileSystem: \"" + fileSystem.getUri() + "\" \n"); + for (String path : oldHdfsPaths) { + if (path.endsWith("*")) + path = path.substring(0, path.length() - 1); + + Path oldPath = new Path(path); + if (fileSystem.exists(oldPath)) { + fileSystem.delete(oldPath, true); + logger.debug("HDFS path " + path + " is dropped."); + output.append("HDFS path " + path + " is dropped.\n"); + } else { + logger.debug("HDFS path " + path + " not exists."); + output.append("HDFS path " + path + " not exists.\n"); + } + // If hbase was deployed on another cluster, the job dir is empty and should be dropped, + // because of rowkey_stats and hfile dirs are both dropped. + if (fileSystem.listStatus(oldPath.getParent()).length == 0) { + Path emptyJobPath = new Path(JobBuilderSupport.getJobWorkingDir(config, getJobId())); + if (fileSystem.exists(emptyJobPath)) { + fileSystem.delete(emptyJobPath, true); + logger.debug("HDFS path " + emptyJobPath + " is empty and dropped."); + output.append("HDFS path " + emptyJobPath + " is empty and dropped.\n"); + } + } + } + } + } + + public void setDeletePathsOnHadoopCluster(List deletePaths) { + setArrayParam("toDeletePathsOnHadoopCluster", deletePaths); + } + + public void setDeletePathsOnHBaseCluster(List deletePaths) { + setArrayParam("toDeletePathsOnHBaseCluster", deletePaths); + } + + public void setJobId(String jobId) { + setParam("jobId", jobId); + } + + public List getDeletePathsOnHadoopCluster() { + return getArrayParam("toDeletePathsOnHadoopCluster"); + } + + public List getDeletePathsOnHBaseCluster() { + return getArrayParam("toDeletePathsOnHBaseCluster"); + } + + public String getJobId() { + return getParam("jobId"); + } + + private void setArrayParam(String paramKey, List paramValues) { + setParam(paramKey, StringUtils.join(paramValues, ",")); + } + + private List getArrayParam(String paramKey) { + final String ids = getParam(paramKey); + if (ids != null) { + final String[] splitted = StringUtils.split(ids, ","); + ArrayList result = Lists.newArrayListWithExpectedSize(splitted.length); + for (String id : splitted) { + result.add(id); + } + return result; + } else { + return Collections.emptyList(); + } + } +} -- 2.3.2 (Apple Git-55)