From cfbac72e6f901b634749e0a011a623a0213c7e51 Mon Sep 17 00:00:00 2001 From: nichunen@mininglamp.com Date: Sat, 5 Sep 2015 23:47:53 -0400 Subject: [PATCH] Finish cleanUnusedIntermediateHiveTable() in org.apache.kylin.job.hadoop.cube.StorageCleanupJob --- .../kylin/job/hadoop/cube/StorageCleanupJob.java | 71 ++++++++++++++++++++ 1 files changed, 71 insertions(+), 0 deletions(-) diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java index 46f0849..b4b3ffe 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java @@ -18,7 +18,9 @@ package org.apache.kylin.job.hadoop.cube; +import java.io.BufferedReader; import java.io.IOException; +import java.io.StringReader; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -44,7 +46,10 @@ import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IIManager; import org.apache.kylin.invertedindex.IISegment; import org.apache.kylin.job.JobInstance; +import org.apache.kylin.job.cmd.ICommandOutput; +import org.apache.kylin.job.cmd.ShellCmd; import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.exception.JobException; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.hadoop.AbstractHadoopJob; import org.apache.kylin.job.manager.ExecutableManager; @@ -220,7 +225,73 @@ public class StorageCleanupJob extends AbstractHadoopJob { } private void cleanUnusedIntermediateHiveTable(Configuration conf) throws IOException { + StringBuilder buf = new StringBuilder(); + buf.append("hive -e \""); + buf.append("show tables " + "\'kylin_intermediate_*\'" + "; "); + buf.append("\""); + ShellCmd cmd = new ShellCmd(buf.toString(), null, null, null, false); + ICommandOutput output = null; + + try { + output = cmd.execute(); + } catch (JobException e) { + e.printStackTrace(); + } + + if(output == null) + return; + String outputStr = output.getOutput(); + BufferedReader reader = new BufferedReader(new StringReader(outputStr)); + String line = null; + List allJobs = executableManager.getAllJobIds(); + List allHiveTablesNeedToBeDeleted = new ArrayList(); + List workingJobList = new ArrayList(); + + for (String jobId : allJobs) { + // only remove FINISHED and DISCARDED job intermediate table + final ExecutableState state = executableManager.getOutput(jobId).getState(); + + if (!state.isFinalState()) { + workingJobList.add(jobId); + log.info("Remove intermediate hive table with job id " + jobId + " with job status " + state); + } + } + + while ((line = reader.readLine()) != null) { + if(line.startsWith("kylin_intermediate_")){ + boolean isNeedDel = true; + + for(String jobId : workingJobList){ + if(line.contains(jobId)){ + isNeedDel = false; + break; + } + } + + if(isNeedDel) + allHiveTablesNeedToBeDeleted.add(line); + } + } + + buf.delete(0, buf.length()); + buf.append("hive -e \""); + + for(String delHive : allHiveTablesNeedToBeDeleted){ + buf.append("drop table if exists " + delHive + "; "); + log.info("Remove " + delHive + " from hive tables."); + } + buf.append("\""); + cmd = new ShellCmd(buf.toString(), null, null, null, false); + + try { + cmd.execute(); + } catch (JobException e) { + e.printStackTrace(); + } + + if(reader != null) + reader.close(); } public static void main(String[] args) throws Exception { -- 1.7.1