From 005c89c76955988ea8e584f293799d04911469e1 Mon Sep 17 00:00:00 2001 From: gaodayue Date: Sun, 30 Aug 2015 22:56:32 +0800 Subject: [PATCH] KYLIN-975 fix bug when user configures kylin.job.hive.database.for.intermediatetable --- .../kylin/job/invertedindex/IIJobBuilder.java | 10 +++++--- .../org/apache/kylin/source/hive/HiveMRInput.java | 28 ++++++++++++++-------- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java b/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java index b18cc61..b4f69d6 100644 --- a/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java +++ b/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java @@ -57,7 +57,7 @@ public final class IIJobBuilder { IIJob result = initialJob(seg, "BUILD", submitter); final String jobId = result.getId(); final IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(seg.getIIDesc()); - final String intermediateHiveTableName = intermediateTableDesc.getTableName(); + final String intermediateTableIdentity = getIntermediateTableIdentity(intermediateTableDesc); final String factDistinctColumnsPath = getIIDistinctColumnsPath(seg, jobId); final String iiRootPath = getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/"; final String iiPath = iiRootPath + "*"; @@ -65,11 +65,11 @@ public final class IIJobBuilder { final AbstractExecutable intermediateHiveTableStep = createFlatHiveTableStep(intermediateTableDesc, jobId); result.addTask(intermediateHiveTableStep); - result.addTask(createFactDistinctColumnsStep(seg, intermediateHiveTableName, jobId, factDistinctColumnsPath)); + result.addTask(createFactDistinctColumnsStep(seg, intermediateTableIdentity, jobId, factDistinctColumnsPath)); result.addTask(createBuildDictionaryStep(seg, factDistinctColumnsPath)); - result.addTask(createInvertedIndexStep(seg, intermediateHiveTableName, iiRootPath)); + result.addTask(createInvertedIndexStep(seg, intermediateTableIdentity, iiRootPath)); // create htable step result.addTask(createCreateHTableStep(seg)); @@ -223,4 +223,8 @@ public final class IIJobBuilder { private String getJobWorkingDir(String uuid) { return engineConfig.getHdfsWorkingDirectory() + "/" + "kylin-" + uuid; } + + private String getIntermediateTableIdentity(IIJoinedFlatTableDesc intermediateTableDesc) { + return engineConfig.getConfig().getHiveDatabaseForIntermediateTable() + "." + intermediateTableDesc.getTableName(); + } } diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index fefd76f..ec9b432 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -59,8 +59,12 @@ public class HiveMRInput implements IMRInput { final String dbName; final String tableName; - public HiveTableInputFormat(String hiveTable) { - String[] parts = HadoopUtil.parseHiveTableName(hiveTable); + /** + * Construct a HiveTableInputFormat to read hive table. + * @param fullQualifiedTableName "databaseName.tableName" + */ + public HiveTableInputFormat(String fullQualifiedTableName) { + String[] parts = HadoopUtil.parseHiveTableName(fullQualifiedTableName); dbName = parts[0]; tableName = parts[1]; } @@ -112,8 +116,9 @@ public class HiveMRInput implements IMRInput { } ShellExecutable step = new ShellExecutable(); - StringBuffer buf = new StringBuffer(); + StringBuilder buf = new StringBuilder(); buf.append("hive -e \""); + buf.append(useDatabaseHql + "\n"); buf.append(dropTableHql + "\n"); buf.append(createTableHql + "\n"); buf.append(insertDataHqls + "\n"); @@ -129,15 +134,18 @@ public class HiveMRInput implements IMRInput { public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) { GarbageCollectionStep step = new GarbageCollectionStep(); step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION); - step.setOldHiveTable(flatHiveTableDesc.getTableName()); + step.setIntermediateTableIdentity(getIntermediateTableIdentity()); jobFlow.addTask(step); } @Override public IMRTableInputFormat getFlatTableInputFormat() { - return new HiveTableInputFormat(flatHiveTableDesc.getTableName()); + return new HiveTableInputFormat(getIntermediateTableIdentity()); } + private String getIntermediateTableIdentity() { + return conf.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatHiveTableDesc.getTableName(); + } } public static class GarbageCollectionStep extends AbstractExecutable { @@ -146,9 +154,9 @@ public class HiveMRInput implements IMRInput { protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { StringBuffer output = new StringBuffer(); - final String hiveTable = this.getOldHiveTable(); + final String hiveTable = this.getIntermediateTableIdentity(); if (StringUtils.isNotEmpty(hiveTable)) { - final String dropSQL = "USE " + context.getConfig().getHiveDatabaseForIntermediateTable() + ";" + " DROP TABLE IF EXISTS " + hiveTable + ";"; + final String dropSQL = "DROP TABLE IF EXISTS " + hiveTable + ";"; final String dropHiveCMD = "hive -e \"" + dropSQL + "\""; ShellCmdOutput shellCmdOutput = new ShellCmdOutput(); try { @@ -164,11 +172,11 @@ public class HiveMRInput implements IMRInput { return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString()); } - public void setOldHiveTable(String hiveTable) { - setParam("oldHiveTable", hiveTable); + public void setIntermediateTableIdentity(String tableIdentity) { + setParam("oldHiveTable", tableIdentity); } - private String getOldHiveTable() { + private String getIntermediateTableIdentity() { return getParam("oldHiveTable"); } } -- 2.3.2 (Apple Git-55)