From 55352b30293b20f6252d6a9e3e00243a369c1eb3 Mon Sep 17 00:00:00 2001 From: sunyerui Date: Thu, 10 Sep 2015 11:23:16 +0800 Subject: [PATCH] KYLIN-968 CubeSegment.lastBuildJobID is null in new instance but used for rowkey_stats path --- .../apache/kylin/job/cube/CubingJobBuilder.java | 16 +-- .../kylin/job/cube/CubingJobBuilderTest.java | 138 +++++++++++++++++++++ 2 files changed, 146 insertions(+), 8 deletions(-) create mode 100644 job/src/test/java/org/apache/kylin/job/cube/CubingJobBuilderTest.java diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java index 5c3c277..ff79286 100644 --- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java +++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java @@ -206,9 +206,9 @@ public final class CubingJobBuilder extends AbstractJobBuilder { final String jobId = result.getId(); final String cuboidPath = cuboidRootPath + "*"; - result.addTask(createRangeRowkeyDistributionStep(seg, cuboidPath)); + result.addTask(createRangeRowkeyDistributionStep(seg, cuboidPath, jobId)); // create htable step - result.addTask(createCreateHTableStep(seg)); + result.addTask(createCreateHTableStep(seg, jobId)); // generate hfiles step final MapReduceExecutable convertCuboidToHfileStep = createConvertCuboidToHfileStep(seg, cuboidPath, jobId); result.addTask(convertCuboidToHfileStep); @@ -265,8 +265,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder { return getJobWorkingDir(seg.getLastBuildJobID()) + "/" + seg.getCubeInstance().getName() + "/cuboid/*"; } - private String getRowkeyDistributionOutputPath(CubeSegment seg) { - return getJobWorkingDir(seg.getLastBuildJobID()) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats"; + private String getRowkeyDistributionOutputPath(CubeSegment seg, String jobId) { + return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats"; } private String getFactDistinctColumnsPath(CubeSegment seg, String jobUuid) { @@ -347,14 +347,14 @@ public final class CubingJobBuilder extends AbstractJobBuilder { return ndCuboidStep; } - private MapReduceExecutable createRangeRowkeyDistributionStep(CubeSegment seg, String inputPath) { + private MapReduceExecutable createRangeRowkeyDistributionStep(CubeSegment seg, String inputPath, String jobId) { MapReduceExecutable rowkeyDistributionStep = new MapReduceExecutable(); rowkeyDistributionStep.setName(ExecutableConstants.STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION); StringBuilder cmd = new StringBuilder(); appendMapReduceParameters(cmd, seg); appendExecCmdParameters(cmd, "input", inputPath); - appendExecCmdParameters(cmd, "output", getRowkeyDistributionOutputPath(seg)); + appendExecCmdParameters(cmd, "output", getRowkeyDistributionOutputPath(seg, jobId)); appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName()); appendExecCmdParameters(cmd, "jobname", "Kylin_Region_Splits_Calculator_" + seg.getCubeInstance().getName() + "_Step"); @@ -363,12 +363,12 @@ public final class CubingJobBuilder extends AbstractJobBuilder { return rowkeyDistributionStep; } - private HadoopShellExecutable createCreateHTableStep(CubeSegment seg) { + private HadoopShellExecutable createCreateHTableStep(CubeSegment seg, String jobId) { HadoopShellExecutable createHtableStep = new HadoopShellExecutable(); createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE); StringBuilder cmd = new StringBuilder(); appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName()); - appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(seg) + "/part-r-00000"); + appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(seg, jobId) + "/part-r-00000"); appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier()); createHtableStep.setJobParams(cmd.toString()); diff --git a/job/src/test/java/org/apache/kylin/job/cube/CubingJobBuilderTest.java b/job/src/test/java/org/apache/kylin/job/cube/CubingJobBuilderTest.java new file mode 100644 index 0000000..f8cbf10 --- /dev/null +++ b/job/src/test/java/org/apache/kylin/job/cube/CubingJobBuilderTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job.cube; + +import org.apache.commons.cli.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HBaseMetadataTestCase; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.job.DeployUtil; +import org.apache.kylin.job.common.MapReduceExecutable; +import org.apache.kylin.job.constant.ExecutableConstants; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.hadoop.AbstractHadoopJob; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.junit.*; + +import java.io.IOException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.TimeZone; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.junit.Assert.*; + +/** + * Created by sunyerui on 15/8/31. + */ +public class CubingJobBuilderTest extends HBaseMetadataTestCase { + + private JobEngineConfig jobEngineConfig; + + private CubeManager cubeManager; + + private static final Log logger = LogFactory.getLog(CubingJobBuilderTest.class); + + @BeforeClass + public static void beforeClass() throws Exception { + staticCreateTestMetadata(); + } + + @Before + public void before() throws Exception { + DeployUtil.deployMetadata(); + + final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + cubeManager = CubeManager.getInstance(kylinConfig); + jobEngineConfig = new JobEngineConfig(kylinConfig); + } + + @AfterClass + public static void afterClass() { + staticCleanupTestMetadata(); + } + + private CubeSegment buildSegment() throws ParseException, IOException { + SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); + f.setTimeZone(TimeZone.getTimeZone("GMT")); + // this cube's start date is 0, end date is 20501112000000 + long date2 = f.parse("2022-01-01").getTime(); + + CubeSegment segment = cubeManager.appendSegments(cubeManager.getCube("test_kylin_cube_without_slr_empty"), date2); + // just to cheat the cubeManager.getBuildingSegments checking + segment.setStatus(SegmentStatusEnum.READY); + + return segment; + } + + private static class ParseOptionHelperJob extends AbstractHadoopJob { + @Override + public int run(String[] strings) throws Exception { + return 0; + } + + public void parseOptionsForRowkeyDistributionStep(String arg) throws org.apache.commons.cli.ParseException, IOException { + Options options = new Options(); + options.addOption(OPTION_INPUT_PATH); + options.addOption(OPTION_OUTPUT_PATH); + options.addOption(OPTION_JOB_NAME); + options.addOption(OPTION_CUBE_NAME); + + GenericOptionsParser hadoopParser = new GenericOptionsParser(new Configuration(), arg.trim().split("\\s+")); + String[] toolArgs = hadoopParser.getRemainingArgs(); + parseOptions(options, toolArgs); + } + + public String getOutputPath() { + return getOptionValue(OPTION_OUTPUT_PATH); + } + } + + private static final Pattern UUID_PATTERN = Pattern.compile(".*kylin-([0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}).*rowkey_stats"); + @Test + public void testBuildJob() throws ParseException, IOException, org.apache.commons.cli.ParseException { + CubeSegment segment = buildSegment(); + CubingJobBuilder cubingJobBuilder = new CubingJobBuilder(jobEngineConfig); + CubingJob job = cubingJobBuilder.buildJob(segment); + assertNotNull(job); + + // here should be more asserts for every step in building + // only check rowkey distribution step for now + MapReduceExecutable rowkeyDistributionStep = (MapReduceExecutable)job.getTaskByName(ExecutableConstants.STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION); + assertNotNull(rowkeyDistributionStep); + String mrParams = rowkeyDistributionStep.getMapReduceParams(); + assertNotNull(mrParams); + logger.info("mrParams: " + mrParams); + // parse output path and check + ParseOptionHelperJob parseHelper = new ParseOptionHelperJob(); + parseHelper.parseOptionsForRowkeyDistributionStep(mrParams); + String outputPath = parseHelper.getOutputPath(); + logger.info("output: " + outputPath); + Matcher m = UUID_PATTERN.matcher(outputPath); + assertTrue(m.find()); + assertEquals(2, m.groupCount()); + assertEquals(job.getId(), m.group(1)); + } +} -- 2.3.2 (Apple Git-55)