From b00665fddfd93c953aa3e156716cbd54fbbc2038 Mon Sep 17 00:00:00 2001 From: gaodayue Date: Sun, 2 Aug 2015 23:02:16 +0800 Subject: [PATCH] KYLIN-921 Dimension with all nulls cause BuildDimensionDictionary failed due to FileNotFoundException --- .../apache/kylin/job/constant/BatchConstants.java | 3 ++ .../job/hadoop/cube/FactDistinctColumnsJob.java | 41 ++++++++++++++++++++- .../job/hadoop/cube/FactDistinctColumnsMapper.java | 43 +++++----------------- .../hadoop/cube/FactDistinctColumnsReducer.java | 32 +++++++++++++--- 4 files changed, 77 insertions(+), 42 deletions(-) diff --git a/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java b/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java index e6b16a9..7333753 100644 --- a/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java +++ b/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java @@ -32,6 +32,9 @@ public interface BatchConstants { public static final String CFG_II_NAME = "ii.name"; public static final String CFG_II_SEGMENT_NAME = "ii.segment.name"; + public static final String CFG_FACT_DICT_COLUMN_NAMES = "fact.dict.column.names"; + public static final String CFG_FACT_DICT_COLUMN_ROWKEY_INDEXES = "fact.dict.column.rowkey.indexes"; + public static final String INPUT_DELIM = "input.delim"; public static final String OUTPUT_PATH = "output.path"; diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java index ccc2e0d..04c638b 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java @@ -19,8 +19,12 @@ package org.apache.kylin.job.hadoop.cube; import java.io.IOException; +import java.util.List; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; import org.apache.commons.cli.Options; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.ShortWritable; @@ -34,8 +38,13 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.RowKeyDesc; +import org.apache.kylin.dict.DictionaryManager; import org.apache.kylin.job.constant.BatchConstants; import org.apache.kylin.job.hadoop.AbstractHadoopJob; +import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +53,7 @@ import org.slf4j.LoggerFactory; */ public class FactDistinctColumnsJob extends AbstractHadoopJob { protected static final Logger log = LoggerFactory.getLogger(FactDistinctColumnsJob.class); + private static final Joiner joiner = Joiner.on(","); @Override public int run(String[] args) throws Exception { @@ -57,6 +67,8 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { parseOptions(options, args); job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); + Configuration jobConf = job.getConfiguration(); + String cubeName = getOptionValue(OPTION_CUBE_NAME); Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); String intermediateTable = getOptionValue(OPTION_TABLE_NAME); @@ -65,8 +77,9 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { // add metadata to distributed cache CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); CubeInstance cubeInstance = cubeMgr.getCube(cubeName); + CubeDesc cubeDesc = cubeInstance.getDescriptor(); - job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); + jobConf.set(BatchConstants.CFG_CUBE_NAME, cubeName); System.out.println("Starting: " + job.getJobName()); setJobClasspath(job); @@ -75,7 +88,31 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { setupReducer(output); // CubeSegment seg = cubeMgr.getCube(cubeName).getTheOnlySegment(); - attachKylinPropsAndMetadata(cubeInstance, job.getConfiguration()); + attachKylinPropsAndMetadata(cubeInstance, jobConf); + + // set names and row key indexes of fact table's dictionary columns in Configuration, + // so that mapper & reducer task can use them + List factDictColNames = Lists.newArrayList(); + List factDictColRowKeyIndexes = Lists.newArrayList(); + + Cuboid baseCuboid = Cuboid.findById(cubeDesc, Cuboid.getBaseCuboidId(cubeDesc)); + List columns = baseCuboid.getColumns(); + + RowKeyDesc rowkey = cubeDesc.getRowkey(); + DictionaryManager dictMgr = DictionaryManager.getInstance(KylinConfig.getInstanceFromEnv()); + for (int i = 0; i < columns.size(); i++) { + TblColRef col = columns.get(i); + if (!rowkey.isUseDictionary(col)) + continue; + + String scanTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), rowkey.getDictionary(col), col, null)[0]; + if (cubeDesc.getModel().isFactTable(scanTable)) { + factDictColNames.add(col.getName()); + factDictColRowKeyIndexes.add(i); + } + } + jobConf.set(BatchConstants.CFG_FACT_DICT_COLUMN_NAMES, joiner.join(factDictColNames)); + jobConf.set(BatchConstants.CFG_FACT_DICT_COLUMN_ROWKEY_INDEXES, joiner.join(factDictColRowKeyIndexes)); return waitForCompletion(job); diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java index 6f1264d..9345171 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java @@ -19,8 +19,6 @@ package org.apache.kylin.job.hadoop.cube; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.kylin.common.util.Bytes; @@ -35,26 +33,18 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.mr.KylinMapper; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.RowKeyDesc; -import org.apache.kylin.dict.DictionaryManager; import org.apache.kylin.job.constant.BatchConstants; import org.apache.kylin.job.hadoop.AbstractHadoopJob; import org.apache.kylin.job.hadoop.hive.CubeJoinedFlatTableDesc; -import org.apache.kylin.metadata.model.TblColRef; /** * @author yangli9 */ public class FactDistinctColumnsMapper extends KylinMapper { - private String cubeName; - private CubeInstance cube; - private CubeDesc cubeDesc; - private int[] factDictCols; - private CubeJoinedFlatTableDesc intermediateTableDesc; + private int[] factDictColRowKeyIndexes; private ShortWritable outputKey = new ShortWritable(); private Text outputValue = new Text(); @@ -69,31 +59,16 @@ public class FactDistinctColumnsMapper extends KylinMapper columns = baseCuboid.getColumns(); - - ArrayList factDictCols = new ArrayList(); - RowKeyDesc rowkey = cubeDesc.getRowkey(); - DictionaryManager dictMgr = DictionaryManager.getInstance(config); - for (int i = 0; i < columns.size(); i++) { - TblColRef col = columns.get(i); - if (rowkey.isUseDictionary(col) == false) - continue; - - String scanTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0]; - if (cubeDesc.getModel().isFactTable(scanTable)) { - factDictCols.add(i); - } + String[] rowKeyIndexes = conf.get(BatchConstants.CFG_FACT_DICT_COLUMN_ROWKEY_INDEXES).split(","); + factDictColRowKeyIndexes = new int[rowKeyIndexes.length]; + for (int i = 0; i < rowKeyIndexes.length; i++) { + factDictColRowKeyIndexes[i] = Integer.parseInt(rowKeyIndexes[i]); } - this.factDictCols = new int[factDictCols.size()]; - for (int i = 0; i < factDictCols.size(); i++) - this.factDictCols[i] = factDictCols.get(i); schema = HCatInputFormat.getTableSchema(context.getConfiguration()); } @@ -105,7 +80,7 @@ public class FactDistinctColumnsMapper extends KylinMapper { + private static Logger logger = LoggerFactory.getLogger(FactDistinctColumnsReducer.class); + private String outputPath; + private FileSystem fs; private List columnList = new ArrayList(); + private Set toBeExtractedColumns = new HashSet(); @Override protected void setup(Context context) throws IOException { @@ -63,11 +68,18 @@ public class FactDistinctColumnsReducer extends KylinReducer values, Context context) throws IOException, InterruptedException { TblColRef col = columnList.get(key.get()); + toBeExtractedColumns.remove(col.getName()); HashSet set = new HashSet(); for (Text textValue : values) { @@ -75,9 +87,6 @@ public class FactDistinctColumnsReducer extends KylinReducer