From f34c0974a96c16bc942c6e3b0226140effdafbb8 Mon Sep 17 00:00:00 2001 From: nichunen@mininglamp.com Date: Mon, 7 Sep 2015 04:53:52 -0400 Subject: [PATCH] Optimize the storage cleaning job. --- .../kylin/job/hadoop/cube/StorageCleanupJob.java | 369 +++++++++++++++++++- 1 files changed, 365 insertions(+), 4 deletions(-) diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java index 46f0849..9115546 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java @@ -18,9 +18,13 @@ package org.apache.kylin.job.hadoop.cube; +import java.io.BufferedReader; +import java.io.FileReader; import java.io.IOException; +import java.io.StringReader; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import org.apache.commons.cli.Option; @@ -44,11 +48,15 @@ import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IIManager; import org.apache.kylin.invertedindex.IISegment; import org.apache.kylin.job.JobInstance; +import org.apache.kylin.job.cmd.ICommandOutput; +import org.apache.kylin.job.cmd.ShellCmd; import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.exception.JobException; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.hadoop.AbstractHadoopJob; import org.apache.kylin.job.manager.ExecutableManager; import org.apache.kylin.metadata.realization.IRealizationConstants; +import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,11 +67,14 @@ public class StorageCleanupJob extends AbstractHadoopJob { @SuppressWarnings("static-access") private static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused storage").create("delete"); + private static final Option OPTION_UUID_LIST = OptionBuilder.withArgName("uuids").hasArgs(Option.UNLIMITED_VALUES).isRequired(false).withDescription("Remove storage with uuids in list,separated by ,").create("uuids"); + private static final Option OPTION_UUID_FILE = OptionBuilder.withArgName("file").hasArg().isRequired(false).withDescription("Remove storage with uuid in file").create("file"); protected static final Logger log = LoggerFactory.getLogger(StorageCleanupJob.class); boolean delete = false; - + String uuids = ""; + String uuidFile = ""; protected static ExecutableManager executableManager = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()); /* @@ -78,17 +89,32 @@ public class StorageCleanupJob extends AbstractHadoopJob { log.info("----- jobs args: " + Arrays.toString(args)); try { options.addOption(OPTION_DELETE); + options.addOption(OPTION_UUID_LIST); + options.addOption(OPTION_UUID_FILE); parseOptions(options, args); log.info("options: '" + getOptionsAsString() + "'"); log.info("delete option value: '" + getOptionValue(OPTION_DELETE) + "'"); + log.info("uuids option value: '" + getOptionValue(OPTION_UUID_LIST) + "'"); + log.info("uuidFile option value: '" + getOptionValue(OPTION_UUID_FILE) + "'"); + uuids = getOptionValue(OPTION_UUID_LIST); + //Get the array of job uuids + String[] uuidArr = getUuidArray(); + delete = Boolean.parseBoolean(getOptionValue(OPTION_DELETE)); Configuration conf = HBaseConfiguration.create(getConf()); - cleanUnusedHdfsFiles(conf); - cleanUnusedHBaseTables(conf); - cleanUnusedIntermediateHiveTable(conf); + if(uuidArr != null && uuidArr.length > 0){ + cleanUnusedIntermediateHiveTable(conf, uuidArr); + cleanUnusedHBaseTables(conf, uuidArr); + cleanUnusedHdfsFiles(conf, uuidArr); + } + else if(uuids == null && uuidFile == null){ + cleanUnusedIntermediateHiveTable(conf); + cleanUnusedHBaseTables(conf); + cleanUnusedHdfsFiles(conf); + } return 0; } catch (Exception e) { @@ -97,6 +123,31 @@ public class StorageCleanupJob extends AbstractHadoopJob { } } + private String[] getUuidArray() throws IOException{ + List uuidList = new ArrayList(); + + if(uuids != null){ + String[] splittedUUids = uuids.split(","); + Collections.addAll(uuidList, splittedUUids); + } + uuidFile = getOptionValue(OPTION_UUID_FILE); + + if(uuidFile != null){ + FileReader fileReader = new FileReader(uuidFile); + BufferedReader buffReader = new BufferedReader(fileReader); + String line = null; + + while((line = buffReader.readLine()) != null && line.length() > 10) { + uuidList.add(line); + } + + buffReader.close(); + fileReader.close(); + } + String[] uuidArr = (String[])(uuidList.toArray(new String[0])); + return uuidArr; + } + private void cleanUnusedHBaseTables(Configuration conf) throws MasterNotRunningException, ZooKeeperConnectionException, IOException { CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); IIManager iiManager = IIManager.getInstance(KylinConfig.getInstanceFromEnv()); @@ -155,6 +206,64 @@ public class StorageCleanupJob extends AbstractHadoopJob { hbaseAdmin.close(); } + private void cleanUnusedHBaseTables(Configuration conf, String[] uuids) throws MasterNotRunningException, ZooKeeperConnectionException, IOException { + CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + IIManager iiManager = IIManager.getInstance(KylinConfig.getInstanceFromEnv()); + HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); + List allTablesNeedToBeDropped = new ArrayList(); + List uuidList = new ArrayList(); + + Collections.addAll(uuidList, uuids); + // add hbase tables with uuid list if its cube status is DISABLED or DESCBROKEN + for (CubeInstance cube : cubeMgr.listAllCubes()) { + if(cube.getStatus() == RealizationStatusEnum.DISABLED || cube.getStatus() == RealizationStatusEnum.DESCBROKEN){ + for (CubeSegment seg : cube.getSegments()) { + String segUuid = seg.getUuid(); + + if(uuidList.contains(segUuid)){ + String tablename = seg.getStorageLocationIdentifier(); + allTablesNeedToBeDropped.add(tablename); + uuidList.remove(segUuid); + log.info("Table " + tablename + " added to drop list, as the table belongs to cube " + cube.getName() + " with status " + cube.getStatus()); + } + } + } + } + + // remove every ii segment htable from drop list + for (IIInstance ii : iiManager.listAllIIs()) { + for (IISegment seg : ii.getSegments()) { + String tablename = seg.getStorageLocationIdentifier(); + if(allTablesNeedToBeDropped.contains(tablename)){ + allTablesNeedToBeDropped.remove(tablename); + log.info("Remove table " + tablename + " from drop list, as the table belongs to ii " + ii.getName() + " with status " + ii.getStatus()); + } + } + } + + if (delete == true) { + // drop tables + for (String htableName : allTablesNeedToBeDropped) { + log.info("Deleting HBase table " + htableName); + if (hbaseAdmin.tableExists(htableName)) { + hbaseAdmin.disableTable(htableName); + hbaseAdmin.deleteTable(htableName); + log.info("Deleted HBase table " + htableName); + } else { + log.info("HBase table" + htableName + " does not exist"); + } + } + } else { + System.out.println("--------------- Tables To Be Dropped ---------------"); + for (String htableName : allTablesNeedToBeDropped) { + System.out.println(htableName); + } + System.out.println("----------------------------------------------------"); + } + + hbaseAdmin.close(); + } + private void cleanUnusedHdfsFiles(Configuration conf) throws IOException { JobEngineConfig engineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv()); CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); @@ -219,8 +328,260 @@ public class StorageCleanupJob extends AbstractHadoopJob { } + private void cleanUnusedHdfsFiles(Configuration conf, String[] uuids) throws IOException { + JobEngineConfig engineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv()); + CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + FileSystem fs = FileSystem.get(conf); + List allHdfsPathsNeedToBeDeleted = new ArrayList(); + FileStatus[] fStatus = fs.listStatus(new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory())); + List fileNameList = new ArrayList(); + List uuidList = new ArrayList(); + Collections.addAll(uuidList, uuids); + + for(String uuid : uuids){ + fileNameList.add(JobInstance.JOB_WORKING_DIR_PREFIX + uuid); + } + + for (FileStatus status : fStatus) { + String path = status.getPath().getName(); + // System.out.println(path); + if (fileNameList.contains(path)) { + String kylinJobPath = engineConfig.getHdfsWorkingDirectory() + "/" + path; + allHdfsPathsNeedToBeDeleted.add(kylinJobPath); + } + } + + + //Remove files with cubes with status READY or BUILDING + for (CubeInstance cube : cubeMgr.listAllCubes()) { + for (CubeSegment seg : cube.getSegments()) { + String uuid = seg.getUuid(); + + if(uuidList.contains(uuid) && (cube.getStatus() == RealizationStatusEnum.READY || cube.getStatus() == RealizationStatusEnum.BUILDING)){ + String path = engineConfig.getHdfsWorkingDirectory() + "/" + JobInstance.JOB_WORKING_DIR_PREFIX + uuid; + allHdfsPathsNeedToBeDeleted.remove(path); + log.info("Remove " + path + " from deletion list, as the path belongs to cube " + cube.getUuid() + " with status " + cube.getStatus()); + } + } + } + + if (delete == true) { + // remove files + for (String hdfsPath : allHdfsPathsNeedToBeDeleted) { + log.info("Deleting hdfs path " + hdfsPath); + Path p = new Path(hdfsPath); + if (fs.exists(p) == true) { + fs.delete(p, true); + log.info("Deleted hdfs path " + hdfsPath); + } else { + log.info("Hdfs path " + hdfsPath + "does not exist"); + } + } + } else { + System.out.println("--------------- HDFS Path To Be Deleted ---------------"); + for (String hdfsPath : allHdfsPathsNeedToBeDeleted) { + System.out.println(hdfsPath); + } + System.out.println("-------------------------------------------------------"); + } + } + private void cleanUnusedIntermediateHiveTable(Configuration conf) throws IOException { + FileSystem fs = FileSystem.get(conf); + //JobEngineConfig engineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv()); + int uuidLength = 36; + + StringBuilder buf = new StringBuilder(); + buf.append("hive -e \""); + buf.append("show tables " + "\'kylin_intermediate_*\'" + "; "); + buf.append("\""); + + ShellCmd cmd = new ShellCmd(buf.toString(), null, null, null, false); + ICommandOutput output = null; + + try { + output = cmd.execute(); + } catch (JobException e) { + e.printStackTrace(); + } + + if(output == null) + return; + String outputStr = output.getOutput(); + BufferedReader reader = new BufferedReader(new StringReader(outputStr)); + String line = null; + List allJobs = executableManager.getAllJobIds(); + List allHiveTablesNeedToBeDeleted = new ArrayList(); + List workingJobList = new ArrayList(); + + for (String jobId : allJobs) { + // only remove FINISHED and DISCARDED job intermediate table + final ExecutableState state = executableManager.getOutput(jobId).getState(); + + if (!state.isFinalState()) { + workingJobList.add(jobId); + log.info("Remove intermediate hive table with job id " + jobId + " with job status " + state); + } + } + + while ((line = reader.readLine()) != null) { + if(line.startsWith("kylin_intermediate_")){ + boolean isNeedDel = true; + String uuid = line.substring(line.length() - uuidLength, line.length()); + uuid = uuid.replace("_", "-"); + //Check whether it's a hive table in use + if(workingJobList.contains(uuid)){ + isNeedDel = false; + } + else{ + log.info("Hive table with uuid " + uuid + " is in use."); + } + + //Check whether the hive table belongs to current Kylin instance + String hdfsPath = JobInstance.getJobWorkingDir(uuid, KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()); + Path p = new Path(hdfsPath); + + if (fs.exists(p) == false) { + isNeedDel = false; + } + else{ + log.info("Hive table with uuid " + uuid + " belongs to a different Kylin instance."); + } + + if(isNeedDel) + allHiveTablesNeedToBeDeleted.add(line); + } + } + + if (delete == true) { + buf.delete(0, buf.length()); + buf.append("hive -e \""); + + for(String delHive : allHiveTablesNeedToBeDeleted){ + buf.append("drop table if exists " + delHive + "; "); + log.info("Remove " + delHive + " from hive tables."); + } + buf.append("\""); + cmd = new ShellCmd(buf.toString(), null, null, null, false); + + try { + cmd.execute(); + } catch (JobException e) { + e.printStackTrace(); + } + } else { + System.out.println("------ Intermediate Hive Tables To Be Dropped ------"); + for (String hiveTable : allHiveTablesNeedToBeDeleted) { + System.out.println(hiveTable); + } + System.out.println("----------------------------------------------------"); + } + + if(reader != null) + reader.close(); + } + + private void cleanUnusedIntermediateHiveTable(Configuration conf, String[] uuids) throws IOException { + FileSystem fs = FileSystem.get(conf); + //JobEngineConfig engineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv()); + int uuidLength = 36; + + StringBuilder buf = new StringBuilder(); + buf.append("hive -e \""); + buf.append("show tables " + "\'kylin_intermediate_*\'" + "; "); + buf.append("\""); + + ShellCmd cmd = new ShellCmd(buf.toString(), null, null, null, false); + ICommandOutput output = null; + + try { + output = cmd.execute(); + } catch (JobException e) { + e.printStackTrace(); + } + + if(output == null) + return; + String outputStr = output.getOutput(); + BufferedReader reader = new BufferedReader(new StringReader(outputStr)); + String line = null; + List allJobs = executableManager.getAllJobIds(); + List allHiveTablesNeedToBeDeleted = new ArrayList(); + List workingJobList = new ArrayList(); + + for (String jobId : allJobs) { + // only remove FINISHED and DISCARDED job intermediate table + final ExecutableState state = executableManager.getOutput(jobId).getState(); + + if (!state.isFinalState()) { + workingJobList.add(jobId); + log.info("Remove intermediate hive table with job id " + jobId + " with job status " + state); + } + } + + while ((line = reader.readLine()) != null) { + if(line.startsWith("kylin_intermediate_")){ + boolean isNeedDel = false; + String uuid = line.substring(line.length() - uuidLength, line.length()); + uuid = uuid.replace("_", "-"); + + //check whether in array + for(String idInArr : uuids){ + if(idInArr.equals(uuid)){ + isNeedDel = true; + break; + } + } + //Check whether it's a hive table in use + if(workingJobList.contains(uuid)){ + isNeedDel = false; + } + else{ + log.info("Hive table with uuid " + uuid + " is in use."); + } + + //Check whether the hive table belongs to current Kylin instance + String hdfsPath = JobInstance.getJobWorkingDir(uuid, KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()); + Path p = new Path(hdfsPath); + + if (fs.exists(p) == false) { + isNeedDel = false; + } + else{ + log.info("Hive table with uuid " + uuid + " belongs to a different Kylin instance."); + } + + if(isNeedDel) + allHiveTablesNeedToBeDeleted.add(line); + } + } + + if (delete == true) { + buf.delete(0, buf.length()); + buf.append("hive -e \""); + + for(String delHive : allHiveTablesNeedToBeDeleted){ + buf.append("drop table if exists " + delHive + "; "); + log.info("Remove " + delHive + " from hive tables."); + } + buf.append("\""); + cmd = new ShellCmd(buf.toString(), null, null, null, false); + + try { + cmd.execute(); + } catch (JobException e) { + e.printStackTrace(); + } + } else { + System.out.println("------ Intermediate Hive Tables To Be Dropped ------"); + for (String hiveTable : allHiveTablesNeedToBeDeleted) { + System.out.println(hiveTable); + } + System.out.println("----------------------------------------------------"); + } + if(reader != null) + reader.close(); } public static void main(String[] args) throws Exception { -- 1.7.1