From d3dd53e098dd72462cd71f63813428b943e81e19 Mon Sep 17 00:00:00 2001 From: nichunen@mininglamp.com Date: Mon, 7 Sep 2015 04:15:45 -0400 Subject: [PATCH] Finish the hive intermediate table clean up job in org.apache.kylin.job.hadoop.cube.StorageCleanupJob --- .../kylin/job/hadoop/cube/StorageCleanupJob.java | 98 +++++++++++++++++++- 1 files changed, 97 insertions(+), 1 deletions(-) diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java index 53b489e..acb52a2 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java @@ -18,7 +18,9 @@ package org.apache.kylin.job.hadoop.cube; +import java.io.BufferedReader; import java.io.IOException; +import java.io.StringReader; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -45,7 +47,10 @@ import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IIManager; import org.apache.kylin.invertedindex.IISegment; import org.apache.kylin.job.JobInstance; +import org.apache.kylin.job.cmd.ICommandOutput; +import org.apache.kylin.job.cmd.ShellCmd; import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.exception.JobException; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.manager.ExecutableManager; import org.apache.kylin.metadata.realization.IRealizationConstants; @@ -86,9 +91,9 @@ public class StorageCleanupJob extends AbstractHadoopJob { Configuration conf = HBaseConfiguration.create(getConf()); + cleanUnusedIntermediateHiveTable(conf); cleanUnusedHdfsFiles(conf); cleanUnusedHBaseTables(conf); - cleanUnusedIntermediateHiveTable(conf); return 0; } catch (Exception e) { @@ -228,7 +233,98 @@ public class StorageCleanupJob extends AbstractHadoopJob { } private void cleanUnusedIntermediateHiveTable(Configuration conf) throws IOException { + FileSystem fs = FileSystem.get(conf); + //JobEngineConfig engineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv()); + int uuidLength = 36; + + StringBuilder buf = new StringBuilder(); + buf.append("hive -e \""); + buf.append("show tables " + "\'kylin_intermediate_*\'" + "; "); + buf.append("\""); + + ShellCmd cmd = new ShellCmd(buf.toString(), null, null, null, false); + ICommandOutput output = null; + + try { + output = cmd.execute(); + } catch (JobException e) { + e.printStackTrace(); + } + + if(output == null) + return; + String outputStr = output.getOutput(); + BufferedReader reader = new BufferedReader(new StringReader(outputStr)); + String line = null; + List allJobs = executableManager.getAllJobIds(); + List allHiveTablesNeedToBeDeleted = new ArrayList(); + List workingJobList = new ArrayList(); + + for (String jobId : allJobs) { + // only remove FINISHED and DISCARDED job intermediate table + final ExecutableState state = executableManager.getOutput(jobId).getState(); + + if (!state.isFinalState()) { + workingJobList.add(jobId); + log.info("Remove intermediate hive table with job id " + jobId + " with job status " + state); + } + } + + while ((line = reader.readLine()) != null) { + if(line.startsWith("kylin_intermediate_")){ + boolean isNeedDel = true; + String uuid = line.substring(line.length() - uuidLength, line.length()); + uuid = uuid.replace("_", "-"); + //Check whether it's a hive table in use + if(workingJobList.contains(uuid)){ + isNeedDel = false; + } + else{ + log.info("Hive table with uuid " + uuid + " is in use."); + } + + //Check whether the hive table belongs to current Kylin instance + String hdfsPath = JobInstance.getJobWorkingDir(uuid, KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()); + Path p = new Path(hdfsPath); + + if (fs.exists(p) == false) { + isNeedDel = false; + } + else{ + log.info("Hive table with uuid " + uuid + " belongs to a different Kylin instance."); + } + + if(isNeedDel) + allHiveTablesNeedToBeDeleted.add(line); + } + } + + if (delete == true) { + buf.delete(0, buf.length()); + buf.append("hive -e \""); + + for(String delHive : allHiveTablesNeedToBeDeleted){ + buf.append("drop table if exists " + delHive + "; "); + log.info("Remove " + delHive + " from hive tables."); + } + buf.append("\""); + cmd = new ShellCmd(buf.toString(), null, null, null, false); + + try { + cmd.execute(); + } catch (JobException e) { + e.printStackTrace(); + } + } else { + System.out.println("------ Intermediate Hive Tables To Be Dropped ------"); + for (String hiveTable : allHiveTablesNeedToBeDeleted) { + System.out.println(hiveTable); + } + System.out.println("----------------------------------------------------"); + } + if(reader != null) + reader.close(); } public static void main(String[] args) throws Exception { -- 1.7.1