From 8f4a98fbe2f30e6ce298835f3de264d9e2b45b52 Mon Sep 17 00:00:00 2001 From: "qianhao.zhou" Date: Wed, 7 Jan 2015 16:02:26 +0800 Subject: [PATCH 1/4] add merge job & refactor --- .../java/com/kylinolap/job2/cube/BuildCubeJob.java | 39 -- .../kylinolap/job2/cube/BuildCubeJobBuilder.java | 323 ---------------- .../java/com/kylinolap/job2/cube/CubingJob.java | 39 ++ .../com/kylinolap/job2/cube/CubingJobBuilder.java | 420 +++++++++++++++++++++ .../cube/UpdateCubeInfoAfterBuildExecutable.java | 114 ++++++ .../cube/UpdateCubeInfoAfterMergeExecutable.java | 116 ++++++ .../job2/cube/UpdateCubeInfoExecutable.java | 113 ------ .../kylinolap/job2/execution/ExecuteResult.java | 4 + .../job2/cube/BuildCubeJobBuilderTest.java | 131 ------- .../kylinolap/job2/cube/CubingJobBuilderTest.java | 130 +++++++ .../com/kylinolap/rest/service/BasicService.java | 24 +- .../com/kylinolap/rest/service/CubeService.java | 27 +- .../com/kylinolap/rest/service/JobService.java | 39 +- 13 files changed, 865 insertions(+), 654 deletions(-) delete mode 100644 job/src/main/java/com/kylinolap/job2/cube/BuildCubeJob.java delete mode 100644 job/src/main/java/com/kylinolap/job2/cube/BuildCubeJobBuilder.java create mode 100644 job/src/main/java/com/kylinolap/job2/cube/CubingJob.java create mode 100644 job/src/main/java/com/kylinolap/job2/cube/CubingJobBuilder.java create mode 100644 job/src/main/java/com/kylinolap/job2/cube/UpdateCubeInfoAfterBuildExecutable.java create mode 100644 job/src/main/java/com/kylinolap/job2/cube/UpdateCubeInfoAfterMergeExecutable.java delete mode 100644 job/src/main/java/com/kylinolap/job2/cube/UpdateCubeInfoExecutable.java delete mode 100644 job/src/test/java/com/kylinolap/job2/cube/BuildCubeJobBuilderTest.java create mode 100644 job/src/test/java/com/kylinolap/job2/cube/CubingJobBuilderTest.java diff --git a/job/src/main/java/com/kylinolap/job2/cube/BuildCubeJob.java b/job/src/main/java/com/kylinolap/job2/cube/BuildCubeJob.java deleted file mode 100644 index 7c7b3f6..0000000 --- a/job/src/main/java/com/kylinolap/job2/cube/BuildCubeJob.java +++ /dev/null @@ -1,39 +0,0 @@ -package com.kylinolap.job2.cube; - -import com.kylinolap.job2.dao.JobPO; -import com.kylinolap.job2.impl.threadpool.DefaultChainedExecutable; - -/** - * Created by qianzhou on 12/25/14. - */ -public class BuildCubeJob extends DefaultChainedExecutable { - - public BuildCubeJob() { - super(); - } - - public BuildCubeJob(JobPO job) { - super(job); - } - - public static final String CUBE_INSTANCE_NAME = "cubeName"; - public static final String SEGMENT_ID = "segmentId"; - - - void setCubeName(String name) { - setParam(CUBE_INSTANCE_NAME, name); - } - - public String getCubeName() { - return getParam(CUBE_INSTANCE_NAME); - } - - public void setSegmentId(String segmentId) { - setParam(SEGMENT_ID, segmentId); - } - - public String getSegmentId() { - return getParam(SEGMENT_ID); - } - -} diff --git a/job/src/main/java/com/kylinolap/job2/cube/BuildCubeJobBuilder.java b/job/src/main/java/com/kylinolap/job2/cube/BuildCubeJobBuilder.java deleted file mode 100644 index 0c032e5..0000000 --- a/job/src/main/java/com/kylinolap/job2/cube/BuildCubeJobBuilder.java +++ /dev/null @@ -1,323 +0,0 @@ -package com.kylinolap.job2.cube; - -import com.kylinolap.cube.CubeSegment; -import com.kylinolap.cube.model.CubeBuildTypeEnum; -import com.kylinolap.job.JobInstance; -import com.kylinolap.job.JoinedFlatTable; -import com.kylinolap.job.engine.JobEngineConfig; -import com.kylinolap.job.hadoop.cube.*; -import com.kylinolap.job.hadoop.dict.CreateDictionaryJob; -import com.kylinolap.job.hadoop.hbase.BulkLoadJob; -import com.kylinolap.job.hadoop.hbase.CreateHTableJob; -import com.kylinolap.job.hadoop.hive.CubeJoinedFlatTableDesc; -import com.kylinolap.job2.common.HadoopShellExecutable; -import com.kylinolap.job2.common.MapReduceExecutable; -import com.kylinolap.job2.common.ShellExecutable; -import com.kylinolap.job2.constants.ExecutableConstants; - -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.TimeZone; - -/** - * Created by qianzhou on 12/25/14. - */ -public final class BuildCubeJobBuilder { - - private static final String JOB_WORKING_DIR_PREFIX = "kylin-"; - - private final JobEngineConfig jobEngineConfig; - private final CubeSegment segment; - - private BuildCubeJobBuilder(JobEngineConfig engineCfg, CubeSegment segment) { - this.jobEngineConfig = engineCfg; - this.segment = segment; - } - - public static BuildCubeJobBuilder newBuilder(JobEngineConfig engineCfg, CubeSegment segment) { - return new BuildCubeJobBuilder(engineCfg, segment); - } - - public BuildCubeJob build() { - final int groupRowkeyColumnsCount = segment.getCubeDesc().getRowkey().getNCuboidBuildLevels(); - final int totalRowkeyColumnsCount = segment.getCubeDesc().getRowkey().getRowKeyColumns().length; - - SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss"); - format.setTimeZone(TimeZone.getTimeZone(jobEngineConfig.getTimeZone())); - - BuildCubeJob result = new BuildCubeJob(); - result.setCubeName(getCubeName()); - result.setSegmentId(segment.getUuid()); - result.setName(getCubeName() + " - " + segment.getName() + " - BUILD - " + format.format(new Date(System.currentTimeMillis()))); - result.setSubmitter(null); - final String jobId = result.getId(); - final CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(segment.getCubeDesc(), this.segment); - final String intermediateHiveTableName = getIntermediateHiveTableName(intermediateTableDesc, jobId); - final String factDistinctColumnsPath = getFactDistinctColumnsPath(jobId); - final String cuboidRootPath = getJobWorkingDir(jobId) + "/" + getCubeName() + "/cuboid/"; - final String cuboidPath = cuboidRootPath + "*"; - final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount); - - final ShellExecutable intermediateHiveTableStep = createIntermediateHiveTableStep(intermediateTableDesc, jobId); - result.addTask(intermediateHiveTableStep); - - result.addTask(createFactDistinctColumnsStep(intermediateHiveTableName, jobId)); - - result.addTask(createBuildDictionaryStep(factDistinctColumnsPath)); - - // base cuboid step - final MapReduceExecutable baseCuboidStep = createBaseCuboidStep(intermediateHiveTableName, cuboidOutputTempPath); - result.addTask(baseCuboidStep); - - // n dim cuboid steps - for (int i = 1; i <= groupRowkeyColumnsCount; i++) { - int dimNum = totalRowkeyColumnsCount - i; - result.addTask(createNDimensionCuboidStep(cuboidOutputTempPath, dimNum, totalRowkeyColumnsCount)); - } - - result.addTask(createRangeRowkeyDistributionStep(cuboidPath)); - // create htable step - result.addTask(createCreateHTableStep()); - // generate hfiles step - final MapReduceExecutable convertCuboidToHfileStep = createConvertCuboidToHfileStep(cuboidPath, jobId); - result.addTask(convertCuboidToHfileStep); - // bulk load step - result.addTask(createBulkLoadStep(jobId)); - - result.addTask(createUpdateCubeInfoStep(intermediateHiveTableStep.getId(), baseCuboidStep.getId(), convertCuboidToHfileStep.getId())); - - return result; - } - - private String getJobWorkingDir(String jobUuid) { - return jobEngineConfig.getHdfsWorkingDirectory() + "/" + JOB_WORKING_DIR_PREFIX + jobUuid; - } - - private String getCubeName() { - return segment.getCubeInstance().getName(); - } - - private String getSegmentName() { - return segment.getName(); - } - - private String getRowkeyDistributionOutputPath() { - return jobEngineConfig.getHdfsWorkingDirectory() + "/" + getCubeName() + "/rowkey_stats"; - } - - private void appendMapReduceParameters(StringBuilder builder, JobEngineConfig engineConfig) { - try { - String jobConf = engineConfig.getHadoopJobConfFilePath(segment.getCubeDesc().getCapacity()); - if (jobConf != null && jobConf.length() > 0) { - builder.append(" -conf ").append(jobConf); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) { - String[] paths = new String[groupRowkeyColumnsCount + 1]; - for (int i = 0; i <= groupRowkeyColumnsCount; i++) { - int dimNum = totalRowkeyColumnCount - i; - if (dimNum == totalRowkeyColumnCount) { - paths[i] = cuboidRootPath + "base_cuboid"; - } else { - paths[i] = cuboidRootPath + dimNum + "d_cuboid"; - } - } - return paths; - } - - private StringBuilder appendExecCmdParameters(StringBuilder cmd, String paraName, String paraValue) { - return cmd.append(" -").append(paraName).append(" ").append(paraValue); - } - - private String getIntermediateHiveTableName(CubeJoinedFlatTableDesc intermediateTableDesc, String jobUuid) { - return JoinedFlatTable.getTableDir(intermediateTableDesc, getJobWorkingDir(jobUuid), jobUuid); - } - - private String getFactDistinctColumnsPath(String jobUuid) { - return getJobWorkingDir(jobUuid) + "/" + getCubeName() + "/fact_distinct_columns"; - } - - private String getHTableName() { - return segment.getStorageLocationIdentifier(); - } - - private String getHFilePath(String jobId) { - return getJobWorkingDir(jobId) + "/" + getCubeName() + "/hfile/"; - } - - private ShellExecutable createIntermediateHiveTableStep(CubeJoinedFlatTableDesc intermediateTableDesc, String jobId) { - try { - ShellExecutable result = new ShellExecutable(); - result.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE); - String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc, jobId); - String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, getJobWorkingDir(jobId), jobId); - String insertDataHql = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, jobId, this.jobEngineConfig); - - - StringBuilder buf = new StringBuilder(); - buf.append("hive -e \""); - buf.append(dropTableHql + "\n"); - buf.append(createTableHql + "\n"); - buf.append(insertDataHql + "\n"); - buf.append("\""); - - result.setCmd(buf.toString()); - return result; - } catch (IOException e) { - throw new RuntimeException("fail to create job", e); - } - } - - private MapReduceExecutable createFactDistinctColumnsStep(String intermediateHiveTableName, String jobId) { - MapReduceExecutable result = new MapReduceExecutable(); - result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS); - result.setMapReduceJobClass(FactDistinctColumnsJob.class); - StringBuilder cmd = new StringBuilder(); - appendMapReduceParameters(cmd, jobEngineConfig); - appendExecCmdParameters(cmd, "cubename", segment.getCubeInstance().getName()); - appendExecCmdParameters(cmd, "input", intermediateHiveTableName); - appendExecCmdParameters(cmd, "output", getFactDistinctColumnsPath(jobId)); - appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + getCubeName() + "_Step"); - appendExecCmdParameters(cmd, "htablename", new CubeJoinedFlatTableDesc(segment.getCubeDesc(), segment).getTableName(jobId)); - - result.setMapReduceParams(cmd.toString()); - return result; - } - - private HadoopShellExecutable createBuildDictionaryStep(String factDistinctColumnsPath) { - // base cuboid job - HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable(); - buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY); - StringBuilder cmd = new StringBuilder(); - appendExecCmdParameters(cmd, "cubename", getCubeName()); - appendExecCmdParameters(cmd, "segmentname", segment.getName()); - appendExecCmdParameters(cmd, "input", factDistinctColumnsPath); - - buildDictionaryStep.setJobParams(cmd.toString()); - buildDictionaryStep.setJobClass(CreateDictionaryJob.class); - return buildDictionaryStep; - } - - private MapReduceExecutable createBaseCuboidStep(String intermediateHiveTableName, String[] cuboidOutputTempPath) { - // base cuboid job - MapReduceExecutable baseCuboidStep = new MapReduceExecutable(); - - StringBuilder cmd = new StringBuilder(); - appendMapReduceParameters(cmd, jobEngineConfig); - - baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID); - - appendExecCmdParameters(cmd, "cubename", getCubeName()); - appendExecCmdParameters(cmd, "segmentname", getSegmentName()); - appendExecCmdParameters(cmd, "input", intermediateHiveTableName); - appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]); - appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + getCubeName()); - appendExecCmdParameters(cmd, "level", "0"); - - baseCuboidStep.setMapReduceParams(cmd.toString()); - baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class); - return baseCuboidStep; - } - - private MapReduceExecutable createNDimensionCuboidStep(String[] cuboidOutputTempPath, int dimNum, int totalRowkeyColumnCount) { - // ND cuboid job - MapReduceExecutable ndCuboidStep = new MapReduceExecutable(); - - ndCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_N_D_CUBOID + " : " + dimNum + "-Dimension"); - StringBuilder cmd = new StringBuilder(); - - appendMapReduceParameters(cmd, jobEngineConfig); - appendExecCmdParameters(cmd, "cubename", getCubeName()); - appendExecCmdParameters(cmd, "segmentname", getSegmentName()); - appendExecCmdParameters(cmd, "input", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]); - appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]); - appendExecCmdParameters(cmd, "jobname", "Kylin_ND-Cuboid_Builder_" + getCubeName() + "_Step"); - appendExecCmdParameters(cmd, "level", "" + (totalRowkeyColumnCount - dimNum)); - - ndCuboidStep.setMapReduceParams(cmd.toString()); - ndCuboidStep.setMapReduceJobClass(NDCuboidJob.class); - return ndCuboidStep; - } - - private MapReduceExecutable createRangeRowkeyDistributionStep(String inputPath) { - MapReduceExecutable rowkeyDistributionStep = new MapReduceExecutable(); - rowkeyDistributionStep.setName(ExecutableConstants.STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION); - StringBuilder cmd = new StringBuilder(); - - appendMapReduceParameters(cmd, jobEngineConfig); - appendExecCmdParameters(cmd, "input", inputPath); - appendExecCmdParameters(cmd, "output", getRowkeyDistributionOutputPath()); - appendExecCmdParameters(cmd, "cubename", getCubeName()); - appendExecCmdParameters(cmd, "jobname", "Kylin_Region_Splits_Calculator_" + getCubeName() + "_Step"); - - rowkeyDistributionStep.setMapReduceParams(cmd.toString()); - rowkeyDistributionStep.setMapReduceJobClass(RangeKeyDistributionJob.class); - return rowkeyDistributionStep; - } - - private HadoopShellExecutable createCreateHTableStep() { - HadoopShellExecutable createHtableStep = new HadoopShellExecutable(); - createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE); - StringBuilder cmd = new StringBuilder(); - appendExecCmdParameters(cmd, "cubename", getCubeName()); - appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath() + "/part-r-00000"); - appendExecCmdParameters(cmd, "htablename", getHTableName()); - - createHtableStep.setJobParams(cmd.toString()); - createHtableStep.setJobClass(CreateHTableJob.class); - - return createHtableStep; - } - - private MapReduceExecutable createConvertCuboidToHfileStep(String inputPath, String jobId) { - MapReduceExecutable createHFilesStep = new MapReduceExecutable(); - createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE); - StringBuilder cmd = new StringBuilder(); - - appendMapReduceParameters(cmd, jobEngineConfig); - appendExecCmdParameters(cmd, "cubename", getCubeName()); - appendExecCmdParameters(cmd, "input", inputPath); - appendExecCmdParameters(cmd, "output", getHFilePath(jobId)); - appendExecCmdParameters(cmd, "htablename", getHTableName()); - appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + getCubeName() + "_Step"); - - createHFilesStep.setMapReduceParams(cmd.toString()); - createHFilesStep.setMapReduceJobClass(CubeHFileJob.class); - - return createHFilesStep; - } - - private HadoopShellExecutable createBulkLoadStep(String jobId) { - HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable(); - bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE); - - StringBuilder cmd = new StringBuilder(); - appendExecCmdParameters(cmd, "input", getHFilePath(jobId)); - appendExecCmdParameters(cmd, "htablename", getHTableName()); - appendExecCmdParameters(cmd, "cubename", getCubeName()); - - bulkLoadStep.setJobParams(cmd.toString()); - bulkLoadStep.setJobClass(BulkLoadJob.class); - - return bulkLoadStep; - - } - - private UpdateCubeInfoExecutable createUpdateCubeInfoStep(String createFlatTableStepId, String baseCuboidStepId, String convertToHFileStepId) { - final UpdateCubeInfoExecutable updateCubeInfoStep = new UpdateCubeInfoExecutable(); - updateCubeInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO); - updateCubeInfoStep.setCubeName(getCubeName()); - updateCubeInfoStep.setSegmentId(segment.getUuid()); - updateCubeInfoStep.setCreateFlatTableStepId(createFlatTableStepId); - updateCubeInfoStep.setBaseCuboidStepId(baseCuboidStepId); - updateCubeInfoStep.setConvertToHFileStepId(convertToHFileStepId); - return updateCubeInfoStep; - } - - -} diff --git a/job/src/main/java/com/kylinolap/job2/cube/CubingJob.java b/job/src/main/java/com/kylinolap/job2/cube/CubingJob.java new file mode 100644 index 0000000..10b93f2 --- /dev/null +++ b/job/src/main/java/com/kylinolap/job2/cube/CubingJob.java @@ -0,0 +1,39 @@ +package com.kylinolap.job2.cube; + +import com.kylinolap.job2.dao.JobPO; +import com.kylinolap.job2.impl.threadpool.DefaultChainedExecutable; + +/** + * Created by qianzhou on 12/25/14. + */ +public class CubingJob extends DefaultChainedExecutable { + + public CubingJob() { + super(); + } + + public CubingJob(JobPO job) { + super(job); + } + + private static final String CUBE_INSTANCE_NAME = "cubeName"; + private static final String SEGMENT_ID = "segmentId"; + + + void setCubeName(String name) { + setParam(CUBE_INSTANCE_NAME, name); + } + + public String getCubeName() { + return getParam(CUBE_INSTANCE_NAME); + } + + void setSegmentId(String segmentId) { + setParam(SEGMENT_ID, segmentId); + } + + public String getSegmentId() { + return getParam(SEGMENT_ID); + } + +} diff --git a/job/src/main/java/com/kylinolap/job2/cube/CubingJobBuilder.java b/job/src/main/java/com/kylinolap/job2/cube/CubingJobBuilder.java new file mode 100644 index 0000000..666647e --- /dev/null +++ b/job/src/main/java/com/kylinolap/job2/cube/CubingJobBuilder.java @@ -0,0 +1,420 @@ +package com.kylinolap.job2.cube; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.kylinolap.cube.CubeSegment; +import com.kylinolap.job.JoinedFlatTable; +import com.kylinolap.job.constant.JobConstants; +import com.kylinolap.job.engine.JobEngineConfig; +import com.kylinolap.job.hadoop.cube.*; +import com.kylinolap.job.hadoop.dict.CreateDictionaryJob; +import com.kylinolap.job.hadoop.hbase.BulkLoadJob; +import com.kylinolap.job.hadoop.hbase.CreateHTableJob; +import com.kylinolap.job.hadoop.hive.CubeJoinedFlatTableDesc; +import com.kylinolap.job2.common.HadoopShellExecutable; +import com.kylinolap.job2.common.MapReduceExecutable; +import com.kylinolap.job2.common.ShellExecutable; +import com.kylinolap.job2.constants.ExecutableConstants; +import org.apache.commons.lang3.StringUtils; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.TimeZone; + +/** + * Created by qianzhou on 12/25/14. + */ +public final class CubingJobBuilder { + + private static final String JOB_WORKING_DIR_PREFIX = "kylin-"; + + private JobEngineConfig jobEngineConfig; + private CubeSegment segment; + private String submitter; + + private CubingJobBuilder() {} + + public static CubingJobBuilder newBuilder() { + return new CubingJobBuilder(); + } + + public CubingJobBuilder setSegment(CubeSegment segment) { + this.segment = segment; + return this; + } + + public CubingJobBuilder setJobEnginConfig(JobEngineConfig enginConfig) { + this.jobEngineConfig = enginConfig; + return this; + } + + public CubingJobBuilder setSubmitter(String submitter) { + this.submitter = submitter; + return this; + } + + public CubingJob buildJob() { + checkPreconditions(); + final int groupRowkeyColumnsCount = segment.getCubeDesc().getRowkey().getNCuboidBuildLevels(); + final int totalRowkeyColumnsCount = segment.getCubeDesc().getRowkey().getRowKeyColumns().length; + + CubingJob result = initialJob("BUILD"); + final String jobId = result.getId(); + final CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(segment.getCubeDesc(), this.segment); + final String intermediateHiveTableName = getIntermediateHiveTableName(intermediateTableDesc, jobId); + final String factDistinctColumnsPath = getFactDistinctColumnsPath(jobId); + final String cuboidRootPath = getJobWorkingDir(jobId) + "/" + getCubeName() + "/cuboid/"; + final String cuboidPath = cuboidRootPath + "*"; + final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount); + + final ShellExecutable intermediateHiveTableStep = createIntermediateHiveTableStep(intermediateTableDesc, jobId); + result.addTask(intermediateHiveTableStep); + + result.addTask(createFactDistinctColumnsStep(intermediateHiveTableName, jobId)); + + result.addTask(createBuildDictionaryStep(factDistinctColumnsPath)); + + // base cuboid step + final MapReduceExecutable baseCuboidStep = createBaseCuboidStep(intermediateHiveTableName, cuboidOutputTempPath); + result.addTask(baseCuboidStep); + + // n dim cuboid steps + for (int i = 1; i <= groupRowkeyColumnsCount; i++) { + int dimNum = totalRowkeyColumnsCount - i; + result.addTask(createNDimensionCuboidStep(cuboidOutputTempPath, dimNum, totalRowkeyColumnsCount)); + } + + result.addTask(createRangeRowkeyDistributionStep(cuboidPath)); + // create htable step + result.addTask(createCreateHTableStep()); + // generate hfiles step + final MapReduceExecutable convertCuboidToHfileStep = createConvertCuboidToHfileStep(cuboidPath, jobId); + result.addTask(convertCuboidToHfileStep); + // bulk load step + result.addTask(createBulkLoadStep(jobId)); + + result.addTask(createUpdateCubeInfoStep(intermediateHiveTableStep.getId(), baseCuboidStep.getId(), convertCuboidToHfileStep.getId())); + + return result; + } + + public CubingJob mergeJob() { + checkPreconditions(); + CubingJob result = initialJob("MERGE"); + final String jobId = result.getId(); + List mergingSegments = segment.getCubeInstance().getMergingSegments(segment); + Preconditions.checkState(mergingSegments != null && mergingSegments.size() > 1, "there should be more than 2 segments to merge"); + String[] cuboidPaths = new String[mergingSegments.size()]; + for (int i = 0; i < mergingSegments.size(); i++) { + cuboidPaths[i] = getPathToMerge(mergingSegments.get(i)); + } + final String formattedPath = StringUtils.join(cuboidPaths, ","); + final String mergedCuboidPath = getJobWorkingDir(jobId) + "/" + getCubeName() + "/cuboid"; + + result.addTask(createMergeCuboidDataStep(formattedPath, mergedCuboidPath)); + + // get output distribution step + result.addTask(createRangeRowkeyDistributionStep(mergedCuboidPath)); + + // create htable step + result.addTask(createCreateHTableStep()); + + // generate hfiles step + final MapReduceExecutable convertCuboidToHfileStep = createConvertCuboidToHfileStep(mergedCuboidPath, jobId); + result.addTask(convertCuboidToHfileStep); + + // bulk load step + result.addTask(createBulkLoadStep(jobId)); + + final List mergingSegmentIds = Lists.transform(mergingSegments, new Function() { + @Nullable + @Override + public String apply(CubeSegment input) { + return input.getUuid(); + } + }); + result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, convertCuboidToHfileStep.getId())); + + return result; + } + + private CubingJob initialJob(String type) { + CubingJob result = new CubingJob(); + SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss"); + format.setTimeZone(TimeZone.getTimeZone(jobEngineConfig.getTimeZone())); + result.setCubeName(getCubeName()); + result.setSegmentId(segment.getUuid()); + result.setName(getCubeName() + " - " + segment.getName() + " - " + type + " - " + format.format(new Date(System.currentTimeMillis()))); + result.setSubmitter(this.submitter); + return result; + } + + private void checkPreconditions() { + Preconditions.checkNotNull(this.segment, "segment cannot be null"); + Preconditions.checkNotNull(this.jobEngineConfig, "jobEngineConfig cannot be null"); + } + + private String getJobWorkingDir(String uuid) { + return jobEngineConfig.getHdfsWorkingDirectory() + "/" + JOB_WORKING_DIR_PREFIX + uuid; + } + + private String getPathToMerge(CubeSegment segment) { + return getJobWorkingDir(segment.getUuid()) + "/" + getCubeName() + "/cuboid/*"; + } + + private String getCubeName() { + return segment.getCubeInstance().getName(); + } + + private String getSegmentName() { + return segment.getName(); + } + + private String getRowkeyDistributionOutputPath() { + return jobEngineConfig.getHdfsWorkingDirectory() + "/" + getCubeName() + "/rowkey_stats"; + } + + private void appendMapReduceParameters(StringBuilder builder, JobEngineConfig engineConfig) { + try { + String jobConf = engineConfig.getHadoopJobConfFilePath(segment.getCubeDesc().getCapacity()); + if (jobConf != null && jobConf.length() > 0) { + builder.append(" -conf ").append(jobConf); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) { + String[] paths = new String[groupRowkeyColumnsCount + 1]; + for (int i = 0; i <= groupRowkeyColumnsCount; i++) { + int dimNum = totalRowkeyColumnCount - i; + if (dimNum == totalRowkeyColumnCount) { + paths[i] = cuboidRootPath + "base_cuboid"; + } else { + paths[i] = cuboidRootPath + dimNum + "d_cuboid"; + } + } + return paths; + } + + private StringBuilder appendExecCmdParameters(StringBuilder cmd, String paraName, String paraValue) { + return cmd.append(" -").append(paraName).append(" ").append(paraValue); + } + + private String getIntermediateHiveTableName(CubeJoinedFlatTableDesc intermediateTableDesc, String jobUuid) { + return JoinedFlatTable.getTableDir(intermediateTableDesc, getJobWorkingDir(jobUuid), jobUuid); + } + + private String getFactDistinctColumnsPath(String jobUuid) { + return getJobWorkingDir(jobUuid) + "/" + getCubeName() + "/fact_distinct_columns"; + } + + private String getHTableName() { + return segment.getStorageLocationIdentifier(); + } + + private String getHFilePath(String jobId) { + return getJobWorkingDir(jobId) + "/" + getCubeName() + "/hfile/"; + } + + private ShellExecutable createIntermediateHiveTableStep(CubeJoinedFlatTableDesc intermediateTableDesc, String jobId) { + try { + ShellExecutable result = new ShellExecutable(); + result.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE); + String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc, jobId); + String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, getJobWorkingDir(jobId), jobId); + String insertDataHql = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, jobId, this.jobEngineConfig); + + + StringBuilder buf = new StringBuilder(); + buf.append("hive -e \""); + buf.append(dropTableHql + "\n"); + buf.append(createTableHql + "\n"); + buf.append(insertDataHql + "\n"); + buf.append("\""); + + result.setCmd(buf.toString()); + return result; + } catch (IOException e) { + throw new RuntimeException("fail to create job", e); + } + } + + private MapReduceExecutable createFactDistinctColumnsStep(String intermediateHiveTableName, String jobId) { + MapReduceExecutable result = new MapReduceExecutable(); + result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS); + result.setMapReduceJobClass(FactDistinctColumnsJob.class); + StringBuilder cmd = new StringBuilder(); + appendMapReduceParameters(cmd, jobEngineConfig); + appendExecCmdParameters(cmd, "cubename", getCubeName()); + appendExecCmdParameters(cmd, "input", intermediateHiveTableName); + appendExecCmdParameters(cmd, "output", getFactDistinctColumnsPath(jobId)); + appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + getCubeName() + "_Step"); + appendExecCmdParameters(cmd, "htablename", new CubeJoinedFlatTableDesc(segment.getCubeDesc(), segment).getTableName(jobId)); + + result.setMapReduceParams(cmd.toString()); + return result; + } + + private HadoopShellExecutable createBuildDictionaryStep(String factDistinctColumnsPath) { + // base cuboid job + HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable(); + buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY); + StringBuilder cmd = new StringBuilder(); + appendExecCmdParameters(cmd, "cubename", getCubeName()); + appendExecCmdParameters(cmd, "segmentname", getSegmentName()); + appendExecCmdParameters(cmd, "input", factDistinctColumnsPath); + + buildDictionaryStep.setJobParams(cmd.toString()); + buildDictionaryStep.setJobClass(CreateDictionaryJob.class); + return buildDictionaryStep; + } + + private MapReduceExecutable createBaseCuboidStep(String intermediateHiveTableName, String[] cuboidOutputTempPath) { + // base cuboid job + MapReduceExecutable baseCuboidStep = new MapReduceExecutable(); + + StringBuilder cmd = new StringBuilder(); + appendMapReduceParameters(cmd, jobEngineConfig); + + baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID); + + appendExecCmdParameters(cmd, "cubename", getCubeName()); + appendExecCmdParameters(cmd, "segmentname", getSegmentName()); + appendExecCmdParameters(cmd, "input", intermediateHiveTableName); + appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]); + appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + getCubeName()); + appendExecCmdParameters(cmd, "level", "0"); + + baseCuboidStep.setMapReduceParams(cmd.toString()); + baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class); + return baseCuboidStep; + } + + private MapReduceExecutable createNDimensionCuboidStep(String[] cuboidOutputTempPath, int dimNum, int totalRowkeyColumnCount) { + // ND cuboid job + MapReduceExecutable ndCuboidStep = new MapReduceExecutable(); + + ndCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_N_D_CUBOID + " : " + dimNum + "-Dimension"); + StringBuilder cmd = new StringBuilder(); + + appendMapReduceParameters(cmd, jobEngineConfig); + appendExecCmdParameters(cmd, "cubename", getCubeName()); + appendExecCmdParameters(cmd, "segmentname", getSegmentName()); + appendExecCmdParameters(cmd, "input", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]); + appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]); + appendExecCmdParameters(cmd, "jobname", "Kylin_ND-Cuboid_Builder_" + getCubeName() + "_Step"); + appendExecCmdParameters(cmd, "level", "" + (totalRowkeyColumnCount - dimNum)); + + ndCuboidStep.setMapReduceParams(cmd.toString()); + ndCuboidStep.setMapReduceJobClass(NDCuboidJob.class); + return ndCuboidStep; + } + + private MapReduceExecutable createRangeRowkeyDistributionStep(String inputPath) { + MapReduceExecutable rowkeyDistributionStep = new MapReduceExecutable(); + rowkeyDistributionStep.setName(ExecutableConstants.STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION); + StringBuilder cmd = new StringBuilder(); + + appendMapReduceParameters(cmd, jobEngineConfig); + appendExecCmdParameters(cmd, "input", inputPath); + appendExecCmdParameters(cmd, "output", getRowkeyDistributionOutputPath()); + appendExecCmdParameters(cmd, "cubename", getCubeName()); + appendExecCmdParameters(cmd, "jobname", "Kylin_Region_Splits_Calculator_" + getCubeName() + "_Step"); + + rowkeyDistributionStep.setMapReduceParams(cmd.toString()); + rowkeyDistributionStep.setMapReduceJobClass(RangeKeyDistributionJob.class); + return rowkeyDistributionStep; + } + + private HadoopShellExecutable createCreateHTableStep() { + HadoopShellExecutable createHtableStep = new HadoopShellExecutable(); + createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE); + StringBuilder cmd = new StringBuilder(); + appendExecCmdParameters(cmd, "cubename", getCubeName()); + appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath() + "/part-r-00000"); + appendExecCmdParameters(cmd, "htablename", getHTableName()); + + createHtableStep.setJobParams(cmd.toString()); + createHtableStep.setJobClass(CreateHTableJob.class); + + return createHtableStep; + } + + private MapReduceExecutable createConvertCuboidToHfileStep(String inputPath, String jobId) { + MapReduceExecutable createHFilesStep = new MapReduceExecutable(); + createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE); + StringBuilder cmd = new StringBuilder(); + + appendMapReduceParameters(cmd, jobEngineConfig); + appendExecCmdParameters(cmd, "cubename", getCubeName()); + appendExecCmdParameters(cmd, "input", inputPath); + appendExecCmdParameters(cmd, "output", getHFilePath(jobId)); + appendExecCmdParameters(cmd, "htablename", getHTableName()); + appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + getCubeName() + "_Step"); + + createHFilesStep.setMapReduceParams(cmd.toString()); + createHFilesStep.setMapReduceJobClass(CubeHFileJob.class); + + return createHFilesStep; + } + + private HadoopShellExecutable createBulkLoadStep(String jobId) { + HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable(); + bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE); + + StringBuilder cmd = new StringBuilder(); + appendExecCmdParameters(cmd, "input", getHFilePath(jobId)); + appendExecCmdParameters(cmd, "htablename", getHTableName()); + appendExecCmdParameters(cmd, "cubename", getCubeName()); + + bulkLoadStep.setJobParams(cmd.toString()); + bulkLoadStep.setJobClass(BulkLoadJob.class); + + return bulkLoadStep; + + } + + private UpdateCubeInfoAfterBuildExecutable createUpdateCubeInfoStep(String createFlatTableStepId, String baseCuboidStepId, String convertToHFileStepId) { + final UpdateCubeInfoAfterBuildExecutable updateCubeInfoStep = new UpdateCubeInfoAfterBuildExecutable(); + updateCubeInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO); + updateCubeInfoStep.setCubeName(getCubeName()); + updateCubeInfoStep.setSegmentId(segment.getUuid()); + updateCubeInfoStep.setCreateFlatTableStepId(createFlatTableStepId); + updateCubeInfoStep.setBaseCuboidStepId(baseCuboidStepId); + updateCubeInfoStep.setConvertToHFileStepId(convertToHFileStepId); + return updateCubeInfoStep; + } + + private MapReduceExecutable createMergeCuboidDataStep(String inputPath, String outputPath) { + MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable(); + mergeCuboidDataStep.setName(JobConstants.STEP_NAME_MERGE_CUBOID); + StringBuilder cmd = new StringBuilder(); + + appendMapReduceParameters(cmd, jobEngineConfig); + appendExecCmdParameters(cmd, "cubename", getCubeName()); + appendExecCmdParameters(cmd, "segmentname", getSegmentName()); + appendExecCmdParameters(cmd, "input", inputPath); + appendExecCmdParameters(cmd, "output", outputPath); + appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + getCubeName() + "_Step"); + + mergeCuboidDataStep.setMapReduceParams(cmd.toString()); + mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidJob.class); + return mergeCuboidDataStep; + } + + private UpdateCubeInfoAfterMergeExecutable createUpdateCubeInfoAfterMergeStep(List mergingSegmentIds, String convertToHFileStepId) { + UpdateCubeInfoAfterMergeExecutable result = new UpdateCubeInfoAfterMergeExecutable(); + result.setCubeName(getCubeName()); + result.setSegmentId(segment.getUuid()); + result.setMergingSegmentIds(mergingSegmentIds); + result.setConvertToHFileStepId(convertToHFileStepId); + return result; + } + +} diff --git a/job/src/main/java/com/kylinolap/job2/cube/UpdateCubeInfoAfterBuildExecutable.java b/job/src/main/java/com/kylinolap/job2/cube/UpdateCubeInfoAfterBuildExecutable.java new file mode 100644 index 0000000..37a4495 --- /dev/null +++ b/job/src/main/java/com/kylinolap/job2/cube/UpdateCubeInfoAfterBuildExecutable.java @@ -0,0 +1,114 @@ +package com.kylinolap.job2.cube; + +import com.google.common.base.Preconditions; +import com.kylinolap.common.KylinConfig; +import com.kylinolap.cube.CubeInstance; +import com.kylinolap.cube.CubeManager; +import com.kylinolap.cube.CubeSegment; +import com.kylinolap.job2.constants.ExecutableConstants; +import com.kylinolap.job2.dao.JobPO; +import com.kylinolap.job2.exception.ExecuteException; +import com.kylinolap.job2.execution.ExecutableContext; +import com.kylinolap.job2.execution.ExecuteResult; +import com.kylinolap.job2.impl.threadpool.AbstractExecutable; +import com.kylinolap.metadata.model.SegmentStatusEnum; +import com.kylinolap.metadata.realization.RealizationStatusEnum; +import org.apache.commons.lang.StringUtils; + +import java.io.IOException; + +/** + * Created by qianzhou on 1/4/15. + */ +public class UpdateCubeInfoAfterBuildExecutable extends AbstractExecutable { + + private static final String SEGMENT_ID = "segmentId"; + private static final String CUBE_NAME = "cubeName"; + private static final String CONVERT_TO_HFILE_STEP_ID = "convertToHFileStepId"; + private static final String BASE_CUBOID_STEP_ID = "baseCuboidStepId"; + private static final String CREATE_FLAT_TABLE_STEP_ID = "createFlatTableStepId"; + + private final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + + public UpdateCubeInfoAfterBuildExecutable() { + } + + public UpdateCubeInfoAfterBuildExecutable(JobPO job) { + super(job); + } + + public void setCubeName(String cubeName) { + this.setParam(CUBE_NAME, cubeName); + } + + private String getCubeName() { + return getParam(CUBE_NAME); + } + + public void setSegmentId(String segmentId) { + this.setParam(SEGMENT_ID, segmentId); + } + + private String getSegmentId() { + return getParam(SEGMENT_ID); + } + + public void setConvertToHFileStepId(String id) { + setParam(CONVERT_TO_HFILE_STEP_ID, id); + } + + private String getConvertToHfileStepId() { + return getParam(CONVERT_TO_HFILE_STEP_ID); + } + + public void setBaseCuboidStepId(String id) { + setParam(BASE_CUBOID_STEP_ID, id); + } + + private String getBaseCuboidStepId() { + return getParam(BASE_CUBOID_STEP_ID); + } + + public void setCreateFlatTableStepId(String id) { + setParam(CREATE_FLAT_TABLE_STEP_ID, id); + } + + private String getCreateFlatTableStepId() { + return getParam(CREATE_FLAT_TABLE_STEP_ID); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + final CubeInstance cube = cubeManager.getCube(getCubeName()); + final CubeSegment segment = cube.getSegmentById(getSegmentId()); + + String sourceRecordsSize = jobService.getOutput(getCreateFlatTableStepId()).getExtra().get(ExecutableConstants.SOURCE_RECORDS_SIZE); + Preconditions.checkState(StringUtils.isNotEmpty(sourceRecordsSize), "Can't get cube source record size."); + long sourceSize = Long.parseLong(sourceRecordsSize); + + String sourceRecordsCount = jobService.getOutput(getBaseCuboidStepId()).getExtra().get(ExecutableConstants.SOURCE_RECORDS_COUNT); + Preconditions.checkState(StringUtils.isNotEmpty(sourceRecordsCount), "Can't get cube source record count."); + long sourceCount = Long.parseLong(sourceRecordsCount); + + String cubeSizeString = jobService.getOutput(getConvertToHfileStepId()).getExtra().get(ExecutableConstants.HDFS_BYTES_WRITTEN); + Preconditions.checkState(StringUtils.isNotEmpty(cubeSizeString), "Can't get cube segment size."); + long size = Long.parseLong(cubeSizeString) / 1024; + + + segment.setLastBuildJobID(getId()); + segment.setLastBuildTime(System.currentTimeMillis()); + segment.setSizeKB(size); + segment.setSourceRecords(sourceCount); + segment.setSourceRecordsSize(sourceSize); + segment.setStatus(SegmentStatusEnum.READY); + cube.setStatus(RealizationStatusEnum.READY); + + try { + cubeManager.updateCube(cube); + return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); + } catch (IOException e) { + logger.error("fail to update cube after build", e); + return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); + } + } +} diff --git a/job/src/main/java/com/kylinolap/job2/cube/UpdateCubeInfoAfterMergeExecutable.java b/job/src/main/java/com/kylinolap/job2/cube/UpdateCubeInfoAfterMergeExecutable.java new file mode 100644 index 0000000..f821516 --- /dev/null +++ b/job/src/main/java/com/kylinolap/job2/cube/UpdateCubeInfoAfterMergeExecutable.java @@ -0,0 +1,116 @@ +package com.kylinolap.job2.cube; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.kylinolap.common.KylinConfig; +import com.kylinolap.cube.CubeInstance; +import com.kylinolap.cube.CubeManager; +import com.kylinolap.cube.CubeSegment; +import com.kylinolap.job2.constants.ExecutableConstants; +import com.kylinolap.job2.exception.ExecuteException; +import com.kylinolap.job2.execution.ExecutableContext; +import com.kylinolap.job2.execution.ExecuteResult; +import com.kylinolap.job2.impl.threadpool.AbstractExecutable; +import org.apache.commons.lang.StringUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Created by qianzhou on 1/7/15. + */ +public class UpdateCubeInfoAfterMergeExecutable extends AbstractExecutable { + + private static final String CUBE_NAME = "cubeName"; + private static final String SEGMENT_ID = "segmentId"; + private static final String MERGING_SEGMENT_IDS = "mergingSegmentIds"; + private static final String CONVERT_TO_HFILE_STEP_ID = "convertToHFileStepId"; + + private final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + final CubeInstance cube = cubeManager.getCube(getCubeName()); + List mergingSegmentIds = getMergingSegmentIds(); + if (mergingSegmentIds.isEmpty()) { + return new ExecuteResult(ExecuteResult.State.FAILED, "there are no merging segments"); + } + CubeSegment mergedSegment = cube.getSegmentById(getSegmentId()); + if (mergedSegment == null) { + return new ExecuteResult(ExecuteResult.State.FAILED, "there is no segment with id:" + getSegmentId()); + } + String cubeSizeString = jobService.getOutput(getConvertToHfileStepId()).getExtra().get(ExecutableConstants.HDFS_BYTES_WRITTEN); + Preconditions.checkState(StringUtils.isNotEmpty(cubeSizeString), "Can't get cube segment size."); + long cubeSize = Long.parseLong(cubeSizeString) / 1024; + + List toBeRemoved = Lists.newArrayListWithExpectedSize(mergingSegmentIds.size()); + for (CubeSegment segment : cube.getSegments()) { + if (mergingSegmentIds.contains(segment.getUuid())) { + toBeRemoved.add(segment); + } + } + + long sourceCount = 0L; + long sourceSize = 0L; + for (CubeSegment segment : toBeRemoved) { + sourceCount += segment.getSourceRecords(); + sourceSize += segment.getSourceRecordsSize(); + } + //update segment info + mergedSegment.setSizeKB(cubeSize); + mergedSegment.setSourceRecords(sourceCount); + mergedSegment.setSourceRecordsSize(sourceSize); + //remove old segment + cube.getSegments().removeAll(toBeRemoved); + try { + cubeManager.updateCube(cube); + return new ExecuteResult(ExecuteResult.State.SUCCEED); + } catch (IOException e) { + logger.error("fail to update cube after merge", e); + return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); + } + } + + public void setSegmentId(String segmentId) { + this.setParam(SEGMENT_ID, segmentId); + } + + private String getSegmentId() { + return getParam(SEGMENT_ID); + } + + public void setCubeName(String cubeName) { + this.setParam(CUBE_NAME, cubeName); + } + + private String getCubeName() { + return getParam(CUBE_NAME); + } + + public void setMergingSegmentIds(List ids) { + setParam(MERGING_SEGMENT_IDS, StringUtils.join(ids, ",")); + } + + private List getMergingSegmentIds() { + final String ids = getParam(MERGING_SEGMENT_IDS); + if (ids != null) { + final String[] splitted = StringUtils.split(ids, ","); + ArrayList result = Lists.newArrayListWithExpectedSize(splitted.length); + for (String id: splitted) { + result.add(id); + } + return result; + } else { + return Collections.emptyList(); + } + } + + public void setConvertToHFileStepId(String id) { + setParam(CONVERT_TO_HFILE_STEP_ID, id); + } + + private String getConvertToHfileStepId() { + return getParam(CONVERT_TO_HFILE_STEP_ID); + } +} diff --git a/job/src/main/java/com/kylinolap/job2/cube/UpdateCubeInfoExecutable.java b/job/src/main/java/com/kylinolap/job2/cube/UpdateCubeInfoExecutable.java deleted file mode 100644 index 03e6ad7..0000000 --- a/job/src/main/java/com/kylinolap/job2/cube/UpdateCubeInfoExecutable.java +++ /dev/null @@ -1,113 +0,0 @@ -package com.kylinolap.job2.cube; - -import com.google.common.base.Preconditions; -import com.kylinolap.common.KylinConfig; -import com.kylinolap.cube.CubeInstance; -import com.kylinolap.cube.CubeManager; -import com.kylinolap.cube.CubeSegment; -import com.kylinolap.job2.constants.ExecutableConstants; -import com.kylinolap.job2.dao.JobPO; -import com.kylinolap.job2.exception.ExecuteException; -import com.kylinolap.job2.execution.ExecutableContext; -import com.kylinolap.job2.execution.ExecuteResult; -import com.kylinolap.job2.impl.threadpool.AbstractExecutable; -import com.kylinolap.metadata.model.SegmentStatusEnum; -import com.kylinolap.metadata.realization.RealizationStatusEnum; -import org.apache.commons.lang.StringUtils; - -import java.io.IOException; - -/** - * Created by qianzhou on 1/4/15. - */ -public class UpdateCubeInfoExecutable extends AbstractExecutable { - - private static final String SEGMENT_ID = "segmentId"; - private static final String CUBE_NAME = "cubeName"; - private static final String CONVERT_TO_HFILE_STEP_ID = "convertToHFileStepId"; - private static final String BASE_CUBOID_STEP_ID = "baseCuboidStepId"; - private static final String CREATE_FLAT_TABLE_STEP_ID = "createFlatTableStepId"; - - private final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); - - public UpdateCubeInfoExecutable() { - } - - public UpdateCubeInfoExecutable(JobPO job) { - super(job); - } - - public void setCubeName(String cubeName) { - this.setParam(CUBE_NAME, cubeName); - } - - private String getCubeName() { - return getParam(CUBE_NAME); - } - - public void setSegmentId(String segmentId) { - this.setParam(SEGMENT_ID, segmentId); - } - - private String getSegmentId() { - return getParam(SEGMENT_ID); - } - - public void setConvertToHFileStepId(String id) { - setParam(CONVERT_TO_HFILE_STEP_ID, id); - } - - private String getConvertToHfileStepId() { - return getParam(CONVERT_TO_HFILE_STEP_ID); - } - - public void setBaseCuboidStepId(String id) { - setParam(BASE_CUBOID_STEP_ID, id); - } - - private String getBaseCuboidStepId() { - return getParam(BASE_CUBOID_STEP_ID); - } - - public void setCreateFlatTableStepId(String id) { - setParam(CREATE_FLAT_TABLE_STEP_ID, id); - } - - private String getCreateFlatTableStepId() { - return getParam(CREATE_FLAT_TABLE_STEP_ID); - } - - @Override - protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - final CubeInstance cube = cubeManager.getCube(getCubeName()); - final CubeSegment segment = cube.getSegmentById(getSegmentId()); - - String sourceRecordsSize = jobService.getOutput(getCreateFlatTableStepId()).getExtra().get(ExecutableConstants.SOURCE_RECORDS_SIZE); - Preconditions.checkState(StringUtils.isNotEmpty(sourceRecordsSize), "Can't get cube source record size."); - long sourceSize = Long.parseLong(sourceRecordsSize); - - String sourceRecordsCount = jobService.getOutput(getBaseCuboidStepId()).getExtra().get(ExecutableConstants.SOURCE_RECORDS_COUNT); - Preconditions.checkState(StringUtils.isNotEmpty(sourceRecordsCount), "Can't get cube source record count."); - long sourceCount = Long.parseLong(sourceRecordsCount); - - String cubeSizeString = jobService.getOutput(getConvertToHfileStepId()).getExtra().get(ExecutableConstants.HDFS_BYTES_WRITTEN); - Preconditions.checkState(StringUtils.isNotEmpty(cubeSizeString), "Can't get cube segment size."); - long size = Long.parseLong(cubeSizeString) / 1024; - - - segment.setLastBuildJobID(getId()); - segment.setLastBuildTime(System.currentTimeMillis()); - segment.setSizeKB(size); - segment.setSourceRecords(sourceCount); - segment.setSourceRecordsSize(sourceSize); - segment.setStatus(SegmentStatusEnum.READY); - cube.setStatus(RealizationStatusEnum.READY); - - try { - cubeManager.updateCube(cube); - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); - } catch (IOException e) { - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); - } - } -} diff --git a/job/src/main/java/com/kylinolap/job2/execution/ExecuteResult.java b/job/src/main/java/com/kylinolap/job2/execution/ExecuteResult.java index 86b9744..9fa7371 100644 --- a/job/src/main/java/com/kylinolap/job2/execution/ExecuteResult.java +++ b/job/src/main/java/com/kylinolap/job2/execution/ExecuteResult.java @@ -12,6 +12,10 @@ private final State state; private final String output; + public ExecuteResult(State state) { + this(state, ""); + } + public ExecuteResult(State state, String output) { Preconditions.checkArgument(state != null, "state cannot be null"); this.state = state; diff --git a/job/src/test/java/com/kylinolap/job2/cube/BuildCubeJobBuilderTest.java b/job/src/test/java/com/kylinolap/job2/cube/BuildCubeJobBuilderTest.java deleted file mode 100644 index 454f53f..0000000 --- a/job/src/test/java/com/kylinolap/job2/cube/BuildCubeJobBuilderTest.java +++ /dev/null @@ -1,131 +0,0 @@ -package com.kylinolap.job2.cube; - -import com.kylinolap.common.KylinConfig; -import com.kylinolap.common.util.AbstractKylinTestCase; -import com.kylinolap.common.util.ClasspathUtil; -import com.kylinolap.common.util.HBaseMetadataTestCase; -import com.kylinolap.cube.CubeInstance; -import com.kylinolap.cube.CubeManager; -import com.kylinolap.cube.CubeSegment; -import com.kylinolap.job.DeployUtil; -import com.kylinolap.job.ExportHBaseData; -import com.kylinolap.job.constant.JobConstants; -import com.kylinolap.job.engine.JobEngineConfig; -import com.kylinolap.job.hadoop.cube.StorageCleanupJob; -import com.kylinolap.job2.execution.ExecutableState; -import com.kylinolap.job2.impl.threadpool.AbstractExecutable; -import com.kylinolap.job2.impl.threadpool.DefaultScheduler; -import com.kylinolap.job2.service.ExecutableManager; -import org.apache.hadoop.util.ToolRunner; -import org.junit.*; - -import java.io.File; -import java.io.IOException; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.util.List; - -import static org.junit.Assert.*; - -@Ignore -public class BuildCubeJobBuilderTest { - - private JobEngineConfig jobEngineConfig; - - private CubeManager cubeManager; - - private DefaultScheduler scheduler; - - protected ExecutableManager jobService; - - static void setFinalStatic(Field field, Object newValue) throws Exception { - field.setAccessible(true); - - Field modifiersField = Field.class.getDeclaredField("modifiers"); - modifiersField.setAccessible(true); - modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); - - field.set(null, newValue); - } - - protected void waitForJob(String jobId) { - while (true) { - AbstractExecutable job = jobService.getJob(jobId); - if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) { - break; - } else { - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - } - - @BeforeClass - public static void beforeClass() throws Exception { - ClasspathUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); - } - - @Before - public void before() throws Exception { - HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA); - - DeployUtil.initCliWorkDir(); - DeployUtil.deployMetadata(); - DeployUtil.overrideJobJarLocations(); - DeployUtil.overrideJobConf(HBaseMetadataTestCase.SANDBOX_TEST_DATA); - - setFinalStatic(JobConstants.class.getField("DEFAULT_SCHEDULER_INTERVAL_SECONDS"), 10); - final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - jobService = ExecutableManager.getInstance(kylinConfig); - scheduler = DefaultScheduler.getInstance(); - scheduler.init(new JobEngineConfig(kylinConfig)); - if (!scheduler.hasStarted()) { - throw new RuntimeException("scheduler has not been started"); - } - cubeManager = CubeManager.getInstance(kylinConfig); - jobEngineConfig = new JobEngineConfig(kylinConfig); - for (AbstractExecutable job: jobService.getAllExecutables()) { - jobService.deleteJob(job); - } - final CubeInstance testCube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty"); - testCube.getSegments().clear(); - cubeManager.updateCube(testCube); - } - - @After - public void after() throws Exception { -// int exitCode = cleanupOldCubes(); -// if (exitCode == 0) { -// exportHBaseData(); -// } -// -// HBaseMetadataTestCase.staticCleanupTestMetadata(); - } - - @Test - public void testBuild() throws Exception { - final CubeInstance cubeInstance = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty"); - assertNotNull(cubeInstance); - final CubeSegment cubeSegment = cubeManager.appendSegments(cubeInstance, 0, System.currentTimeMillis()); - final BuildCubeJobBuilder buildCubeJobBuilder = BuildCubeJobBuilder.newBuilder(jobEngineConfig, cubeSegment); - final BuildCubeJob job = buildCubeJobBuilder.build(); - jobService.addJob(job); - waitForJob(job.getId()); - assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState()); - } - - private int cleanupOldCubes() throws Exception { - String[] args = { "--delete", "true" }; - - int exitCode = ToolRunner.run(new StorageCleanupJob(), args); - return exitCode; - } - - private void exportHBaseData() throws IOException { - ExportHBaseData export = new ExportHBaseData(); - export.exportTables(); - } -} \ No newline at end of file diff --git a/job/src/test/java/com/kylinolap/job2/cube/CubingJobBuilderTest.java b/job/src/test/java/com/kylinolap/job2/cube/CubingJobBuilderTest.java new file mode 100644 index 0000000..35cf05b --- /dev/null +++ b/job/src/test/java/com/kylinolap/job2/cube/CubingJobBuilderTest.java @@ -0,0 +1,130 @@ +package com.kylinolap.job2.cube; + +import com.kylinolap.common.KylinConfig; +import com.kylinolap.common.util.AbstractKylinTestCase; +import com.kylinolap.common.util.ClasspathUtil; +import com.kylinolap.common.util.HBaseMetadataTestCase; +import com.kylinolap.cube.CubeInstance; +import com.kylinolap.cube.CubeManager; +import com.kylinolap.cube.CubeSegment; +import com.kylinolap.job.DeployUtil; +import com.kylinolap.job.ExportHBaseData; +import com.kylinolap.job.constant.JobConstants; +import com.kylinolap.job.engine.JobEngineConfig; +import com.kylinolap.job.hadoop.cube.StorageCleanupJob; +import com.kylinolap.job2.execution.ExecutableState; +import com.kylinolap.job2.impl.threadpool.AbstractExecutable; +import com.kylinolap.job2.impl.threadpool.DefaultScheduler; +import com.kylinolap.job2.service.ExecutableManager; +import org.apache.hadoop.util.ToolRunner; +import org.junit.*; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; + +import static org.junit.Assert.*; + +@Ignore +public class CubingJobBuilderTest { + + private JobEngineConfig jobEngineConfig; + + private CubeManager cubeManager; + + private DefaultScheduler scheduler; + + protected ExecutableManager jobService; + + static void setFinalStatic(Field field, Object newValue) throws Exception { + field.setAccessible(true); + + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); + + field.set(null, newValue); + } + + protected void waitForJob(String jobId) { + while (true) { + AbstractExecutable job = jobService.getJob(jobId); + if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) { + break; + } else { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + + @BeforeClass + public static void beforeClass() throws Exception { + ClasspathUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); + } + + @Before + public void before() throws Exception { + HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA); + + DeployUtil.initCliWorkDir(); + DeployUtil.deployMetadata(); + DeployUtil.overrideJobJarLocations(); + DeployUtil.overrideJobConf(HBaseMetadataTestCase.SANDBOX_TEST_DATA); + + setFinalStatic(JobConstants.class.getField("DEFAULT_SCHEDULER_INTERVAL_SECONDS"), 10); + final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + jobService = ExecutableManager.getInstance(kylinConfig); + scheduler = DefaultScheduler.getInstance(); + scheduler.init(new JobEngineConfig(kylinConfig)); + if (!scheduler.hasStarted()) { + throw new RuntimeException("scheduler has not been started"); + } + cubeManager = CubeManager.getInstance(kylinConfig); + jobEngineConfig = new JobEngineConfig(kylinConfig); + for (AbstractExecutable job: jobService.getAllExecutables()) { + jobService.deleteJob(job); + } + final CubeInstance testCube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty"); + testCube.getSegments().clear(); + cubeManager.updateCube(testCube); + } + + @After + public void after() throws Exception { +// int exitCode = cleanupOldCubes(); +// if (exitCode == 0) { +// exportHBaseData(); +// } +// +// HBaseMetadataTestCase.staticCleanupTestMetadata(); + } + + @Test + public void testBuild() throws Exception { + final CubeInstance cubeInstance = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty"); + assertNotNull(cubeInstance); + final CubeSegment cubeSegment = cubeManager.appendSegments(cubeInstance, 0, System.currentTimeMillis()); + final CubingJobBuilder cubingJobBuilder = CubingJobBuilder.newBuilder().setJobEnginConfig(jobEngineConfig).setSegment(cubeSegment); + final CubingJob job = cubingJobBuilder.buildJob(); + jobService.addJob(job); + waitForJob(job.getId()); + assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState()); + } + + private int cleanupOldCubes() throws Exception { + String[] args = { "--delete", "true" }; + + int exitCode = ToolRunner.run(new StorageCleanupJob(), args); + return exitCode; + } + + private void exportHBaseData() throws IOException { + ExportHBaseData export = new ExportHBaseData(); + export.exportTables(); + } +} \ No newline at end of file diff --git a/server/src/main/java/com/kylinolap/rest/service/BasicService.java b/server/src/main/java/com/kylinolap/rest/service/BasicService.java index 5fa91ea..221a8b2 100644 --- a/server/src/main/java/com/kylinolap/rest/service/BasicService.java +++ b/server/src/main/java/com/kylinolap/rest/service/BasicService.java @@ -27,7 +27,7 @@ import com.kylinolap.job.JobManager; import com.kylinolap.job.engine.JobEngineConfig; import com.kylinolap.job.exception.JobException; -import com.kylinolap.job2.cube.BuildCubeJob; +import com.kylinolap.job2.cube.CubingJob; import com.kylinolap.job2.execution.ExecutableState; import com.kylinolap.job2.impl.threadpool.AbstractExecutable; import com.kylinolap.job2.service.ExecutableManager; @@ -181,23 +181,23 @@ public final ExecutableManager getExecutableManager() { return ExecutableManager.getInstance(getConfig()); } - protected List listAllCubingJobs(final String cubeName, final String projectName, final Set statusList) { - List results = Lists.newArrayList(FluentIterable.from(getExecutableManager().getAllExecutables()).filter(new Predicate() { + protected List listAllCubingJobs(final String cubeName, final String projectName, final Set statusList) { + List results = Lists.newArrayList(FluentIterable.from(getExecutableManager().getAllExecutables()).filter(new Predicate() { @Override public boolean apply(AbstractExecutable executable) { if (cubeName == null) { return true; } - return executable instanceof BuildCubeJob && ((BuildCubeJob) executable).getCubeName().equalsIgnoreCase(cubeName); + return executable instanceof CubingJob && ((CubingJob) executable).getCubeName().equalsIgnoreCase(cubeName); } - }).transform(new Function() { + }).transform(new Function() { @Override - public BuildCubeJob apply(AbstractExecutable executable) { - return (BuildCubeJob) executable; + public CubingJob apply(AbstractExecutable executable) { + return (CubingJob) executable; } - }).filter(new Predicate() { + }).filter(new Predicate() { @Override - public boolean apply(BuildCubeJob executable) { + public boolean apply(CubingJob executable) { if (null == projectName || null == getProjectManager().getProject(projectName)) { return true; } else { @@ -206,16 +206,16 @@ public boolean apply(BuildCubeJob executable) { return project.containsRealization(RealizationType.CUBE, executable.getCubeName()); } } - }).filter(new Predicate() { + }).filter(new Predicate() { @Override - public boolean apply(BuildCubeJob executable) { + public boolean apply(CubingJob executable) { return statusList.contains(executable.getStatus()); } })); return results; } - protected List listAllCubingJobs(final String cubeName, final String projectName) { + protected List listAllCubingJobs(final String cubeName, final String projectName) { return listAllCubingJobs(cubeName, projectName, EnumSet.allOf(ExecutableState.class)); } diff --git a/server/src/main/java/com/kylinolap/rest/service/CubeService.java b/server/src/main/java/com/kylinolap/rest/service/CubeService.java index 7041ddd..50a0f7e 100644 --- a/server/src/main/java/com/kylinolap/rest/service/CubeService.java +++ b/server/src/main/java/com/kylinolap/rest/service/CubeService.java @@ -25,7 +25,7 @@ import java.text.SimpleDateFormat; import java.util.*; -import com.kylinolap.job2.cube.BuildCubeJob; +import com.kylinolap.job2.cube.CubingJob; import com.kylinolap.job2.execution.ExecutableState; import com.kylinolap.metadata.realization.RealizationType; import com.kylinolap.metadata.project.RealizationEntry; @@ -61,11 +61,6 @@ import com.kylinolap.cube.cuboid.CuboidCLI; import com.kylinolap.cube.exception.CubeIntegrityException; import com.kylinolap.cube.model.CubeDesc; -import com.kylinolap.job.JobDAO; -import com.kylinolap.job.JobInstance; -import com.kylinolap.job.JobInstance.JobStep; -import com.kylinolap.job.constant.JobStatusEnum; -import com.kylinolap.job.constant.JobStepStatusEnum; import com.kylinolap.job.exception.JobException; import com.kylinolap.job.hadoop.cardinality.HiveColumnCardinalityJob; import com.kylinolap.metadata.MetadataConstances; @@ -224,8 +219,8 @@ private boolean isCubeInProject(String projectName, CubeInstance target) { @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'MANAGEMENT')") public CubeDesc updateCubeAndDesc(CubeInstance cube, CubeDesc desc, String newProjectName) throws UnknownHostException, IOException, JobException { - final List buildCubeJobs = listAllCubingJobs(cube.getName(), null, EnumSet.of(ExecutableState.READY, ExecutableState.RUNNING)); - if (!buildCubeJobs.isEmpty()) { + final List cubingJobs = listAllCubingJobs(cube.getName(), null, EnumSet.of(ExecutableState.READY, ExecutableState.RUNNING)); + if (!cubingJobs.isEmpty()) { throw new JobException("Cube schema shouldn't be changed with running job."); } @@ -256,8 +251,8 @@ public CubeDesc updateCubeAndDesc(CubeInstance cube, CubeDesc desc, String newPr @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'MANAGEMENT')") public void deleteCube(CubeInstance cube) throws IOException, JobException, CubeIntegrityException { - final List buildCubeJobs = listAllCubingJobs(cube.getName(), null, EnumSet.of(ExecutableState.READY, ExecutableState.RUNNING)); - if (!buildCubeJobs.isEmpty()) { + final List cubingJobs = listAllCubingJobs(cube.getName(), null, EnumSet.of(ExecutableState.READY, ExecutableState.RUNNING)); + if (!cubingJobs.isEmpty()) { throw new JobException("The cube " + cube.getName() + " has running job, please discard it and try again."); } @@ -383,8 +378,8 @@ public CubeInstance enableCube(CubeInstance cube) throws IOException, CubeIntegr throw new InternalErrorException("Cube " + cubeName + " dosen't contain any READY segment"); } - final List buildCubeJobs = listAllCubingJobs(cube.getName(), null, EnumSet.of(ExecutableState.READY, ExecutableState.RUNNING)); - if (!buildCubeJobs.isEmpty()) { + final List cubingJobs = listAllCubingJobs(cube.getName(), null, EnumSet.of(ExecutableState.READY, ExecutableState.RUNNING)); + if (!cubingJobs.isEmpty()) { throw new JobException("Enable is not allowed with a running job."); } if (!cube.getDescriptor().calculateSignature().equals(cube.getDescriptor().getSignature())) { @@ -601,11 +596,11 @@ public CubeInstance rebuildLookupSnapshot(String cubeName, String segmentName, S * @throws CubeIntegrityException */ private void releaseAllSegments(CubeInstance cube) throws IOException, JobException, UnknownHostException, CubeIntegrityException { - final List buildCubeJobs = listAllCubingJobs(cube.getName(), null); - for (BuildCubeJob buildCubeJob : buildCubeJobs) { - final ExecutableState status = buildCubeJob.getStatus(); + final List cubingJobs = listAllCubingJobs(cube.getName(), null); + for (CubingJob cubingJob : cubingJobs) { + final ExecutableState status = cubingJob.getStatus(); if (status != ExecutableState.SUCCEED && status != ExecutableState.STOPPED && status != ExecutableState.DISCARDED) { - getExecutableManager().discardJob(buildCubeJob.getId()); + getExecutableManager().discardJob(cubingJob.getId()); } } cube.getSegments().clear(); diff --git a/server/src/main/java/com/kylinolap/rest/service/JobService.java b/server/src/main/java/com/kylinolap/rest/service/JobService.java index 8f70f59..a1fe3da 100644 --- a/server/src/main/java/com/kylinolap/rest/service/JobService.java +++ b/server/src/main/java/com/kylinolap/rest/service/JobService.java @@ -21,7 +21,6 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; import com.google.common.collect.Lists; @@ -31,15 +30,12 @@ import com.kylinolap.job2.common.HadoopShellExecutable; import com.kylinolap.job2.common.MapReduceExecutable; import com.kylinolap.job2.common.ShellExecutable; -import com.kylinolap.job2.cube.BuildCubeJob; -import com.kylinolap.job2.cube.BuildCubeJobBuilder; -import com.kylinolap.job2.execution.Executable; +import com.kylinolap.job2.cube.CubingJob; +import com.kylinolap.job2.cube.CubingJobBuilder; import com.kylinolap.job2.execution.ExecutableState; import com.kylinolap.job2.execution.Output; import com.kylinolap.job2.impl.threadpool.AbstractExecutable; import com.kylinolap.metadata.model.SegmentStatusEnum; -import com.kylinolap.metadata.project.ProjectInstance; -import com.kylinolap.metadata.realization.RealizationType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.security.access.prepost.PreAuthorize; @@ -95,10 +91,10 @@ states.add(parseToExecutableState(status)); } } - return Lists.newArrayList(FluentIterable.from(listAllCubingJobs(cubeName, projectName, states)).transform(new Function() { + return Lists.newArrayList(FluentIterable.from(listAllCubingJobs(cubeName, projectName, states)).transform(new Function() { @Override - public JobInstance apply(BuildCubeJob buildCubeJob) { - return parseToJobInstance(buildCubeJob); + public JobInstance apply(CubingJob cubingJob) { + return parseToJobInstance(cubingJob); } })); } @@ -126,26 +122,29 @@ private ExecutableState parseToExecutableState(JobStatusEnum status) { @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')") public JobInstance submitJob(CubeInstance cube, long startDate, long endDate, CubeBuildTypeEnum buildType, String submitter) throws IOException, JobException, InvalidJobInstanceException { - final List buildCubeJobs = listAllCubingJobs(cube.getName(), null, EnumSet.allOf(ExecutableState.class)); - for (BuildCubeJob job : buildCubeJobs) { + final List cubingJobs = listAllCubingJobs(cube.getName(), null, EnumSet.allOf(ExecutableState.class)); + for (CubingJob job : cubingJobs) { if (job.getStatus() == ExecutableState.READY || job.getStatus() == ExecutableState.RUNNING) { throw new JobException("The cube " + cube.getName() + " has running job(" + job.getId() + ") please discard it and try again."); } } try { - CubeSegment cubeSegment = null; + CubingJob job; + CubeSegment segment; + CubingJobBuilder builder = CubingJobBuilder.newBuilder().setJobEnginConfig(new JobEngineConfig(getConfig())).setSubmitter(submitter); if (buildType == CubeBuildTypeEnum.BUILD) { - cubeSegment = this.getCubeManager().appendSegments(cube, startDate, endDate); + segment = getCubeManager().appendSegments(cube, startDate, endDate); + builder.setSegment(segment); + job = builder.buildJob(); } else if (buildType == CubeBuildTypeEnum.MERGE) { - throw new RuntimeException("has not implemented yet"); -// cubeSegment = this.getCubeManager().mergeSegments(cube, startDate, endDate); + segment = getCubeManager().mergeSegments(cube, startDate, endDate); + builder.setSegment(segment); + job = builder.mergeJob(); } else { throw new JobException("invalid build type:" + buildType); } - BuildCubeJobBuilder builder = BuildCubeJobBuilder.newBuilder(new JobEngineConfig(getConfig()), cubeSegment); - final BuildCubeJob job = builder.build(); - cubeSegment.setLastBuildJobID(job.getId()); + segment.setLastBuildJobID(job.getId()); getCubeManager().updateCube(cube); getExecutableManager().addJob(job); return parseToJobInstance(job); @@ -159,8 +158,8 @@ public JobInstance getJobInstance(String uuid) throws IOException, JobException } private JobInstance parseToJobInstance(AbstractExecutable job) { - Preconditions.checkState(job instanceof BuildCubeJob, "illegal job type, id:" + job.getId()); - BuildCubeJob cubeJob = (BuildCubeJob) job; + Preconditions.checkState(job instanceof CubingJob, "illegal job type, id:" + job.getId()); + CubingJob cubeJob = (CubingJob) job; final JobInstance result = new JobInstance(); result.setName(job.getName()); result.setRelatedCube(cubeJob.getCubeName()); From 7a92c28c13eb8206b70a8b5a99c3ea53e1adf219 Mon Sep 17 00:00:00 2001 From: "qianhao.zhou" Date: Wed, 7 Jan 2015 17:04:26 +0800 Subject: [PATCH 2/4] refactor --- .../java/com/kylinolap/job2/common/MapReduceExecutable.java | 10 ++++++++++ .../main/java/com/kylinolap/job2/cube/CubingJobBuilder.java | 1 + .../com/kylinolap/job2/impl/threadpool/AbstractExecutable.java | 10 ---------- job/src/test/java/com/kylinolap/job2/SelfStopExecutable.java | 6 ++++++ 4 files changed, 17 insertions(+), 10 deletions(-) diff --git a/job/src/main/java/com/kylinolap/job2/common/MapReduceExecutable.java b/job/src/main/java/com/kylinolap/job2/common/MapReduceExecutable.java index 0a688f9..13bfc61 100644 --- a/job/src/main/java/com/kylinolap/job2/common/MapReduceExecutable.java +++ b/job/src/main/java/com/kylinolap/job2/common/MapReduceExecutable.java @@ -7,6 +7,7 @@ import com.kylinolap.job2.dao.JobPO; import com.kylinolap.job2.exception.ExecuteException; import com.kylinolap.job2.execution.ExecutableContext; +import com.kylinolap.job2.execution.ExecutableState; import com.kylinolap.job2.execution.ExecuteResult; import com.kylinolap.job2.impl.threadpool.AbstractExecutable; import org.apache.hadoop.util.ToolRunner; @@ -79,6 +80,15 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio } } + /* + * stop is triggered by JobService, the Scheduler is not awake of that, so much get the latest value from service + * + * */ + private boolean isStopped() { + final ExecutableState status = jobService.getOutput(getId()).getState(); + return status == ExecutableState.STOPPED || status == ExecutableState.DISCARDED; + } + public void setMapReduceJobClass(Class clazzName) { setParam(KEY_MR_JOB, clazzName.getName()); } diff --git a/job/src/main/java/com/kylinolap/job2/cube/CubingJobBuilder.java b/job/src/main/java/com/kylinolap/job2/cube/CubingJobBuilder.java index 666647e..d7aa04b 100644 --- a/job/src/main/java/com/kylinolap/job2/cube/CubingJobBuilder.java +++ b/job/src/main/java/com/kylinolap/job2/cube/CubingJobBuilder.java @@ -410,6 +410,7 @@ private MapReduceExecutable createMergeCuboidDataStep(String inputPath, String o private UpdateCubeInfoAfterMergeExecutable createUpdateCubeInfoAfterMergeStep(List mergingSegmentIds, String convertToHFileStepId) { UpdateCubeInfoAfterMergeExecutable result = new UpdateCubeInfoAfterMergeExecutable(); + result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO); result.setCubeName(getCubeName()); result.setSegmentId(segment.getUuid()); result.setMergingSegmentIds(mergingSegmentIds); diff --git a/job/src/main/java/com/kylinolap/job2/impl/threadpool/AbstractExecutable.java b/job/src/main/java/com/kylinolap/job2/impl/threadpool/AbstractExecutable.java index 70eb696..c26eb72 100644 --- a/job/src/main/java/com/kylinolap/job2/impl/threadpool/AbstractExecutable.java +++ b/job/src/main/java/com/kylinolap/job2/impl/threadpool/AbstractExecutable.java @@ -34,7 +34,6 @@ public AbstractExecutable() { this.job = new JobPO(); this.job.setType(this.getClass().getName()); this.job.setUuid(uuid); - } protected AbstractExecutable(JobPO job) { @@ -166,15 +165,6 @@ public JobPO getJobPO() { return job; } - /* - * stop is triggered by JobService, the Scheduler is not awake of that, so - * - * */ - protected final boolean isStopped() { - final ExecutableState status = getStatus(); - return status == ExecutableState.STOPPED || status == ExecutableState.DISCARDED; - } - @Override public String toString() { return Objects.toStringHelper(this).add("id", getId()).add("name", getName()).add("state", getStatus()).toString(); diff --git a/job/src/test/java/com/kylinolap/job2/SelfStopExecutable.java b/job/src/test/java/com/kylinolap/job2/SelfStopExecutable.java index b32c547..6bd2a89 100644 --- a/job/src/test/java/com/kylinolap/job2/SelfStopExecutable.java +++ b/job/src/test/java/com/kylinolap/job2/SelfStopExecutable.java @@ -3,6 +3,7 @@ import com.kylinolap.job2.dao.JobPO; import com.kylinolap.job2.exception.ExecuteException; import com.kylinolap.job2.execution.ExecutableContext; +import com.kylinolap.job2.execution.ExecutableState; import com.kylinolap.job2.execution.ExecuteResult; /** @@ -30,4 +31,9 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio } } + private boolean isStopped() { + final ExecutableState status = jobService.getOutput(getId()).getState(); + return status == ExecutableState.STOPPED || status == ExecutableState.DISCARDED; + } + } From a36bfd19087209de9c61f020455817d361af31d5 Mon Sep 17 00:00:00 2001 From: "qianhao.zhou" Date: Wed, 7 Jan 2015 17:49:04 +0800 Subject: [PATCH 3/4] fix bug --- .../com/kylinolap/job2/cube/CubingJobBuilder.java | 12 +++++++----- .../job2/cube/UpdateCubeInfoAfterBuildExecutable.java | 11 ++++++++++- .../job2/cube/UpdateCubeInfoAfterMergeExecutable.java | 19 +++++++++++++++++++ 3 files changed, 36 insertions(+), 6 deletions(-) diff --git a/job/src/main/java/com/kylinolap/job2/cube/CubingJobBuilder.java b/job/src/main/java/com/kylinolap/job2/cube/CubingJobBuilder.java index d7aa04b..209f634 100644 --- a/job/src/main/java/com/kylinolap/job2/cube/CubingJobBuilder.java +++ b/job/src/main/java/com/kylinolap/job2/cube/CubingJobBuilder.java @@ -97,7 +97,7 @@ public CubingJob buildJob() { // bulk load step result.addTask(createBulkLoadStep(jobId)); - result.addTask(createUpdateCubeInfoStep(intermediateHiveTableStep.getId(), baseCuboidStep.getId(), convertCuboidToHfileStep.getId())); + result.addTask(createUpdateCubeInfoStep(intermediateHiveTableStep.getId(), baseCuboidStep.getId(), convertCuboidToHfileStep.getId(), jobId)); return result; } @@ -137,7 +137,7 @@ public String apply(CubeSegment input) { return input.getUuid(); } }); - result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, convertCuboidToHfileStep.getId())); + result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId)); return result; } @@ -163,7 +163,7 @@ private String getJobWorkingDir(String uuid) { } private String getPathToMerge(CubeSegment segment) { - return getJobWorkingDir(segment.getUuid()) + "/" + getCubeName() + "/cuboid/*"; + return getJobWorkingDir(segment.getLastBuildJobID()) + "/" + getCubeName() + "/cuboid/*"; } private String getCubeName() { @@ -380,7 +380,7 @@ private HadoopShellExecutable createBulkLoadStep(String jobId) { } - private UpdateCubeInfoAfterBuildExecutable createUpdateCubeInfoStep(String createFlatTableStepId, String baseCuboidStepId, String convertToHFileStepId) { + private UpdateCubeInfoAfterBuildExecutable createUpdateCubeInfoStep(String createFlatTableStepId, String baseCuboidStepId, String convertToHFileStepId, String jobId) { final UpdateCubeInfoAfterBuildExecutable updateCubeInfoStep = new UpdateCubeInfoAfterBuildExecutable(); updateCubeInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO); updateCubeInfoStep.setCubeName(getCubeName()); @@ -388,6 +388,7 @@ private UpdateCubeInfoAfterBuildExecutable createUpdateCubeInfoStep(String creat updateCubeInfoStep.setCreateFlatTableStepId(createFlatTableStepId); updateCubeInfoStep.setBaseCuboidStepId(baseCuboidStepId); updateCubeInfoStep.setConvertToHFileStepId(convertToHFileStepId); + updateCubeInfoStep.setCubingJobId(jobId); return updateCubeInfoStep; } @@ -408,13 +409,14 @@ private MapReduceExecutable createMergeCuboidDataStep(String inputPath, String o return mergeCuboidDataStep; } - private UpdateCubeInfoAfterMergeExecutable createUpdateCubeInfoAfterMergeStep(List mergingSegmentIds, String convertToHFileStepId) { + private UpdateCubeInfoAfterMergeExecutable createUpdateCubeInfoAfterMergeStep(List mergingSegmentIds, String convertToHFileStepId, String jobId) { UpdateCubeInfoAfterMergeExecutable result = new UpdateCubeInfoAfterMergeExecutable(); result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO); result.setCubeName(getCubeName()); result.setSegmentId(segment.getUuid()); result.setMergingSegmentIds(mergingSegmentIds); result.setConvertToHFileStepId(convertToHFileStepId); + result.setCubingJobId(jobId); return result; } diff --git a/job/src/main/java/com/kylinolap/job2/cube/UpdateCubeInfoAfterBuildExecutable.java b/job/src/main/java/com/kylinolap/job2/cube/UpdateCubeInfoAfterBuildExecutable.java index 37a4495..7c4b1d2 100644 --- a/job/src/main/java/com/kylinolap/job2/cube/UpdateCubeInfoAfterBuildExecutable.java +++ b/job/src/main/java/com/kylinolap/job2/cube/UpdateCubeInfoAfterBuildExecutable.java @@ -27,6 +27,7 @@ private static final String CONVERT_TO_HFILE_STEP_ID = "convertToHFileStepId"; private static final String BASE_CUBOID_STEP_ID = "baseCuboidStepId"; private static final String CREATE_FLAT_TABLE_STEP_ID = "createFlatTableStepId"; + private static final String CUBING_JOB_ID = "cubingJobId"; private final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); @@ -77,6 +78,14 @@ private String getCreateFlatTableStepId() { return getParam(CREATE_FLAT_TABLE_STEP_ID); } + public void setCubingJobId(String id) { + setParam(CUBING_JOB_ID, id); + } + + private String getCubingJobId() { + return CUBING_JOB_ID; + } + @Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { final CubeInstance cube = cubeManager.getCube(getCubeName()); @@ -95,7 +104,7 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio long size = Long.parseLong(cubeSizeString) / 1024; - segment.setLastBuildJobID(getId()); + segment.setLastBuildJobID(getCubingJobId()); segment.setLastBuildTime(System.currentTimeMillis()); segment.setSizeKB(size); segment.setSourceRecords(sourceCount); diff --git a/job/src/main/java/com/kylinolap/job2/cube/UpdateCubeInfoAfterMergeExecutable.java b/job/src/main/java/com/kylinolap/job2/cube/UpdateCubeInfoAfterMergeExecutable.java index f821516..7d274bb 100644 --- a/job/src/main/java/com/kylinolap/job2/cube/UpdateCubeInfoAfterMergeExecutable.java +++ b/job/src/main/java/com/kylinolap/job2/cube/UpdateCubeInfoAfterMergeExecutable.java @@ -7,6 +7,7 @@ import com.kylinolap.cube.CubeManager; import com.kylinolap.cube.CubeSegment; import com.kylinolap.job2.constants.ExecutableConstants; +import com.kylinolap.job2.dao.JobPO; import com.kylinolap.job2.exception.ExecuteException; import com.kylinolap.job2.execution.ExecutableContext; import com.kylinolap.job2.execution.ExecuteResult; @@ -27,8 +28,17 @@ private static final String SEGMENT_ID = "segmentId"; private static final String MERGING_SEGMENT_IDS = "mergingSegmentIds"; private static final String CONVERT_TO_HFILE_STEP_ID = "convertToHFileStepId"; + private static final String CUBING_JOB_ID = "cubingJobId"; private final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + + public UpdateCubeInfoAfterMergeExecutable() { + } + + public UpdateCubeInfoAfterMergeExecutable(JobPO job) { + super(job); + } + @Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { final CubeInstance cube = cubeManager.getCube(getCubeName()); @@ -61,6 +71,7 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio mergedSegment.setSizeKB(cubeSize); mergedSegment.setSourceRecords(sourceCount); mergedSegment.setSourceRecordsSize(sourceSize); + mergedSegment.setLastBuildJobID(getCubingJobId()); //remove old segment cube.getSegments().removeAll(toBeRemoved); try { @@ -113,4 +124,12 @@ public void setConvertToHFileStepId(String id) { private String getConvertToHfileStepId() { return getParam(CONVERT_TO_HFILE_STEP_ID); } + + public void setCubingJobId(String id) { + setParam(CUBING_JOB_ID, id); + } + + private String getCubingJobId() { + return CUBING_JOB_ID; + } } From ee9d9e547e19e46ab71bfa910e934501d4944a9f Mon Sep 17 00:00:00 2001 From: "qianhao.zhou" Date: Wed, 7 Jan 2015 18:06:13 +0800 Subject: [PATCH 4/4] finish CubingJobBuilderTest --- .../kylinolap/job2/cube/CubingJobBuilderTest.java | 22 ++++++++++++++++++++-- .../com/kylinolap/rest/service/JobService.java | 9 ++------- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/job/src/test/java/com/kylinolap/job2/cube/CubingJobBuilderTest.java b/job/src/test/java/com/kylinolap/job2/cube/CubingJobBuilderTest.java index 35cf05b..f0a3fbb 100644 --- a/job/src/test/java/com/kylinolap/job2/cube/CubingJobBuilderTest.java +++ b/job/src/test/java/com/kylinolap/job2/cube/CubingJobBuilderTest.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Modifier; +import java.text.SimpleDateFormat; +import java.util.Date; import static org.junit.Assert.*; @@ -108,12 +110,28 @@ public void after() throws Exception { public void testBuild() throws Exception { final CubeInstance cubeInstance = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty"); assertNotNull(cubeInstance); - final CubeSegment cubeSegment = cubeManager.appendSegments(cubeInstance, 0, System.currentTimeMillis()); - final CubingJobBuilder cubingJobBuilder = CubingJobBuilder.newBuilder().setJobEnginConfig(jobEngineConfig).setSegment(cubeSegment); + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); + Date date1 = dateFormat.parse("2013-01-01"); + final CubeSegment cubeSegment1 = cubeManager.appendSegments(cubeInstance, 0, date1.getTime()); + final CubingJobBuilder cubingJobBuilder = CubingJobBuilder.newBuilder().setJobEnginConfig(jobEngineConfig).setSegment(cubeSegment1); final CubingJob job = cubingJobBuilder.buildJob(); jobService.addJob(job); waitForJob(job.getId()); assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState()); + + Date date2 = dateFormat.parse("2013-04-01"); + final CubeSegment cubeSegment2 = cubeManager.appendSegments(cubeInstance, date1.getTime(), date2.getTime()); + final CubingJob job2 = CubingJobBuilder.newBuilder().setJobEnginConfig(jobEngineConfig).setSegment(cubeSegment2).buildJob(); + jobService.addJob(job2); + waitForJob(job2.getId()); + assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job2.getId()).getState()); + + final CubeSegment cubeSegment3 = cubeManager.mergeSegments(cubeInstance, 0, date2.getTime()); + final CubingJob job3 = CubingJobBuilder.newBuilder().setJobEnginConfig(jobEngineConfig).setSegment(cubeSegment3).mergeJob(); + jobService.addJob(job3); + waitForJob(job3.getId()); + assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job3.getId()).getState()); + } private int cleanupOldCubes() throws Exception { diff --git a/server/src/main/java/com/kylinolap/rest/service/JobService.java b/server/src/main/java/com/kylinolap/rest/service/JobService.java index a1fe3da..b901251 100644 --- a/server/src/main/java/com/kylinolap/rest/service/JobService.java +++ b/server/src/main/java/com/kylinolap/rest/service/JobService.java @@ -131,21 +131,16 @@ public JobInstance submitJob(CubeInstance cube, long startDate, long endDate, Cu try { CubingJob job; - CubeSegment segment; CubingJobBuilder builder = CubingJobBuilder.newBuilder().setJobEnginConfig(new JobEngineConfig(getConfig())).setSubmitter(submitter); if (buildType == CubeBuildTypeEnum.BUILD) { - segment = getCubeManager().appendSegments(cube, startDate, endDate); - builder.setSegment(segment); + builder.setSegment(getCubeManager().appendSegments(cube, startDate, endDate)); job = builder.buildJob(); } else if (buildType == CubeBuildTypeEnum.MERGE) { - segment = getCubeManager().mergeSegments(cube, startDate, endDate); - builder.setSegment(segment); + builder.setSegment(getCubeManager().mergeSegments(cube, startDate, endDate)); job = builder.mergeJob(); } else { throw new JobException("invalid build type:" + buildType); } - segment.setLastBuildJobID(job.getId()); - getCubeManager().updateCube(cube); getExecutableManager().addJob(job); return parseToJobInstance(job); } catch (CubeIntegrityException e) {