From f3d39707e2b99f3d42b866fb188ac84f335e905f Mon Sep 17 00:00:00 2001 From: gaodayue Date: Fri, 3 Jun 2016 18:17:43 +0800 Subject: [PATCH] KYLIN-1752 Add an option to fail cube build job when source table is empty --- .../org/apache/kylin/common/KylinConfigBase.java | 4 ++++ .../kylin/source/hive/CreateFlatHiveTableStep.java | 22 ++++++++++++++-------- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 68e3b6c..9aaa89f 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -363,6 +363,10 @@ abstract public class KylinConfigBase implements Serializable { return getOptional("kylin.job.cmd.extra.args"); } + public boolean isJobFailOnEmptyEnabled() { + return Boolean.parseBoolean(getOptional("kylin.job.fail_on_empty.enabled", "false")); + } + public String getOverrideHiveTableLocation(String table) { return getOptional("hive.table.location." + table.toUpperCase()); } diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java index e9b9994..4e98d31 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java @@ -23,9 +23,11 @@ import java.io.InputStream; public class CreateFlatHiveTableStep extends AbstractExecutable { private final BufferedLogger stepLogger = new BufferedLogger(logger); - private long readRowCountFromFile(Path file) throws IOException { - FileSystem fs = FileSystem.get(file.toUri(), HadoopUtil.getCurrentConfiguration()); - InputStream in = fs.open(file); + private long readRowCountFromFile() throws IOException { + Path rowCountFile = new Path(getRowCountOutputDir(), "000000_0"); + + FileSystem fs = FileSystem.get(rowCountFile.toUri(), HadoopUtil.getCurrentConfiguration()); + InputStream in = fs.open(rowCountFile); try { String content = IOUtils.toString(in); return Long.valueOf(content.trim()); // strip the '\n' character @@ -35,9 +37,7 @@ public class CreateFlatHiveTableStep extends AbstractExecutable { } } - private int determineNumReducer(KylinConfig config) throws IOException { - Path rowCountFile = new Path(getRowCountOutputDir(), "000000_0"); - long rowCount = readRowCountFromFile(rowCountFile); + private int determineNumReducer(KylinConfig config, long rowCount) throws IOException { int mapperInputRows = config.getHadoopJobMapperInputRows(); int numReducers = Math.round(rowCount / ((float) mapperInputRows)); @@ -78,8 +78,14 @@ public class CreateFlatHiveTableStep extends AbstractExecutable { protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { KylinConfig config = getCubeSpecificConfig(); try { - - int numReducers = determineNumReducer(config); + long rowCount = readRowCountFromFile(); + if (config.isJobFailOnEmptyEnabled() && rowCount == 0) { + stepLogger.log("Detect upstream hive table is empty, " + + "fail the job because \"kylin.job.fail_on_empty.enabled\" = \"true\""); + return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog()); + } + + int numReducers = determineNumReducer(config, rowCount); createFlatHiveTable(config, numReducers); return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog()); -- 2.7.4 (Apple Git-66)