From 107ba84d94ed36cbe7dd5aaeb9a95991a62a41b2 Mon Sep 17 00:00:00 2001 From: terry Date: Tue, 8 Dec 2015 22:12:45 +0800 Subject: [PATCH 8/8] job package part patch,KYLIN-1172 Signed-off-by: terry --- .../org/apache/kylin/job/AbstractJobBuilder.java | 29 +++- .../kylin/job/common/DistcpShellExecutable.java | 102 +++++++++++++ .../org/apache/kylin/job/common/HqlExecutable.java | 16 ++- .../kylin/job/constant/ExecutableConstants.java | 1 + .../apache/kylin/job/cube/CubingJobBuilder.java | 62 ++++++-- .../kylin/job/cube/GarbageCollectionStep.java | 20 ++- .../apache/kylin/job/hadoop/AbstractHadoopJob.java | 25 ++-- .../kylin/job/hadoop/cube/BaseCuboidMapper.java | 2 +- .../apache/kylin/job/hadoop/cube/CubeHFileJob.java | 6 +- .../apache/kylin/job/hadoop/cube/CuboidJob.java | 19 ++- .../job/hadoop/cube/NewFactDistinctColumnsJob.java | 153 ++++++++++++++++++++ .../hadoop/cube/NewFactDistinctColumnsMapper.java | 157 +++++++++++++++++++++ .../dict/CreateInvertedIndexDictionaryJob.java | 3 + .../apache/kylin/job/hadoop/hbase/BulkLoadJob.java | 3 +- .../kylin/job/hadoop/hbase/CreateHTableJob.java | 6 +- .../kylin/job/tools/DeployCoprocessorCLI.java | 2 +- .../test/java/org/apache/kylin/job/DeployUtil.java | 4 +- 17 files changed, 568 insertions(+), 42 deletions(-) create mode 100644 job/src/main/java/org/apache/kylin/job/common/DistcpShellExecutable.java create mode 100644 job/src/main/java/org/apache/kylin/job/hadoop/cube/NewFactDistinctColumnsJob.java create mode 100644 job/src/main/java/org/apache/kylin/job/hadoop/cube/NewFactDistinctColumnsMapper.java diff --git a/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java b/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java index 7d70de7..6624c57 100644 --- a/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java +++ b/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java @@ -20,11 +20,17 @@ package org.apache.kylin.job; import java.io.IOException; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HiveManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.job.common.DistcpShellExecutable; +import org.apache.kylin.job.common.HadoopShellExecutable; import org.apache.kylin.job.common.ShellExecutable; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.hadoop.hive.IJoinedFlatTableDesc; +import org.apache.kylin.metadata.project.ProjectManager; public abstract class AbstractJobBuilder { @@ -71,9 +77,18 @@ public abstract class AbstractJobBuilder { throw new RuntimeException("Failed to generate insert data SQL for intermediate table."); } + String hiveName = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(HiveManager.getInstance(). + getCurrentProject()).getHiveName(); + String hiveCmd = "hive"; + if(hiveName != null) { + hiveCmd = HiveManager.getInstance().getHiveCommand(hiveName); + if(hiveCmd == null) + hiveCmd = "hive"; + } ShellExecutable step = new ShellExecutable(); StringBuffer buf = new StringBuffer(); - buf.append("hive -e \""); + buf.append(hiveCmd + " "); + buf.append(" -e \""); buf.append(useDatabaseHql + "\n"); buf.append(dropTableHql + "\n"); buf.append(createTableHql + "\n"); @@ -85,6 +100,18 @@ public abstract class AbstractJobBuilder { return step; } + + + protected ShellExecutable createCopyHiveTableStep(CubeSegment seg, String intermediateHiveTableName, String output) { + DistcpShellExecutable copyHiveTableSetp = new DistcpShellExecutable(); + copyHiveTableSetp.setName(ExecutableConstants.STEP_NAME_COPY_INTERMEDIATE_TABLE); + StringBuilder cmd = new StringBuilder(); + copyHiveTableSetp.setCubeName(seg.getCubeInstance().getName()); + copyHiveTableSetp.setOutputPath(output); + copyHiveTableSetp.setTableName(intermediateHiveTableName); + + return copyHiveTableSetp; + } protected String getJobWorkingDir(String uuid) { return engineConfig.getHdfsWorkingDirectory() + "/" + JOB_WORKING_DIR_PREFIX + uuid; diff --git a/job/src/main/java/org/apache/kylin/job/common/DistcpShellExecutable.java b/job/src/main/java/org/apache/kylin/job/common/DistcpShellExecutable.java new file mode 100644 index 0000000..d3857a0 --- /dev/null +++ b/job/src/main/java/org/apache/kylin/job/common/DistcpShellExecutable.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job.common; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.HiveManager; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.metadata.project.ProjectManager; + +public class DistcpShellExecutable extends ShellExecutable { + private static final String CUBE_NAME = "cubename"; + private static final String TABLE_NAME = "tablename"; + private static final String OUTPUT_PATH = "output"; + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + String cubeName = getCubeName().toUpperCase(); + CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubeInstance cubeInstance = cubeMgr.getCube(cubeName); + + String tableName = getTableName(); + String hiveName = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(cubeInstance.getProjectName()). + getHiveName(); + String input = null; + try { + input = HiveManager.getInstance().getHiveTableLocation(tableName, hiveName); + input = HadoopUtil.transformHdfsPath(input); + } catch (Exception e) { + logger.error("Can get location of hive table " + tableName + " using hive name " + hiveName); + return new ExecuteResult(ExecuteResult.State.ERROR , e.getLocalizedMessage()); + } + String output = getOutputPath(); + + logger.info("Copy Intermediate Hive Table input : " + input); + logger.info("Copy Intermediate Hive Table output : " + output); + //copy hive table only when hive is in different hadoop... + if(hiveName == null) { + String inputPath = HadoopUtil.makeQualifiedPathInHadoopCluster(input); + String outputPath = HadoopUtil.makeQualifiedPathInHadoopCluster(output); + if(inputPath.equals(outputPath)) { + return new ExecuteResult(ExecuteResult.State.SUCCEED, null); + } + } + String cmd = String.format("hadoop distcp %s %s", input, output); + setCmd(cmd); + + return super.doWork(context); + } + + public void setCubeName(String name) { + setParam(CUBE_NAME, name); + } + + public void setTableName(String name) { + setParam(TABLE_NAME, name); + } + + public void setOutputPath(String output) { + setParam(OUTPUT_PATH, output); + } + + public String getCubeName() { + return getParam(CUBE_NAME); + } + + public String getTableName() { + return getParam(TABLE_NAME); + } + + public String getOutputPath() { + return getParam(OUTPUT_PATH); + } + + public String getParameters() { + StringBuffer buf = new StringBuffer(); + buf.append(" -").append(CUBE_NAME).append(" ").append(getCubeName()).append(" -").append(TABLE_NAME).append(" "). + append(getTableName()).append(" -").append(OUTPUT_PATH).append(" ").append(getOutputPath()); + + return buf.toString(); + } +} diff --git a/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java b/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java index 6147bd6..1e41919 100644 --- a/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java +++ b/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.util.HiveClient; +import org.apache.kylin.common.util.HiveManager; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; @@ -41,6 +42,7 @@ public class HqlExecutable extends AbstractExecutable { private static final String HQL = "hql"; private static final String HIVE_CONFIG = "hive-config"; + private static final String HIVE_ENV_NAME = "hive-name"; public HqlExecutable() { super(); @@ -50,7 +52,10 @@ public class HqlExecutable extends AbstractExecutable { protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { try { Map configMap = getConfiguration(); - HiveClient hiveClient = new HiveClient(configMap); + String hiveName = getHiveName(); +// HiveClient hiveClient = new HiveClient(configMap); + HiveClient hiveClient = HiveManager.getInstance().createHiveClient(hiveName); + hiveClient.appendConfiguration(configMap); for (String hql : getHqls()) { hiveClient.executeHQL(hql); @@ -101,5 +106,12 @@ public class HqlExecutable extends AbstractExecutable { return Collections.emptyList(); } } - + + public void setHiveName(String hiveName) { + setParam(HIVE_ENV_NAME, hiveName); + } + + private String getHiveName() { + return this.getParam(HIVE_ENV_NAME); + } } diff --git a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index cf5b112..1d7ab4f 100644 --- a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -43,6 +43,7 @@ public final class ExecutableConstants { public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary"; public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table"; + public static final String STEP_NAME_COPY_INTERMEDIATE_TABLE = "Copy Intermediate table to local Hadoop"; public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns"; public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid Data"; public static final String STEP_NAME_BUILD_N_D_CUBOID = "Build N-Dimension Cuboid Data"; diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java index 70d7be2..03c18f6 100644 --- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java +++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java @@ -25,6 +25,8 @@ import java.util.List; import java.util.TimeZone; import org.apache.commons.lang3.StringUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.job.AbstractJobBuilder; @@ -38,6 +40,7 @@ import org.apache.kylin.job.hadoop.cube.CubeHFileJob; import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsJob; import org.apache.kylin.job.hadoop.cube.MergeCuboidJob; import org.apache.kylin.job.hadoop.cube.NDCuboidJob; +import org.apache.kylin.job.hadoop.cube.NewFactDistinctColumnsJob; import org.apache.kylin.job.hadoop.cube.RangeKeyDistributionJob; import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob; import org.apache.kylin.job.hadoop.hbase.BulkLoadJob; @@ -175,19 +178,27 @@ public final class CubingJobBuilder extends AbstractJobBuilder { final String jobId = result.getId(); final CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg); final String intermediateHiveTableName = getIntermediateHiveTableName(intermediateTableDesc, jobId); - final String intermediateHiveTableLocation = getIntermediateHiveTableLocation(intermediateTableDesc, jobId); final String factDistinctColumnsPath = getFactDistinctColumnsPath(seg, jobId); final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount); + String database = engineConfig.getConfig().getHiveDatabaseForIntermediateTable(); final AbstractExecutable intermediateHiveTableStep = createIntermediateHiveTableStep(intermediateTableDesc, jobId); result.addTask(intermediateHiveTableStep); - result.addTask(createFactDistinctColumnsStep(seg, intermediateHiveTableName, jobId)); + String fullTableName = database + "." + intermediateHiveTableName; + String intermediateHiveTableLocation = null; + if(KylinConfig.getInstanceFromEnv().isCopyIntermediateTable()) { + intermediateHiveTableLocation = this.getIntermediateHiveTableLocation(intermediateTableDesc, jobId); + result.addTask(createCopyHiveTableStep(seg, fullTableName, intermediateHiveTableLocation)); + } + //create fast distinct column job and base cuboid job using intermediate table,transform to location in run +// result.addTask(createFactDistinctColumnsStep(seg, intermediateHiveTableName, jobId)); + result.addTask(createNewFactDistinctColumnsStep(seg, fullTableName, jobId, intermediateHiveTableLocation)); result.addTask(createBuildDictionaryStep(seg, factDistinctColumnsPath)); // base cuboid step - final MapReduceExecutable baseCuboidStep = createBaseCuboidStep(seg, intermediateHiveTableLocation, cuboidOutputTempPath); + final MapReduceExecutable baseCuboidStep = createBaseCuboidStep(seg, fullTableName, cuboidOutputTempPath, intermediateHiveTableLocation); result.addTask(baseCuboidStep); // n dim cuboid steps @@ -203,9 +214,9 @@ public final class CubingJobBuilder extends AbstractJobBuilder { final String jobId = result.getId(); final String cuboidPath = cuboidRootPath + "*"; - result.addTask(createRangeRowkeyDistributionStep(seg, cuboidPath)); + result.addTask(createRangeRowkeyDistributionStep(seg, cuboidPath, jobId)); // create htable step - result.addTask(createCreateHTableStep(seg)); + result.addTask(createCreateHTableStep(seg, result.getId())); // generate hfiles step final MapReduceExecutable convertCuboidToHfileStep = createConvertCuboidToHfileStep(seg, cuboidPath, jobId); result.addTask(convertCuboidToHfileStep); @@ -262,8 +273,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder { return getJobWorkingDir(seg.getLastBuildJobID()) + "/" + seg.getCubeInstance().getName() + "/cuboid/*"; } - private String getRowkeyDistributionOutputPath(CubeSegment seg) { - return getJobWorkingDir(seg.getLastBuildJobID()) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats"; + private String getRowkeyDistributionOutputPath(CubeSegment seg, String uuid) { + return getJobWorkingDir(uuid) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats"; } private String getFactDistinctColumnsPath(CubeSegment seg, String jobUuid) { @@ -271,7 +282,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder { } private String getHFilePath(CubeSegment seg, String jobId) { - return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/"; + return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/"); } private MapReduceExecutable createFactDistinctColumnsStep(CubeSegment seg, String intermediateHiveTableName, String jobId) { @@ -288,6 +299,25 @@ public final class CubingJobBuilder extends AbstractJobBuilder { result.setMapReduceParams(cmd.toString()); return result; } + + private MapReduceExecutable createNewFactDistinctColumnsStep(CubeSegment seg, String intermediateHiveTableName, String jobId, + String intermediateHiveTableLocation) { + MapReduceExecutable result = new MapReduceExecutable(); + result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS); + result.setMapReduceJobClass(NewFactDistinctColumnsJob.class); + StringBuilder cmd = new StringBuilder(); + appendMapReduceParameters(cmd, seg); + appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName()); + appendExecCmdParameters(cmd, "segmentname", seg.getName()); + appendExecCmdParameters(cmd, "output", getFactDistinctColumnsPath(seg, jobId)); + appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getCubeInstance().getName() + "_Step"); + appendExecCmdParameters(cmd, "tablename", intermediateHiveTableName); + if(intermediateHiveTableLocation != null) + appendExecCmdParameters(cmd, "input", intermediateHiveTableLocation); + + result.setMapReduceParams(cmd.toString()); + return result; + } private HadoopShellExecutable createBuildDictionaryStep(CubeSegment seg, String factDistinctColumnsPath) { // base cuboid job @@ -303,7 +333,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder { return buildDictionaryStep; } - private MapReduceExecutable createBaseCuboidStep(CubeSegment seg, String intermediateHiveTableLocation, String[] cuboidOutputTempPath) { + private MapReduceExecutable createBaseCuboidStep(CubeSegment seg, String intermediateHiveTableName, String[] cuboidOutputTempPath, + String intermediateHiveTableLocation) { // base cuboid job MapReduceExecutable baseCuboidStep = new MapReduceExecutable(); @@ -314,7 +345,9 @@ public final class CubingJobBuilder extends AbstractJobBuilder { appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName()); appendExecCmdParameters(cmd, "segmentname", seg.getName()); - appendExecCmdParameters(cmd, "input", intermediateHiveTableLocation); + appendExecCmdParameters(cmd, "tablename", intermediateHiveTableName); + if(intermediateHiveTableLocation != null) + appendExecCmdParameters(cmd, "input", intermediateHiveTableLocation); appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]); appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getCubeInstance().getName()); appendExecCmdParameters(cmd, "level", "0"); @@ -344,14 +377,14 @@ public final class CubingJobBuilder extends AbstractJobBuilder { return ndCuboidStep; } - private MapReduceExecutable createRangeRowkeyDistributionStep(CubeSegment seg, String inputPath) { + private MapReduceExecutable createRangeRowkeyDistributionStep(CubeSegment seg, String inputPath, String uuid) { MapReduceExecutable rowkeyDistributionStep = new MapReduceExecutable(); rowkeyDistributionStep.setName(ExecutableConstants.STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION); StringBuilder cmd = new StringBuilder(); appendMapReduceParameters(cmd, seg); appendExecCmdParameters(cmd, "input", inputPath); - appendExecCmdParameters(cmd, "output", getRowkeyDistributionOutputPath(seg)); + appendExecCmdParameters(cmd, "output", getRowkeyDistributionOutputPath(seg, uuid)); appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName()); appendExecCmdParameters(cmd, "jobname", "Kylin_Region_Splits_Calculator_" + seg.getCubeInstance().getName() + "_Step"); @@ -360,12 +393,12 @@ public final class CubingJobBuilder extends AbstractJobBuilder { return rowkeyDistributionStep; } - private HadoopShellExecutable createCreateHTableStep(CubeSegment seg) { + private HadoopShellExecutable createCreateHTableStep(CubeSegment seg, String uuid) { HadoopShellExecutable createHtableStep = new HadoopShellExecutable(); createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE); StringBuilder cmd = new StringBuilder(); appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName()); - appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(seg) + "/part-r-00000"); + appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(seg, uuid) + "/part-r-00000"); appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier()); createHtableStep.setJobParams(cmd.toString()); @@ -463,6 +496,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder { result.setOldHTables(oldHtables); result.setOldHiveTable(hiveIntermediateTable); result.setOldHdsfPaths(oldHdsfPaths); + result.setProjectName(seg.getCubeInstance().getProjectName()); return result; } diff --git a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java index 37f9630..c120dae 100644 --- a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java +++ b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java @@ -33,11 +33,13 @@ import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.HiveManager; import org.apache.kylin.job.cmd.ShellCmdOutput; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.metadata.project.ProjectManager; import org.apache.kylin.metadata.realization.IRealizationConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +56,8 @@ public class GarbageCollectionStep extends AbstractExecutable { private static final String OLD_HIVE_TABLE = "oldHiveTable"; private static final String OLD_HDFS_PATHS = "oldHdfsPaths"; + + private static final String PROJECT_NAME = "projectName"; private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class); @@ -83,10 +87,16 @@ public class GarbageCollectionStep extends AbstractExecutable { private void dropHiveTable(ExecutableContext context) throws IOException { final String hiveTable = this.getOldHiveTable(); if (StringUtils.isNotEmpty(hiveTable)) { + String projectName = this.getProjectName(); + String hiveName = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(projectName).getHiveName(); + String hiveCmd = HiveManager.getInstance().getHiveCommand(hiveName); final String dropSQL = "USE " + KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable() + ";" + " DROP TABLE IF EXISTS " + hiveTable + ";"; - final String dropHiveCMD = "hive -e \"" + dropSQL + "\""; + final String dropHiveCMD = hiveCmd + " -e \"" + dropSQL + "\""; + logger.info("executing: " + dropHiveCMD); ShellCmdOutput shellCmdOutput = new ShellCmdOutput(); context.getConfig().getCliCommandExecutor().execute(dropHiveCMD, shellCmdOutput); + logger.debug("Dropped Hive table " + hiveTable + " \n"); + output.append(shellCmdOutput.getOutput() + " \n"); output.append("Dropped Hive table " + hiveTable + " \n"); } @@ -194,5 +204,13 @@ public class GarbageCollectionStep extends AbstractExecutable { private String getOldHiveTable() { return getParam(OLD_HIVE_TABLE); } + + public void setProjectName(String projectName) { + setParam(PROJECT_NAME, projectName); + } + + private String getProjectName() { + return getParam(PROJECT_NAME); + } } diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java index e814155..9db3ed3 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java @@ -74,8 +74,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create("cubename"); protected static final Option OPTION_II_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("II name. For exmaple, some_ii").create("iiname"); protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube segment name)").create("segmentname"); - protected static final Option OPTION_TABLE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Hive table name.").create("tablename"); - protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input path").create("input"); + protected static final Option OPTION_TABLE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(false).withDescription("Hive table name.").create("tablename"); + protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(false).withDescription("Input path").create("input"); protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName("inputformat").hasArg().isRequired(false).withDescription("Input format").create("inputformat"); protected static final Option OPTION_INPUT_DELIM = OptionBuilder.withArgName("inputdelim").hasArg().isRequired(false).withDescription("Input delimeter").create("inputdelim"); protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Output path").create("output"); @@ -145,18 +145,19 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { logger.info("Hadoop job classpath is: " + job.getConfiguration().get(MAP_REDUCE_CLASSPATH)); return ; } - String hdfsLibDir = KylinConfig.getInstanceFromEnv().getHadoopDependencyJarsLocation(); + String hdfsLibDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "kylin-job-dependencies"; String normalClasspath = null; String jarClasspath = null; try { FileSystem fs = FileSystem.get(new Configuration(jobConf)); Path path = new Path(hdfsLibDir); if(!fs.exists(path)) { - //upload all files inly if the directory is empty, if it is not contain all jars may get error too... - if(!fs.mkdirs(path)) { - logger.warn("Create directory for uploading hadoop dependency jars failed , location " + hdfsLibDir); - return ; - } + fs.mkdirs(path); + } + + FileStatus[] fList = fs.listStatus(path); + //upload all files inly if the directory is empty, if it is not contain all jars may get error too... + if(fList.length == 0) { long start = System.currentTimeMillis(); uploadJobClasspath(classpath, hdfsLibDir); logger.info("Upload all dependency files to HDFS " + hdfsLibDir + ", cost " + (System.currentTimeMillis() - start)); @@ -164,7 +165,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { StringBuffer jarClasspathBuffer = new StringBuffer(); StringBuffer normalClasspathBuffer = new StringBuffer(); - FileStatus[] fList = fs.listStatus(path); + fList = fs.listStatus(path); for(FileStatus file : fList) { Path p = file.getPath(); if(p.getName().endsWith(".jar")) { @@ -176,8 +177,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { jarClasspath = jarClasspathBuffer.length() > 1 ? jarClasspathBuffer.substring(0, jarClasspathBuffer.length() - 1) : null; normalClasspath = normalClasspathBuffer.length() > 1 ? normalClasspathBuffer.substring(0, normalClasspathBuffer.length() - 1) : null; } catch (IOException e) { - logger.error("Upload all kylin job dependency file to HDFS failed !", e); - return ; + logger.error("Upload all kylin job dependency file to HDFS failed !"); } if(jarClasspath != null) { @@ -400,7 +400,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { } protected void deletePath(Configuration conf, Path path) throws IOException { - FileSystem fs = FileSystem.get(conf); + FileSystem fs = FileSystem.get(path.toUri(), conf); if (fs.exists(path)) { fs.delete(path, true); } @@ -485,5 +485,4 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { public Job getJob() { return this.job; } - } diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java index 5286fbc..08a87d6 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java @@ -226,8 +226,8 @@ public class BaseCuboidMapper extends KylinMapper BatchConstants.ERROR_RECORD_THRESHOLD) { diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java index 4154d31..578d237 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java @@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.job.constant.BatchConstants; @@ -55,8 +56,9 @@ public class CubeHFileJob extends AbstractHadoopJob { options.addOption(OPTION_OUTPUT_PATH); options.addOption(OPTION_HTABLE_NAME); parseOptions(options, args); - - Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); + + String outputPath = HadoopUtil.transformHdfsPath(getOptionValue(OPTION_OUTPUT_PATH)); + Path output = new Path(outputPath); String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java index 2010b25..a2ef01a 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java @@ -32,6 +32,8 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.HiveManager; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.cuboid.CuboidCLI; @@ -39,6 +41,7 @@ import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.job.constant.BatchConstants; import org.apache.kylin.job.exception.JobException; import org.apache.kylin.job.hadoop.AbstractHadoopJob; +import org.apache.kylin.metadata.project.ProjectManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,13 +64,13 @@ public class CuboidJob extends AbstractHadoopJob { options.addOption(OPTION_JOB_NAME); options.addOption(OPTION_CUBE_NAME); options.addOption(OPTION_SEGMENT_NAME); - options.addOption(OPTION_INPUT_PATH); + options.addOption(OPTION_TABLE_NAME); options.addOption(OPTION_OUTPUT_PATH); options.addOption(OPTION_NCUBOID_LEVEL); options.addOption(OPTION_INPUT_FORMAT); + options.addOption(OPTION_INPUT_PATH); parseOptions(options, args); - Path input = new Path(getOptionValue(OPTION_INPUT_PATH)); Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); int nCuboidLevel = Integer.parseInt(getOptionValue(OPTION_NCUBOID_LEVEL)); @@ -76,7 +79,17 @@ public class CuboidJob extends AbstractHadoopJob { KylinConfig config = KylinConfig.getInstanceFromEnv(); CubeManager cubeMgr = CubeManager.getInstance(config); CubeInstance cube = cubeMgr.getCube(cubeName); - + + String inputPath = getOptionValue(OPTION_INPUT_PATH); + if(inputPath == null) { + String tableName = getOptionValue(OPTION_TABLE_NAME); + String hiveName = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(cube.getProjectName()). + getHiveName(); + inputPath = HiveManager.getInstance().getHiveTableLocation(tableName, hiveName); + inputPath = HadoopUtil.transformHdfsPath(inputPath); + } + Path input = new Path(inputPath); + job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); logger.info("Starting: " + job.getJobName()); FileInputFormat.setInputPaths(job, input); diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewFactDistinctColumnsJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewFactDistinctColumnsJob.java new file mode 100644 index 0000000..8c7dfa9 --- /dev/null +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewFactDistinctColumnsJob.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// by hzfengyu +package org.apache.kylin.job.hadoop.cube; + +import java.io.IOException; + +import org.apache.commons.cli.Options; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.ShortWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.HiveManager; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.job.constant.BatchConstants; +import org.apache.kylin.job.hadoop.AbstractHadoopJob; +import org.apache.kylin.metadata.project.ProjectManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NewFactDistinctColumnsJob extends AbstractHadoopJob { + protected static final Logger log = LoggerFactory.getLogger(NewFactDistinctColumnsJob.class); + + @Override + public int run(String[] args) throws Exception { + Options options = new Options(); + + try { + options.addOption(OPTION_JOB_NAME); + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_OUTPUT_PATH); + options.addOption(OPTION_TABLE_NAME); + options.addOption(OPTION_SEGMENT_NAME); + options.addOption(OPTION_INPUT_PATH); + + parseOptions(options, args); + + job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); + Configuration jobConf = job.getConfiguration(); + + String cubeName = getOptionValue(OPTION_CUBE_NAME); + Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); + String segmentName = getOptionValue(OPTION_SEGMENT_NAME); + String inputPath = getOptionValue(OPTION_INPUT_PATH); + + // ---------------------------------------------------------------------------- + // add metadata to distributed cache + CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubeInstance cubeInstance = cubeMgr.getCube(cubeName); + + if(inputPath == null) { + String tableName = getOptionValue(OPTION_TABLE_NAME); + String hiveName = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(cubeInstance.getProjectName()). + getHiveName(); + inputPath = HiveManager.getInstance().getHiveTableLocation(tableName, hiveName); + inputPath = HadoopUtil.transformHdfsPath(inputPath); + } + Path input = new Path(inputPath); + + jobConf.set(BatchConstants.CFG_CUBE_NAME, cubeName); + + FileInputFormat.setInputPaths(job, input); + System.out.println("Starting: " + job.getJobName()); + + setJobClasspath(job); + + setupMapper(input); + setupReducer(output); + + job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); + job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName); + // CubeSegment seg = cubeMgr.getCube(cubeName).getTheOnlySegment(); + attachKylinPropsAndMetadata(cubeInstance, job.getConfiguration()); + + return waitForCompletion(job); + } catch (Exception e) { + logger.error("error in NewFactDistinctColumnsJob", e); + printUsage(options); + throw e; + } finally { + if (job != null) { + cleanupTempConfFile(job.getConfiguration()); + } + } + } + + private void setupMapper(Path intermediateTableLocation) throws IOException { + // FileInputFormat.setInputPaths(job, input); + boolean isInputTextFormat = false; + if (hasOption(OPTION_INPUT_FORMAT) && ("textinputformat".equalsIgnoreCase(getOptionValue(OPTION_INPUT_FORMAT)))) { + isInputTextFormat = true; + } + + if (isInputTextFormat) { + job.setInputFormatClass(TextInputFormat.class); + } else { + job.setInputFormatClass(SequenceFileInputFormat.class); + } + job.setMapperClass(NewFactDistinctColumnsMapper.class); + job.setCombinerClass(FactDistinctColumnsCombiner.class); + job.setMapOutputKeyClass(ShortWritable.class); + job.setMapOutputValueClass(Text.class); + } + + private void setupReducer(Path output) throws IOException { + job.setReducerClass(FactDistinctColumnsReducer.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(Text.class); + + FileOutputFormat.setOutputPath(job, output); + job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString()); + + job.setNumReduceTasks(1); + + deletePath(job.getConfiguration(), output); + } + + public static void main(String[] args) throws Exception { + NewFactDistinctColumnsJob job = new NewFactDistinctColumnsJob(); + int exitCode = ToolRunner.run(job, args); + System.exit(exitCode); + } + +} diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewFactDistinctColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewFactDistinctColumnsMapper.java new file mode 100644 index 0000000..e349fe1 --- /dev/null +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewFactDistinctColumnsMapper.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// added by hzfengyu +package org.apache.kylin.job.hadoop.cube; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.ShortWritable; +import org.apache.hadoop.io.Text; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.mr.KylinMapper; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.BytesSplitter; +import org.apache.kylin.common.util.SplittedBytes; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.RowKeyDesc; +import org.apache.kylin.dict.DictionaryManager; +import org.apache.kylin.job.constant.BatchConstants; +import org.apache.kylin.job.hadoop.AbstractHadoopJob; +import org.apache.kylin.job.hadoop.hive.CubeJoinedFlatTableDesc; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NewFactDistinctColumnsMapper extends KylinMapper { + private static final Logger logger = LoggerFactory.getLogger(NewFactDistinctColumnsMapper.class); + + private String cubeName; + private String segmentName; + private String intermediateTableRowDelimiter; + private byte byteRowDelimiter; + private CubeInstance cube; + private CubeDesc cubeDesc; + private int[] factDictCols; + private CubeSegment cubeSegment; + private int counter; + + private ShortWritable outputKey = new ShortWritable(); + private Text outputValue = new Text(); + private int errorRecordCounter; + + private BytesSplitter bytesSplitter; + private CubeJoinedFlatTableDesc intermediateTableDesc; + + @Override + protected void setup(Context context) throws IOException { + super.publishConfiguration(context.getConfiguration()); + Configuration conf = context.getConfiguration(); + cubeName = conf.get(BatchConstants.CFG_CUBE_NAME).toUpperCase(); + segmentName = conf.get(BatchConstants.CFG_CUBE_SEGMENT_NAME); + intermediateTableRowDelimiter = context.getConfiguration().get(BatchConstants.CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER, Character.toString(BatchConstants.INTERMEDIATE_TABLE_ROW_DELIMITER)); + if (Bytes.toBytes(intermediateTableRowDelimiter).length > 1) { + throw new RuntimeException("Expected delimiter byte length is 1, but got " + Bytes.toBytes(intermediateTableRowDelimiter).length); + } + + byteRowDelimiter = Bytes.toBytes(intermediateTableRowDelimiter)[0]; + + KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration()); + + cube = CubeManager.getInstance(config).getCube(cubeName); + cubeDesc = cube.getDescriptor(); + cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW); + + intermediateTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), cubeSegment); + bytesSplitter = new BytesSplitter(200, 4096); + + long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); + Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); + List columns = baseCuboid.getColumns(); + + logger.info("Basecuboid columns : " + columns); + ArrayList factDictCols = new ArrayList(); + RowKeyDesc rowkey = cubeDesc.getRowkey(); + DictionaryManager dictMgr = DictionaryManager.getInstance(config); + for (int i = 0; i < columns.size(); i++) { + TblColRef col = columns.get(i); + if (rowkey.isUseDictionary(col) == false) + continue; + + String scanTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0]; + if (cubeDesc.getModel().isFactTable(scanTable)) { + factDictCols.add(i); + } + } + logger.info("Fact dict columns : " + factDictCols); + this.factDictCols = new int[factDictCols.size()]; + for (int i = 0; i < factDictCols.size(); i++) + this.factDictCols[i] = factDictCols.get(i); + } + + @Override + public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException { + counter++; + if (counter % BatchConstants.COUNTER_MAX == 0) { + logger.info("Handled " + counter + " records!"); + } + + try { + bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter); + intermediateTableDesc.sanityCheck(bytesSplitter); + + SplittedBytes[] columnValues = bytesSplitter.getSplitBuffers(); + int[] flatTableIndexes = intermediateTableDesc.getRowKeyColumnIndexes(); + SplittedBytes fieldValue = null; + for (int i : factDictCols) { + outputKey.set((short) i); + fieldValue = columnValues[flatTableIndexes[i]]; + if (fieldValue == null) + continue; + outputValue.set(fieldValue.value, 0, fieldValue.length); + context.write(outputKey, outputValue); + } + } catch (Exception ex) { + handleErrorRecord(bytesSplitter, ex); + } + } + + private void handleErrorRecord(BytesSplitter bytesSplitter, Exception ex) throws IOException { + + ex.printStackTrace(System.err); + System.err.println("Insane record: " + bytesSplitter); + + errorRecordCounter++; + if (errorRecordCounter > BatchConstants.ERROR_RECORD_THRESHOLD) { + if (ex instanceof IOException) + throw (IOException) ex; + else if (ex instanceof RuntimeException) + throw (RuntimeException) ex; + else + throw new RuntimeException("", ex); + } + } +} diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java index f60313d..94943d4 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java @@ -21,6 +21,7 @@ package org.apache.kylin.job.hadoop.dict; import org.apache.commons.cli.Options; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HiveManager; import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IIManager; import org.apache.kylin.job.hadoop.AbstractHadoopJob; @@ -46,6 +47,8 @@ public class CreateInvertedIndexDictionaryJob extends AbstractHadoopJob { IIManager mgr = IIManager.getInstance(config); IIInstance ii = mgr.getII(iiname); + //set local thread project name + HiveManager.getInstance().setCurrentProject(ii.getProjectName()); mgr.buildInvertedIndexDictionary(ii.getFirstSegment(), factColumnsInputPath); return 0; diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java index e51f8cb..0a2c2ba 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.model.CubeDesc; @@ -59,7 +60,7 @@ public class BulkLoadJob extends AbstractHadoopJob { // end with "/" String input = getOptionValue(OPTION_INPUT_PATH); - Configuration conf = HBaseConfiguration.create(getConf()); + Configuration conf = HadoopUtil.getCurrentHBaseConfiguration(); FileSystem fs = FileSystem.get(conf); String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java index 1350077..b8e57d4 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java @@ -42,6 +42,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.model.CubeDesc; @@ -80,7 +81,8 @@ public class CreateHTableJob extends AbstractHadoopJob { tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName()); tableDesc.setValue(IRealizationConstants.HTableTag, config.getMetadataUrlPrefix()); - Configuration conf = HBaseConfiguration.create(getConf()); + Configuration conf = HadoopUtil.getCurrentHBaseConfiguration(); + Configuration hadoopConf = HadoopUtil.getCurrentConfiguration(); HBaseAdmin admin = new HBaseAdmin(conf); try { @@ -106,7 +108,7 @@ public class CreateHTableJob extends AbstractHadoopJob { tableDesc.addFamily(cf); } - byte[][] splitKeys = getSplits(conf, partitionFilePath); + byte[][] splitKeys = getSplits(hadoopConf, partitionFilePath); if (admin.tableExists(tableName)) { // admin.disableTable(tableName); diff --git a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java b/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java index 0465865..b399462 100644 --- a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java +++ b/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java @@ -106,7 +106,7 @@ public class DeployCoprocessorCLI { private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - Configuration hconf = HadoopUtil.getCurrentConfiguration(); + Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration(); FileSystem fileSystem = FileSystem.get(hconf); String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar(); diff --git a/job/src/test/java/org/apache/kylin/job/DeployUtil.java b/job/src/test/java/org/apache/kylin/job/DeployUtil.java index 41399c7..6f1ef3e 100644 --- a/job/src/test/java/org/apache/kylin/job/DeployUtil.java +++ b/job/src/test/java/org/apache/kylin/job/DeployUtil.java @@ -33,6 +33,7 @@ import org.apache.kylin.common.persistence.ResourceTool; import org.apache.kylin.common.util.AbstractKylinTestCase; import org.apache.kylin.common.util.CliCommandExecutor; import org.apache.kylin.common.util.HiveClient; +import org.apache.kylin.common.util.HiveManager; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -215,7 +216,8 @@ public class DeployUtil { String tableFileDir = temp.getParent(); temp.delete(); - HiveClient hiveClient = new HiveClient(); +// HiveClient hiveClient = new HiveClient(); + HiveClient hiveClient = HiveManager.getInstance().createHiveClient(null); // create hive tables hiveClient.executeHQL("CREATE DATABASE IF NOT EXISTS EDW"); -- 1.9.4.msysgit.2